Skip to main content

array_format/delta/
mutable.rs

1use std::sync::Arc;
2
3use bytes::Bytes;
4use indexmap::IndexMap;
5
6use crate::{
7    DType, Error, Result,
8    address::{BlockAllocAddress, ChunkAddress},
9    layout::{
10        ArrayLayout, ArrayMeta, AttrIndexKind, AttributeValue, Attributes, ChunkEntry, FillValue,
11        StorageLayout,
12    },
13};
14
15use super::{
16    Delta,
17    allocator::{AllocatorOutput, DeltaAllocator},
18    immutable::DeltaImmutable,
19};
20
21pub struct DeltaMutable {
22    pub delta_index: u32,
23    pub array_meta: IndexMap<String, ArrayMeta>,
24    pub allocator: DeltaAllocator,
25    pub attr_keys: Vec<String>,
26    pub attr_values: Vec<AttributeValue>,
27}
28
29fn alloc_addr_from_chunk(addr: &ChunkAddress) -> BlockAllocAddress {
30    BlockAllocAddress::new(addr.block_id, addr.offset as u64, addr.size as u64)
31}
32
33impl Delta<DeltaMutable> {
34    pub fn new(
35        codec: Arc<dyn crate::codec::CompressionCodec>,
36        block_target_size: usize,
37        delta_index: u32,
38    ) -> Self {
39        Delta {
40            inner: DeltaMutable {
41                delta_index,
42                array_meta: IndexMap::new(),
43                allocator: DeltaAllocator::new(codec, block_target_size),
44                attr_keys: Vec::new(),
45                attr_values: Vec::new(),
46            },
47        }
48    }
49
50    pub fn define_array(
51        &mut self,
52        name: impl Into<String>,
53        dtype: DType,
54        shape: Vec<usize>,
55        dimension_names: Vec<String>,
56        chunk_shape: Option<Vec<usize>>,
57        fill_value: Option<FillValue>,
58    ) -> Result<()> {
59        let name = name.into();
60        let shape_u32: Vec<u32> = shape.iter().map(|&s| s as u32).collect();
61        let ndim = shape_u32.len();
62        let chunk_shape_u32: Vec<u32> = chunk_shape
63            .map(|cs| cs.iter().map(|&s| s as u32).collect())
64            .unwrap_or_else(|| shape_u32.clone());
65        let dim_names = if dimension_names.len() == ndim {
66            dimension_names
67        } else {
68            (0..ndim).map(|i| format!("dim{i}")).collect()
69        };
70        let layout = ArrayLayout {
71            shape: shape_u32,
72            dimension_names: dim_names,
73            storage: StorageLayout {
74                chunk_shape: chunk_shape_u32,
75                chunks: Vec::new(),
76            },
77        };
78        self.inner.array_meta.insert(
79            name.clone(),
80            ArrayMeta {
81                name,
82                dtype,
83                layout,
84                fill_value,
85                deleted: false,
86                attributes: Attributes::empty(AttrIndexKind::U16),
87            },
88        );
89        Ok(())
90    }
91
92    pub fn array_meta(&self, name: &str) -> Option<&ArrayMeta> {
93        self.inner.array_meta.get(name)
94    }
95
96    /// Allocates chunk bytes and records the chunk address in the array meta.
97    pub fn write_raw_chunk(&mut self, name: &str, coord: Vec<u32>, raw: &[u8]) -> Result<()> {
98        let alloc = self.inner.allocator.allocate(raw);
99        let address = ChunkAddress::from(alloc);
100        let meta = self
101            .inner
102            .array_meta
103            .get_mut(name)
104            .ok_or_else(|| Error::ArrayNotFound {
105                name: name.to_string(),
106            })?;
107        if let Some(entry) = meta
108            .layout
109            .storage
110            .chunks
111            .iter_mut()
112            .find(|e| e.coord == coord)
113        {
114            entry.address = address;
115        } else {
116            meta.layout
117                .storage
118                .chunks
119                .push(ChunkEntry { coord, address });
120        }
121        Ok(())
122    }
123
124    /// Returns a mutable reference to the ArrayMeta for `name`, if present.
125    pub fn array_meta_mut(&mut self, name: &str) -> Option<&mut ArrayMeta> {
126        self.inner.array_meta.get_mut(name)
127    }
128
129    /// Inserts or replaces the ArrayMeta for `meta.name`.
130    pub fn upsert_array_meta(&mut self, meta: ArrayMeta) {
131        self.inner.array_meta.insert(meta.name.clone(), meta);
132    }
133
134    /// Stamps `meta` as deleted, clears its chunks, and upserts it.
135    pub fn mark_deleted(&mut self, mut meta: ArrayMeta) {
136        meta.deleted = true;
137        meta.layout.storage.chunks.clear();
138        self.upsert_array_meta(meta);
139    }
140
141    /// Interns `key` into the attribute key dictionary, returning its index.
142    pub fn intern_attr_key(&mut self, key: &str) -> usize {
143        if let Some(i) = self.inner.attr_keys.iter().position(|k| k == key) {
144            i
145        } else {
146            self.inner.attr_keys.push(key.to_string());
147            self.inner.attr_keys.len() - 1
148        }
149    }
150
151    /// Interns `value` into the attribute value dictionary, returning its index.
152    pub fn intern_attr_value(&mut self, value: AttributeValue) -> usize {
153        if let Some(i) = self.inner.attr_values.iter().position(|v| *v == value) {
154            i
155        } else {
156            self.inner.attr_values.push(value);
157            self.inner.attr_values.len() - 1
158        }
159    }
160
161    /// Reads raw (uncompressed) chunk bytes previously written into this
162    /// mutable delta. Returns `None` if the array or coord is not present.
163    pub fn read_raw_chunk(&self, name: &str, coord: &[u32]) -> Option<Bytes> {
164        let meta = self.inner.array_meta.get(name)?;
165        let entry = meta
166            .layout
167            .storage
168            .chunks
169            .iter()
170            .find(|e| e.coord.as_slice() == coord)?;
171        self.inner
172            .allocator
173            .fetch(&alloc_addr_from_chunk(&entry.address))
174    }
175
176    /// Commits this delta: compresses all buffered blocks, serializes the
177    /// footer, and writes the complete delta bytes to `storage`.
178    pub async fn commit(
179        self,
180        storage: Arc<dyn crate::storage::Storage>,
181        path: Arc<str>,
182        cache: Option<Arc<super::DeltaCache>>,
183        base_file_hint: impl Into<String>,
184    ) -> Result<Delta<DeltaImmutable>> {
185        use crate::footer::{FOOTER_VERSION, Footer};
186
187        let overlay_index = self.inner.delta_index;
188        let arrays: Vec<ArrayMeta> = self.inner.array_meta.into_values().collect();
189        let attr_keys = self.inner.attr_keys;
190        let attr_values = self.inner.attr_values;
191        let AllocatorOutput {
192            mut file,
193            output_size,
194            blocks,
195        } = self.inner.allocator.commit().await;
196
197        let footer = Footer {
198            version: FOOTER_VERSION,
199            blocks,
200            arrays,
201            attr_keys,
202            attr_values,
203            overlay_index,
204            base_file_hint: base_file_hint.into(),
205        };
206        let footer_bytes = footer.serialize()?;
207
208        super::write_file_then_bytes(&mut file, output_size, &footer_bytes, &*storage).await?;
209        Delta::<DeltaImmutable>::open(storage, path, cache).await
210    }
211}
212
213#[cfg(test)]
214mod tests {
215    use std::sync::Arc;
216
217    use super::*;
218    use crate::{DType, NoCompression, codec::CompressionCodec, storage::InMemoryStorage};
219
220    fn codec() -> Arc<dyn CompressionCodec> {
221        Arc::new(NoCompression)
222    }
223
224    fn make_mutable() -> Delta<DeltaMutable> {
225        Delta::<DeltaMutable>::new(codec(), 512, 0)
226    }
227
228    #[test]
229    fn define_array_stores_meta() {
230        let mut d = make_mutable();
231        d.define_array(
232            "temp",
233            DType::Float32,
234            vec![100],
235            vec!["x".into()],
236            None,
237            None,
238        )
239        .unwrap();
240        let meta = d.array_meta("temp").expect("array_meta returned None");
241        assert_eq!(meta.name, "temp");
242        assert_eq!(meta.dtype, DType::Float32);
243        assert_eq!(meta.layout.shape, vec![100u32]);
244        assert!(!meta.deleted);
245    }
246
247    #[test]
248    fn define_array_default_chunk_shape_equals_shape() {
249        let mut d = make_mutable();
250        d.define_array("a", DType::Int32, vec![50], vec![], None, None)
251            .unwrap();
252        let meta = d.array_meta("a").unwrap();
253        assert_eq!(meta.layout.storage.chunk_shape, meta.layout.shape);
254    }
255
256    #[test]
257    fn define_array_custom_chunk_shape() {
258        let mut d = make_mutable();
259        d.define_array("a", DType::UInt8, vec![200], vec![], Some(vec![50]), None)
260            .unwrap();
261        let meta = d.array_meta("a").unwrap();
262        assert_eq!(meta.layout.storage.chunk_shape, vec![50u32]);
263    }
264
265    #[test]
266    fn write_raw_chunk_records_entry() {
267        let mut d = make_mutable();
268        d.define_array("x", DType::UInt8, vec![4], vec![], None, None)
269            .unwrap();
270        d.write_raw_chunk("x", vec![0], &[1u8, 2, 3, 4]).unwrap();
271        let meta = d.array_meta("x").unwrap();
272        assert_eq!(meta.layout.storage.chunks.len(), 1);
273        assert_eq!(meta.layout.storage.chunks[0].coord, vec![0u32]);
274    }
275
276    #[test]
277    fn write_raw_chunk_overwrites_same_coord() {
278        let mut d = make_mutable();
279        d.define_array("x", DType::UInt8, vec![4], vec![], None, None)
280            .unwrap();
281        d.write_raw_chunk("x", vec![0], &[0u8; 4]).unwrap();
282        d.write_raw_chunk("x", vec![0], &[99u8; 4]).unwrap();
283        let meta = d.array_meta("x").unwrap();
284        assert_eq!(meta.layout.storage.chunks.len(), 1);
285    }
286
287    #[test]
288    fn write_raw_chunk_unknown_array_returns_error() {
289        let mut d = make_mutable();
290        let err = d.write_raw_chunk("nope", vec![0], &[1, 2, 3]).unwrap_err();
291        assert!(matches!(err, crate::Error::ArrayNotFound { .. }));
292    }
293
294    #[tokio::test]
295    async fn commit_produces_readable_delta() {
296        let values: Vec<f64> = vec![1.0, 2.0, 3.0];
297        let raw: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
298        let mut d = make_mutable();
299        d.define_array(
300            "temps",
301            DType::Float64,
302            vec![3],
303            vec!["t".into()],
304            None,
305            None,
306        )
307        .unwrap();
308        d.write_raw_chunk("temps", vec![0], &raw).unwrap();
309        let storage = Arc::new(InMemoryStorage::new());
310        let immutable = d
311            .commit(storage, Arc::from("test"), None, "base")
312            .await
313            .unwrap();
314        let meta = immutable
315            .array_meta("temps")
316            .expect("array not found after commit");
317        assert_eq!(meta.dtype, DType::Float64);
318        assert_eq!(meta.layout.shape, vec![3u32]);
319    }
320
321    #[tokio::test]
322    async fn multiple_chunks_across_blocks() {
323        let d_codec = codec();
324        let mut d = Delta::<DeltaMutable>::new(Arc::clone(&d_codec), 16, 0);
325        d.define_array("m", DType::UInt8, vec![64], vec![], Some(vec![8]), None)
326            .unwrap();
327        let chunks: Vec<Vec<u8>> = (0..8u8).map(|i| vec![i; 8]).collect();
328        for (i, chunk) in chunks.iter().enumerate() {
329            d.write_raw_chunk("m", vec![i as u32 * 8], chunk).unwrap();
330        }
331        let storage = Arc::new(InMemoryStorage::new());
332        let immutable = d
333            .commit(storage, Arc::from("test"), None, "base")
334            .await
335            .unwrap();
336        for (i, expected) in chunks.iter().enumerate() {
337            let bytes = immutable
338                .read_raw_chunk("m", &[i as u32 * 8])
339                .await
340                .unwrap()
341                .expect("chunk missing");
342            assert_eq!(bytes.as_ref(), expected.as_slice(), "mismatch at chunk {i}");
343        }
344    }
345}