1use crate::structured_db::{
2 StructuredColumnValue, StructuredDbIterator, StructuredSchema, combined_resolver, decode_row,
3 load_structured_schema_from_cobble_schema, project_decoded_row_for_read,
4 project_structured_schema_for_scan,
5};
6use cobble::{Config, MergeOperatorResolver, ReadOnlyDb, ReadOptions, Result, ScanOptions};
7use std::ops::Range;
8use std::sync::Arc;
9
10pub struct StructuredReadOnlyDb {
11 db: ReadOnlyDb,
12 structured_schema: Arc<StructuredSchema>,
13}
14
15impl StructuredReadOnlyDb {
16 pub fn open(config: Config, snapshot_id: u64, db_id: impl Into<String>) -> Result<Self> {
17 Self::open_with_resolver(config, snapshot_id, db_id, None)
18 }
19
20 pub fn open_with_resolver(
21 config: Config,
22 snapshot_id: u64,
23 db_id: impl Into<String>,
24 resolver: Option<Arc<dyn MergeOperatorResolver>>,
25 ) -> Result<Self> {
26 let db = ReadOnlyDb::open_with_db_id_and_resolver(
27 config,
28 snapshot_id,
29 db_id,
30 combined_resolver(resolver),
31 )?;
32 let structured_schema = load_structured_schema_from_cobble_schema(&db.current_schema())?;
33 Ok(Self {
34 db,
35 structured_schema: Arc::new(structured_schema),
36 })
37 }
38
39 pub fn id(&self) -> &str {
40 self.db.id()
41 }
42
43 pub fn current_schema(&self) -> StructuredSchema {
44 self.structured_schema.as_ref().clone()
45 }
46
47 pub fn get(
48 &self,
49 bucket: u16,
50 key: &[u8],
51 ) -> Result<Option<Vec<Option<StructuredColumnValue>>>> {
52 let raw = self.db.get(bucket, key)?;
53 raw.map(|columns| decode_row(&self.structured_schema, 0, columns))
54 .transpose()
55 }
56
57 pub fn get_with_options(
58 &self,
59 bucket: u16,
60 key: &[u8],
61 options: &ReadOptions,
62 ) -> Result<Option<Vec<Option<StructuredColumnValue>>>> {
63 let raw = if options.column_indices.is_some() {
64 self.db.get(bucket, key)?
65 } else {
66 self.db.get_with_options(bucket, key, options)?
67 };
68 raw.map(|columns| decode_row(&self.structured_schema, 0, columns))
69 .transpose()
70 .map(|row| row.map(|decoded| project_decoded_row_for_read(decoded, options)))
71 }
72
73 pub fn scan(&self, bucket: u16, range: Range<&[u8]>) -> Result<StructuredDbIterator<'static>> {
74 self.scan_with_options(bucket, range, &ScanOptions::default())
75 }
76
77 pub fn scan_with_options(
78 &self,
79 bucket: u16,
80 range: Range<&[u8]>,
81 options: &ScanOptions,
82 ) -> Result<StructuredDbIterator<'static>> {
83 let inner = self.db.scan_with_options(bucket, range, options)?;
84 let projected_schema = project_structured_schema_for_scan(&self.structured_schema, options);
85 Ok(StructuredDbIterator::new(inner, projected_schema, 0))
86 }
87}
88
89#[cfg(test)]
90mod tests {
91 use super::*;
92 use crate::StructuredColumnType;
93 use crate::list::{ListConfig, ListRetainMode};
94 use crate::structured_db::StructuredDb;
95 use bytes::Bytes;
96 use cobble::{ReadOptions, VolumeDescriptor};
97 use std::collections::BTreeMap;
98 use std::thread;
99 use std::time::Duration;
100 use uuid::Uuid;
101
102 fn test_schema() -> StructuredSchema {
103 StructuredSchema {
104 columns: BTreeMap::from([(
105 1,
106 StructuredColumnType::List(ListConfig {
107 max_elements: Some(3),
108 retain_mode: ListRetainMode::Last,
109 preserve_element_ttl: false,
110 }),
111 )]),
112 }
113 }
114
115 fn test_config(root: &str) -> Config {
116 Config {
117 volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
118 num_columns: 2,
119 snapshot_on_flush: true,
120 ..Config::default()
121 }
122 }
123
124 #[test]
125 fn test_structured_read_only_db_get_scan() {
126 let root = format!("/tmp/ds_readonly_get_scan_{}", Uuid::new_v4());
127 let config = test_config(&root);
128
129 let mut db = StructuredDb::open(config.clone(), vec![0u16..=0u16]).unwrap();
131 db.apply_schema(test_schema()).unwrap();
132 db.put(0, b"k1", 0, Bytes::from_static(b"v0")).unwrap();
133 db.merge(0, b"k1", 1, vec![Bytes::from_static(b"a")])
134 .unwrap();
135 db.merge(0, b"k1", 1, vec![Bytes::from_static(b"b")])
136 .unwrap();
137 let snap_id = db.snapshot().unwrap();
138 thread::sleep(Duration::from_millis(200));
139 let db_id = db.id().to_string();
140 db.close().unwrap();
141
142 let rodb = StructuredReadOnlyDb::open(config, snap_id, db_id).unwrap();
144
145 assert_eq!(rodb.current_schema(), test_schema());
147
148 let row = rodb.get(0, b"k1").unwrap().expect("row exists");
150 assert_eq!(
151 row[0],
152 Some(StructuredColumnValue::Bytes(Bytes::from_static(b"v0")))
153 );
154 assert_eq!(
155 row[1],
156 Some(StructuredColumnValue::List(vec![
157 Bytes::from_static(b"a"),
158 Bytes::from_static(b"b"),
159 ]))
160 );
161
162 let mut iter = rodb.scan(0, b"k0".as_ref()..b"k9".as_ref()).unwrap();
164 let first = iter.next().expect("one row").unwrap();
165 assert_eq!(first.0.as_ref(), b"k1");
166 assert!(iter.next().is_none());
167
168 let _ = std::fs::remove_dir_all(root);
169 }
170
171 #[test]
172 fn test_structured_read_only_db_missing_key() {
173 let root = format!("/tmp/ds_readonly_missing_{}", Uuid::new_v4());
174 let config = test_config(&root);
175
176 let mut db = StructuredDb::open(config.clone(), vec![0u16..=0u16]).unwrap();
177 db.apply_schema(test_schema()).unwrap();
178 db.put(0, b"k1", 0, Bytes::from_static(b"v0")).unwrap();
179 let snap_id = db.snapshot().unwrap();
180 thread::sleep(Duration::from_millis(200));
181 let db_id = db.id().to_string();
182 db.close().unwrap();
183
184 let rodb = StructuredReadOnlyDb::open(config, snap_id, db_id).unwrap();
185 assert!(rodb.get(0, b"no-such-key").unwrap().is_none());
186
187 let _ = std::fs::remove_dir_all(root);
188 }
189
190 #[test]
191 fn test_structured_read_only_db_get_with_projection_reindexes_schema() {
192 let root = format!("/tmp/ds_readonly_get_projection_{}", Uuid::new_v4());
193 let config = test_config(&root);
194
195 let mut db = StructuredDb::open(config.clone(), vec![0u16..=0u16]).unwrap();
196 db.apply_schema(test_schema()).unwrap();
197 db.put(0, b"k1", 0, Bytes::from_static(b"v0")).unwrap();
198 db.merge(0, b"k1", 1, vec![Bytes::from_static(b"a")])
199 .unwrap();
200 db.merge(0, b"k1", 1, vec![Bytes::from_static(b"b")])
201 .unwrap();
202 let snap_id = db.snapshot().unwrap();
203 thread::sleep(Duration::from_millis(200));
204 let db_id = db.id().to_string();
205 db.close().unwrap();
206
207 let rodb = StructuredReadOnlyDb::open(config, snap_id, db_id).unwrap();
208 let row = rodb
209 .get_with_options(0, b"k1", &ReadOptions::for_column(1))
210 .unwrap()
211 .expect("row exists");
212 assert_eq!(row.len(), 1);
213 assert_eq!(
214 row[0],
215 Some(StructuredColumnValue::List(vec![
216 Bytes::from_static(b"a"),
217 Bytes::from_static(b"b"),
218 ]))
219 );
220
221 let _ = std::fs::remove_dir_all(root);
222 }
223}