1use std::sync::Arc;
2
3use bytes::Bytes;
4use indexmap::IndexMap;
5
6use crate::{
7 DType, Error, Result,
8 address::{BlockAllocAddress, ChunkAddress},
9 layout::{
10 ArrayLayout, ArrayMeta, AttrIndexKind, AttributeValue, Attributes, ChunkEntry, FillValue,
11 StorageLayout,
12 },
13};
14
15use super::{
16 Delta,
17 allocator::{AllocatorOutput, DeltaAllocator},
18 immutable::DeltaImmutable,
19};
20
21pub struct DeltaMutable {
22 pub delta_index: u32,
23 pub array_meta: IndexMap<String, ArrayMeta>,
24 pub allocator: DeltaAllocator,
25 pub attr_keys: Vec<String>,
26 pub attr_values: Vec<AttributeValue>,
27}
28
29fn alloc_addr_from_chunk(addr: &ChunkAddress) -> BlockAllocAddress {
30 BlockAllocAddress::new(addr.block_id, addr.offset as u64, addr.size as u64)
31}
32
33impl Delta<DeltaMutable> {
34 pub fn new(
35 codec: Arc<dyn crate::codec::CompressionCodec>,
36 block_target_size: usize,
37 delta_index: u32,
38 ) -> Self {
39 Delta {
40 inner: DeltaMutable {
41 delta_index,
42 array_meta: IndexMap::new(),
43 allocator: DeltaAllocator::new(codec, block_target_size),
44 attr_keys: Vec::new(),
45 attr_values: Vec::new(),
46 },
47 }
48 }
49
50 pub fn define_array(
51 &mut self,
52 name: impl Into<String>,
53 dtype: DType,
54 shape: Vec<usize>,
55 dimension_names: Vec<String>,
56 chunk_shape: Option<Vec<usize>>,
57 fill_value: Option<FillValue>,
58 ) -> Result<()> {
59 let name = name.into();
60 let shape_u32: Vec<u32> = shape.iter().map(|&s| s as u32).collect();
61 let ndim = shape_u32.len();
62 let chunk_shape_u32: Vec<u32> = chunk_shape
63 .map(|cs| cs.iter().map(|&s| s as u32).collect())
64 .unwrap_or_else(|| shape_u32.clone());
65 let dim_names = if dimension_names.len() == ndim {
66 dimension_names
67 } else {
68 (0..ndim).map(|i| format!("dim{i}")).collect()
69 };
70 let layout = ArrayLayout {
71 shape: shape_u32,
72 dimension_names: dim_names,
73 storage: StorageLayout {
74 chunk_shape: chunk_shape_u32,
75 chunks: Vec::new(),
76 },
77 };
78 self.inner.array_meta.insert(
79 name.clone(),
80 ArrayMeta {
81 name,
82 dtype,
83 layout,
84 fill_value,
85 deleted: false,
86 attributes: Attributes::empty(AttrIndexKind::U16),
87 },
88 );
89 Ok(())
90 }
91
92 pub fn array_meta(&self, name: &str) -> Option<&ArrayMeta> {
93 self.inner.array_meta.get(name)
94 }
95
96 pub fn write_raw_chunk(&mut self, name: &str, coord: Vec<u32>, raw: &[u8]) -> Result<()> {
98 let alloc = self.inner.allocator.allocate(raw);
99 let address = ChunkAddress::from(alloc);
100 let meta = self
101 .inner
102 .array_meta
103 .get_mut(name)
104 .ok_or_else(|| Error::ArrayNotFound {
105 name: name.to_string(),
106 })?;
107 if let Some(entry) = meta
108 .layout
109 .storage
110 .chunks
111 .iter_mut()
112 .find(|e| e.coord == coord)
113 {
114 entry.address = address;
115 } else {
116 meta.layout
117 .storage
118 .chunks
119 .push(ChunkEntry { coord, address });
120 }
121 Ok(())
122 }
123
124 pub fn array_meta_mut(&mut self, name: &str) -> Option<&mut ArrayMeta> {
126 self.inner.array_meta.get_mut(name)
127 }
128
129 pub fn upsert_array_meta(&mut self, meta: ArrayMeta) {
131 self.inner.array_meta.insert(meta.name.clone(), meta);
132 }
133
134 pub fn mark_deleted(&mut self, mut meta: ArrayMeta) {
136 meta.deleted = true;
137 meta.layout.storage.chunks.clear();
138 self.upsert_array_meta(meta);
139 }
140
141 pub fn intern_attr_key(&mut self, key: &str) -> usize {
143 if let Some(i) = self.inner.attr_keys.iter().position(|k| k == key) {
144 i
145 } else {
146 self.inner.attr_keys.push(key.to_string());
147 self.inner.attr_keys.len() - 1
148 }
149 }
150
151 pub fn intern_attr_value(&mut self, value: AttributeValue) -> usize {
153 if let Some(i) = self.inner.attr_values.iter().position(|v| *v == value) {
154 i
155 } else {
156 self.inner.attr_values.push(value);
157 self.inner.attr_values.len() - 1
158 }
159 }
160
161 pub fn read_raw_chunk(&self, name: &str, coord: &[u32]) -> Option<Bytes> {
164 let meta = self.inner.array_meta.get(name)?;
165 let entry = meta
166 .layout
167 .storage
168 .chunks
169 .iter()
170 .find(|e| e.coord.as_slice() == coord)?;
171 self.inner
172 .allocator
173 .fetch(&alloc_addr_from_chunk(&entry.address))
174 }
175
176 pub async fn commit(
179 self,
180 storage: Arc<dyn crate::storage::Storage>,
181 path: Arc<str>,
182 cache: Option<Arc<super::DeltaCache>>,
183 base_file_hint: impl Into<String>,
184 ) -> Result<Delta<DeltaImmutable>> {
185 use crate::footer::{FOOTER_VERSION, Footer};
186
187 let overlay_index = self.inner.delta_index;
188 let arrays: Vec<ArrayMeta> = self.inner.array_meta.into_values().collect();
189 let attr_keys = self.inner.attr_keys;
190 let attr_values = self.inner.attr_values;
191 let AllocatorOutput {
192 mut file,
193 output_size,
194 blocks,
195 } = self.inner.allocator.commit().await;
196
197 let footer = Footer {
198 version: FOOTER_VERSION,
199 blocks,
200 arrays,
201 attr_keys,
202 attr_values,
203 overlay_index,
204 base_file_hint: base_file_hint.into(),
205 };
206 let footer_bytes = footer.serialize()?;
207
208 super::write_file_then_bytes(&mut file, output_size, &footer_bytes, &*storage).await?;
209 Delta::<DeltaImmutable>::open(storage, path, cache).await
210 }
211}
212
213#[cfg(test)]
214mod tests {
215 use std::sync::Arc;
216
217 use super::*;
218 use crate::{DType, NoCompression, codec::CompressionCodec, storage::InMemoryStorage};
219
220 fn codec() -> Arc<dyn CompressionCodec> {
221 Arc::new(NoCompression)
222 }
223
224 fn make_mutable() -> Delta<DeltaMutable> {
225 Delta::<DeltaMutable>::new(codec(), 512, 0)
226 }
227
228 #[test]
229 fn define_array_stores_meta() {
230 let mut d = make_mutable();
231 d.define_array(
232 "temp",
233 DType::Float32,
234 vec![100],
235 vec!["x".into()],
236 None,
237 None,
238 )
239 .unwrap();
240 let meta = d.array_meta("temp").expect("array_meta returned None");
241 assert_eq!(meta.name, "temp");
242 assert_eq!(meta.dtype, DType::Float32);
243 assert_eq!(meta.layout.shape, vec![100u32]);
244 assert!(!meta.deleted);
245 }
246
247 #[test]
248 fn define_array_default_chunk_shape_equals_shape() {
249 let mut d = make_mutable();
250 d.define_array("a", DType::Int32, vec![50], vec![], None, None)
251 .unwrap();
252 let meta = d.array_meta("a").unwrap();
253 assert_eq!(meta.layout.storage.chunk_shape, meta.layout.shape);
254 }
255
256 #[test]
257 fn define_array_custom_chunk_shape() {
258 let mut d = make_mutable();
259 d.define_array("a", DType::UInt8, vec![200], vec![], Some(vec![50]), None)
260 .unwrap();
261 let meta = d.array_meta("a").unwrap();
262 assert_eq!(meta.layout.storage.chunk_shape, vec![50u32]);
263 }
264
265 #[test]
266 fn write_raw_chunk_records_entry() {
267 let mut d = make_mutable();
268 d.define_array("x", DType::UInt8, vec![4], vec![], None, None)
269 .unwrap();
270 d.write_raw_chunk("x", vec![0], &[1u8, 2, 3, 4]).unwrap();
271 let meta = d.array_meta("x").unwrap();
272 assert_eq!(meta.layout.storage.chunks.len(), 1);
273 assert_eq!(meta.layout.storage.chunks[0].coord, vec![0u32]);
274 }
275
276 #[test]
277 fn write_raw_chunk_overwrites_same_coord() {
278 let mut d = make_mutable();
279 d.define_array("x", DType::UInt8, vec![4], vec![], None, None)
280 .unwrap();
281 d.write_raw_chunk("x", vec![0], &[0u8; 4]).unwrap();
282 d.write_raw_chunk("x", vec![0], &[99u8; 4]).unwrap();
283 let meta = d.array_meta("x").unwrap();
284 assert_eq!(meta.layout.storage.chunks.len(), 1);
285 }
286
287 #[test]
288 fn write_raw_chunk_unknown_array_returns_error() {
289 let mut d = make_mutable();
290 let err = d.write_raw_chunk("nope", vec![0], &[1, 2, 3]).unwrap_err();
291 assert!(matches!(err, crate::Error::ArrayNotFound { .. }));
292 }
293
294 #[tokio::test]
295 async fn commit_produces_readable_delta() {
296 let values: Vec<f64> = vec![1.0, 2.0, 3.0];
297 let raw: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
298 let mut d = make_mutable();
299 d.define_array(
300 "temps",
301 DType::Float64,
302 vec![3],
303 vec!["t".into()],
304 None,
305 None,
306 )
307 .unwrap();
308 d.write_raw_chunk("temps", vec![0], &raw).unwrap();
309 let storage = Arc::new(InMemoryStorage::new());
310 let immutable = d
311 .commit(storage, Arc::from("test"), None, "base")
312 .await
313 .unwrap();
314 let meta = immutable
315 .array_meta("temps")
316 .expect("array not found after commit");
317 assert_eq!(meta.dtype, DType::Float64);
318 assert_eq!(meta.layout.shape, vec![3u32]);
319 }
320
321 #[tokio::test]
322 async fn multiple_chunks_across_blocks() {
323 let d_codec = codec();
324 let mut d = Delta::<DeltaMutable>::new(Arc::clone(&d_codec), 16, 0);
325 d.define_array("m", DType::UInt8, vec![64], vec![], Some(vec![8]), None)
326 .unwrap();
327 let chunks: Vec<Vec<u8>> = (0..8u8).map(|i| vec![i; 8]).collect();
328 for (i, chunk) in chunks.iter().enumerate() {
329 d.write_raw_chunk("m", vec![i as u32 * 8], chunk).unwrap();
330 }
331 let storage = Arc::new(InMemoryStorage::new());
332 let immutable = d
333 .commit(storage, Arc::from("test"), None, "base")
334 .await
335 .unwrap();
336 for (i, expected) in chunks.iter().enumerate() {
337 let bytes = immutable
338 .read_raw_chunk("m", &[i as u32 * 8])
339 .await
340 .unwrap()
341 .expect("chunk missing");
342 assert_eq!(bytes.as_ref(), expected.as_slice(), "mismatch at chunk {i}");
343 }
344 }
345}