1use crate::list::{
2 LIST_OPERATOR_ID, ListConfig, decode_list_for_read, encode_list_for_write, list_operator,
3 list_operator_from_metadata,
4};
5use bytes::Bytes;
6use cobble::{
7 BytesMergeOperator, Config, Db, DbIterator, Error, MergeOperatorResolver, ReadOptions, Result,
8 ScanOptions, Schema, SchemaBuilder, ShardSnapshotInput, WriteBatch, WriteOptions,
9};
10use serde::{Deserialize, Serialize};
11use serde_json::Value as JsonValue;
12use std::collections::BTreeMap;
13use std::ops::{Range, RangeInclusive};
14use std::sync::Arc;
15
16#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
17pub struct StructuredSchema {
18 pub columns: BTreeMap<u16, StructuredColumnType>,
19}
20
21#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
22#[serde(tag = "kind", rename_all = "snake_case")]
23pub enum StructuredColumnType {
24 #[default]
25 Bytes,
26 List(ListConfig),
27}
28
29#[derive(Clone, Debug, PartialEq, Eq)]
30pub enum StructuredColumnValue {
31 Bytes(Bytes),
32 List(Vec<Bytes>),
33}
34
35pub(crate) fn column_type(schema: &StructuredSchema, column: u16) -> &StructuredColumnType {
38 schema
39 .columns
40 .get(&column)
41 .unwrap_or(&StructuredColumnType::Bytes)
42}
43
44pub(crate) fn encode_for_write(
45 schema: &StructuredSchema,
46 now_seconds: u32,
47 column: u16,
48 value: StructuredColumnValue,
49 ttl_seconds: Option<u32>,
50) -> Result<Bytes> {
51 match (column_type(schema, column), value) {
52 (StructuredColumnType::Bytes, StructuredColumnValue::Bytes(value)) => Ok(value),
53 (StructuredColumnType::List(config), StructuredColumnValue::List(elements)) => {
54 encode_list_for_write(elements, config, ttl_seconds, now_seconds)
55 }
56 (_, _) => Err(Error::InputError(format!(
57 "column {} expects a different type of value",
58 column,
59 ))),
60 }
61}
62
63pub(crate) fn decode_row(
64 schema: &StructuredSchema,
65 now_seconds: u32,
66 columns: Vec<Option<Bytes>>,
67) -> Result<Vec<Option<StructuredColumnValue>>> {
68 columns
69 .into_iter()
70 .enumerate()
71 .map(|(idx, column)| {
72 let Some(raw) = column else {
73 return Ok(None);
74 };
75 match column_type(schema, idx as u16) {
76 StructuredColumnType::Bytes => Ok(Some(StructuredColumnValue::Bytes(raw))),
77 StructuredColumnType::List(config) => Ok(Some(StructuredColumnValue::List(
78 decode_list_for_read(&raw, config, now_seconds)?,
79 ))),
80 }
81 })
82 .collect()
83}
84
85pub(crate) fn project_structured_schema_for_indices(
86 schema: &StructuredSchema,
87 column_indices: Option<&[usize]>,
88) -> Arc<StructuredSchema> {
89 let Some(indices) = column_indices else {
90 return Arc::new(schema.clone());
91 };
92 let mut columns = BTreeMap::new();
93 for (projected_idx, original_idx) in indices.iter().enumerate() {
94 if let Some(column_type) = schema.columns.get(&(*original_idx as u16)) {
95 columns.insert(projected_idx as u16, column_type.clone());
96 }
97 }
98 Arc::new(StructuredSchema { columns })
99}
100
101pub(crate) fn project_structured_schema_for_scan(
102 schema: &StructuredSchema,
103 options: &ScanOptions,
104) -> Arc<StructuredSchema> {
105 project_structured_schema_for_indices(schema, options.column_indices.as_deref())
106}
107
108pub(crate) fn project_decoded_row_for_read(
109 row: Vec<Option<StructuredColumnValue>>,
110 options: &ReadOptions,
111) -> Vec<Option<StructuredColumnValue>> {
112 let Some(indices) = options.column_indices.as_deref() else {
113 return row;
114 };
115 indices
116 .iter()
117 .map(|&idx| row.get(idx).cloned().unwrap_or(None))
118 .collect()
119}
120
121pub(crate) fn load_structured_schema_from_cobble_schema(
122 schema: &Schema,
123) -> Result<StructuredSchema> {
124 let operator_ids = schema.all_operator_ids();
125 let mut columns = BTreeMap::new();
126 for column_idx in 0..schema.num_columns() {
127 let operator_id = operator_ids
128 .get(column_idx)
129 .map(|s| s.as_str())
130 .unwrap_or("");
131 if operator_id == LIST_OPERATOR_ID {
132 let metadata_value = schema.column_metadata_at(column_idx).ok_or_else(|| {
133 Error::FileFormatError(format!("list column {} missing metadata", column_idx))
134 })?;
135 let config =
136 serde_json::from_value::<ListConfig>(metadata_value.clone()).map_err(|err| {
137 Error::FileFormatError(format!(
138 "failed to decode list config at column {}: {}",
139 column_idx, err
140 ))
141 })?;
142 columns.insert(column_idx as u16, StructuredColumnType::List(config));
143 }
144 }
145 Ok(StructuredSchema { columns })
146}
147
148pub(crate) fn combined_resolver(
149 custom: Option<Arc<dyn MergeOperatorResolver>>,
150) -> Arc<dyn MergeOperatorResolver> {
151 Arc::new(move |id: &str, metadata: Option<&JsonValue>| {
152 if let Some(operator) = list_operator_from_metadata(id, metadata) {
153 return Some(operator);
154 }
155 custom
156 .as_ref()
157 .and_then(|resolver| resolver.resolve(id, metadata))
158 })
159}
160
161pub fn structured_merge_operator_resolver() -> Arc<dyn MergeOperatorResolver> {
164 combined_resolver(None)
165}
166
167pub fn structured_resolvable_operator_ids() -> Vec<String> {
169 vec![LIST_OPERATOR_ID.to_string()]
170}
171
172pub struct StructuredWriteBatch {
179 structured_schema: Arc<StructuredSchema>,
180 now_seconds: u32,
181 inner: WriteBatch,
182}
183
184impl StructuredWriteBatch {
185 pub(crate) fn new(structured_schema: Arc<StructuredSchema>, now_seconds: u32) -> Self {
186 Self {
187 structured_schema,
188 now_seconds,
189 inner: WriteBatch::new(),
190 }
191 }
192
193 pub fn put<K, V>(&mut self, bucket: u16, key: K, column: u16, value: V) -> Result<()>
194 where
195 K: AsRef<[u8]>,
196 V: Into<StructuredColumnValue>,
197 {
198 self.put_with_options(bucket, key, column, value, &WriteOptions::default())
199 }
200
201 pub fn put_with_options<K, V>(
202 &mut self,
203 bucket: u16,
204 key: K,
205 column: u16,
206 value: V,
207 options: &WriteOptions,
208 ) -> Result<()>
209 where
210 K: AsRef<[u8]>,
211 V: Into<StructuredColumnValue>,
212 {
213 let encoded = encode_for_write(
214 &self.structured_schema,
215 self.now_seconds,
216 column,
217 value.into(),
218 options.ttl_seconds,
219 )?;
220 self.inner
221 .put_with_options(bucket, key, column, encoded, options);
222 Ok(())
223 }
224
225 pub fn delete<K>(&mut self, bucket: u16, key: K, column: u16)
226 where
227 K: AsRef<[u8]>,
228 {
229 self.inner.delete(bucket, key, column);
230 }
231
232 pub fn merge<K, V>(&mut self, bucket: u16, key: K, column: u16, value: V) -> Result<()>
233 where
234 K: AsRef<[u8]>,
235 V: Into<StructuredColumnValue>,
236 {
237 self.merge_with_options(bucket, key, column, value, &WriteOptions::default())
238 }
239
240 pub fn merge_with_options<K, V>(
241 &mut self,
242 bucket: u16,
243 key: K,
244 column: u16,
245 value: V,
246 options: &WriteOptions,
247 ) -> Result<()>
248 where
249 K: AsRef<[u8]>,
250 V: Into<StructuredColumnValue>,
251 {
252 let encoded = encode_for_write(
253 &self.structured_schema,
254 self.now_seconds,
255 column,
256 value.into(),
257 options.ttl_seconds,
258 )?;
259 self.inner
260 .merge_with_options(bucket, key, column, encoded, options);
261 Ok(())
262 }
263
264 pub(crate) fn into_inner(self) -> WriteBatch {
265 self.inner
266 }
267}
268
269impl From<Bytes> for StructuredColumnValue {
272 fn from(value: Bytes) -> Self {
273 Self::Bytes(value)
274 }
275}
276
277impl From<Vec<u8>> for StructuredColumnValue {
278 fn from(value: Vec<u8>) -> Self {
279 Self::Bytes(Bytes::from(value))
280 }
281}
282
283impl From<Vec<Bytes>> for StructuredColumnValue {
284 fn from(value: Vec<Bytes>) -> Self {
285 Self::List(value)
286 }
287}
288
289impl From<Vec<Vec<u8>>> for StructuredColumnValue {
290 fn from(value: Vec<Vec<u8>>) -> Self {
291 Self::List(value.into_iter().map(Bytes::from).collect())
292 }
293}
294
295pub struct StructuredDb {
298 db: Db,
299 structured_schema: Arc<StructuredSchema>,
300}
301
302pub trait StructuredSchemaOwner {
303 fn current_structured_schema(&self) -> StructuredSchema;
304 fn begin_core_schema_update(&self) -> SchemaBuilder;
305 fn reload_structured_schema_from_core(&mut self) -> Result<StructuredSchema>;
306}
307
308pub struct StructuredSchemaBuilder<'a, O: StructuredSchemaOwner> {
309 owner: &'a mut O,
310 schema: StructuredSchema,
311 inner: Option<SchemaBuilder>,
312 pending_error: Option<Error>,
313}
314
315impl<'a, O: StructuredSchemaOwner> StructuredSchemaBuilder<'a, O> {
316 pub fn new(owner: &'a mut O) -> Self {
317 let schema = owner.current_structured_schema();
318 let inner = owner.begin_core_schema_update();
319 Self {
320 owner,
321 schema,
322 inner: Some(inner),
323 pending_error: None,
324 }
325 }
326
327 pub fn add_bytes_column(&mut self, column: u16) -> &mut Self {
328 self.schema.columns.remove(&column);
329 self.apply_inner(|inner| {
330 inner.set_column_operator(column as usize, Arc::new(BytesMergeOperator))?;
331 inner.clear_column_metadata(column as usize)?;
332 Ok(())
333 });
334 self
335 }
336
337 pub fn add_list_column(&mut self, column: u16, config: ListConfig) -> &mut Self {
338 self.schema
339 .columns
340 .insert(column, StructuredColumnType::List(config.clone()));
341 self.apply_inner(|inner| {
342 inner.set_column_operator(column as usize, list_operator(config.clone()))?;
343 inner.set_column_metadata(
344 column as usize,
345 serde_json::to_value(config).map_err(|err| {
346 Error::FileFormatError(format!(
347 "failed to encode list config metadata: {}",
348 err
349 ))
350 })?,
351 )?;
352 Ok(())
353 });
354 self
355 }
356
357 pub fn delete_column(&mut self, column: u16) -> &mut Self {
358 self.schema.columns.remove(&column);
360 self.apply_inner(|inner| {
361 inner.set_column_operator(column as usize, Arc::new(BytesMergeOperator))?;
362 inner.clear_column_metadata(column as usize)?;
363 Ok(())
364 });
365 self
366 }
367
368 pub fn current_schema(&self) -> &StructuredSchema {
369 &self.schema
370 }
371
372 pub fn commit(&mut self) -> Result<StructuredSchema> {
373 if let Some(err) = self.pending_error.take() {
374 return Err(err);
375 }
376 let inner = self
377 .inner
378 .take()
379 .ok_or_else(|| Error::InvalidState("schema builder already committed".to_string()))?;
380 inner.commit();
381 self.owner.reload_structured_schema_from_core()
382 }
383
384 fn apply_inner<F>(&mut self, f: F)
385 where
386 F: FnOnce(&mut SchemaBuilder) -> Result<()>,
387 {
388 if self.pending_error.is_some() {
389 return;
390 }
391 let Some(inner) = self.inner.as_mut() else {
392 self.pending_error = Some(Error::InvalidState(
393 "schema builder already committed".to_string(),
394 ));
395 return;
396 };
397 if let Err(err) = f(inner) {
398 self.pending_error = Some(err);
399 }
400 }
401}
402
403impl StructuredDb {
404 pub fn open(config: Config, bucket_ranges: Vec<RangeInclusive<u16>>) -> Result<Self> {
405 let db = Db::open(config, bucket_ranges)?;
406 let structured_schema = load_structured_schema_from_cobble_schema(&db.current_schema())?;
407 Ok(Self {
408 db,
409 structured_schema: Arc::new(structured_schema),
410 })
411 }
412
413 pub fn open_from_snapshot(
414 config: Config,
415 snapshot_id: u64,
416 db_id: impl Into<String>,
417 ) -> Result<Self> {
418 Self::open_from_snapshot_with_resolver(config, snapshot_id, db_id, None)
419 }
420
421 pub fn open_from_snapshot_with_resolver(
422 config: Config,
423 snapshot_id: u64,
424 db_id: impl Into<String>,
425 resolver: Option<Arc<dyn MergeOperatorResolver>>,
426 ) -> Result<Self> {
427 let db = Db::open_from_snapshot_with_resolver(
428 config,
429 snapshot_id,
430 db_id,
431 Some(combined_resolver(resolver)),
432 )?;
433 let structured_schema = load_structured_schema_from_cobble_schema(&db.current_schema())?;
434 Ok(Self {
435 db,
436 structured_schema: Arc::new(structured_schema),
437 })
438 }
439
440 pub fn resume(config: Config, db_id: impl Into<String>) -> Result<Self> {
441 Self::resume_with_resolver(config, db_id, None)
442 }
443
444 pub fn resume_with_resolver(
445 config: Config,
446 db_id: impl Into<String>,
447 resolver: Option<Arc<dyn MergeOperatorResolver>>,
448 ) -> Result<Self> {
449 let db = Db::resume_with_resolver(config, db_id, Some(combined_resolver(resolver)))?;
450 let structured_schema = load_structured_schema_from_cobble_schema(&db.current_schema())?;
451 Ok(Self {
452 db,
453 structured_schema: Arc::new(structured_schema),
454 })
455 }
456
457 pub fn id(&self) -> &str {
458 self.db.id()
459 }
460
461 pub fn current_schema(&self) -> StructuredSchema {
462 self.structured_schema.as_ref().clone()
463 }
464
465 pub fn update_schema(&mut self) -> StructuredSchemaBuilder<'_, Self> {
466 StructuredSchemaBuilder::new(self)
467 }
468
469 pub fn reload_schema(&mut self) -> Result<()> {
470 let schema = load_structured_schema_from_cobble_schema(&self.db.current_schema())?;
471 self.structured_schema = Arc::new(schema);
472 Ok(())
473 }
474
475 pub fn apply_schema(
476 &mut self,
477 structured_schema: StructuredSchema,
478 ) -> Result<StructuredSchema> {
479 persist_structured_schema(&self.db, &structured_schema)?;
480 let reloaded = load_structured_schema_from_cobble_schema(&self.db.current_schema())?;
481 self.structured_schema = Arc::new(reloaded.clone());
482 Ok(reloaded)
483 }
484
485 pub fn put<K, V>(&self, bucket: u16, key: K, column: u16, value: V) -> Result<()>
486 where
487 K: AsRef<[u8]>,
488 V: Into<StructuredColumnValue>,
489 {
490 self.put_with_options(bucket, key, column, value, &WriteOptions::default())
491 }
492
493 pub fn put_with_options<K, V>(
494 &self,
495 bucket: u16,
496 key: K,
497 column: u16,
498 value: V,
499 options: &WriteOptions,
500 ) -> Result<()>
501 where
502 K: AsRef<[u8]>,
503 V: Into<StructuredColumnValue>,
504 {
505 let encoded = encode_for_write(
506 &self.structured_schema,
507 self.db.now_seconds(),
508 column,
509 value.into(),
510 options.ttl_seconds,
511 )?;
512 self.db
513 .put_with_options(bucket, key, column, encoded, options)
514 }
515
516 pub fn merge<K, V>(&self, bucket: u16, key: K, column: u16, value: V) -> Result<()>
517 where
518 K: AsRef<[u8]>,
519 V: Into<StructuredColumnValue>,
520 {
521 self.merge_with_options(bucket, key, column, value, &WriteOptions::default())
522 }
523
524 pub fn merge_with_options<K, V>(
525 &self,
526 bucket: u16,
527 key: K,
528 column: u16,
529 value: V,
530 options: &WriteOptions,
531 ) -> Result<()>
532 where
533 K: AsRef<[u8]>,
534 V: Into<StructuredColumnValue>,
535 {
536 let encoded = encode_for_write(
537 &self.structured_schema,
538 self.db.now_seconds(),
539 column,
540 value.into(),
541 options.ttl_seconds,
542 )?;
543 self.db
544 .merge_with_options(bucket, key, column, encoded, options)
545 }
546
547 pub fn delete<K>(&self, bucket: u16, key: K, column: u16) -> Result<()>
548 where
549 K: AsRef<[u8]>,
550 {
551 self.db.delete(bucket, key, column)
552 }
553
554 pub fn new_write_batch(&self) -> StructuredWriteBatch {
555 StructuredWriteBatch::new(Arc::clone(&self.structured_schema), self.db.now_seconds())
556 }
557
558 pub fn write_batch(&self, batch: StructuredWriteBatch) -> Result<()> {
559 self.db.write_batch(batch.into_inner())
560 }
561
562 pub fn get<K>(&self, bucket: u16, key: K) -> Result<Option<Vec<Option<StructuredColumnValue>>>>
563 where
564 K: AsRef<[u8]>,
565 {
566 let raw = self.db.get(bucket, key.as_ref())?;
567 raw.map(|columns| decode_row(&self.structured_schema, 0, columns))
568 .transpose()
569 }
570
571 pub fn get_with_options<K>(
572 &self,
573 bucket: u16,
574 key: K,
575 options: &ReadOptions,
576 ) -> Result<Option<Vec<Option<StructuredColumnValue>>>>
577 where
578 K: AsRef<[u8]>,
579 {
580 let raw = if options.column_indices.is_some() {
581 self.db.get(bucket, key.as_ref())?
582 } else {
583 self.db.get_with_options(bucket, key.as_ref(), options)?
584 };
585 raw.map(|columns| decode_row(&self.structured_schema, 0, columns))
586 .transpose()
587 .map(|row| row.map(|decoded| project_decoded_row_for_read(decoded, options)))
588 }
589
590 pub fn scan<'a>(
591 &'a self,
592 bucket: u16,
593 range: Range<&[u8]>,
594 ) -> Result<StructuredDbIterator<'a>> {
595 self.scan_with_options(bucket, range, &ScanOptions::default())
596 }
597
598 pub fn scan_with_options<'a>(
599 &'a self,
600 bucket: u16,
601 range: Range<&[u8]>,
602 options: &ScanOptions,
603 ) -> Result<StructuredDbIterator<'a>> {
604 let inner = self.db.scan_with_options(bucket, range, options)?;
605 let projected_schema = project_structured_schema_for_scan(&self.structured_schema, options);
606 Ok(StructuredDbIterator::new(inner, projected_schema, 0))
607 }
608
609 pub fn snapshot(&self) -> Result<u64> {
610 self.db.snapshot()
611 }
612
613 pub fn snapshot_with_callback<F>(&self, callback: F) -> Result<u64>
614 where
615 F: Fn(Result<ShardSnapshotInput>) + Send + Sync + 'static,
616 {
617 self.db.snapshot_with_callback(callback)
618 }
619
620 pub fn expire_snapshot(&self, snapshot_id: u64) -> Result<bool> {
621 self.db.expire_snapshot(snapshot_id)
622 }
623
624 pub fn retain_snapshot(&self, snapshot_id: u64) -> bool {
625 self.db.retain_snapshot(snapshot_id)
626 }
627
628 pub fn shard_snapshot_input(&self, snapshot_id: u64) -> Result<ShardSnapshotInput> {
629 self.db.shard_snapshot_input(snapshot_id)
630 }
631
632 pub fn set_time(&self, next: u32) {
633 self.db.set_time(next);
634 }
635
636 pub fn now_seconds(&self) -> u32 {
637 self.db.now_seconds()
638 }
639
640 pub fn get_raw_with_options(
641 &self,
642 bucket: u16,
643 key: &[u8],
644 options: &ReadOptions,
645 ) -> Result<Option<Vec<Option<Bytes>>>> {
646 self.db.get_with_options(bucket, key, options)
647 }
648
649 pub fn scan_raw<'a>(
650 &'a self,
651 bucket: u16,
652 range: Range<&[u8]>,
653 options: &ScanOptions,
654 ) -> Result<DbIterator<'a>> {
655 self.db.scan_with_options(bucket, range, options)
656 }
657
658 pub fn close(&self) -> Result<()> {
659 self.db.close()
660 }
661}
662
663impl StructuredSchemaOwner for StructuredDb {
664 fn current_structured_schema(&self) -> StructuredSchema {
665 self.current_schema()
666 }
667
668 fn begin_core_schema_update(&self) -> SchemaBuilder {
669 self.db.update_schema()
670 }
671
672 fn reload_structured_schema_from_core(&mut self) -> Result<StructuredSchema> {
673 self.reload_schema()?;
674 Ok(self.current_schema())
675 }
676}
677
678pub type DataStructureDb = StructuredDb;
680
681pub struct StructuredDbIterator<'a> {
684 inner: DbIterator<'a>,
685 structured_schema: Arc<StructuredSchema>,
686 now_seconds: u32,
687}
688
689impl<'a> StructuredDbIterator<'a> {
690 pub(crate) fn new(
691 inner: DbIterator<'a>,
692 structured_schema: Arc<StructuredSchema>,
693 now_seconds: u32,
694 ) -> Self {
695 Self {
696 inner,
697 structured_schema,
698 now_seconds,
699 }
700 }
701}
702
703impl Iterator for StructuredDbIterator<'_> {
704 type Item = Result<(Bytes, Vec<Option<StructuredColumnValue>>)>;
705
706 fn next(&mut self) -> Option<Self::Item> {
707 self.inner.next().map(|item| {
708 let (key, columns) = item?;
709 let decoded = decode_row(&self.structured_schema, self.now_seconds, columns)?;
710 Ok((key, decoded))
711 })
712 }
713}
714
715pub(crate) fn persist_structured_schema_on_db(
718 db: &Db,
719 structured_schema: &StructuredSchema,
720) -> Result<()> {
721 let mut schema = db.update_schema();
722 apply_structured_schema(&mut schema, structured_schema)?;
723 schema.commit();
724 Ok(())
725}
726
727fn persist_structured_schema(db: &Db, structured_schema: &StructuredSchema) -> Result<()> {
728 persist_structured_schema_on_db(db, structured_schema)
729}
730
731fn apply_structured_schema(
732 schema: &mut SchemaBuilder,
733 structured_schema: &StructuredSchema,
734) -> Result<()> {
735 for (column, column_type) in &structured_schema.columns {
736 match column_type {
737 StructuredColumnType::Bytes => {}
738 StructuredColumnType::List(config) => {
739 schema.set_column_operator(*column as usize, list_operator(config.clone()))?;
740 schema.set_column_metadata(
741 *column as usize,
742 serde_json::to_value(config).map_err(|err| {
743 Error::FileFormatError(format!(
744 "failed to encode list config metadata: {}",
745 err
746 ))
747 })?,
748 )?;
749 }
750 }
751 }
752 Ok(())
753}
754
755#[cfg(test)]
756mod tests {
757 use super::*;
758 use crate::list::{ListConfig, ListRetainMode};
759 use cobble::{ReadOptions, VolumeDescriptor};
760 use std::thread;
761 use std::time::Duration;
762 use uuid::Uuid;
763
764 #[test]
765 fn test_structured_db_resume_loads_structured_schema() {
766 let root = format!("/tmp/ds_structured_resume_{}", Uuid::new_v4());
767 let config = Config {
768 volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
769 snapshot_on_flush: true,
770 num_columns: 2,
771 ..Config::default()
772 };
773 let structured_schema = StructuredSchema {
774 columns: BTreeMap::from([(
775 1,
776 StructuredColumnType::List(ListConfig {
777 max_elements: Some(2),
778 retain_mode: ListRetainMode::Last,
779 preserve_element_ttl: true,
780 }),
781 )]),
782 };
783 let mut db = StructuredDb::open(config.clone(), vec![0u16..=0u16]).unwrap();
784 db.apply_schema(structured_schema.clone()).unwrap();
785 db.merge(0, b"k", 1, vec![Bytes::from_static(b"a")])
786 .unwrap();
787 let _ = db.snapshot().unwrap();
788 thread::sleep(Duration::from_millis(200));
789 let db_id = db.id().to_string();
790 db.close().unwrap();
791
792 let resumed = StructuredDb::resume(config, db_id).unwrap();
793 assert_eq!(resumed.current_schema(), structured_schema);
794 resumed.close().unwrap();
795 let _ = std::fs::remove_dir_all(root);
796 }
797
798 #[test]
799 fn test_structured_db_get_and_scan_return_structured_values() {
800 let root = format!("/tmp/ds_structured_get_scan_{}", Uuid::new_v4());
801 let config = Config {
802 volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
803 num_columns: 2,
804 ..Config::default()
805 };
806 let structured_schema = StructuredSchema {
807 columns: BTreeMap::from([(
808 1,
809 StructuredColumnType::List(ListConfig {
810 max_elements: Some(2),
811 retain_mode: ListRetainMode::Last,
812 preserve_element_ttl: false,
813 }),
814 )]),
815 };
816 let mut db = StructuredDb::open(config, vec![0u16..=0u16]).unwrap();
817 db.apply_schema(structured_schema).unwrap();
818 db.put(0, b"k1", 0, Bytes::from_static(b"v0")).unwrap();
819 db.merge(0, b"k1", 1, vec![Bytes::from_static(b"a")])
820 .unwrap();
821 db.merge(0, b"k1", 1, vec![Bytes::from_static(b"b")])
822 .unwrap();
823 db.merge(0, b"k1", 1, vec![Bytes::from_static(b"c")])
824 .unwrap();
825
826 let row = db.get(0, b"k1").unwrap().expect("row exists");
827 assert_eq!(
828 row[0],
829 Some(StructuredColumnValue::Bytes(Bytes::from_static(b"v0")))
830 );
831 assert_eq!(
832 row[1],
833 Some(StructuredColumnValue::List(vec![
834 Bytes::from_static(b"b"),
835 Bytes::from_static(b"c")
836 ]))
837 );
838
839 let mut iter = db.scan(0, b"k0".as_ref()..b"k9".as_ref()).unwrap();
840 let first = iter.next().expect("one row").unwrap();
841 assert_eq!(first.0.as_ref(), b"k1");
842 assert_eq!(first.1.len(), 2, "scan row should have 2 columns");
843 assert_eq!(
844 first.1[0],
845 Some(StructuredColumnValue::Bytes(Bytes::from_static(b"v0")))
846 );
847 assert_eq!(
848 first.1[1],
849 Some(StructuredColumnValue::List(vec![
850 Bytes::from_static(b"b"),
851 Bytes::from_static(b"c")
852 ]))
853 );
854 assert!(iter.next().is_none());
855
856 db.close().unwrap();
857 let _ = std::fs::remove_dir_all(root);
858 }
859
860 #[test]
861 fn test_structured_write_batch_round_trip() {
862 let root = format!("/tmp/ds_structured_write_batch_{}", Uuid::new_v4());
863 let config = Config {
864 volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
865 num_columns: 2,
866 ..Config::default()
867 };
868 let structured_schema = StructuredSchema {
869 columns: BTreeMap::from([(
870 1,
871 StructuredColumnType::List(ListConfig {
872 max_elements: Some(3),
873 retain_mode: ListRetainMode::Last,
874 preserve_element_ttl: false,
875 }),
876 )]),
877 };
878 let mut db = StructuredDb::open(config, vec![0u16..=0u16]).unwrap();
879 db.apply_schema(structured_schema).unwrap();
880 let mut batch = db.new_write_batch();
881 batch.put(0, b"k1", 0, Bytes::from_static(b"v0")).unwrap();
882 batch
883 .merge(0, b"k1", 1, vec![Bytes::from_static(b"a")])
884 .unwrap();
885 batch
886 .merge(0, b"k1", 1, vec![Bytes::from_static(b"b")])
887 .unwrap();
888 batch
889 .merge(0, b"k1", 1, vec![Bytes::from_static(b"c")])
890 .unwrap();
891 batch.put(0, b"k2", 0, Bytes::from_static(b"v2")).unwrap();
892 db.write_batch(batch).unwrap();
893
894 let row = db.get(0, b"k1").unwrap().expect("row exists");
895 assert_eq!(
896 row[0],
897 Some(StructuredColumnValue::Bytes(Bytes::from_static(b"v0")))
898 );
899 assert_eq!(
900 row[1],
901 Some(StructuredColumnValue::List(vec![
902 Bytes::from_static(b"a"),
903 Bytes::from_static(b"b"),
904 Bytes::from_static(b"c")
905 ]))
906 );
907 let mut iter = db.scan(0, b"k0".as_ref()..b"k9".as_ref()).unwrap();
908 let first = iter.next().expect("first row").unwrap();
909 assert_eq!(first.0.as_ref(), b"k1");
910 let second = iter.next().expect("second row").unwrap();
911 assert_eq!(second.0.as_ref(), b"k2");
912 assert!(iter.next().is_none());
913
914 db.close().unwrap();
915 let _ = std::fs::remove_dir_all(root);
916 }
917
918 #[test]
919 fn test_structured_write_batch_rejects_type_mismatch() {
920 let root = format!("/tmp/ds_structured_write_batch_mismatch_{}", Uuid::new_v4());
921 let config = Config {
922 volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
923 num_columns: 2,
924 ..Config::default()
925 };
926 let structured_schema = StructuredSchema {
927 columns: BTreeMap::from([(
928 1,
929 StructuredColumnType::List(ListConfig {
930 max_elements: None,
931 retain_mode: ListRetainMode::Last,
932 preserve_element_ttl: false,
933 }),
934 )]),
935 };
936 let mut db = StructuredDb::open(config, vec![0u16..=0u16]).unwrap();
937 db.apply_schema(structured_schema).unwrap();
938 let mut batch = db.new_write_batch();
939 let err = batch
940 .put(0, b"k1", 1, Bytes::from_static(b"not-a-list"))
941 .expect_err("type mismatch should fail");
942 match err {
943 Error::InputError(msg) => assert!(msg.contains("column 1 expects")),
944 other => panic!("unexpected error: {other:?}"),
945 }
946 db.close().unwrap();
947 let _ = std::fs::remove_dir_all(root);
948 }
949
950 #[test]
951 fn test_structured_scan_with_projection_reindexes_schema() {
952 let root = format!("/tmp/ds_structured_scan_projection_{}", Uuid::new_v4());
953 let config = Config {
954 volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
955 num_columns: 2,
956 ..Config::default()
957 };
958 let structured_schema = StructuredSchema {
959 columns: BTreeMap::from([(
960 1,
961 StructuredColumnType::List(ListConfig {
962 max_elements: Some(8),
963 retain_mode: ListRetainMode::Last,
964 preserve_element_ttl: false,
965 }),
966 )]),
967 };
968 let mut db = StructuredDb::open(config, vec![0u16..=0u16]).unwrap();
969 db.apply_schema(structured_schema).unwrap();
970 db.put(0, b"k1", 0, Bytes::from_static(b"v0")).unwrap();
971 db.merge(0, b"k1", 1, vec![Bytes::from_static(b"a")])
972 .unwrap();
973 db.merge(0, b"k1", 1, vec![Bytes::from_static(b"b")])
974 .unwrap();
975
976 let mut iter = db
977 .scan_with_options(
978 0,
979 b"k0".as_ref()..b"k9".as_ref(),
980 &ScanOptions::for_column(1),
981 )
982 .unwrap();
983 let first = iter.next().expect("one row").unwrap();
984 assert_eq!(first.0.as_ref(), b"k1");
985 assert_eq!(first.1.len(), 1);
986 assert_eq!(
987 first.1[0],
988 Some(StructuredColumnValue::List(vec![
989 Bytes::from_static(b"a"),
990 Bytes::from_static(b"b"),
991 ]))
992 );
993
994 db.close().unwrap();
995 let _ = std::fs::remove_dir_all(root);
996 }
997
998 #[test]
999 fn test_structured_get_with_projection_reindexes_schema() {
1000 let root = format!("/tmp/ds_structured_get_projection_{}", Uuid::new_v4());
1001 let config = Config {
1002 volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
1003 num_columns: 2,
1004 ..Config::default()
1005 };
1006 let structured_schema = StructuredSchema {
1007 columns: BTreeMap::from([(
1008 1,
1009 StructuredColumnType::List(ListConfig {
1010 max_elements: Some(8),
1011 retain_mode: ListRetainMode::Last,
1012 preserve_element_ttl: false,
1013 }),
1014 )]),
1015 };
1016 let mut db = StructuredDb::open(config, vec![0u16..=0u16]).unwrap();
1017 db.apply_schema(structured_schema).unwrap();
1018 db.put(0, b"k1", 0, Bytes::from_static(b"v0")).unwrap();
1019 db.merge(0, b"k1", 1, vec![Bytes::from_static(b"a")])
1020 .unwrap();
1021 db.merge(0, b"k1", 1, vec![Bytes::from_static(b"b")])
1022 .unwrap();
1023
1024 let row = db
1025 .get_with_options(0, b"k1", &ReadOptions::for_column(1))
1026 .unwrap()
1027 .expect("row exists");
1028 assert_eq!(row.len(), 1);
1029 assert_eq!(
1030 row[0],
1031 Some(StructuredColumnValue::List(vec![
1032 Bytes::from_static(b"a"),
1033 Bytes::from_static(b"b"),
1034 ]))
1035 );
1036
1037 db.close().unwrap();
1038 let _ = std::fs::remove_dir_all(root);
1039 }
1040}