Skip to main content

cobble_data_structure/
structured_scan.rs

1use crate::structured_db::{
2    StructuredColumnValue, StructuredSchema, combined_resolver, decode_row,
3    load_structured_schema_from_cobble_schema, project_structured_schema_for_scan,
4};
5use bytes::Bytes;
6use cobble::{
7    Config, GlobalSnapshotManifest, MergeOperatorResolver, ReadOnlyDb, Result, ScanOptions,
8    ScanPlan, ScanSplit, ScanSplitScanner, ShardSnapshotRef,
9};
10use serde::{Deserialize, Serialize};
11use std::sync::Arc;
12
13/// Structured distributed scan plan.
14///
15/// Wraps `cobble::ScanPlan` and produces structured scan splits/scanners.
16pub struct StructuredScanPlan {
17    inner: ScanPlan,
18}
19
20impl StructuredScanPlan {
21    pub fn new(manifest: GlobalSnapshotManifest) -> Self {
22        Self {
23            inner: ScanPlan::new(manifest),
24        }
25    }
26
27    pub fn with_start(mut self, start: Vec<u8>) -> Self {
28        self.inner = self.inner.with_start(start);
29        self
30    }
31
32    pub fn with_end(mut self, end: Vec<u8>) -> Self {
33        self.inner = self.inner.with_end(end);
34        self
35    }
36
37    pub fn splits(&self) -> Vec<StructuredScanSplit> {
38        self.inner.splits().into_iter().map(Into::into).collect()
39    }
40}
41
42/// Structured version of a distributed scan split.
43#[derive(Clone, Debug, Serialize, Deserialize)]
44pub struct StructuredScanSplit {
45    pub shard: ShardSnapshotRef,
46    pub start: Option<Vec<u8>>,
47    pub end: Option<Vec<u8>>,
48}
49
50impl From<ScanSplit> for StructuredScanSplit {
51    fn from(value: ScanSplit) -> Self {
52        Self {
53            shard: value.shard,
54            start: value.start,
55            end: value.end,
56        }
57    }
58}
59
60impl From<StructuredScanSplit> for ScanSplit {
61    fn from(value: StructuredScanSplit) -> Self {
62        Self {
63            shard: value.shard,
64            start: value.start,
65            end: value.end,
66        }
67    }
68}
69
70impl StructuredScanSplit {
71    pub fn create_scanner(
72        &self,
73        config: Config,
74        options: &ScanOptions,
75    ) -> Result<StructuredScanSplitScanner> {
76        self.create_scanner_internal(config, None, options)
77    }
78
79    pub fn create_scanner_with_resolver(
80        &self,
81        config: Config,
82        resolver: Arc<dyn MergeOperatorResolver>,
83        options: &ScanOptions,
84    ) -> Result<StructuredScanSplitScanner> {
85        self.create_scanner_internal(config, Some(resolver), options)
86    }
87
88    fn create_scanner_internal(
89        &self,
90        config: Config,
91        resolver: Option<Arc<dyn MergeOperatorResolver>>,
92        options: &ScanOptions,
93    ) -> Result<StructuredScanSplitScanner> {
94        let resolver = combined_resolver(resolver);
95        let read_only = ReadOnlyDb::open_with_db_id_and_resolver(
96            config.clone(),
97            self.shard.snapshot_id,
98            self.shard.db_id.clone(),
99            Arc::clone(&resolver),
100        )?;
101        let structured_schema = Arc::new(load_structured_schema_from_cobble_schema(
102            &read_only.current_schema(),
103        )?);
104        let projected_schema = project_structured_schema_for_scan(&structured_schema, options);
105        let scanner = ScanSplit::from(self.clone())
106            .create_scanner_with_resolver(config, resolver, options)?;
107        Ok(StructuredScanSplitScanner {
108            inner: scanner,
109            structured_schema: projected_schema,
110        })
111    }
112}
113
114pub struct StructuredScanSplitScanner {
115    inner: ScanSplitScanner,
116    structured_schema: Arc<StructuredSchema>,
117}
118
119impl Iterator for StructuredScanSplitScanner {
120    type Item = Result<(Bytes, Vec<Option<StructuredColumnValue>>)>;
121
122    fn next(&mut self) -> Option<Self::Item> {
123        self.inner.next().map(|item| {
124            let (key, columns) = item?;
125            let decoded = decode_row(&self.structured_schema, 0, columns)?;
126            Ok((key, decoded))
127        })
128    }
129}
130
131#[cfg(test)]
132mod tests {
133    use super::*;
134    use crate::list::{ListConfig, ListRetainMode};
135    use crate::{StructuredColumnType, StructuredDb, StructuredSchema};
136    use cobble::{
137        CoordinatorConfig, DbCoordinator, ScanOptions, ShardSnapshotInput, VolumeDescriptor,
138        VolumeUsageKind,
139    };
140    use std::collections::BTreeMap;
141
142    fn cleanup_root(path: &str) {
143        let _ = std::fs::remove_dir_all(path);
144    }
145
146    fn test_schema() -> StructuredSchema {
147        StructuredSchema {
148            columns: BTreeMap::from([(
149                1,
150                StructuredColumnType::List(ListConfig {
151                    max_elements: Some(8),
152                    retain_mode: ListRetainMode::Last,
153                    preserve_element_ttl: false,
154                }),
155            )]),
156        }
157    }
158
159    fn test_config(root: &str) -> Config {
160        Config {
161            volumes: VolumeDescriptor::single_volume(format!("file://{}/db", root)),
162            num_columns: 2,
163            total_buckets: 4,
164            ..Config::default()
165        }
166    }
167
168    fn write_and_snapshot(
169        config: &Config,
170        structured_schema: StructuredSchema,
171        writes: impl FnOnce(&StructuredDb),
172    ) -> (StructuredDb, ShardSnapshotInput) {
173        let mut db = StructuredDb::open(config.clone(), vec![0u16..=3u16]).unwrap();
174        db.update_schema()
175            .add_list_column(
176                1,
177                match structured_schema.columns.get(&1) {
178                    Some(StructuredColumnType::List(cfg)) => cfg.clone(),
179                    _ => panic!("test schema missing list column 1"),
180                },
181            )
182            .commit()
183            .unwrap();
184        writes(&db);
185        let (tx, rx) = std::sync::mpsc::channel();
186        db.snapshot_with_callback(move |result| {
187            let _ = tx.send(result);
188        })
189        .unwrap();
190        let shard_input = rx
191            .recv_timeout(std::time::Duration::from_secs(10))
192            .expect("snapshot callback timed out")
193            .unwrap();
194        db.retain_snapshot(shard_input.snapshot_id);
195        (db, shard_input)
196    }
197
198    #[test]
199    fn test_structured_scan_split_scanner() {
200        let root = "/tmp/structured_scan_split_scanner";
201        cleanup_root(root);
202
203        let config = test_config(root);
204        let (_db, shard_input) = write_and_snapshot(&config, test_schema(), |db| {
205            db.put(0, b"k1", 0, b"v1".to_vec()).unwrap();
206            db.merge(0, b"k1", 1, vec![b"a".to_vec()]).unwrap();
207            db.merge(0, b"k1", 1, vec![b"b".to_vec()]).unwrap();
208            db.put(0, b"k2", 0, b"v2".to_vec()).unwrap();
209        });
210
211        let coordinator = DbCoordinator::open(CoordinatorConfig {
212            volumes: vec![VolumeDescriptor::new(
213                format!("file://{}/coordinator", root),
214                vec![
215                    VolumeUsageKind::PrimaryDataPriorityHigh,
216                    VolumeUsageKind::Meta,
217                ],
218            )],
219            snapshot_retention: None,
220        })
221        .unwrap();
222        let global = coordinator
223            .take_global_snapshot(4, vec![shard_input])
224            .unwrap();
225        coordinator.materialize_global_snapshot(&global).unwrap();
226
227        let plan = StructuredScanPlan::new(global);
228        let splits = plan.splits();
229        assert_eq!(splits.len(), 1);
230
231        let scanner = splits[0]
232            .create_scanner(config, &ScanOptions::default())
233            .unwrap();
234        let results: Vec<_> = scanner.map(|r| r.unwrap()).collect();
235        assert_eq!(results.len(), 2);
236        assert_eq!(results[0].0.as_ref(), b"k1");
237        assert_eq!(
238            results[0].1[1],
239            Some(StructuredColumnValue::List(vec![
240                Bytes::from_static(b"a"),
241                Bytes::from_static(b"b"),
242            ]))
243        );
244        assert_eq!(results[1].0.as_ref(), b"k2");
245
246        cleanup_root(root);
247    }
248
249    #[test]
250    fn test_structured_scan_split_serialization() {
251        let split = StructuredScanSplit {
252            shard: ShardSnapshotRef {
253                ranges: vec![0u16..=3u16],
254                db_id: "test-db".to_string(),
255                snapshot_id: 7,
256                manifest_path: "test-db/snapshot/SNAPSHOT-7".to_string(),
257                timestamp_seconds: 42,
258            },
259            start: Some(b"a".to_vec()),
260            end: Some(b"z".to_vec()),
261        };
262        let json = serde_json::to_string(&split).unwrap();
263        let decoded: StructuredScanSplit = serde_json::from_str(&json).unwrap();
264        assert_eq!(decoded.shard.db_id, "test-db");
265        assert_eq!(decoded.shard.snapshot_id, 7);
266        assert_eq!(decoded.start, Some(b"a".to_vec()));
267        assert_eq!(decoded.end, Some(b"z".to_vec()));
268    }
269
270    #[test]
271    fn test_structured_scan_projection_reindexes_schema() {
272        let root = "/tmp/structured_scan_projection_reindex";
273        cleanup_root(root);
274
275        let config = test_config(root);
276        let (_db, shard_input) = write_and_snapshot(&config, test_schema(), |db| {
277            db.put(0, b"k1", 0, b"v1".to_vec()).unwrap();
278            db.merge(0, b"k1", 1, vec![b"a".to_vec()]).unwrap();
279            db.merge(0, b"k1", 1, vec![b"b".to_vec()]).unwrap();
280        });
281
282        let coordinator = DbCoordinator::open(CoordinatorConfig {
283            volumes: vec![VolumeDescriptor::new(
284                format!("file://{}/coordinator", root),
285                vec![
286                    VolumeUsageKind::PrimaryDataPriorityHigh,
287                    VolumeUsageKind::Meta,
288                ],
289            )],
290            snapshot_retention: None,
291        })
292        .unwrap();
293        let global = coordinator
294            .take_global_snapshot(4, vec![shard_input])
295            .unwrap();
296        coordinator.materialize_global_snapshot(&global).unwrap();
297
298        let split = StructuredScanPlan::new(global).splits().remove(0);
299        let scanner = split
300            .create_scanner(config, &ScanOptions::for_column(1))
301            .unwrap();
302        let rows: Vec<_> = scanner.map(|r| r.unwrap()).collect();
303        assert_eq!(rows.len(), 1);
304        assert_eq!(rows[0].0.as_ref(), b"k1");
305        assert_eq!(rows[0].1.len(), 1);
306        assert_eq!(
307            rows[0].1[0],
308            Some(StructuredColumnValue::List(vec![
309                Bytes::from_static(b"a"),
310                Bytes::from_static(b"b"),
311            ]))
312        );
313
314        cleanup_root(root);
315    }
316}