Skip to main content

array_format/delta/
immutable.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use bytes::Bytes;
5
6use crate::{
7    Error, Result,
8    codec::decompress_by_id,
9    footer::{Footer, read_footer},
10    layout::ArrayMeta,
11    storage::Storage,
12};
13
14use super::{Delta, DeltaCache};
15
16pub struct DeltaImmutable {
17    pub footer: Footer,
18    pub storage: Arc<dyn Storage>,
19    pub path: Arc<str>,
20    pub cache: Option<Arc<DeltaCache>>,
21    /// Maps array name → index in `footer.arrays` for O(1) lookup.
22    pub array_index: HashMap<String, usize>,
23}
24
25impl Delta<DeltaImmutable> {
26    /// Opens a committed delta file from the given storage.
27    pub async fn open(
28        storage: Arc<dyn Storage>,
29        path: Arc<str>,
30        cache: Option<Arc<DeltaCache>>,
31    ) -> Result<Self> {
32        let footer = read_footer(&*storage).await?;
33        let array_index = footer
34            .arrays
35            .iter()
36            .enumerate()
37            .map(|(i, a)| (a.name.clone(), i))
38            .collect();
39        Ok(Delta {
40            inner: DeltaImmutable {
41                footer,
42                storage,
43                path,
44                cache,
45                array_index,
46            },
47        })
48    }
49
50    /// Returns the array metadata for `name` if present and not deleted.
51    pub fn array_meta(&self, name: &str) -> Option<&ArrayMeta> {
52        let idx = self.inner.array_index.get(name)?;
53        let a = &self.inner.footer.arrays[*idx];
54        if a.deleted { None } else { Some(a) }
55    }
56
57    /// Returns the raw (uncompressed) bytes for the chunk at `coord`, or
58    /// `None` if this delta does not have that chunk.
59    pub async fn read_raw_chunk(&self, name: &str, coord: &[u32]) -> Result<Option<Bytes>> {
60        let meta = match self
61            .inner
62            .array_index
63            .get(name)
64            .map(|&i| &self.inner.footer.arrays[i])
65        {
66            Some(m) => m,
67            None => return Ok(None),
68        };
69        if meta.deleted {
70            return Ok(None);
71        }
72        let entry = match meta
73            .layout
74            .storage
75            .chunks
76            .iter()
77            .find(|e| e.coord.as_slice() == coord)
78        {
79            Some(e) => e,
80            None => return Ok(None),
81        };
82        let block = self
83            .inner
84            .footer
85            .blocks
86            .iter()
87            .find(|b| b.id == entry.address.block_id)
88            .ok_or(Error::BlockOutOfRange {
89                block_id: entry.address.block_id.0,
90            })?;
91
92        let block_bytes = if let Some(cache) = &self.inner.cache {
93            cache
94                .get_or_load(&self.inner.path, block, &*self.inner.storage)
95                .await?
96        } else {
97            let compressed = self.inner.storage.read_range(block.file_range()).await?;
98            Bytes::from(decompress_by_id(
99                &block.codec,
100                &compressed,
101                block.uncompressed_size as usize,
102            )?)
103        };
104
105        let start = entry.address.offset as usize;
106        let end = start + entry.address.size as usize;
107        Ok(Some(block_bytes.slice(start..end)))
108    }
109}
110
111#[cfg(test)]
112mod tests {
113    use std::sync::Arc;
114
115    use crate::{
116        DType, NoCompression,
117        codec::CompressionCodec,
118        delta::{Delta, DeltaMutable},
119        storage::InMemoryStorage,
120    };
121
122    fn codec() -> Arc<dyn CompressionCodec> {
123        Arc::new(NoCompression)
124    }
125
126    fn make_mutable() -> Delta<DeltaMutable> {
127        Delta::<DeltaMutable>::new(codec(), 512, 0)
128    }
129
130    #[tokio::test]
131    async fn immutable_read_raw_chunk_matches_written_bytes() {
132        let raw = vec![0xCAu8; 32];
133        let mut d = make_mutable();
134        d.define_array("data", DType::UInt8, vec![32], vec![], None, None)
135            .unwrap();
136        d.write_raw_chunk("data", vec![0], &raw).unwrap();
137        let storage = Arc::new(InMemoryStorage::new());
138        let immutable = d
139            .commit(storage, Arc::from("test"), None, "base")
140            .await
141            .unwrap();
142        let bytes = immutable
143            .read_raw_chunk("data", &[0])
144            .await
145            .unwrap()
146            .expect("chunk missing");
147        assert_eq!(bytes.as_ref(), raw.as_slice());
148    }
149
150    #[tokio::test]
151    async fn immutable_read_raw_chunk_unknown_array_returns_none() {
152        let mut d = make_mutable();
153        d.define_array("a", DType::UInt8, vec![4], vec![], None, None)
154            .unwrap();
155        d.write_raw_chunk("a", vec![0], &[0u8; 4]).unwrap();
156        let storage = Arc::new(InMemoryStorage::new());
157        let immutable = d
158            .commit(storage, Arc::from("test"), None, "base")
159            .await
160            .unwrap();
161        assert!(
162            immutable
163                .read_raw_chunk("missing", &[0])
164                .await
165                .unwrap()
166                .is_none()
167        );
168    }
169
170    #[tokio::test]
171    async fn immutable_read_raw_chunk_unknown_coord_returns_none() {
172        let mut d = make_mutable();
173        d.define_array("a", DType::UInt8, vec![4], vec![], None, None)
174            .unwrap();
175        d.write_raw_chunk("a", vec![0], &[0u8; 4]).unwrap();
176        let storage = Arc::new(InMemoryStorage::new());
177        let immutable = d
178            .commit(storage, Arc::from("test"), None, "base")
179            .await
180            .unwrap();
181        assert!(
182            immutable
183                .read_raw_chunk("a", &[99])
184                .await
185                .unwrap()
186                .is_none()
187        );
188    }
189
190    #[tokio::test]
191    async fn overlay_index_survives_commit() {
192        let d = Delta::<DeltaMutable>::new(codec(), 512, 7);
193        let storage = Arc::new(InMemoryStorage::new());
194        let immutable = d
195            .commit(storage, Arc::from("test"), None, "myfile")
196            .await
197            .unwrap();
198        assert_eq!(immutable.inner.footer.overlay_index, 7);
199        assert_eq!(immutable.inner.footer.base_file_hint, "myfile");
200    }
201}