1use crate::structured_db::{
2 StructuredColumnValue, StructuredDbIterator, StructuredSchema, StructuredSchemaBuilder,
3 StructuredSchemaOwner, StructuredWriteBatch, decode_row, encode_for_write,
4 load_structured_schema_from_cobble_schema, persist_structured_schema_on_db,
5 project_decoded_row_for_read, project_structured_schema_for_scan,
6};
7use cobble::{Config, ReadOptions, Result, ScanOptions, SingleDb, WriteOptions};
8use std::ops::Range;
9use std::sync::Arc;
10
11pub struct StructuredSingleDb {
12 db: SingleDb,
13 structured_schema: Arc<StructuredSchema>,
14}
15
16impl StructuredSingleDb {
17 pub fn open(config: Config) -> Result<Self> {
18 let db = SingleDb::open(config)?;
19 let structured_schema =
20 load_structured_schema_from_cobble_schema(&db.db().current_schema())?;
21 Ok(Self {
22 db,
23 structured_schema: Arc::new(structured_schema),
24 })
25 }
26
27 pub fn db(&self) -> &SingleDb {
28 &self.db
29 }
30
31 pub fn current_schema(&self) -> StructuredSchema {
32 self.structured_schema.as_ref().clone()
33 }
34
35 pub fn update_schema(&mut self) -> StructuredSchemaBuilder<'_, Self> {
36 StructuredSchemaBuilder::new(self)
37 }
38}
39
40impl StructuredSingleDb {
41 pub fn reload_schema(&mut self) -> Result<()> {
42 let schema = load_structured_schema_from_cobble_schema(&self.db.db().current_schema())?;
43 self.structured_schema = Arc::new(schema);
44 Ok(())
45 }
46
47 pub fn apply_schema(
48 &mut self,
49 structured_schema: StructuredSchema,
50 ) -> Result<StructuredSchema> {
51 persist_structured_schema_on_db(self.db.db(), &structured_schema)?;
52 let reloaded = load_structured_schema_from_cobble_schema(&self.db.db().current_schema())?;
53 self.structured_schema = Arc::new(reloaded.clone());
54 Ok(reloaded)
55 }
56
57 pub fn put<K, V>(&self, bucket: u16, key: K, column: u16, value: V) -> Result<()>
60 where
61 K: AsRef<[u8]>,
62 V: Into<StructuredColumnValue>,
63 {
64 self.put_with_options(bucket, key, column, value, &WriteOptions::default())
65 }
66
67 pub fn put_with_options<K, V>(
68 &self,
69 bucket: u16,
70 key: K,
71 column: u16,
72 value: V,
73 options: &WriteOptions,
74 ) -> Result<()>
75 where
76 K: AsRef<[u8]>,
77 V: Into<StructuredColumnValue>,
78 {
79 let encoded = encode_for_write(
80 &self.structured_schema,
81 self.db.db().now_seconds(),
82 column,
83 value.into(),
84 options.ttl_seconds,
85 )?;
86 self.db
87 .put_with_options(bucket, key, column, encoded, options)
88 }
89
90 pub fn merge<K, V>(&self, bucket: u16, key: K, column: u16, value: V) -> Result<()>
91 where
92 K: AsRef<[u8]>,
93 V: Into<StructuredColumnValue>,
94 {
95 self.merge_with_options(bucket, key, column, value, &WriteOptions::default())
96 }
97
98 pub fn merge_with_options<K, V>(
99 &self,
100 bucket: u16,
101 key: K,
102 column: u16,
103 value: V,
104 options: &WriteOptions,
105 ) -> Result<()>
106 where
107 K: AsRef<[u8]>,
108 V: Into<StructuredColumnValue>,
109 {
110 let encoded = encode_for_write(
111 &self.structured_schema,
112 self.db.db().now_seconds(),
113 column,
114 value.into(),
115 options.ttl_seconds,
116 )?;
117 self.db
118 .merge_with_options(bucket, key, column, encoded, options)
119 }
120
121 pub fn delete<K>(&self, bucket: u16, key: K, column: u16) -> Result<()>
122 where
123 K: AsRef<[u8]>,
124 {
125 self.db.delete(bucket, key, column)
126 }
127
128 pub fn new_write_batch(&self) -> StructuredWriteBatch {
129 StructuredWriteBatch::new(
130 Arc::clone(&self.structured_schema),
131 self.db.db().now_seconds(),
132 )
133 }
134
135 pub fn write_batch(&self, batch: StructuredWriteBatch) -> Result<()> {
136 self.db.write_batch(batch.into_inner())
137 }
138
139 pub fn get<K>(&self, bucket: u16, key: K) -> Result<Option<Vec<Option<StructuredColumnValue>>>>
142 where
143 K: AsRef<[u8]>,
144 {
145 let raw = self.db.get(bucket, key.as_ref())?;
146 raw.map(|columns| decode_row(&self.structured_schema, 0, columns))
147 .transpose()
148 }
149
150 pub fn get_with_options<K>(
151 &self,
152 bucket: u16,
153 key: K,
154 options: &ReadOptions,
155 ) -> Result<Option<Vec<Option<StructuredColumnValue>>>>
156 where
157 K: AsRef<[u8]>,
158 {
159 let raw = if options.column_indices.is_some() {
160 self.db.get(bucket, key.as_ref())?
161 } else {
162 self.db.get_with_options(bucket, key.as_ref(), options)?
163 };
164 raw.map(|columns| decode_row(&self.structured_schema, 0, columns))
165 .transpose()
166 .map(|row| row.map(|decoded| project_decoded_row_for_read(decoded, options)))
167 }
168
169 pub fn scan<'a>(
170 &'a self,
171 bucket: u16,
172 range: Range<&[u8]>,
173 ) -> Result<StructuredDbIterator<'a>> {
174 self.scan_with_options(bucket, range, &ScanOptions::default())
175 }
176
177 pub fn scan_with_options<'a>(
178 &'a self,
179 bucket: u16,
180 range: Range<&[u8]>,
181 options: &ScanOptions,
182 ) -> Result<StructuredDbIterator<'a>> {
183 let inner = self.db.scan_with_options(bucket, range, options)?;
184 let projected_schema = project_structured_schema_for_scan(&self.structured_schema, options);
185 Ok(StructuredDbIterator::new(inner, projected_schema, 0))
186 }
187
188 pub fn snapshot(&self) -> Result<u64> {
191 self.db.snapshot()
192 }
193
194 pub fn snapshot_with_callback<F>(&self, callback: F) -> Result<u64>
195 where
196 F: Fn(Result<cobble::GlobalSnapshotManifest>) + Send + Sync + 'static,
197 {
198 self.db.snapshot_with_callback(callback)
199 }
200
201 pub fn retain_snapshot(&self, global_snapshot_id: u64) -> Result<bool> {
202 self.db.retain_snapshot(global_snapshot_id)
203 }
204
205 pub fn expire_snapshot(&self, global_snapshot_id: u64) -> Result<bool> {
206 self.db.expire_snapshot(global_snapshot_id)
207 }
208
209 pub fn list_snapshots(&self) -> Result<Vec<cobble::GlobalSnapshotManifest>> {
210 self.db.list_snapshots()
211 }
212
213 pub fn set_time(&self, next: u32) {
214 self.db.set_time(next)
215 }
216
217 pub fn close(&self) -> Result<()> {
218 self.db.close()
219 }
220}
221
222impl StructuredSchemaOwner for StructuredSingleDb {
223 fn current_structured_schema(&self) -> StructuredSchema {
224 self.current_schema()
225 }
226
227 fn begin_core_schema_update(&self) -> cobble::SchemaBuilder {
228 self.db.db().update_schema()
229 }
230
231 fn reload_structured_schema_from_core(&mut self) -> Result<StructuredSchema> {
232 self.reload_schema()?;
233 Ok(self.current_schema())
234 }
235}
236
237#[cfg(test)]
238mod tests {
239 use super::*;
240 use crate::list::{ListConfig, ListRetainMode};
241 use bytes::Bytes;
242 use cobble::{ReadOptions, VolumeDescriptor};
243 use std::thread;
244 use std::time::Duration;
245 use uuid::Uuid;
246
247 fn apply_test_schema(db: &mut StructuredSingleDb) {
248 db.update_schema()
249 .add_list_column(
250 1,
251 ListConfig {
252 max_elements: Some(3),
253 retain_mode: ListRetainMode::Last,
254 preserve_element_ttl: false,
255 },
256 )
257 .commit()
258 .unwrap();
259 }
260
261 fn test_config(root: &str) -> Config {
262 Config {
263 volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
264 num_columns: 2,
265 total_buckets: 2,
266 snapshot_on_flush: true,
267 ..Config::default()
268 }
269 }
270
271 #[test]
272 fn test_structured_single_db_put_get_scan() {
273 let root = format!("/tmp/ds_single_put_get_{}", Uuid::new_v4());
274 let mut db = StructuredSingleDb::open(test_config(&root)).unwrap();
275 apply_test_schema(&mut db);
276
277 db.put(0, b"k1", 0, Bytes::from_static(b"v0")).unwrap();
278 db.merge(0, b"k1", 1, vec![Bytes::from_static(b"a")])
279 .unwrap();
280 db.merge(0, b"k1", 1, vec![Bytes::from_static(b"b")])
281 .unwrap();
282
283 let row = db.get(0, b"k1").unwrap().expect("row exists");
284 assert_eq!(
285 row[0],
286 Some(StructuredColumnValue::Bytes(Bytes::from_static(b"v0")))
287 );
288 assert_eq!(
289 row[1],
290 Some(StructuredColumnValue::List(vec![
291 Bytes::from_static(b"a"),
292 Bytes::from_static(b"b"),
293 ]))
294 );
295
296 let mut iter = db.scan(0, b"k0".as_ref()..b"k9".as_ref()).unwrap();
297 let first = iter.next().expect("one row").unwrap();
298 assert_eq!(first.0.as_ref(), b"k1");
299 assert!(iter.next().is_none());
300
301 db.close().unwrap();
302 let _ = std::fs::remove_dir_all(root);
303 }
304
305 #[test]
306 fn test_structured_single_db_write_batch() {
307 let root = format!("/tmp/ds_single_batch_{}", Uuid::new_v4());
308 let mut db = StructuredSingleDb::open(test_config(&root)).unwrap();
309 apply_test_schema(&mut db);
310
311 let mut batch = db.new_write_batch();
312 batch.put(0, b"k1", 0, Bytes::from_static(b"v0")).unwrap();
313 batch
314 .merge(0, b"k1", 1, vec![Bytes::from_static(b"a")])
315 .unwrap();
316 batch
317 .merge(0, b"k1", 1, vec![Bytes::from_static(b"b")])
318 .unwrap();
319 db.write_batch(batch).unwrap();
320
321 let row = db.get(0, b"k1").unwrap().expect("row exists");
322 assert_eq!(
323 row[1],
324 Some(StructuredColumnValue::List(vec![
325 Bytes::from_static(b"a"),
326 Bytes::from_static(b"b"),
327 ]))
328 );
329
330 db.close().unwrap();
331 let _ = std::fs::remove_dir_all(root);
332 }
333
334 #[test]
335 fn test_structured_single_db_delete() {
336 let root = format!("/tmp/ds_single_delete_{}", Uuid::new_v4());
337 let mut db = StructuredSingleDb::open(test_config(&root)).unwrap();
338 apply_test_schema(&mut db);
339
340 db.put(0, b"k1", 0, Bytes::from_static(b"v0")).unwrap();
341 assert!(db.get(0, b"k1").unwrap().is_some());
342 db.delete(0, b"k1", 0).unwrap();
343 let row = db.get(0, b"k1").unwrap();
345 if let Some(row) = row {
346 assert_eq!(row[0], None);
347 }
348
349 db.close().unwrap();
350 let _ = std::fs::remove_dir_all(root);
351 }
352
353 #[test]
354 fn test_structured_single_db_snapshot_lifecycle() {
355 let root = format!("/tmp/ds_single_snap_{}", Uuid::new_v4());
356 let mut db = StructuredSingleDb::open(test_config(&root)).unwrap();
357 apply_test_schema(&mut db);
358
359 db.put(0, b"k1", 0, Bytes::from_static(b"v0")).unwrap();
360 let snap_id = db.snapshot().unwrap();
361 thread::sleep(Duration::from_millis(300));
363
364 let snapshots = db.list_snapshots().unwrap();
365 assert!(!snapshots.is_empty());
366 assert_eq!(snapshots[0].id, snap_id);
367
368 db.close().unwrap();
369 let _ = std::fs::remove_dir_all(root);
370 }
371
372 #[test]
373 fn test_structured_single_db_get_with_projection_reindexes_schema() {
374 let root = format!("/tmp/ds_single_get_projection_{}", Uuid::new_v4());
375 let mut db = StructuredSingleDb::open(test_config(&root)).unwrap();
376 apply_test_schema(&mut db);
377 db.put(0, b"k1", 0, Bytes::from_static(b"v0")).unwrap();
378 db.merge(0, b"k1", 1, vec![Bytes::from_static(b"a")])
379 .unwrap();
380 db.merge(0, b"k1", 1, vec![Bytes::from_static(b"b")])
381 .unwrap();
382
383 let row = db
384 .get_with_options(0, b"k1", &ReadOptions::for_column(1))
385 .unwrap()
386 .expect("row exists");
387 assert_eq!(row.len(), 1);
388 assert_eq!(
389 row[0],
390 Some(StructuredColumnValue::List(vec![
391 Bytes::from_static(b"a"),
392 Bytes::from_static(b"b"),
393 ]))
394 );
395
396 db.close().unwrap();
397 let _ = std::fs::remove_dir_all(root);
398 }
399}