Skip to main content

cobble_data_structure/
structured_read_only_db.rs

1use crate::structured_db::{
2    StructuredColumnValue, StructuredDbIterator, StructuredSchema, combined_resolver, decode_row,
3    load_structured_schema_from_cobble_schema, project_decoded_row_for_read,
4    project_structured_schema_for_scan,
5};
6use cobble::{Config, MergeOperatorResolver, ReadOnlyDb, ReadOptions, Result, ScanOptions};
7use std::ops::Range;
8use std::sync::Arc;
9
10pub struct StructuredReadOnlyDb {
11    db: ReadOnlyDb,
12    structured_schema: Arc<StructuredSchema>,
13}
14
15impl StructuredReadOnlyDb {
16    pub fn open(config: Config, snapshot_id: u64, db_id: String) -> Result<Self> {
17        Self::open_with_resolver(config, snapshot_id, db_id, None)
18    }
19
20    pub fn open_with_resolver(
21        config: Config,
22        snapshot_id: u64,
23        db_id: String,
24        resolver: Option<Arc<dyn MergeOperatorResolver>>,
25    ) -> Result<Self> {
26        let db = ReadOnlyDb::open_with_db_id_and_resolver(
27            config,
28            snapshot_id,
29            db_id,
30            combined_resolver(resolver),
31        )?;
32        let structured_schema = load_structured_schema_from_cobble_schema(&db.current_schema())?;
33        Ok(Self {
34            db,
35            structured_schema: Arc::new(structured_schema),
36        })
37    }
38
39    pub fn id(&self) -> &str {
40        self.db.id()
41    }
42
43    pub fn current_schema(&self) -> StructuredSchema {
44        self.structured_schema.as_ref().clone()
45    }
46
47    pub fn get(
48        &self,
49        bucket: u16,
50        key: &[u8],
51    ) -> Result<Option<Vec<Option<StructuredColumnValue>>>> {
52        let raw = self.db.get(bucket, key)?;
53        raw.map(|columns| decode_row(&self.structured_schema, 0, columns))
54            .transpose()
55    }
56
57    pub fn get_with_options(
58        &self,
59        bucket: u16,
60        key: &[u8],
61        options: &ReadOptions,
62    ) -> Result<Option<Vec<Option<StructuredColumnValue>>>> {
63        let raw = if options.column_indices.is_some() {
64            self.db.get(bucket, key)?
65        } else {
66            self.db.get_with_options(bucket, key, options)?
67        };
68        raw.map(|columns| decode_row(&self.structured_schema, 0, columns))
69            .transpose()
70            .map(|row| row.map(|decoded| project_decoded_row_for_read(decoded, options)))
71    }
72
73    pub fn scan(&self, bucket: u16, range: Range<&[u8]>) -> Result<StructuredDbIterator<'static>> {
74        self.scan_with_options(bucket, range, &ScanOptions::default())
75    }
76
77    pub fn scan_with_options(
78        &self,
79        bucket: u16,
80        range: Range<&[u8]>,
81        options: &ScanOptions,
82    ) -> Result<StructuredDbIterator<'static>> {
83        let inner = self.db.scan_with_options(bucket, range, options)?;
84        let projected_schema = project_structured_schema_for_scan(&self.structured_schema, options);
85        Ok(StructuredDbIterator::new(inner, projected_schema, 0))
86    }
87}
88
89#[cfg(test)]
90mod tests {
91    use super::*;
92    use crate::StructuredColumnType;
93    use crate::list::{ListConfig, ListRetainMode};
94    use crate::structured_db::StructuredDb;
95    use bytes::Bytes;
96    use cobble::{ReadOptions, VolumeDescriptor};
97    use std::collections::BTreeMap;
98    use std::thread;
99    use std::time::Duration;
100    use uuid::Uuid;
101
102    fn test_schema() -> StructuredSchema {
103        StructuredSchema {
104            columns: BTreeMap::from([(
105                1,
106                StructuredColumnType::List(ListConfig {
107                    max_elements: Some(3),
108                    retain_mode: ListRetainMode::Last,
109                    preserve_element_ttl: false,
110                }),
111            )]),
112        }
113    }
114
115    fn test_config(root: &str) -> Config {
116        Config {
117            volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
118            num_columns: 2,
119            snapshot_on_flush: true,
120            ..Config::default()
121        }
122    }
123
124    #[test]
125    fn test_structured_read_only_db_get_scan() {
126        let root = format!("/tmp/ds_readonly_get_scan_{}", Uuid::new_v4());
127        let config = test_config(&root);
128
129        // Write data using StructuredDb
130        let mut db = StructuredDb::open(config.clone(), vec![0u16..=0u16]).unwrap();
131        db.apply_schema(test_schema()).unwrap();
132        db.put(0, b"k1", 0, Bytes::from_static(b"v0")).unwrap();
133        db.merge(0, b"k1", 1, vec![Bytes::from_static(b"a")])
134            .unwrap();
135        db.merge(0, b"k1", 1, vec![Bytes::from_static(b"b")])
136            .unwrap();
137        let snap_id = db.snapshot().unwrap();
138        thread::sleep(Duration::from_millis(200));
139        let db_id = db.id().to_string();
140        db.close().unwrap();
141
142        // Open as StructuredReadOnlyDb
143        let rodb = StructuredReadOnlyDb::open(config, snap_id, db_id).unwrap();
144
145        // Verify schema was auto-loaded
146        assert_eq!(rodb.current_schema(), test_schema());
147
148        // get
149        let row = rodb.get(0, b"k1").unwrap().expect("row exists");
150        assert_eq!(
151            row[0],
152            Some(StructuredColumnValue::Bytes(Bytes::from_static(b"v0")))
153        );
154        assert_eq!(
155            row[1],
156            Some(StructuredColumnValue::List(vec![
157                Bytes::from_static(b"a"),
158                Bytes::from_static(b"b"),
159            ]))
160        );
161
162        // scan
163        let mut iter = rodb.scan(0, b"k0".as_ref()..b"k9".as_ref()).unwrap();
164        let first = iter.next().expect("one row").unwrap();
165        assert_eq!(first.0.as_ref(), b"k1");
166        assert!(iter.next().is_none());
167
168        let _ = std::fs::remove_dir_all(root);
169    }
170
171    #[test]
172    fn test_structured_read_only_db_missing_key() {
173        let root = format!("/tmp/ds_readonly_missing_{}", Uuid::new_v4());
174        let config = test_config(&root);
175
176        let mut db = StructuredDb::open(config.clone(), vec![0u16..=0u16]).unwrap();
177        db.apply_schema(test_schema()).unwrap();
178        db.put(0, b"k1", 0, Bytes::from_static(b"v0")).unwrap();
179        let snap_id = db.snapshot().unwrap();
180        thread::sleep(Duration::from_millis(200));
181        let db_id = db.id().to_string();
182        db.close().unwrap();
183
184        let rodb = StructuredReadOnlyDb::open(config, snap_id, db_id).unwrap();
185        assert!(rodb.get(0, b"no-such-key").unwrap().is_none());
186
187        let _ = std::fs::remove_dir_all(root);
188    }
189
190    #[test]
191    fn test_structured_read_only_db_get_with_projection_reindexes_schema() {
192        let root = format!("/tmp/ds_readonly_get_projection_{}", Uuid::new_v4());
193        let config = test_config(&root);
194
195        let mut db = StructuredDb::open(config.clone(), vec![0u16..=0u16]).unwrap();
196        db.apply_schema(test_schema()).unwrap();
197        db.put(0, b"k1", 0, Bytes::from_static(b"v0")).unwrap();
198        db.merge(0, b"k1", 1, vec![Bytes::from_static(b"a")])
199            .unwrap();
200        db.merge(0, b"k1", 1, vec![Bytes::from_static(b"b")])
201            .unwrap();
202        let snap_id = db.snapshot().unwrap();
203        thread::sleep(Duration::from_millis(200));
204        let db_id = db.id().to_string();
205        db.close().unwrap();
206
207        let rodb = StructuredReadOnlyDb::open(config, snap_id, db_id).unwrap();
208        let row = rodb
209            .get_with_options(0, b"k1", &ReadOptions::for_column(1))
210            .unwrap()
211            .expect("row exists");
212        assert_eq!(row.len(), 1);
213        assert_eq!(
214            row[0],
215            Some(StructuredColumnValue::List(vec![
216                Bytes::from_static(b"a"),
217                Bytes::from_static(b"b"),
218            ]))
219        );
220
221        let _ = std::fs::remove_dir_all(root);
222    }
223}