Skip to main content

cobble_data_structure/
structured_single_db.rs

1use crate::structured_db::{
2    StructuredColumnValue, StructuredDbIterator, StructuredSchema, StructuredSchemaBuilder,
3    StructuredSchemaOwner, StructuredWriteBatch, decode_row, encode_for_write,
4    load_structured_schema_from_cobble_schema, persist_structured_schema_on_db,
5    project_decoded_row_for_read, project_structured_schema_for_scan,
6};
7use cobble::{Config, ReadOptions, Result, ScanOptions, SingleDb, WriteOptions};
8use std::ops::Range;
9use std::sync::Arc;
10
11pub struct StructuredSingleDb {
12    db: SingleDb,
13    structured_schema: Arc<StructuredSchema>,
14}
15
16impl StructuredSingleDb {
17    pub fn open(config: Config) -> Result<Self> {
18        let db = SingleDb::open(config)?;
19        let structured_schema =
20            load_structured_schema_from_cobble_schema(&db.db().current_schema())?;
21        Ok(Self {
22            db,
23            structured_schema: Arc::new(structured_schema),
24        })
25    }
26
27    pub fn db(&self) -> &SingleDb {
28        &self.db
29    }
30
31    pub fn current_schema(&self) -> StructuredSchema {
32        self.structured_schema.as_ref().clone()
33    }
34
35    pub fn update_schema(&mut self) -> StructuredSchemaBuilder<'_, Self> {
36        StructuredSchemaBuilder::new(self)
37    }
38}
39
40impl StructuredSingleDb {
41    pub fn reload_schema(&mut self) -> Result<()> {
42        let schema = load_structured_schema_from_cobble_schema(&self.db.db().current_schema())?;
43        self.structured_schema = Arc::new(schema);
44        Ok(())
45    }
46
47    pub fn apply_schema(
48        &mut self,
49        structured_schema: StructuredSchema,
50    ) -> Result<StructuredSchema> {
51        persist_structured_schema_on_db(self.db.db(), &structured_schema)?;
52        let reloaded = load_structured_schema_from_cobble_schema(&self.db.db().current_schema())?;
53        self.structured_schema = Arc::new(reloaded.clone());
54        Ok(reloaded)
55    }
56
57    // ── Write operations ────────────────────────────────────────────────
58
59    pub fn put<K, V>(&self, bucket: u16, key: K, column: u16, value: V) -> Result<()>
60    where
61        K: AsRef<[u8]>,
62        V: Into<StructuredColumnValue>,
63    {
64        self.put_with_options(bucket, key, column, value, &WriteOptions::default())
65    }
66
67    pub fn put_with_options<K, V>(
68        &self,
69        bucket: u16,
70        key: K,
71        column: u16,
72        value: V,
73        options: &WriteOptions,
74    ) -> Result<()>
75    where
76        K: AsRef<[u8]>,
77        V: Into<StructuredColumnValue>,
78    {
79        let encoded = encode_for_write(
80            &self.structured_schema,
81            self.db.db().now_seconds(),
82            column,
83            value.into(),
84            options.ttl_seconds,
85        )?;
86        self.db
87            .put_with_options(bucket, key, column, encoded, options)
88    }
89
90    pub fn merge<K, V>(&self, bucket: u16, key: K, column: u16, value: V) -> Result<()>
91    where
92        K: AsRef<[u8]>,
93        V: Into<StructuredColumnValue>,
94    {
95        self.merge_with_options(bucket, key, column, value, &WriteOptions::default())
96    }
97
98    pub fn merge_with_options<K, V>(
99        &self,
100        bucket: u16,
101        key: K,
102        column: u16,
103        value: V,
104        options: &WriteOptions,
105    ) -> Result<()>
106    where
107        K: AsRef<[u8]>,
108        V: Into<StructuredColumnValue>,
109    {
110        let encoded = encode_for_write(
111            &self.structured_schema,
112            self.db.db().now_seconds(),
113            column,
114            value.into(),
115            options.ttl_seconds,
116        )?;
117        self.db
118            .merge_with_options(bucket, key, column, encoded, options)
119    }
120
121    pub fn delete<K>(&self, bucket: u16, key: K, column: u16) -> Result<()>
122    where
123        K: AsRef<[u8]>,
124    {
125        self.db.delete(bucket, key, column)
126    }
127
128    pub fn new_write_batch(&self) -> StructuredWriteBatch {
129        StructuredWriteBatch::new(
130            Arc::clone(&self.structured_schema),
131            self.db.db().now_seconds(),
132        )
133    }
134
135    pub fn write_batch(&self, batch: StructuredWriteBatch) -> Result<()> {
136        self.db.write_batch(batch.into_inner())
137    }
138
139    // ── Read operations ─────────────────────────────────────────────────
140
141    pub fn get<K>(&self, bucket: u16, key: K) -> Result<Option<Vec<Option<StructuredColumnValue>>>>
142    where
143        K: AsRef<[u8]>,
144    {
145        let raw = self.db.get(bucket, key.as_ref())?;
146        raw.map(|columns| decode_row(&self.structured_schema, 0, columns))
147            .transpose()
148    }
149
150    pub fn get_with_options<K>(
151        &self,
152        bucket: u16,
153        key: K,
154        options: &ReadOptions,
155    ) -> Result<Option<Vec<Option<StructuredColumnValue>>>>
156    where
157        K: AsRef<[u8]>,
158    {
159        let raw = if options.column_indices.is_some() {
160            self.db.get(bucket, key.as_ref())?
161        } else {
162            self.db.get_with_options(bucket, key.as_ref(), options)?
163        };
164        raw.map(|columns| decode_row(&self.structured_schema, 0, columns))
165            .transpose()
166            .map(|row| row.map(|decoded| project_decoded_row_for_read(decoded, options)))
167    }
168
169    pub fn scan<'a>(
170        &'a self,
171        bucket: u16,
172        range: Range<&[u8]>,
173    ) -> Result<StructuredDbIterator<'a>> {
174        self.scan_with_options(bucket, range, &ScanOptions::default())
175    }
176
177    pub fn scan_with_options<'a>(
178        &'a self,
179        bucket: u16,
180        range: Range<&[u8]>,
181        options: &ScanOptions,
182    ) -> Result<StructuredDbIterator<'a>> {
183        let inner = self.db.scan_with_options(bucket, range, options)?;
184        let projected_schema = project_structured_schema_for_scan(&self.structured_schema, options);
185        Ok(StructuredDbIterator::new(inner, projected_schema, 0))
186    }
187
188    // ── Snapshot lifecycle ───────────────────────────────────────────────
189
190    pub fn snapshot(&self) -> Result<u64> {
191        self.db.snapshot()
192    }
193
194    pub fn snapshot_with_callback<F>(&self, callback: F) -> Result<u64>
195    where
196        F: Fn(Result<cobble::GlobalSnapshotManifest>) + Send + Sync + 'static,
197    {
198        self.db.snapshot_with_callback(callback)
199    }
200
201    pub fn retain_snapshot(&self, global_snapshot_id: u64) -> Result<bool> {
202        self.db.retain_snapshot(global_snapshot_id)
203    }
204
205    pub fn expire_snapshot(&self, global_snapshot_id: u64) -> Result<bool> {
206        self.db.expire_snapshot(global_snapshot_id)
207    }
208
209    pub fn list_snapshots(&self) -> Result<Vec<cobble::GlobalSnapshotManifest>> {
210        self.db.list_snapshots()
211    }
212
213    pub fn set_time(&self, next: u32) {
214        self.db.set_time(next)
215    }
216
217    pub fn close(&self) -> Result<()> {
218        self.db.close()
219    }
220}
221
222impl StructuredSchemaOwner for StructuredSingleDb {
223    fn current_structured_schema(&self) -> StructuredSchema {
224        self.current_schema()
225    }
226
227    fn begin_core_schema_update(&self) -> cobble::SchemaBuilder {
228        self.db.db().update_schema()
229    }
230
231    fn reload_structured_schema_from_core(&mut self) -> Result<StructuredSchema> {
232        self.reload_schema()?;
233        Ok(self.current_schema())
234    }
235}
236
237#[cfg(test)]
238mod tests {
239    use super::*;
240    use crate::list::{ListConfig, ListRetainMode};
241    use bytes::Bytes;
242    use cobble::{ReadOptions, VolumeDescriptor};
243    use std::thread;
244    use std::time::Duration;
245    use uuid::Uuid;
246
247    fn apply_test_schema(db: &mut StructuredSingleDb) {
248        db.update_schema()
249            .add_list_column(
250                1,
251                ListConfig {
252                    max_elements: Some(3),
253                    retain_mode: ListRetainMode::Last,
254                    preserve_element_ttl: false,
255                },
256            )
257            .commit()
258            .unwrap();
259    }
260
261    fn test_config(root: &str) -> Config {
262        Config {
263            volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
264            num_columns: 2,
265            total_buckets: 2,
266            snapshot_on_flush: true,
267            ..Config::default()
268        }
269    }
270
271    #[test]
272    fn test_structured_single_db_put_get_scan() {
273        let root = format!("/tmp/ds_single_put_get_{}", Uuid::new_v4());
274        let mut db = StructuredSingleDb::open(test_config(&root)).unwrap();
275        apply_test_schema(&mut db);
276
277        db.put(0, b"k1", 0, Bytes::from_static(b"v0")).unwrap();
278        db.merge(0, b"k1", 1, vec![Bytes::from_static(b"a")])
279            .unwrap();
280        db.merge(0, b"k1", 1, vec![Bytes::from_static(b"b")])
281            .unwrap();
282
283        let row = db.get(0, b"k1").unwrap().expect("row exists");
284        assert_eq!(
285            row[0],
286            Some(StructuredColumnValue::Bytes(Bytes::from_static(b"v0")))
287        );
288        assert_eq!(
289            row[1],
290            Some(StructuredColumnValue::List(vec![
291                Bytes::from_static(b"a"),
292                Bytes::from_static(b"b"),
293            ]))
294        );
295
296        let mut iter = db.scan(0, b"k0".as_ref()..b"k9".as_ref()).unwrap();
297        let first = iter.next().expect("one row").unwrap();
298        assert_eq!(first.0.as_ref(), b"k1");
299        assert!(iter.next().is_none());
300
301        db.close().unwrap();
302        let _ = std::fs::remove_dir_all(root);
303    }
304
305    #[test]
306    fn test_structured_single_db_write_batch() {
307        let root = format!("/tmp/ds_single_batch_{}", Uuid::new_v4());
308        let mut db = StructuredSingleDb::open(test_config(&root)).unwrap();
309        apply_test_schema(&mut db);
310
311        let mut batch = db.new_write_batch();
312        batch.put(0, b"k1", 0, Bytes::from_static(b"v0")).unwrap();
313        batch
314            .merge(0, b"k1", 1, vec![Bytes::from_static(b"a")])
315            .unwrap();
316        batch
317            .merge(0, b"k1", 1, vec![Bytes::from_static(b"b")])
318            .unwrap();
319        db.write_batch(batch).unwrap();
320
321        let row = db.get(0, b"k1").unwrap().expect("row exists");
322        assert_eq!(
323            row[1],
324            Some(StructuredColumnValue::List(vec![
325                Bytes::from_static(b"a"),
326                Bytes::from_static(b"b"),
327            ]))
328        );
329
330        db.close().unwrap();
331        let _ = std::fs::remove_dir_all(root);
332    }
333
334    #[test]
335    fn test_structured_single_db_delete() {
336        let root = format!("/tmp/ds_single_delete_{}", Uuid::new_v4());
337        let mut db = StructuredSingleDb::open(test_config(&root)).unwrap();
338        apply_test_schema(&mut db);
339
340        db.put(0, b"k1", 0, Bytes::from_static(b"v0")).unwrap();
341        assert!(db.get(0, b"k1").unwrap().is_some());
342        db.delete(0, b"k1", 0).unwrap();
343        // After deleting column 0, the row may still be present but column 0 is None
344        let row = db.get(0, b"k1").unwrap();
345        if let Some(row) = row {
346            assert_eq!(row[0], None);
347        }
348
349        db.close().unwrap();
350        let _ = std::fs::remove_dir_all(root);
351    }
352
353    #[test]
354    fn test_structured_single_db_snapshot_lifecycle() {
355        let root = format!("/tmp/ds_single_snap_{}", Uuid::new_v4());
356        let mut db = StructuredSingleDb::open(test_config(&root)).unwrap();
357        apply_test_schema(&mut db);
358
359        db.put(0, b"k1", 0, Bytes::from_static(b"v0")).unwrap();
360        let snap_id = db.snapshot().unwrap();
361        // Snapshot ID is allocated from 0, just check it succeeds
362        thread::sleep(Duration::from_millis(300));
363
364        let snapshots = db.list_snapshots().unwrap();
365        assert!(!snapshots.is_empty());
366        assert_eq!(snapshots[0].id, snap_id);
367
368        db.close().unwrap();
369        let _ = std::fs::remove_dir_all(root);
370    }
371
372    #[test]
373    fn test_structured_single_db_get_with_projection_reindexes_schema() {
374        let root = format!("/tmp/ds_single_get_projection_{}", Uuid::new_v4());
375        let mut db = StructuredSingleDb::open(test_config(&root)).unwrap();
376        apply_test_schema(&mut db);
377        db.put(0, b"k1", 0, Bytes::from_static(b"v0")).unwrap();
378        db.merge(0, b"k1", 1, vec![Bytes::from_static(b"a")])
379            .unwrap();
380        db.merge(0, b"k1", 1, vec![Bytes::from_static(b"b")])
381            .unwrap();
382
383        let row = db
384            .get_with_options(0, b"k1", &ReadOptions::for_column(1))
385            .unwrap()
386            .expect("row exists");
387        assert_eq!(row.len(), 1);
388        assert_eq!(
389            row[0],
390            Some(StructuredColumnValue::List(vec![
391                Bytes::from_static(b"a"),
392                Bytes::from_static(b"b"),
393            ]))
394        );
395
396        db.close().unwrap();
397        let _ = std::fs::remove_dir_all(root);
398    }
399}