1use crate::structured_db::{
2 StructuredColumnValue, StructuredSchema, combined_resolver, decode_row,
3 load_structured_schema_from_cobble_schema, project_structured_schema_for_scan,
4};
5use bytes::Bytes;
6use cobble::{
7 Config, GlobalSnapshotManifest, MergeOperatorResolver, ReadOnlyDb, Result, ScanOptions,
8 ScanPlan, ScanSplit, ScanSplitScanner, ShardSnapshotRef,
9};
10use serde::{Deserialize, Serialize};
11use std::sync::Arc;
12
13pub struct StructuredScanPlan {
17 inner: ScanPlan,
18}
19
20impl StructuredScanPlan {
21 pub fn new(manifest: GlobalSnapshotManifest) -> Self {
22 Self {
23 inner: ScanPlan::new(manifest),
24 }
25 }
26
27 pub fn with_start(mut self, start: Vec<u8>) -> Self {
28 self.inner = self.inner.with_start(start);
29 self
30 }
31
32 pub fn with_end(mut self, end: Vec<u8>) -> Self {
33 self.inner = self.inner.with_end(end);
34 self
35 }
36
37 pub fn splits(&self) -> Vec<StructuredScanSplit> {
38 self.inner.splits().into_iter().map(Into::into).collect()
39 }
40}
41
42#[derive(Clone, Debug, Serialize, Deserialize)]
44pub struct StructuredScanSplit {
45 pub shard: ShardSnapshotRef,
46 pub start: Option<Vec<u8>>,
47 pub end: Option<Vec<u8>>,
48}
49
50impl From<ScanSplit> for StructuredScanSplit {
51 fn from(value: ScanSplit) -> Self {
52 Self {
53 shard: value.shard,
54 start: value.start,
55 end: value.end,
56 }
57 }
58}
59
60impl From<StructuredScanSplit> for ScanSplit {
61 fn from(value: StructuredScanSplit) -> Self {
62 Self {
63 shard: value.shard,
64 start: value.start,
65 end: value.end,
66 }
67 }
68}
69
70impl StructuredScanSplit {
71 pub fn create_scanner(
72 &self,
73 config: Config,
74 options: &ScanOptions,
75 ) -> Result<StructuredScanSplitScanner> {
76 self.create_scanner_internal(config, None, options)
77 }
78
79 pub fn create_scanner_with_resolver(
80 &self,
81 config: Config,
82 resolver: Arc<dyn MergeOperatorResolver>,
83 options: &ScanOptions,
84 ) -> Result<StructuredScanSplitScanner> {
85 self.create_scanner_internal(config, Some(resolver), options)
86 }
87
88 fn create_scanner_internal(
89 &self,
90 config: Config,
91 resolver: Option<Arc<dyn MergeOperatorResolver>>,
92 options: &ScanOptions,
93 ) -> Result<StructuredScanSplitScanner> {
94 let resolver = combined_resolver(resolver);
95 let read_only = ReadOnlyDb::open_with_db_id_and_resolver(
96 config.clone(),
97 self.shard.snapshot_id,
98 self.shard.db_id.clone(),
99 Arc::clone(&resolver),
100 )?;
101 let structured_schema = Arc::new(load_structured_schema_from_cobble_schema(
102 &read_only.current_schema(),
103 )?);
104 let projected_schema = project_structured_schema_for_scan(&structured_schema, options);
105 let scanner = ScanSplit::from(self.clone())
106 .create_scanner_with_resolver(config, resolver, options)?;
107 Ok(StructuredScanSplitScanner {
108 inner: scanner,
109 structured_schema: projected_schema,
110 })
111 }
112}
113
114pub struct StructuredScanSplitScanner {
115 inner: ScanSplitScanner,
116 structured_schema: Arc<StructuredSchema>,
117}
118
119impl Iterator for StructuredScanSplitScanner {
120 type Item = Result<(Bytes, Vec<Option<StructuredColumnValue>>)>;
121
122 fn next(&mut self) -> Option<Self::Item> {
123 self.inner.next().map(|item| {
124 let (key, columns) = item?;
125 let decoded = decode_row(&self.structured_schema, 0, columns)?;
126 Ok((key, decoded))
127 })
128 }
129}
130
131#[cfg(test)]
132mod tests {
133 use super::*;
134 use crate::list::{ListConfig, ListRetainMode};
135 use crate::{StructuredColumnType, StructuredDb, StructuredSchema};
136 use cobble::{
137 CoordinatorConfig, DbCoordinator, ScanOptions, ShardSnapshotInput, VolumeDescriptor,
138 VolumeUsageKind,
139 };
140 use std::collections::BTreeMap;
141
142 fn cleanup_root(path: &str) {
143 let _ = std::fs::remove_dir_all(path);
144 }
145
146 fn test_schema() -> StructuredSchema {
147 StructuredSchema {
148 columns: BTreeMap::from([(
149 1,
150 StructuredColumnType::List(ListConfig {
151 max_elements: Some(8),
152 retain_mode: ListRetainMode::Last,
153 preserve_element_ttl: false,
154 }),
155 )]),
156 }
157 }
158
159 fn test_config(root: &str) -> Config {
160 Config {
161 volumes: VolumeDescriptor::single_volume(format!("file://{}/db", root)),
162 num_columns: 2,
163 total_buckets: 4,
164 ..Config::default()
165 }
166 }
167
168 fn write_and_snapshot(
169 config: &Config,
170 structured_schema: StructuredSchema,
171 writes: impl FnOnce(&StructuredDb),
172 ) -> (StructuredDb, ShardSnapshotInput) {
173 let mut db = StructuredDb::open(config.clone(), vec![0u16..=3u16]).unwrap();
174 db.update_schema()
175 .add_list_column(
176 1,
177 match structured_schema.columns.get(&1) {
178 Some(StructuredColumnType::List(cfg)) => cfg.clone(),
179 _ => panic!("test schema missing list column 1"),
180 },
181 )
182 .commit()
183 .unwrap();
184 writes(&db);
185 let (tx, rx) = std::sync::mpsc::channel();
186 db.snapshot_with_callback(move |result| {
187 let _ = tx.send(result);
188 })
189 .unwrap();
190 let shard_input = rx
191 .recv_timeout(std::time::Duration::from_secs(10))
192 .expect("snapshot callback timed out")
193 .unwrap();
194 db.retain_snapshot(shard_input.snapshot_id);
195 (db, shard_input)
196 }
197
198 #[test]
199 fn test_structured_scan_split_scanner() {
200 let root = "/tmp/structured_scan_split_scanner";
201 cleanup_root(root);
202
203 let config = test_config(root);
204 let (_db, shard_input) = write_and_snapshot(&config, test_schema(), |db| {
205 db.put(0, b"k1", 0, b"v1".to_vec()).unwrap();
206 db.merge(0, b"k1", 1, vec![b"a".to_vec()]).unwrap();
207 db.merge(0, b"k1", 1, vec![b"b".to_vec()]).unwrap();
208 db.put(0, b"k2", 0, b"v2".to_vec()).unwrap();
209 });
210
211 let coordinator = DbCoordinator::open(CoordinatorConfig {
212 volumes: vec![VolumeDescriptor::new(
213 format!("file://{}/coordinator", root),
214 vec![
215 VolumeUsageKind::PrimaryDataPriorityHigh,
216 VolumeUsageKind::Meta,
217 ],
218 )],
219 snapshot_retention: None,
220 })
221 .unwrap();
222 let global = coordinator
223 .take_global_snapshot(4, vec![shard_input])
224 .unwrap();
225 coordinator.materialize_global_snapshot(&global).unwrap();
226
227 let plan = StructuredScanPlan::new(global);
228 let splits = plan.splits();
229 assert_eq!(splits.len(), 1);
230
231 let scanner = splits[0]
232 .create_scanner(config, &ScanOptions::default())
233 .unwrap();
234 let results: Vec<_> = scanner.map(|r| r.unwrap()).collect();
235 assert_eq!(results.len(), 2);
236 assert_eq!(results[0].0.as_ref(), b"k1");
237 assert_eq!(
238 results[0].1[1],
239 Some(StructuredColumnValue::List(vec![
240 Bytes::from_static(b"a"),
241 Bytes::from_static(b"b"),
242 ]))
243 );
244 assert_eq!(results[1].0.as_ref(), b"k2");
245
246 cleanup_root(root);
247 }
248
249 #[test]
250 fn test_structured_scan_split_serialization() {
251 let split = StructuredScanSplit {
252 shard: ShardSnapshotRef {
253 ranges: vec![0u16..=3u16],
254 db_id: "test-db".to_string(),
255 snapshot_id: 7,
256 manifest_path: "test-db/snapshot/SNAPSHOT-7".to_string(),
257 timestamp_seconds: 42,
258 },
259 start: Some(b"a".to_vec()),
260 end: Some(b"z".to_vec()),
261 };
262 let json = serde_json::to_string(&split).unwrap();
263 let decoded: StructuredScanSplit = serde_json::from_str(&json).unwrap();
264 assert_eq!(decoded.shard.db_id, "test-db");
265 assert_eq!(decoded.shard.snapshot_id, 7);
266 assert_eq!(decoded.start, Some(b"a".to_vec()));
267 assert_eq!(decoded.end, Some(b"z".to_vec()));
268 }
269
270 #[test]
271 fn test_structured_scan_projection_reindexes_schema() {
272 let root = "/tmp/structured_scan_projection_reindex";
273 cleanup_root(root);
274
275 let config = test_config(root);
276 let (_db, shard_input) = write_and_snapshot(&config, test_schema(), |db| {
277 db.put(0, b"k1", 0, b"v1".to_vec()).unwrap();
278 db.merge(0, b"k1", 1, vec![b"a".to_vec()]).unwrap();
279 db.merge(0, b"k1", 1, vec![b"b".to_vec()]).unwrap();
280 });
281
282 let coordinator = DbCoordinator::open(CoordinatorConfig {
283 volumes: vec![VolumeDescriptor::new(
284 format!("file://{}/coordinator", root),
285 vec![
286 VolumeUsageKind::PrimaryDataPriorityHigh,
287 VolumeUsageKind::Meta,
288 ],
289 )],
290 snapshot_retention: None,
291 })
292 .unwrap();
293 let global = coordinator
294 .take_global_snapshot(4, vec![shard_input])
295 .unwrap();
296 coordinator.materialize_global_snapshot(&global).unwrap();
297
298 let split = StructuredScanPlan::new(global).splits().remove(0);
299 let scanner = split
300 .create_scanner(config, &ScanOptions::for_column(1))
301 .unwrap();
302 let rows: Vec<_> = scanner.map(|r| r.unwrap()).collect();
303 assert_eq!(rows.len(), 1);
304 assert_eq!(rows[0].0.as_ref(), b"k1");
305 assert_eq!(rows[0].1.len(), 1);
306 assert_eq!(
307 rows[0].1[0],
308 Some(StructuredColumnValue::List(vec![
309 Bytes::from_static(b"a"),
310 Bytes::from_static(b"b"),
311 ]))
312 );
313
314 cleanup_root(root);
315 }
316}