Skip to main content

cobble_data_structure/
structured_reader.rs

1use crate::structured_db::{
2    StructuredColumnValue, StructuredDbIterator, StructuredSchema, combined_resolver, decode_row,
3    load_structured_schema_from_cobble_schema, project_decoded_row_for_read,
4    project_structured_schema_for_scan,
5};
6use cobble::{
7    GlobalSnapshotManifest, GlobalSnapshotSummary, ReadOnlyDb, ReadOptions, Reader, ReaderConfig,
8    Result, ScanOptions, VolumeDescriptor,
9};
10use std::ops::Range;
11use std::sync::Arc;
12
13pub struct StructuredReader {
14    reader: Reader,
15    structured_schema: Arc<StructuredSchema>,
16}
17
18impl StructuredReader {
19    pub fn open(read_config: ReaderConfig, global_snapshot_id: u64) -> Result<Self> {
20        let volumes = read_config.volumes.clone();
21        let resolver = combined_resolver(None);
22        let reader = Reader::open_with_resolver(read_config, global_snapshot_id, Some(resolver))?;
23        let structured_schema = load_schema_from_reader(&reader, &volumes)?;
24        Ok(Self {
25            reader,
26            structured_schema: Arc::new(structured_schema),
27        })
28    }
29
30    pub fn open_current(read_config: ReaderConfig) -> Result<Self> {
31        let volumes = read_config.volumes.clone();
32        let resolver = combined_resolver(None);
33        let reader = Reader::open_current_with_resolver(read_config, Some(resolver))?;
34        let structured_schema = load_schema_from_reader(&reader, &volumes)?;
35        Ok(Self {
36            reader,
37            structured_schema: Arc::new(structured_schema),
38        })
39    }
40
41    pub fn current_schema(&self) -> StructuredSchema {
42        self.structured_schema.as_ref().clone()
43    }
44
45    // ── Read operations ─────────────────────────────────────────────────
46
47    pub fn get(
48        &mut self,
49        bucket_id: u16,
50        key: &[u8],
51    ) -> Result<Option<Vec<Option<StructuredColumnValue>>>> {
52        let raw = self.reader.get(bucket_id, key)?;
53        raw.map(|columns| decode_row(&self.structured_schema, 0, columns))
54            .transpose()
55    }
56
57    pub fn get_with_options(
58        &mut self,
59        bucket_id: u16,
60        key: &[u8],
61        options: &ReadOptions,
62    ) -> Result<Option<Vec<Option<StructuredColumnValue>>>> {
63        let raw = if options.column_indices.is_some() {
64            self.reader.get(bucket_id, key)?
65        } else {
66            self.reader.get_with_options(bucket_id, key, options)?
67        };
68        raw.map(|columns| decode_row(&self.structured_schema, 0, columns))
69            .transpose()
70            .map(|row| row.map(|decoded| project_decoded_row_for_read(decoded, options)))
71    }
72
73    pub fn scan(
74        &mut self,
75        bucket_id: u16,
76        range: Range<&[u8]>,
77    ) -> Result<StructuredDbIterator<'static>> {
78        self.scan_with_options(bucket_id, range, &ScanOptions::default())
79    }
80
81    pub fn scan_with_options(
82        &mut self,
83        bucket_id: u16,
84        range: Range<&[u8]>,
85        options: &ScanOptions,
86    ) -> Result<StructuredDbIterator<'static>> {
87        let inner = self.reader.scan_with_options(bucket_id, range, options)?;
88        let projected_schema = project_structured_schema_for_scan(&self.structured_schema, options);
89        Ok(StructuredDbIterator::new(inner, projected_schema, 0))
90    }
91
92    // ── Snapshot management ─────────────────────────────────────────────
93
94    pub fn refresh(&mut self) -> Result<()> {
95        self.reader.refresh()
96    }
97
98    pub fn read_mode(&self) -> &'static str {
99        self.reader.read_mode()
100    }
101
102    pub fn configured_snapshot_id(&self) -> Option<u64> {
103        self.reader.configured_snapshot_id()
104    }
105
106    pub fn current_global_snapshot(&self) -> &GlobalSnapshotManifest {
107        self.reader.current_global_snapshot()
108    }
109
110    pub fn list_global_snapshots(&self) -> Result<Vec<GlobalSnapshotSummary>> {
111        self.reader.list_global_snapshots()
112    }
113
114    pub fn list_global_snapshot_manifests(&self) -> Result<Vec<GlobalSnapshotManifest>> {
115        self.reader.list_global_snapshot_manifests()
116    }
117}
118
119/// Load the structured schema from the first shard of the reader's current global snapshot.
120fn load_schema_from_reader(
121    reader: &Reader,
122    volumes: &[VolumeDescriptor],
123) -> Result<StructuredSchema> {
124    let manifest = reader.current_global_snapshot();
125    let shard = manifest.shard_snapshots.first().ok_or_else(|| {
126        cobble::Error::ConfigError("global snapshot has no shard snapshots".to_string())
127    })?;
128    let config = cobble::Config {
129        volumes: volumes.to_vec(),
130        total_buckets: manifest.total_buckets,
131        ..cobble::Config::default()
132    };
133    let read_only = ReadOnlyDb::open_with_db_id_and_resolver(
134        config,
135        shard.snapshot_id,
136        shard.db_id.clone(),
137        combined_resolver(None),
138    )?;
139    load_structured_schema_from_cobble_schema(&read_only.current_schema())
140}
141
142#[cfg(test)]
143mod tests {
144    use super::*;
145    use crate::StructuredColumnType;
146    use crate::list::{ListConfig, ListRetainMode};
147    use crate::structured_single_db::StructuredSingleDb;
148    use bytes::Bytes;
149    use cobble::{ReadOptions, VolumeDescriptor};
150    use std::collections::BTreeMap;
151    use std::thread;
152    use std::time::Duration;
153    use uuid::Uuid;
154
155    fn test_schema() -> StructuredSchema {
156        StructuredSchema {
157            columns: BTreeMap::from([(
158                1,
159                StructuredColumnType::List(ListConfig {
160                    max_elements: Some(3),
161                    retain_mode: ListRetainMode::Last,
162                    preserve_element_ttl: false,
163                }),
164            )]),
165        }
166    }
167
168    fn test_config(root: &str) -> cobble::Config {
169        cobble::Config {
170            volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
171            num_columns: 2,
172            total_buckets: 2,
173            snapshot_on_flush: true,
174            ..cobble::Config::default()
175        }
176    }
177
178    #[test]
179    fn test_structured_reader_get_scan() {
180        let root = format!("/tmp/ds_reader_get_scan_{}", Uuid::new_v4());
181
182        // Write data via StructuredSingleDb and create a global snapshot
183        let mut db = StructuredSingleDb::open(test_config(&root)).unwrap();
184        db.update_schema()
185            .add_list_column(
186                1,
187                ListConfig {
188                    max_elements: Some(3),
189                    retain_mode: ListRetainMode::Last,
190                    preserve_element_ttl: false,
191                },
192            )
193            .commit()
194            .unwrap();
195        db.put(0, b"k1", 0, Bytes::from_static(b"v0")).unwrap();
196        db.merge(0, b"k1", 1, vec![Bytes::from_static(b"a")])
197            .unwrap();
198        db.merge(0, b"k1", 1, vec![Bytes::from_static(b"b")])
199            .unwrap();
200        let snap_id = db.snapshot().unwrap();
201        thread::sleep(Duration::from_millis(200));
202        db.close().unwrap();
203
204        // Open as StructuredReader
205        let read_config = ReaderConfig {
206            volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
207            total_buckets: 2,
208            ..ReaderConfig::default()
209        };
210        let mut reader = StructuredReader::open(read_config, snap_id).unwrap();
211
212        // Verify schema was auto-loaded
213        assert_eq!(reader.current_schema(), test_schema());
214
215        // get
216        let row = reader.get(0, b"k1").unwrap().expect("row exists");
217        assert_eq!(
218            row[0],
219            Some(StructuredColumnValue::Bytes(Bytes::from_static(b"v0")))
220        );
221        assert_eq!(
222            row[1],
223            Some(StructuredColumnValue::List(vec![
224                Bytes::from_static(b"a"),
225                Bytes::from_static(b"b"),
226            ]))
227        );
228
229        // scan
230        let mut iter = reader.scan(0, b"k0".as_ref()..b"k9".as_ref()).unwrap();
231        let first = iter.next().expect("one row").unwrap();
232        assert_eq!(first.0.as_ref(), b"k1");
233        assert!(iter.next().is_none());
234
235        let _ = std::fs::remove_dir_all(root);
236    }
237
238    #[test]
239    fn test_structured_reader_open_current() {
240        let root = format!("/tmp/ds_reader_current_{}", Uuid::new_v4());
241
242        let mut db = StructuredSingleDb::open(test_config(&root)).unwrap();
243        db.update_schema()
244            .add_list_column(
245                1,
246                ListConfig {
247                    max_elements: Some(3),
248                    retain_mode: ListRetainMode::Last,
249                    preserve_element_ttl: false,
250                },
251            )
252            .commit()
253            .unwrap();
254        db.put(0, b"k1", 0, Bytes::from_static(b"v0")).unwrap();
255        let _ = db.snapshot().unwrap();
256        thread::sleep(Duration::from_millis(200));
257        db.close().unwrap();
258
259        let read_config = ReaderConfig {
260            volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
261            total_buckets: 2,
262            ..ReaderConfig::default()
263        };
264        let mut reader = StructuredReader::open_current(read_config).unwrap();
265
266        let row = reader.get(0, b"k1").unwrap().expect("row exists");
267        assert_eq!(
268            row[0],
269            Some(StructuredColumnValue::Bytes(Bytes::from_static(b"v0")))
270        );
271
272        let _ = std::fs::remove_dir_all(root);
273    }
274
275    #[test]
276    fn test_structured_reader_get_with_projection_reindexes_schema() {
277        let root = format!("/tmp/ds_reader_get_projection_{}", Uuid::new_v4());
278
279        let mut db = StructuredSingleDb::open(test_config(&root)).unwrap();
280        db.update_schema()
281            .add_list_column(
282                1,
283                ListConfig {
284                    max_elements: Some(3),
285                    retain_mode: ListRetainMode::Last,
286                    preserve_element_ttl: false,
287                },
288            )
289            .commit()
290            .unwrap();
291        db.put(0, b"k1", 0, Bytes::from_static(b"v0")).unwrap();
292        db.merge(0, b"k1", 1, vec![Bytes::from_static(b"a")])
293            .unwrap();
294        db.merge(0, b"k1", 1, vec![Bytes::from_static(b"b")])
295            .unwrap();
296        let snap_id = db.snapshot().unwrap();
297        thread::sleep(Duration::from_millis(200));
298        db.close().unwrap();
299
300        let read_config = ReaderConfig {
301            volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
302            total_buckets: 2,
303            ..ReaderConfig::default()
304        };
305        let mut reader = StructuredReader::open(read_config, snap_id).unwrap();
306        let row = reader
307            .get_with_options(0, b"k1", &ReadOptions::for_column(1))
308            .unwrap()
309            .expect("row exists");
310        assert_eq!(row.len(), 1);
311        assert_eq!(
312            row[0],
313            Some(StructuredColumnValue::List(vec![
314                Bytes::from_static(b"a"),
315                Bytes::from_static(b"b"),
316            ]))
317        );
318
319        let _ = std::fs::remove_dir_all(root);
320    }
321}