1use std::collections::HashMap;
2use std::sync::Arc;
3
4use bytes::Bytes;
5
6use crate::{
7 Error, Result,
8 codec::decompress_by_id,
9 footer::{Footer, read_footer},
10 layout::ArrayMeta,
11 storage::Storage,
12};
13
14use super::{Delta, DeltaCache};
15
16pub struct DeltaImmutable {
17 pub footer: Footer,
18 pub storage: Arc<dyn Storage>,
19 pub path: Arc<str>,
20 pub cache: Option<Arc<DeltaCache>>,
21 pub array_index: HashMap<String, usize>,
23}
24
25impl Delta<DeltaImmutable> {
26 pub async fn open(
28 storage: Arc<dyn Storage>,
29 path: Arc<str>,
30 cache: Option<Arc<DeltaCache>>,
31 ) -> Result<Self> {
32 let footer = read_footer(&*storage).await?;
33 let array_index = footer
34 .arrays
35 .iter()
36 .enumerate()
37 .map(|(i, a)| (a.name.clone(), i))
38 .collect();
39 Ok(Delta {
40 inner: DeltaImmutable {
41 footer,
42 storage,
43 path,
44 cache,
45 array_index,
46 },
47 })
48 }
49
50 pub fn array_meta(&self, name: &str) -> Option<&ArrayMeta> {
52 let idx = self.inner.array_index.get(name)?;
53 let a = &self.inner.footer.arrays[*idx];
54 if a.deleted { None } else { Some(a) }
55 }
56
57 pub async fn read_raw_chunk(&self, name: &str, coord: &[u32]) -> Result<Option<Bytes>> {
60 let meta = match self
61 .inner
62 .array_index
63 .get(name)
64 .map(|&i| &self.inner.footer.arrays[i])
65 {
66 Some(m) => m,
67 None => return Ok(None),
68 };
69 if meta.deleted {
70 return Ok(None);
71 }
72 let entry = match meta
73 .layout
74 .storage
75 .chunks
76 .iter()
77 .find(|e| e.coord.as_slice() == coord)
78 {
79 Some(e) => e,
80 None => return Ok(None),
81 };
82 let block = self
83 .inner
84 .footer
85 .blocks
86 .iter()
87 .find(|b| b.id == entry.address.block_id)
88 .ok_or(Error::BlockOutOfRange {
89 block_id: entry.address.block_id.0,
90 })?;
91
92 let block_bytes = if let Some(cache) = &self.inner.cache {
93 cache
94 .get_or_load(&self.inner.path, block, &*self.inner.storage)
95 .await?
96 } else {
97 let compressed = self.inner.storage.read_range(block.file_range()).await?;
98 Bytes::from(decompress_by_id(
99 &block.codec,
100 &compressed,
101 block.uncompressed_size as usize,
102 )?)
103 };
104
105 let start = entry.address.offset as usize;
106 let end = start + entry.address.size as usize;
107 Ok(Some(block_bytes.slice(start..end)))
108 }
109}
110
111#[cfg(test)]
112mod tests {
113 use std::sync::Arc;
114
115 use crate::{
116 DType, NoCompression,
117 codec::CompressionCodec,
118 delta::{Delta, DeltaMutable},
119 storage::InMemoryStorage,
120 };
121
122 fn codec() -> Arc<dyn CompressionCodec> {
123 Arc::new(NoCompression)
124 }
125
126 fn make_mutable() -> Delta<DeltaMutable> {
127 Delta::<DeltaMutable>::new(codec(), 512, 0)
128 }
129
130 #[tokio::test]
131 async fn immutable_read_raw_chunk_matches_written_bytes() {
132 let raw = vec![0xCAu8; 32];
133 let mut d = make_mutable();
134 d.define_array("data", DType::UInt8, vec![32], vec![], None, None)
135 .unwrap();
136 d.write_raw_chunk("data", vec![0], &raw).unwrap();
137 let storage = Arc::new(InMemoryStorage::new());
138 let immutable = d
139 .commit(storage, Arc::from("test"), None, "base")
140 .await
141 .unwrap();
142 let bytes = immutable
143 .read_raw_chunk("data", &[0])
144 .await
145 .unwrap()
146 .expect("chunk missing");
147 assert_eq!(bytes.as_ref(), raw.as_slice());
148 }
149
150 #[tokio::test]
151 async fn immutable_read_raw_chunk_unknown_array_returns_none() {
152 let mut d = make_mutable();
153 d.define_array("a", DType::UInt8, vec![4], vec![], None, None)
154 .unwrap();
155 d.write_raw_chunk("a", vec![0], &[0u8; 4]).unwrap();
156 let storage = Arc::new(InMemoryStorage::new());
157 let immutable = d
158 .commit(storage, Arc::from("test"), None, "base")
159 .await
160 .unwrap();
161 assert!(
162 immutable
163 .read_raw_chunk("missing", &[0])
164 .await
165 .unwrap()
166 .is_none()
167 );
168 }
169
170 #[tokio::test]
171 async fn immutable_read_raw_chunk_unknown_coord_returns_none() {
172 let mut d = make_mutable();
173 d.define_array("a", DType::UInt8, vec![4], vec![], None, None)
174 .unwrap();
175 d.write_raw_chunk("a", vec![0], &[0u8; 4]).unwrap();
176 let storage = Arc::new(InMemoryStorage::new());
177 let immutable = d
178 .commit(storage, Arc::from("test"), None, "base")
179 .await
180 .unwrap();
181 assert!(
182 immutable
183 .read_raw_chunk("a", &[99])
184 .await
185 .unwrap()
186 .is_none()
187 );
188 }
189
190 #[tokio::test]
191 async fn overlay_index_survives_commit() {
192 let d = Delta::<DeltaMutable>::new(codec(), 512, 7);
193 let storage = Arc::new(InMemoryStorage::new());
194 let immutable = d
195 .commit(storage, Arc::from("test"), None, "myfile")
196 .await
197 .unwrap();
198 assert_eq!(immutable.inner.footer.overlay_index, 7);
199 assert_eq!(immutable.inner.footer.base_file_hint, "myfile");
200 }
201}