1use crate::list::{
2 LIST_OPERATOR_ID, ListConfig, decode_list_for_read, encode_list_for_write, list_operator,
3 list_operator_from_metadata,
4};
5use bytes::Bytes;
6use cobble::{
7 BytesMergeOperator, Config, Db, DbIterator, Error, MergeOperatorResolver, ReadOptions, Result,
8 ScanOptions, Schema, SchemaBuilder, ShardSnapshotInput, WriteBatch, WriteOptions,
9};
10use serde::{Deserialize, Serialize};
11use serde_json::Value as JsonValue;
12use std::collections::BTreeMap;
13use std::ops::{Range, RangeInclusive};
14use std::sync::Arc;
15
16#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
17pub struct StructuredSchema {
18 pub columns: BTreeMap<u16, StructuredColumnType>,
19}
20
21#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
22#[serde(tag = "kind", rename_all = "snake_case")]
23pub enum StructuredColumnType {
24 #[default]
25 Bytes,
26 List(ListConfig),
27}
28
29#[derive(Clone, Debug, PartialEq, Eq)]
30pub enum StructuredColumnValue {
31 Bytes(Bytes),
32 List(Vec<Bytes>),
33}
34
35pub(crate) fn column_type(schema: &StructuredSchema, column: u16) -> &StructuredColumnType {
38 schema
39 .columns
40 .get(&column)
41 .unwrap_or(&StructuredColumnType::Bytes)
42}
43
44pub(crate) fn encode_for_write(
45 schema: &StructuredSchema,
46 now_seconds: u32,
47 column: u16,
48 value: StructuredColumnValue,
49 ttl_seconds: Option<u32>,
50) -> Result<Bytes> {
51 match (column_type(schema, column), value) {
52 (StructuredColumnType::Bytes, StructuredColumnValue::Bytes(value)) => Ok(value),
53 (StructuredColumnType::List(config), StructuredColumnValue::List(elements)) => {
54 encode_list_for_write(elements, config, ttl_seconds, now_seconds)
55 }
56 (_, _) => Err(Error::InputError(format!(
57 "column {} expects a different type of value",
58 column,
59 ))),
60 }
61}
62
63pub(crate) fn decode_row(
64 schema: &StructuredSchema,
65 now_seconds: u32,
66 columns: Vec<Option<Bytes>>,
67) -> Result<Vec<Option<StructuredColumnValue>>> {
68 columns
69 .into_iter()
70 .enumerate()
71 .map(|(idx, column)| {
72 let Some(raw) = column else {
73 return Ok(None);
74 };
75 match column_type(schema, idx as u16) {
76 StructuredColumnType::Bytes => Ok(Some(StructuredColumnValue::Bytes(raw))),
77 StructuredColumnType::List(config) => Ok(Some(StructuredColumnValue::List(
78 decode_list_for_read(&raw, config, now_seconds)?,
79 ))),
80 }
81 })
82 .collect()
83}
84
85pub(crate) fn project_structured_schema_for_indices(
86 schema: &StructuredSchema,
87 column_indices: Option<&[usize]>,
88) -> Arc<StructuredSchema> {
89 let Some(indices) = column_indices else {
90 return Arc::new(schema.clone());
91 };
92 let mut columns = BTreeMap::new();
93 for (projected_idx, original_idx) in indices.iter().enumerate() {
94 if let Some(column_type) = schema.columns.get(&(*original_idx as u16)) {
95 columns.insert(projected_idx as u16, column_type.clone());
96 }
97 }
98 Arc::new(StructuredSchema { columns })
99}
100
101pub(crate) fn project_structured_schema_for_scan(
102 schema: &StructuredSchema,
103 options: &ScanOptions,
104) -> Arc<StructuredSchema> {
105 project_structured_schema_for_indices(schema, options.column_indices.as_deref())
106}
107
108pub(crate) fn project_decoded_row_for_read(
109 row: Vec<Option<StructuredColumnValue>>,
110 options: &ReadOptions,
111) -> Vec<Option<StructuredColumnValue>> {
112 let Some(indices) = options.column_indices.as_deref() else {
113 return row;
114 };
115 indices
116 .iter()
117 .map(|&idx| row.get(idx).cloned().unwrap_or(None))
118 .collect()
119}
120
121pub(crate) fn load_structured_schema_from_cobble_schema(
122 schema: &Schema,
123) -> Result<StructuredSchema> {
124 let operator_ids = schema.all_operator_ids();
125 let mut columns = BTreeMap::new();
126 for column_idx in 0..schema.num_columns() {
127 let operator_id = operator_ids
128 .get(column_idx)
129 .map(|s| s.as_str())
130 .unwrap_or("");
131 if operator_id == LIST_OPERATOR_ID {
132 let metadata_value = schema.column_metadata_at(column_idx).ok_or_else(|| {
133 Error::FileFormatError(format!("list column {} missing metadata", column_idx))
134 })?;
135 let config =
136 serde_json::from_value::<ListConfig>(metadata_value.clone()).map_err(|err| {
137 Error::FileFormatError(format!(
138 "failed to decode list config at column {}: {}",
139 column_idx, err
140 ))
141 })?;
142 columns.insert(column_idx as u16, StructuredColumnType::List(config));
143 }
144 }
145 Ok(StructuredSchema { columns })
146}
147
148pub(crate) fn combined_resolver(
149 custom: Option<Arc<dyn MergeOperatorResolver>>,
150) -> Arc<dyn MergeOperatorResolver> {
151 Arc::new(move |id: &str, metadata: Option<&JsonValue>| {
152 if let Some(operator) = list_operator_from_metadata(id, metadata) {
153 return Some(operator);
154 }
155 custom
156 .as_ref()
157 .and_then(|resolver| resolver.resolve(id, metadata))
158 })
159}
160
161pub fn structured_merge_operator_resolver() -> Arc<dyn MergeOperatorResolver> {
164 combined_resolver(None)
165}
166
167pub fn structured_resolvable_operator_ids() -> Vec<String> {
169 vec![LIST_OPERATOR_ID.to_string()]
170}
171
172pub struct StructuredWriteBatch {
179 structured_schema: Arc<StructuredSchema>,
180 now_seconds: u32,
181 inner: WriteBatch,
182}
183
184impl StructuredWriteBatch {
185 pub(crate) fn new(structured_schema: Arc<StructuredSchema>, now_seconds: u32) -> Self {
186 Self {
187 structured_schema,
188 now_seconds,
189 inner: WriteBatch::new(),
190 }
191 }
192
193 pub fn put<K, V>(&mut self, bucket: u16, key: K, column: u16, value: V) -> Result<()>
194 where
195 K: AsRef<[u8]>,
196 V: Into<StructuredColumnValue>,
197 {
198 self.put_with_options(bucket, key, column, value, &WriteOptions::default())
199 }
200
201 pub fn put_with_options<K, V>(
202 &mut self,
203 bucket: u16,
204 key: K,
205 column: u16,
206 value: V,
207 options: &WriteOptions,
208 ) -> Result<()>
209 where
210 K: AsRef<[u8]>,
211 V: Into<StructuredColumnValue>,
212 {
213 let encoded = encode_for_write(
214 &self.structured_schema,
215 self.now_seconds,
216 column,
217 value.into(),
218 options.ttl_seconds,
219 )?;
220 self.inner
221 .put_with_options(bucket, key, column, encoded, options);
222 Ok(())
223 }
224
225 pub fn delete<K>(&mut self, bucket: u16, key: K, column: u16)
226 where
227 K: AsRef<[u8]>,
228 {
229 self.inner.delete(bucket, key, column);
230 }
231
232 pub fn merge<K, V>(&mut self, bucket: u16, key: K, column: u16, value: V) -> Result<()>
233 where
234 K: AsRef<[u8]>,
235 V: Into<StructuredColumnValue>,
236 {
237 self.merge_with_options(bucket, key, column, value, &WriteOptions::default())
238 }
239
240 pub fn merge_with_options<K, V>(
241 &mut self,
242 bucket: u16,
243 key: K,
244 column: u16,
245 value: V,
246 options: &WriteOptions,
247 ) -> Result<()>
248 where
249 K: AsRef<[u8]>,
250 V: Into<StructuredColumnValue>,
251 {
252 let encoded = encode_for_write(
253 &self.structured_schema,
254 self.now_seconds,
255 column,
256 value.into(),
257 options.ttl_seconds,
258 )?;
259 self.inner
260 .merge_with_options(bucket, key, column, encoded, options);
261 Ok(())
262 }
263
264 pub(crate) fn into_inner(self) -> WriteBatch {
265 self.inner
266 }
267}
268
269impl From<Bytes> for StructuredColumnValue {
272 fn from(value: Bytes) -> Self {
273 Self::Bytes(value)
274 }
275}
276
277impl From<Vec<u8>> for StructuredColumnValue {
278 fn from(value: Vec<u8>) -> Self {
279 Self::Bytes(Bytes::from(value))
280 }
281}
282
283impl From<Vec<Bytes>> for StructuredColumnValue {
284 fn from(value: Vec<Bytes>) -> Self {
285 Self::List(value)
286 }
287}
288
289impl From<Vec<Vec<u8>>> for StructuredColumnValue {
290 fn from(value: Vec<Vec<u8>>) -> Self {
291 Self::List(value.into_iter().map(Bytes::from).collect())
292 }
293}
294
295pub struct StructuredDb {
298 db: Db,
299 structured_schema: Arc<StructuredSchema>,
300}
301
302pub trait StructuredSchemaOwner {
303 fn current_structured_schema(&self) -> StructuredSchema;
304 fn begin_core_schema_update(&self) -> SchemaBuilder;
305 fn reload_structured_schema_from_core(&mut self) -> Result<StructuredSchema>;
306}
307
308pub struct StructuredSchemaBuilder<'a, O: StructuredSchemaOwner> {
309 owner: &'a mut O,
310 schema: StructuredSchema,
311 inner: Option<SchemaBuilder>,
312 pending_error: Option<Error>,
313}
314
315impl<'a, O: StructuredSchemaOwner> StructuredSchemaBuilder<'a, O> {
316 pub fn new(owner: &'a mut O) -> Self {
317 let schema = owner.current_structured_schema();
318 let inner = owner.begin_core_schema_update();
319 Self {
320 owner,
321 schema,
322 inner: Some(inner),
323 pending_error: None,
324 }
325 }
326
327 pub fn add_bytes_column(&mut self, column: u16) -> &mut Self {
328 self.schema.columns.remove(&column);
329 self.apply_inner(|inner| {
330 inner.set_column_operator(column as usize, Arc::new(BytesMergeOperator))?;
331 inner.clear_column_metadata(column as usize)?;
332 Ok(())
333 });
334 self
335 }
336
337 pub fn add_list_column(&mut self, column: u16, config: ListConfig) -> &mut Self {
338 self.schema
339 .columns
340 .insert(column, StructuredColumnType::List(config.clone()));
341 self.apply_inner(|inner| {
342 inner.set_column_operator(column as usize, list_operator(config.clone()))?;
343 inner.set_column_metadata(
344 column as usize,
345 serde_json::to_value(config).map_err(|err| {
346 Error::FileFormatError(format!(
347 "failed to encode list config metadata: {}",
348 err
349 ))
350 })?,
351 )?;
352 Ok(())
353 });
354 self
355 }
356
357 pub fn delete_column(&mut self, column: u16) -> &mut Self {
358 self.schema.columns.remove(&column);
360 self.apply_inner(|inner| {
361 inner.set_column_operator(column as usize, Arc::new(BytesMergeOperator))?;
362 inner.clear_column_metadata(column as usize)?;
363 Ok(())
364 });
365 self
366 }
367
368 pub fn current_schema(&self) -> &StructuredSchema {
369 &self.schema
370 }
371
372 pub fn commit(&mut self) -> Result<StructuredSchema> {
373 if let Some(err) = self.pending_error.take() {
374 return Err(err);
375 }
376 let inner = self
377 .inner
378 .take()
379 .ok_or_else(|| Error::InvalidState("schema builder already committed".to_string()))?;
380 inner.commit();
381 self.owner.reload_structured_schema_from_core()
382 }
383
384 fn apply_inner<F>(&mut self, f: F)
385 where
386 F: FnOnce(&mut SchemaBuilder) -> Result<()>,
387 {
388 if self.pending_error.is_some() {
389 return;
390 }
391 let Some(inner) = self.inner.as_mut() else {
392 self.pending_error = Some(Error::InvalidState(
393 "schema builder already committed".to_string(),
394 ));
395 return;
396 };
397 if let Err(err) = f(inner) {
398 self.pending_error = Some(err);
399 }
400 }
401}
402
403impl StructuredDb {
404 pub fn open(config: Config, bucket_ranges: Vec<RangeInclusive<u16>>) -> Result<Self> {
405 let db = Db::open(config, bucket_ranges)?;
406 let structured_schema = load_structured_schema_from_cobble_schema(&db.current_schema())?;
407 Ok(Self {
408 db,
409 structured_schema: Arc::new(structured_schema),
410 })
411 }
412
413 pub fn open_from_snapshot(config: Config, snapshot_id: u64, db_id: String) -> Result<Self> {
414 Self::open_from_snapshot_with_resolver(config, snapshot_id, db_id, None)
415 }
416
417 pub fn open_from_snapshot_with_resolver(
418 config: Config,
419 snapshot_id: u64,
420 db_id: String,
421 resolver: Option<Arc<dyn MergeOperatorResolver>>,
422 ) -> Result<Self> {
423 let db = Db::open_from_snapshot_with_resolver(
424 config,
425 snapshot_id,
426 db_id,
427 Some(combined_resolver(resolver)),
428 )?;
429 let structured_schema = load_structured_schema_from_cobble_schema(&db.current_schema())?;
430 Ok(Self {
431 db,
432 structured_schema: Arc::new(structured_schema),
433 })
434 }
435
436 pub fn resume(config: Config, db_id: String) -> Result<Self> {
437 Self::resume_with_resolver(config, db_id, None)
438 }
439
440 pub fn resume_with_resolver(
441 config: Config,
442 db_id: String,
443 resolver: Option<Arc<dyn MergeOperatorResolver>>,
444 ) -> Result<Self> {
445 let db = Db::resume_with_resolver(config, db_id, Some(combined_resolver(resolver)))?;
446 let structured_schema = load_structured_schema_from_cobble_schema(&db.current_schema())?;
447 Ok(Self {
448 db,
449 structured_schema: Arc::new(structured_schema),
450 })
451 }
452
453 pub fn id(&self) -> &str {
454 self.db.id()
455 }
456
457 pub fn current_schema(&self) -> StructuredSchema {
458 self.structured_schema.as_ref().clone()
459 }
460
461 pub fn update_schema(&mut self) -> StructuredSchemaBuilder<'_, Self> {
462 StructuredSchemaBuilder::new(self)
463 }
464
465 pub fn reload_schema(&mut self) -> Result<()> {
466 let schema = load_structured_schema_from_cobble_schema(&self.db.current_schema())?;
467 self.structured_schema = Arc::new(schema);
468 Ok(())
469 }
470
471 pub fn apply_schema(
472 &mut self,
473 structured_schema: StructuredSchema,
474 ) -> Result<StructuredSchema> {
475 persist_structured_schema(&self.db, &structured_schema)?;
476 let reloaded = load_structured_schema_from_cobble_schema(&self.db.current_schema())?;
477 self.structured_schema = Arc::new(reloaded.clone());
478 Ok(reloaded)
479 }
480
481 pub fn put<K, V>(&self, bucket: u16, key: K, column: u16, value: V) -> Result<()>
482 where
483 K: AsRef<[u8]>,
484 V: Into<StructuredColumnValue>,
485 {
486 self.put_with_options(bucket, key, column, value, &WriteOptions::default())
487 }
488
489 pub fn put_with_options<K, V>(
490 &self,
491 bucket: u16,
492 key: K,
493 column: u16,
494 value: V,
495 options: &WriteOptions,
496 ) -> Result<()>
497 where
498 K: AsRef<[u8]>,
499 V: Into<StructuredColumnValue>,
500 {
501 let encoded = encode_for_write(
502 &self.structured_schema,
503 self.db.now_seconds(),
504 column,
505 value.into(),
506 options.ttl_seconds,
507 )?;
508 self.db
509 .put_with_options(bucket, key, column, encoded, options)
510 }
511
512 pub fn merge<K, V>(&self, bucket: u16, key: K, column: u16, value: V) -> Result<()>
513 where
514 K: AsRef<[u8]>,
515 V: Into<StructuredColumnValue>,
516 {
517 self.merge_with_options(bucket, key, column, value, &WriteOptions::default())
518 }
519
520 pub fn merge_with_options<K, V>(
521 &self,
522 bucket: u16,
523 key: K,
524 column: u16,
525 value: V,
526 options: &WriteOptions,
527 ) -> Result<()>
528 where
529 K: AsRef<[u8]>,
530 V: Into<StructuredColumnValue>,
531 {
532 let encoded = encode_for_write(
533 &self.structured_schema,
534 self.db.now_seconds(),
535 column,
536 value.into(),
537 options.ttl_seconds,
538 )?;
539 self.db
540 .merge_with_options(bucket, key, column, encoded, options)
541 }
542
543 pub fn delete<K>(&self, bucket: u16, key: K, column: u16) -> Result<()>
544 where
545 K: AsRef<[u8]>,
546 {
547 self.db.delete(bucket, key, column)
548 }
549
550 pub fn new_write_batch(&self) -> StructuredWriteBatch {
551 StructuredWriteBatch::new(Arc::clone(&self.structured_schema), self.db.now_seconds())
552 }
553
554 pub fn write_batch(&self, batch: StructuredWriteBatch) -> Result<()> {
555 self.db.write_batch(batch.into_inner())
556 }
557
558 pub fn get<K>(&self, bucket: u16, key: K) -> Result<Option<Vec<Option<StructuredColumnValue>>>>
559 where
560 K: AsRef<[u8]>,
561 {
562 let raw = self.db.get(bucket, key.as_ref())?;
563 raw.map(|columns| decode_row(&self.structured_schema, 0, columns))
564 .transpose()
565 }
566
567 pub fn get_with_options<K>(
568 &self,
569 bucket: u16,
570 key: K,
571 options: &ReadOptions,
572 ) -> Result<Option<Vec<Option<StructuredColumnValue>>>>
573 where
574 K: AsRef<[u8]>,
575 {
576 let raw = if options.column_indices.is_some() {
577 self.db.get(bucket, key.as_ref())?
578 } else {
579 self.db.get_with_options(bucket, key.as_ref(), options)?
580 };
581 raw.map(|columns| decode_row(&self.structured_schema, 0, columns))
582 .transpose()
583 .map(|row| row.map(|decoded| project_decoded_row_for_read(decoded, options)))
584 }
585
586 pub fn scan<'a>(
587 &'a self,
588 bucket: u16,
589 range: Range<&[u8]>,
590 ) -> Result<StructuredDbIterator<'a>> {
591 self.scan_with_options(bucket, range, &ScanOptions::default())
592 }
593
594 pub fn scan_with_options<'a>(
595 &'a self,
596 bucket: u16,
597 range: Range<&[u8]>,
598 options: &ScanOptions,
599 ) -> Result<StructuredDbIterator<'a>> {
600 let inner = self.db.scan_with_options(bucket, range, options)?;
601 let projected_schema = project_structured_schema_for_scan(&self.structured_schema, options);
602 Ok(StructuredDbIterator::new(inner, projected_schema, 0))
603 }
604
605 pub fn snapshot(&self) -> Result<u64> {
606 self.db.snapshot()
607 }
608
609 pub fn snapshot_with_callback<F>(&self, callback: F) -> Result<u64>
610 where
611 F: Fn(Result<ShardSnapshotInput>) + Send + Sync + 'static,
612 {
613 self.db.snapshot_with_callback(callback)
614 }
615
616 pub fn expire_snapshot(&self, snapshot_id: u64) -> Result<bool> {
617 self.db.expire_snapshot(snapshot_id)
618 }
619
620 pub fn retain_snapshot(&self, snapshot_id: u64) -> bool {
621 self.db.retain_snapshot(snapshot_id)
622 }
623
624 pub fn shard_snapshot_input(&self, snapshot_id: u64) -> Result<ShardSnapshotInput> {
625 self.db.shard_snapshot_input(snapshot_id)
626 }
627
628 pub fn set_time(&self, next: u32) {
629 self.db.set_time(next);
630 }
631
632 pub fn now_seconds(&self) -> u32 {
633 self.db.now_seconds()
634 }
635
636 pub fn get_raw_with_options(
637 &self,
638 bucket: u16,
639 key: &[u8],
640 options: &ReadOptions,
641 ) -> Result<Option<Vec<Option<Bytes>>>> {
642 self.db.get_with_options(bucket, key, options)
643 }
644
645 pub fn scan_raw<'a>(
646 &'a self,
647 bucket: u16,
648 range: Range<&[u8]>,
649 options: &ScanOptions,
650 ) -> Result<DbIterator<'a>> {
651 self.db.scan_with_options(bucket, range, options)
652 }
653
654 pub fn close(&self) -> Result<()> {
655 self.db.close()
656 }
657}
658
659impl StructuredSchemaOwner for StructuredDb {
660 fn current_structured_schema(&self) -> StructuredSchema {
661 self.current_schema()
662 }
663
664 fn begin_core_schema_update(&self) -> SchemaBuilder {
665 self.db.update_schema()
666 }
667
668 fn reload_structured_schema_from_core(&mut self) -> Result<StructuredSchema> {
669 self.reload_schema()?;
670 Ok(self.current_schema())
671 }
672}
673
674pub type DataStructureDb = StructuredDb;
676
677pub struct StructuredDbIterator<'a> {
680 inner: DbIterator<'a>,
681 structured_schema: Arc<StructuredSchema>,
682 now_seconds: u32,
683}
684
685impl<'a> StructuredDbIterator<'a> {
686 pub(crate) fn new(
687 inner: DbIterator<'a>,
688 structured_schema: Arc<StructuredSchema>,
689 now_seconds: u32,
690 ) -> Self {
691 Self {
692 inner,
693 structured_schema,
694 now_seconds,
695 }
696 }
697}
698
699impl Iterator for StructuredDbIterator<'_> {
700 type Item = Result<(Bytes, Vec<Option<StructuredColumnValue>>)>;
701
702 fn next(&mut self) -> Option<Self::Item> {
703 self.inner.next().map(|item| {
704 let (key, columns) = item?;
705 let decoded = decode_row(&self.structured_schema, self.now_seconds, columns)?;
706 Ok((key, decoded))
707 })
708 }
709}
710
711pub(crate) fn persist_structured_schema_on_db(
714 db: &Db,
715 structured_schema: &StructuredSchema,
716) -> Result<()> {
717 let mut schema = db.update_schema();
718 apply_structured_schema(&mut schema, structured_schema)?;
719 schema.commit();
720 Ok(())
721}
722
723fn persist_structured_schema(db: &Db, structured_schema: &StructuredSchema) -> Result<()> {
724 persist_structured_schema_on_db(db, structured_schema)
725}
726
727fn apply_structured_schema(
728 schema: &mut SchemaBuilder,
729 structured_schema: &StructuredSchema,
730) -> Result<()> {
731 for (column, column_type) in &structured_schema.columns {
732 match column_type {
733 StructuredColumnType::Bytes => {}
734 StructuredColumnType::List(config) => {
735 schema.set_column_operator(*column as usize, list_operator(config.clone()))?;
736 schema.set_column_metadata(
737 *column as usize,
738 serde_json::to_value(config).map_err(|err| {
739 Error::FileFormatError(format!(
740 "failed to encode list config metadata: {}",
741 err
742 ))
743 })?,
744 )?;
745 }
746 }
747 }
748 Ok(())
749}
750
751#[cfg(test)]
752mod tests {
753 use super::*;
754 use crate::list::{ListConfig, ListRetainMode};
755 use cobble::{ReadOptions, VolumeDescriptor};
756 use std::thread;
757 use std::time::Duration;
758 use uuid::Uuid;
759
760 #[test]
761 fn test_structured_db_resume_loads_structured_schema() {
762 let root = format!("/tmp/ds_structured_resume_{}", Uuid::new_v4());
763 let config = Config {
764 volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
765 snapshot_on_flush: true,
766 num_columns: 2,
767 ..Config::default()
768 };
769 let structured_schema = StructuredSchema {
770 columns: BTreeMap::from([(
771 1,
772 StructuredColumnType::List(ListConfig {
773 max_elements: Some(2),
774 retain_mode: ListRetainMode::Last,
775 preserve_element_ttl: true,
776 }),
777 )]),
778 };
779 let mut db = StructuredDb::open(config.clone(), vec![0u16..=0u16]).unwrap();
780 db.apply_schema(structured_schema.clone()).unwrap();
781 db.merge(0, b"k", 1, vec![Bytes::from_static(b"a")])
782 .unwrap();
783 let _ = db.snapshot().unwrap();
784 thread::sleep(Duration::from_millis(200));
785 let db_id = db.id().to_string();
786 db.close().unwrap();
787
788 let resumed = StructuredDb::resume(config, db_id).unwrap();
789 assert_eq!(resumed.current_schema(), structured_schema);
790 resumed.close().unwrap();
791 let _ = std::fs::remove_dir_all(root);
792 }
793
794 #[test]
795 fn test_structured_db_get_and_scan_return_structured_values() {
796 let root = format!("/tmp/ds_structured_get_scan_{}", Uuid::new_v4());
797 let config = Config {
798 volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
799 num_columns: 2,
800 ..Config::default()
801 };
802 let structured_schema = StructuredSchema {
803 columns: BTreeMap::from([(
804 1,
805 StructuredColumnType::List(ListConfig {
806 max_elements: Some(2),
807 retain_mode: ListRetainMode::Last,
808 preserve_element_ttl: false,
809 }),
810 )]),
811 };
812 let mut db = StructuredDb::open(config, vec![0u16..=0u16]).unwrap();
813 db.apply_schema(structured_schema).unwrap();
814 db.put(0, b"k1", 0, Bytes::from_static(b"v0")).unwrap();
815 db.merge(0, b"k1", 1, vec![Bytes::from_static(b"a")])
816 .unwrap();
817 db.merge(0, b"k1", 1, vec![Bytes::from_static(b"b")])
818 .unwrap();
819 db.merge(0, b"k1", 1, vec![Bytes::from_static(b"c")])
820 .unwrap();
821
822 let row = db.get(0, b"k1").unwrap().expect("row exists");
823 assert_eq!(
824 row[0],
825 Some(StructuredColumnValue::Bytes(Bytes::from_static(b"v0")))
826 );
827 assert_eq!(
828 row[1],
829 Some(StructuredColumnValue::List(vec![
830 Bytes::from_static(b"b"),
831 Bytes::from_static(b"c")
832 ]))
833 );
834
835 let mut iter = db.scan(0, b"k0".as_ref()..b"k9".as_ref()).unwrap();
836 let first = iter.next().expect("one row").unwrap();
837 assert_eq!(first.0.as_ref(), b"k1");
838 assert_eq!(first.1.len(), 2, "scan row should have 2 columns");
839 assert_eq!(
840 first.1[0],
841 Some(StructuredColumnValue::Bytes(Bytes::from_static(b"v0")))
842 );
843 assert_eq!(
844 first.1[1],
845 Some(StructuredColumnValue::List(vec![
846 Bytes::from_static(b"b"),
847 Bytes::from_static(b"c")
848 ]))
849 );
850 assert!(iter.next().is_none());
851
852 db.close().unwrap();
853 let _ = std::fs::remove_dir_all(root);
854 }
855
856 #[test]
857 fn test_structured_write_batch_round_trip() {
858 let root = format!("/tmp/ds_structured_write_batch_{}", Uuid::new_v4());
859 let config = Config {
860 volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
861 num_columns: 2,
862 ..Config::default()
863 };
864 let structured_schema = StructuredSchema {
865 columns: BTreeMap::from([(
866 1,
867 StructuredColumnType::List(ListConfig {
868 max_elements: Some(3),
869 retain_mode: ListRetainMode::Last,
870 preserve_element_ttl: false,
871 }),
872 )]),
873 };
874 let mut db = StructuredDb::open(config, vec![0u16..=0u16]).unwrap();
875 db.apply_schema(structured_schema).unwrap();
876 let mut batch = db.new_write_batch();
877 batch.put(0, b"k1", 0, Bytes::from_static(b"v0")).unwrap();
878 batch
879 .merge(0, b"k1", 1, vec![Bytes::from_static(b"a")])
880 .unwrap();
881 batch
882 .merge(0, b"k1", 1, vec![Bytes::from_static(b"b")])
883 .unwrap();
884 batch
885 .merge(0, b"k1", 1, vec![Bytes::from_static(b"c")])
886 .unwrap();
887 batch.put(0, b"k2", 0, Bytes::from_static(b"v2")).unwrap();
888 db.write_batch(batch).unwrap();
889
890 let row = db.get(0, b"k1").unwrap().expect("row exists");
891 assert_eq!(
892 row[0],
893 Some(StructuredColumnValue::Bytes(Bytes::from_static(b"v0")))
894 );
895 assert_eq!(
896 row[1],
897 Some(StructuredColumnValue::List(vec![
898 Bytes::from_static(b"a"),
899 Bytes::from_static(b"b"),
900 Bytes::from_static(b"c")
901 ]))
902 );
903 let mut iter = db.scan(0, b"k0".as_ref()..b"k9".as_ref()).unwrap();
904 let first = iter.next().expect("first row").unwrap();
905 assert_eq!(first.0.as_ref(), b"k1");
906 let second = iter.next().expect("second row").unwrap();
907 assert_eq!(second.0.as_ref(), b"k2");
908 assert!(iter.next().is_none());
909
910 db.close().unwrap();
911 let _ = std::fs::remove_dir_all(root);
912 }
913
914 #[test]
915 fn test_structured_write_batch_rejects_type_mismatch() {
916 let root = format!("/tmp/ds_structured_write_batch_mismatch_{}", Uuid::new_v4());
917 let config = Config {
918 volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
919 num_columns: 2,
920 ..Config::default()
921 };
922 let structured_schema = StructuredSchema {
923 columns: BTreeMap::from([(
924 1,
925 StructuredColumnType::List(ListConfig {
926 max_elements: None,
927 retain_mode: ListRetainMode::Last,
928 preserve_element_ttl: false,
929 }),
930 )]),
931 };
932 let mut db = StructuredDb::open(config, vec![0u16..=0u16]).unwrap();
933 db.apply_schema(structured_schema).unwrap();
934 let mut batch = db.new_write_batch();
935 let err = batch
936 .put(0, b"k1", 1, Bytes::from_static(b"not-a-list"))
937 .expect_err("type mismatch should fail");
938 match err {
939 Error::InputError(msg) => assert!(msg.contains("column 1 expects")),
940 other => panic!("unexpected error: {other:?}"),
941 }
942 db.close().unwrap();
943 let _ = std::fs::remove_dir_all(root);
944 }
945
946 #[test]
947 fn test_structured_scan_with_projection_reindexes_schema() {
948 let root = format!("/tmp/ds_structured_scan_projection_{}", Uuid::new_v4());
949 let config = Config {
950 volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
951 num_columns: 2,
952 ..Config::default()
953 };
954 let structured_schema = StructuredSchema {
955 columns: BTreeMap::from([(
956 1,
957 StructuredColumnType::List(ListConfig {
958 max_elements: Some(8),
959 retain_mode: ListRetainMode::Last,
960 preserve_element_ttl: false,
961 }),
962 )]),
963 };
964 let mut db = StructuredDb::open(config, vec![0u16..=0u16]).unwrap();
965 db.apply_schema(structured_schema).unwrap();
966 db.put(0, b"k1", 0, Bytes::from_static(b"v0")).unwrap();
967 db.merge(0, b"k1", 1, vec![Bytes::from_static(b"a")])
968 .unwrap();
969 db.merge(0, b"k1", 1, vec![Bytes::from_static(b"b")])
970 .unwrap();
971
972 let mut iter = db
973 .scan_with_options(
974 0,
975 b"k0".as_ref()..b"k9".as_ref(),
976 &ScanOptions::for_column(1),
977 )
978 .unwrap();
979 let first = iter.next().expect("one row").unwrap();
980 assert_eq!(first.0.as_ref(), b"k1");
981 assert_eq!(first.1.len(), 1);
982 assert_eq!(
983 first.1[0],
984 Some(StructuredColumnValue::List(vec![
985 Bytes::from_static(b"a"),
986 Bytes::from_static(b"b"),
987 ]))
988 );
989
990 db.close().unwrap();
991 let _ = std::fs::remove_dir_all(root);
992 }
993
994 #[test]
995 fn test_structured_get_with_projection_reindexes_schema() {
996 let root = format!("/tmp/ds_structured_get_projection_{}", Uuid::new_v4());
997 let config = Config {
998 volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
999 num_columns: 2,
1000 ..Config::default()
1001 };
1002 let structured_schema = StructuredSchema {
1003 columns: BTreeMap::from([(
1004 1,
1005 StructuredColumnType::List(ListConfig {
1006 max_elements: Some(8),
1007 retain_mode: ListRetainMode::Last,
1008 preserve_element_ttl: false,
1009 }),
1010 )]),
1011 };
1012 let mut db = StructuredDb::open(config, vec![0u16..=0u16]).unwrap();
1013 db.apply_schema(structured_schema).unwrap();
1014 db.put(0, b"k1", 0, Bytes::from_static(b"v0")).unwrap();
1015 db.merge(0, b"k1", 1, vec![Bytes::from_static(b"a")])
1016 .unwrap();
1017 db.merge(0, b"k1", 1, vec![Bytes::from_static(b"b")])
1018 .unwrap();
1019
1020 let row = db
1021 .get_with_options(0, b"k1", &ReadOptions::for_column(1))
1022 .unwrap()
1023 .expect("row exists");
1024 assert_eq!(row.len(), 1);
1025 assert_eq!(
1026 row[0],
1027 Some(StructuredColumnValue::List(vec![
1028 Bytes::from_static(b"a"),
1029 Bytes::from_static(b"b"),
1030 ]))
1031 );
1032
1033 db.close().unwrap();
1034 let _ = std::fs::remove_dir_all(root);
1035 }
1036}