Skip to main content

cobble_data_structure/
structured_db.rs

1use crate::list::{
2    LIST_OPERATOR_ID, ListConfig, decode_list_for_read, encode_list_for_write, list_operator,
3    list_operator_from_metadata,
4};
5use bytes::Bytes;
6use cobble::{
7    BytesMergeOperator, Config, Db, DbIterator, Error, MergeOperatorResolver, ReadOptions, Result,
8    ScanOptions, Schema, SchemaBuilder, ShardSnapshotInput, WriteBatch, WriteOptions,
9};
10use serde::{Deserialize, Serialize};
11use serde_json::Value as JsonValue;
12use std::collections::BTreeMap;
13use std::ops::{Range, RangeInclusive};
14use std::sync::Arc;
15
16#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
17pub struct StructuredSchema {
18    pub columns: BTreeMap<u16, StructuredColumnType>,
19}
20
21#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
22#[serde(tag = "kind", rename_all = "snake_case")]
23pub enum StructuredColumnType {
24    #[default]
25    Bytes,
26    List(ListConfig),
27}
28
29#[derive(Clone, Debug, PartialEq, Eq)]
30pub enum StructuredColumnValue {
31    Bytes(Bytes),
32    List(Vec<Bytes>),
33}
34
35// ── Shared helpers (used by all Structured* wrappers) ───────────────────────
36
37pub(crate) fn column_type(schema: &StructuredSchema, column: u16) -> &StructuredColumnType {
38    schema
39        .columns
40        .get(&column)
41        .unwrap_or(&StructuredColumnType::Bytes)
42}
43
44pub(crate) fn encode_for_write(
45    schema: &StructuredSchema,
46    now_seconds: u32,
47    column: u16,
48    value: StructuredColumnValue,
49    ttl_seconds: Option<u32>,
50) -> Result<Bytes> {
51    match (column_type(schema, column), value) {
52        (StructuredColumnType::Bytes, StructuredColumnValue::Bytes(value)) => Ok(value),
53        (StructuredColumnType::List(config), StructuredColumnValue::List(elements)) => {
54            encode_list_for_write(elements, config, ttl_seconds, now_seconds)
55        }
56        (_, _) => Err(Error::InputError(format!(
57            "column {} expects a different type of value",
58            column,
59        ))),
60    }
61}
62
63pub(crate) fn decode_row(
64    schema: &StructuredSchema,
65    now_seconds: u32,
66    columns: Vec<Option<Bytes>>,
67) -> Result<Vec<Option<StructuredColumnValue>>> {
68    columns
69        .into_iter()
70        .enumerate()
71        .map(|(idx, column)| {
72            let Some(raw) = column else {
73                return Ok(None);
74            };
75            match column_type(schema, idx as u16) {
76                StructuredColumnType::Bytes => Ok(Some(StructuredColumnValue::Bytes(raw))),
77                StructuredColumnType::List(config) => Ok(Some(StructuredColumnValue::List(
78                    decode_list_for_read(&raw, config, now_seconds)?,
79                ))),
80            }
81        })
82        .collect()
83}
84
85pub(crate) fn project_structured_schema_for_indices(
86    schema: &StructuredSchema,
87    column_indices: Option<&[usize]>,
88) -> Arc<StructuredSchema> {
89    let Some(indices) = column_indices else {
90        return Arc::new(schema.clone());
91    };
92    let mut columns = BTreeMap::new();
93    for (projected_idx, original_idx) in indices.iter().enumerate() {
94        if let Some(column_type) = schema.columns.get(&(*original_idx as u16)) {
95            columns.insert(projected_idx as u16, column_type.clone());
96        }
97    }
98    Arc::new(StructuredSchema { columns })
99}
100
101pub(crate) fn project_structured_schema_for_scan(
102    schema: &StructuredSchema,
103    options: &ScanOptions,
104) -> Arc<StructuredSchema> {
105    project_structured_schema_for_indices(schema, options.column_indices.as_deref())
106}
107
108pub(crate) fn project_decoded_row_for_read(
109    row: Vec<Option<StructuredColumnValue>>,
110    options: &ReadOptions,
111) -> Vec<Option<StructuredColumnValue>> {
112    let Some(indices) = options.column_indices.as_deref() else {
113        return row;
114    };
115    indices
116        .iter()
117        .map(|&idx| row.get(idx).cloned().unwrap_or(None))
118        .collect()
119}
120
121pub(crate) fn load_structured_schema_from_cobble_schema(
122    schema: &Schema,
123) -> Result<StructuredSchema> {
124    let operator_ids = schema.all_operator_ids();
125    let mut columns = BTreeMap::new();
126    for column_idx in 0..schema.num_columns() {
127        let operator_id = operator_ids
128            .get(column_idx)
129            .map(|s| s.as_str())
130            .unwrap_or("");
131        if operator_id == LIST_OPERATOR_ID {
132            let metadata_value = schema.column_metadata_at(column_idx).ok_or_else(|| {
133                Error::FileFormatError(format!("list column {} missing metadata", column_idx))
134            })?;
135            let config =
136                serde_json::from_value::<ListConfig>(metadata_value.clone()).map_err(|err| {
137                    Error::FileFormatError(format!(
138                        "failed to decode list config at column {}: {}",
139                        column_idx, err
140                    ))
141                })?;
142            columns.insert(column_idx as u16, StructuredColumnType::List(config));
143        }
144    }
145    Ok(StructuredSchema { columns })
146}
147
148pub(crate) fn combined_resolver(
149    custom: Option<Arc<dyn MergeOperatorResolver>>,
150) -> Arc<dyn MergeOperatorResolver> {
151    Arc::new(move |id: &str, metadata: Option<&JsonValue>| {
152        if let Some(operator) = list_operator_from_metadata(id, metadata) {
153            return Some(operator);
154        }
155        custom
156            .as_ref()
157            .and_then(|resolver| resolver.resolve(id, metadata))
158    })
159}
160
161/// Returns a `MergeOperatorResolver` that can resolve all structured data type
162/// merge operators (e.g. list) from their metadata.
163pub fn structured_merge_operator_resolver() -> Arc<dyn MergeOperatorResolver> {
164    combined_resolver(None)
165}
166
167/// Returns the merge operator IDs that `structured_merge_operator_resolver` can resolve.
168pub fn structured_resolvable_operator_ids() -> Vec<String> {
169    vec![LIST_OPERATOR_ID.to_string()]
170}
171
172// ── StructuredWriteBatch ────────────────────────────────────────────────────
173
174/// Structured write batch wrapper.
175///
176/// Each operation is encoded and written into the inner `cobble::WriteBatch` immediately, so we
177/// avoid a second typed-op staging buffer and an extra conversion pass at flush time.
178pub struct StructuredWriteBatch {
179    structured_schema: Arc<StructuredSchema>,
180    now_seconds: u32,
181    inner: WriteBatch,
182}
183
184impl StructuredWriteBatch {
185    pub(crate) fn new(structured_schema: Arc<StructuredSchema>, now_seconds: u32) -> Self {
186        Self {
187            structured_schema,
188            now_seconds,
189            inner: WriteBatch::new(),
190        }
191    }
192
193    pub fn put<K, V>(&mut self, bucket: u16, key: K, column: u16, value: V) -> Result<()>
194    where
195        K: AsRef<[u8]>,
196        V: Into<StructuredColumnValue>,
197    {
198        self.put_with_options(bucket, key, column, value, &WriteOptions::default())
199    }
200
201    pub fn put_with_options<K, V>(
202        &mut self,
203        bucket: u16,
204        key: K,
205        column: u16,
206        value: V,
207        options: &WriteOptions,
208    ) -> Result<()>
209    where
210        K: AsRef<[u8]>,
211        V: Into<StructuredColumnValue>,
212    {
213        let encoded = encode_for_write(
214            &self.structured_schema,
215            self.now_seconds,
216            column,
217            value.into(),
218            options.ttl_seconds,
219        )?;
220        self.inner
221            .put_with_options(bucket, key, column, encoded, options);
222        Ok(())
223    }
224
225    pub fn delete<K>(&mut self, bucket: u16, key: K, column: u16)
226    where
227        K: AsRef<[u8]>,
228    {
229        self.inner.delete(bucket, key, column);
230    }
231
232    pub fn merge<K, V>(&mut self, bucket: u16, key: K, column: u16, value: V) -> Result<()>
233    where
234        K: AsRef<[u8]>,
235        V: Into<StructuredColumnValue>,
236    {
237        self.merge_with_options(bucket, key, column, value, &WriteOptions::default())
238    }
239
240    pub fn merge_with_options<K, V>(
241        &mut self,
242        bucket: u16,
243        key: K,
244        column: u16,
245        value: V,
246        options: &WriteOptions,
247    ) -> Result<()>
248    where
249        K: AsRef<[u8]>,
250        V: Into<StructuredColumnValue>,
251    {
252        let encoded = encode_for_write(
253            &self.structured_schema,
254            self.now_seconds,
255            column,
256            value.into(),
257            options.ttl_seconds,
258        )?;
259        self.inner
260            .merge_with_options(bucket, key, column, encoded, options);
261        Ok(())
262    }
263
264    pub(crate) fn into_inner(self) -> WriteBatch {
265        self.inner
266    }
267}
268
269// ── StructuredColumnValue conversions ───────────────────────────────────────
270
271impl From<Bytes> for StructuredColumnValue {
272    fn from(value: Bytes) -> Self {
273        Self::Bytes(value)
274    }
275}
276
277impl From<Vec<u8>> for StructuredColumnValue {
278    fn from(value: Vec<u8>) -> Self {
279        Self::Bytes(Bytes::from(value))
280    }
281}
282
283impl From<Vec<Bytes>> for StructuredColumnValue {
284    fn from(value: Vec<Bytes>) -> Self {
285        Self::List(value)
286    }
287}
288
289impl From<Vec<Vec<u8>>> for StructuredColumnValue {
290    fn from(value: Vec<Vec<u8>>) -> Self {
291        Self::List(value.into_iter().map(Bytes::from).collect())
292    }
293}
294
295// ── StructuredDb (formerly DataStructureDb) ─────────────────────────────────
296
297pub struct StructuredDb {
298    db: Db,
299    structured_schema: Arc<StructuredSchema>,
300}
301
302pub trait StructuredSchemaOwner {
303    fn current_structured_schema(&self) -> StructuredSchema;
304    fn begin_core_schema_update(&self) -> SchemaBuilder;
305    fn reload_structured_schema_from_core(&mut self) -> Result<StructuredSchema>;
306}
307
308pub struct StructuredSchemaBuilder<'a, O: StructuredSchemaOwner> {
309    owner: &'a mut O,
310    schema: StructuredSchema,
311    inner: Option<SchemaBuilder>,
312    pending_error: Option<Error>,
313}
314
315impl<'a, O: StructuredSchemaOwner> StructuredSchemaBuilder<'a, O> {
316    pub fn new(owner: &'a mut O) -> Self {
317        let schema = owner.current_structured_schema();
318        let inner = owner.begin_core_schema_update();
319        Self {
320            owner,
321            schema,
322            inner: Some(inner),
323            pending_error: None,
324        }
325    }
326
327    pub fn add_bytes_column(&mut self, column: u16) -> &mut Self {
328        self.schema.columns.remove(&column);
329        self.apply_inner(|inner| {
330            inner.set_column_operator(column as usize, Arc::new(BytesMergeOperator))?;
331            inner.clear_column_metadata(column as usize)?;
332            Ok(())
333        });
334        self
335    }
336
337    pub fn add_list_column(&mut self, column: u16, config: ListConfig) -> &mut Self {
338        self.schema
339            .columns
340            .insert(column, StructuredColumnType::List(config.clone()));
341        self.apply_inner(|inner| {
342            inner.set_column_operator(column as usize, list_operator(config.clone()))?;
343            inner.set_column_metadata(
344                column as usize,
345                serde_json::to_value(config).map_err(|err| {
346                    Error::FileFormatError(format!(
347                        "failed to encode list config metadata: {}",
348                        err
349                    ))
350                })?,
351            )?;
352            Ok(())
353        });
354        self
355    }
356
357    pub fn delete_column(&mut self, column: u16) -> &mut Self {
358        // Structured delete means dropping structured typing for this column (back to Bytes).
359        self.schema.columns.remove(&column);
360        self.apply_inner(|inner| {
361            inner.set_column_operator(column as usize, Arc::new(BytesMergeOperator))?;
362            inner.clear_column_metadata(column as usize)?;
363            Ok(())
364        });
365        self
366    }
367
368    pub fn current_schema(&self) -> &StructuredSchema {
369        &self.schema
370    }
371
372    pub fn commit(&mut self) -> Result<StructuredSchema> {
373        if let Some(err) = self.pending_error.take() {
374            return Err(err);
375        }
376        let inner = self
377            .inner
378            .take()
379            .ok_or_else(|| Error::InvalidState("schema builder already committed".to_string()))?;
380        inner.commit();
381        self.owner.reload_structured_schema_from_core()
382    }
383
384    fn apply_inner<F>(&mut self, f: F)
385    where
386        F: FnOnce(&mut SchemaBuilder) -> Result<()>,
387    {
388        if self.pending_error.is_some() {
389            return;
390        }
391        let Some(inner) = self.inner.as_mut() else {
392            self.pending_error = Some(Error::InvalidState(
393                "schema builder already committed".to_string(),
394            ));
395            return;
396        };
397        if let Err(err) = f(inner) {
398            self.pending_error = Some(err);
399        }
400    }
401}
402
403impl StructuredDb {
404    pub fn open(config: Config, bucket_ranges: Vec<RangeInclusive<u16>>) -> Result<Self> {
405        let db = Db::open(config, bucket_ranges)?;
406        let structured_schema = load_structured_schema_from_cobble_schema(&db.current_schema())?;
407        Ok(Self {
408            db,
409            structured_schema: Arc::new(structured_schema),
410        })
411    }
412
413    pub fn open_from_snapshot(
414        config: Config,
415        snapshot_id: u64,
416        db_id: impl Into<String>,
417    ) -> Result<Self> {
418        Self::open_from_snapshot_with_resolver(config, snapshot_id, db_id, None)
419    }
420
421    pub fn open_from_snapshot_with_resolver(
422        config: Config,
423        snapshot_id: u64,
424        db_id: impl Into<String>,
425        resolver: Option<Arc<dyn MergeOperatorResolver>>,
426    ) -> Result<Self> {
427        let db = Db::open_from_snapshot_with_resolver(
428            config,
429            snapshot_id,
430            db_id,
431            Some(combined_resolver(resolver)),
432        )?;
433        let structured_schema = load_structured_schema_from_cobble_schema(&db.current_schema())?;
434        Ok(Self {
435            db,
436            structured_schema: Arc::new(structured_schema),
437        })
438    }
439
440    pub fn resume(config: Config, db_id: impl Into<String>) -> Result<Self> {
441        Self::resume_with_resolver(config, db_id, None)
442    }
443
444    pub fn resume_with_resolver(
445        config: Config,
446        db_id: impl Into<String>,
447        resolver: Option<Arc<dyn MergeOperatorResolver>>,
448    ) -> Result<Self> {
449        let db = Db::resume_with_resolver(config, db_id, Some(combined_resolver(resolver)))?;
450        let structured_schema = load_structured_schema_from_cobble_schema(&db.current_schema())?;
451        Ok(Self {
452            db,
453            structured_schema: Arc::new(structured_schema),
454        })
455    }
456
457    pub fn id(&self) -> &str {
458        self.db.id()
459    }
460
461    pub fn current_schema(&self) -> StructuredSchema {
462        self.structured_schema.as_ref().clone()
463    }
464
465    pub fn update_schema(&mut self) -> StructuredSchemaBuilder<'_, Self> {
466        StructuredSchemaBuilder::new(self)
467    }
468
469    pub fn reload_schema(&mut self) -> Result<()> {
470        let schema = load_structured_schema_from_cobble_schema(&self.db.current_schema())?;
471        self.structured_schema = Arc::new(schema);
472        Ok(())
473    }
474
475    pub fn apply_schema(
476        &mut self,
477        structured_schema: StructuredSchema,
478    ) -> Result<StructuredSchema> {
479        persist_structured_schema(&self.db, &structured_schema)?;
480        let reloaded = load_structured_schema_from_cobble_schema(&self.db.current_schema())?;
481        self.structured_schema = Arc::new(reloaded.clone());
482        Ok(reloaded)
483    }
484
485    pub fn put<K, V>(&self, bucket: u16, key: K, column: u16, value: V) -> Result<()>
486    where
487        K: AsRef<[u8]>,
488        V: Into<StructuredColumnValue>,
489    {
490        self.put_with_options(bucket, key, column, value, &WriteOptions::default())
491    }
492
493    pub fn put_with_options<K, V>(
494        &self,
495        bucket: u16,
496        key: K,
497        column: u16,
498        value: V,
499        options: &WriteOptions,
500    ) -> Result<()>
501    where
502        K: AsRef<[u8]>,
503        V: Into<StructuredColumnValue>,
504    {
505        let encoded = encode_for_write(
506            &self.structured_schema,
507            self.db.now_seconds(),
508            column,
509            value.into(),
510            options.ttl_seconds,
511        )?;
512        self.db
513            .put_with_options(bucket, key, column, encoded, options)
514    }
515
516    pub fn merge<K, V>(&self, bucket: u16, key: K, column: u16, value: V) -> Result<()>
517    where
518        K: AsRef<[u8]>,
519        V: Into<StructuredColumnValue>,
520    {
521        self.merge_with_options(bucket, key, column, value, &WriteOptions::default())
522    }
523
524    pub fn merge_with_options<K, V>(
525        &self,
526        bucket: u16,
527        key: K,
528        column: u16,
529        value: V,
530        options: &WriteOptions,
531    ) -> Result<()>
532    where
533        K: AsRef<[u8]>,
534        V: Into<StructuredColumnValue>,
535    {
536        let encoded = encode_for_write(
537            &self.structured_schema,
538            self.db.now_seconds(),
539            column,
540            value.into(),
541            options.ttl_seconds,
542        )?;
543        self.db
544            .merge_with_options(bucket, key, column, encoded, options)
545    }
546
547    pub fn delete<K>(&self, bucket: u16, key: K, column: u16) -> Result<()>
548    where
549        K: AsRef<[u8]>,
550    {
551        self.db.delete(bucket, key, column)
552    }
553
554    pub fn new_write_batch(&self) -> StructuredWriteBatch {
555        StructuredWriteBatch::new(Arc::clone(&self.structured_schema), self.db.now_seconds())
556    }
557
558    pub fn write_batch(&self, batch: StructuredWriteBatch) -> Result<()> {
559        self.db.write_batch(batch.into_inner())
560    }
561
562    pub fn get<K>(&self, bucket: u16, key: K) -> Result<Option<Vec<Option<StructuredColumnValue>>>>
563    where
564        K: AsRef<[u8]>,
565    {
566        let raw = self.db.get(bucket, key.as_ref())?;
567        raw.map(|columns| decode_row(&self.structured_schema, 0, columns))
568            .transpose()
569    }
570
571    pub fn get_with_options<K>(
572        &self,
573        bucket: u16,
574        key: K,
575        options: &ReadOptions,
576    ) -> Result<Option<Vec<Option<StructuredColumnValue>>>>
577    where
578        K: AsRef<[u8]>,
579    {
580        let raw = if options.column_indices.is_some() {
581            self.db.get(bucket, key.as_ref())?
582        } else {
583            self.db.get_with_options(bucket, key.as_ref(), options)?
584        };
585        raw.map(|columns| decode_row(&self.structured_schema, 0, columns))
586            .transpose()
587            .map(|row| row.map(|decoded| project_decoded_row_for_read(decoded, options)))
588    }
589
590    pub fn scan<'a>(
591        &'a self,
592        bucket: u16,
593        range: Range<&[u8]>,
594    ) -> Result<StructuredDbIterator<'a>> {
595        self.scan_with_options(bucket, range, &ScanOptions::default())
596    }
597
598    pub fn scan_with_options<'a>(
599        &'a self,
600        bucket: u16,
601        range: Range<&[u8]>,
602        options: &ScanOptions,
603    ) -> Result<StructuredDbIterator<'a>> {
604        let inner = self.db.scan_with_options(bucket, range, options)?;
605        let projected_schema = project_structured_schema_for_scan(&self.structured_schema, options);
606        Ok(StructuredDbIterator::new(inner, projected_schema, 0))
607    }
608
609    pub fn snapshot(&self) -> Result<u64> {
610        self.db.snapshot()
611    }
612
613    pub fn snapshot_with_callback<F>(&self, callback: F) -> Result<u64>
614    where
615        F: Fn(Result<ShardSnapshotInput>) + Send + Sync + 'static,
616    {
617        self.db.snapshot_with_callback(callback)
618    }
619
620    pub fn expire_snapshot(&self, snapshot_id: u64) -> Result<bool> {
621        self.db.expire_snapshot(snapshot_id)
622    }
623
624    pub fn retain_snapshot(&self, snapshot_id: u64) -> bool {
625        self.db.retain_snapshot(snapshot_id)
626    }
627
628    pub fn shard_snapshot_input(&self, snapshot_id: u64) -> Result<ShardSnapshotInput> {
629        self.db.shard_snapshot_input(snapshot_id)
630    }
631
632    pub fn set_time(&self, next: u32) {
633        self.db.set_time(next);
634    }
635
636    pub fn now_seconds(&self) -> u32 {
637        self.db.now_seconds()
638    }
639
640    pub fn get_raw_with_options(
641        &self,
642        bucket: u16,
643        key: &[u8],
644        options: &ReadOptions,
645    ) -> Result<Option<Vec<Option<Bytes>>>> {
646        self.db.get_with_options(bucket, key, options)
647    }
648
649    pub fn scan_raw<'a>(
650        &'a self,
651        bucket: u16,
652        range: Range<&[u8]>,
653        options: &ScanOptions,
654    ) -> Result<DbIterator<'a>> {
655        self.db.scan_with_options(bucket, range, options)
656    }
657
658    pub fn close(&self) -> Result<()> {
659        self.db.close()
660    }
661}
662
663impl StructuredSchemaOwner for StructuredDb {
664    fn current_structured_schema(&self) -> StructuredSchema {
665        self.current_schema()
666    }
667
668    fn begin_core_schema_update(&self) -> SchemaBuilder {
669        self.db.update_schema()
670    }
671
672    fn reload_structured_schema_from_core(&mut self) -> Result<StructuredSchema> {
673        self.reload_schema()?;
674        Ok(self.current_schema())
675    }
676}
677
678/// Type alias for backward compatibility.
679pub type DataStructureDb = StructuredDb;
680
681// ── StructuredDbIterator ────────────────────────────────────────────────────
682
683pub struct StructuredDbIterator<'a> {
684    inner: DbIterator<'a>,
685    structured_schema: Arc<StructuredSchema>,
686    now_seconds: u32,
687}
688
689impl<'a> StructuredDbIterator<'a> {
690    pub(crate) fn new(
691        inner: DbIterator<'a>,
692        structured_schema: Arc<StructuredSchema>,
693        now_seconds: u32,
694    ) -> Self {
695        Self {
696            inner,
697            structured_schema,
698            now_seconds,
699        }
700    }
701}
702
703impl Iterator for StructuredDbIterator<'_> {
704    type Item = Result<(Bytes, Vec<Option<StructuredColumnValue>>)>;
705
706    fn next(&mut self) -> Option<Self::Item> {
707        self.inner.next().map(|item| {
708            let (key, columns) = item?;
709            let decoded = decode_row(&self.structured_schema, self.now_seconds, columns)?;
710            Ok((key, decoded))
711        })
712    }
713}
714
715// ── Internal helpers ────────────────────────────────────────────────────────
716
717pub(crate) fn persist_structured_schema_on_db(
718    db: &Db,
719    structured_schema: &StructuredSchema,
720) -> Result<()> {
721    let mut schema = db.update_schema();
722    apply_structured_schema(&mut schema, structured_schema)?;
723    schema.commit();
724    Ok(())
725}
726
727fn persist_structured_schema(db: &Db, structured_schema: &StructuredSchema) -> Result<()> {
728    persist_structured_schema_on_db(db, structured_schema)
729}
730
731fn apply_structured_schema(
732    schema: &mut SchemaBuilder,
733    structured_schema: &StructuredSchema,
734) -> Result<()> {
735    for (column, column_type) in &structured_schema.columns {
736        match column_type {
737            StructuredColumnType::Bytes => {}
738            StructuredColumnType::List(config) => {
739                schema.set_column_operator(*column as usize, list_operator(config.clone()))?;
740                schema.set_column_metadata(
741                    *column as usize,
742                    serde_json::to_value(config).map_err(|err| {
743                        Error::FileFormatError(format!(
744                            "failed to encode list config metadata: {}",
745                            err
746                        ))
747                    })?,
748                )?;
749            }
750        }
751    }
752    Ok(())
753}
754
755#[cfg(test)]
756mod tests {
757    use super::*;
758    use crate::list::{ListConfig, ListRetainMode};
759    use cobble::{ReadOptions, VolumeDescriptor};
760    use std::thread;
761    use std::time::Duration;
762    use uuid::Uuid;
763
764    #[test]
765    fn test_structured_db_resume_loads_structured_schema() {
766        let root = format!("/tmp/ds_structured_resume_{}", Uuid::new_v4());
767        let config = Config {
768            volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
769            snapshot_on_flush: true,
770            num_columns: 2,
771            ..Config::default()
772        };
773        let structured_schema = StructuredSchema {
774            columns: BTreeMap::from([(
775                1,
776                StructuredColumnType::List(ListConfig {
777                    max_elements: Some(2),
778                    retain_mode: ListRetainMode::Last,
779                    preserve_element_ttl: true,
780                }),
781            )]),
782        };
783        let mut db = StructuredDb::open(config.clone(), vec![0u16..=0u16]).unwrap();
784        db.apply_schema(structured_schema.clone()).unwrap();
785        db.merge(0, b"k", 1, vec![Bytes::from_static(b"a")])
786            .unwrap();
787        let _ = db.snapshot().unwrap();
788        thread::sleep(Duration::from_millis(200));
789        let db_id = db.id().to_string();
790        db.close().unwrap();
791
792        let resumed = StructuredDb::resume(config, db_id).unwrap();
793        assert_eq!(resumed.current_schema(), structured_schema);
794        resumed.close().unwrap();
795        let _ = std::fs::remove_dir_all(root);
796    }
797
798    #[test]
799    fn test_structured_db_get_and_scan_return_structured_values() {
800        let root = format!("/tmp/ds_structured_get_scan_{}", Uuid::new_v4());
801        let config = Config {
802            volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
803            num_columns: 2,
804            ..Config::default()
805        };
806        let structured_schema = StructuredSchema {
807            columns: BTreeMap::from([(
808                1,
809                StructuredColumnType::List(ListConfig {
810                    max_elements: Some(2),
811                    retain_mode: ListRetainMode::Last,
812                    preserve_element_ttl: false,
813                }),
814            )]),
815        };
816        let mut db = StructuredDb::open(config, vec![0u16..=0u16]).unwrap();
817        db.apply_schema(structured_schema).unwrap();
818        db.put(0, b"k1", 0, Bytes::from_static(b"v0")).unwrap();
819        db.merge(0, b"k1", 1, vec![Bytes::from_static(b"a")])
820            .unwrap();
821        db.merge(0, b"k1", 1, vec![Bytes::from_static(b"b")])
822            .unwrap();
823        db.merge(0, b"k1", 1, vec![Bytes::from_static(b"c")])
824            .unwrap();
825
826        let row = db.get(0, b"k1").unwrap().expect("row exists");
827        assert_eq!(
828            row[0],
829            Some(StructuredColumnValue::Bytes(Bytes::from_static(b"v0")))
830        );
831        assert_eq!(
832            row[1],
833            Some(StructuredColumnValue::List(vec![
834                Bytes::from_static(b"b"),
835                Bytes::from_static(b"c")
836            ]))
837        );
838
839        let mut iter = db.scan(0, b"k0".as_ref()..b"k9".as_ref()).unwrap();
840        let first = iter.next().expect("one row").unwrap();
841        assert_eq!(first.0.as_ref(), b"k1");
842        assert_eq!(first.1.len(), 2, "scan row should have 2 columns");
843        assert_eq!(
844            first.1[0],
845            Some(StructuredColumnValue::Bytes(Bytes::from_static(b"v0")))
846        );
847        assert_eq!(
848            first.1[1],
849            Some(StructuredColumnValue::List(vec![
850                Bytes::from_static(b"b"),
851                Bytes::from_static(b"c")
852            ]))
853        );
854        assert!(iter.next().is_none());
855
856        db.close().unwrap();
857        let _ = std::fs::remove_dir_all(root);
858    }
859
860    #[test]
861    fn test_structured_write_batch_round_trip() {
862        let root = format!("/tmp/ds_structured_write_batch_{}", Uuid::new_v4());
863        let config = Config {
864            volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
865            num_columns: 2,
866            ..Config::default()
867        };
868        let structured_schema = StructuredSchema {
869            columns: BTreeMap::from([(
870                1,
871                StructuredColumnType::List(ListConfig {
872                    max_elements: Some(3),
873                    retain_mode: ListRetainMode::Last,
874                    preserve_element_ttl: false,
875                }),
876            )]),
877        };
878        let mut db = StructuredDb::open(config, vec![0u16..=0u16]).unwrap();
879        db.apply_schema(structured_schema).unwrap();
880        let mut batch = db.new_write_batch();
881        batch.put(0, b"k1", 0, Bytes::from_static(b"v0")).unwrap();
882        batch
883            .merge(0, b"k1", 1, vec![Bytes::from_static(b"a")])
884            .unwrap();
885        batch
886            .merge(0, b"k1", 1, vec![Bytes::from_static(b"b")])
887            .unwrap();
888        batch
889            .merge(0, b"k1", 1, vec![Bytes::from_static(b"c")])
890            .unwrap();
891        batch.put(0, b"k2", 0, Bytes::from_static(b"v2")).unwrap();
892        db.write_batch(batch).unwrap();
893
894        let row = db.get(0, b"k1").unwrap().expect("row exists");
895        assert_eq!(
896            row[0],
897            Some(StructuredColumnValue::Bytes(Bytes::from_static(b"v0")))
898        );
899        assert_eq!(
900            row[1],
901            Some(StructuredColumnValue::List(vec![
902                Bytes::from_static(b"a"),
903                Bytes::from_static(b"b"),
904                Bytes::from_static(b"c")
905            ]))
906        );
907        let mut iter = db.scan(0, b"k0".as_ref()..b"k9".as_ref()).unwrap();
908        let first = iter.next().expect("first row").unwrap();
909        assert_eq!(first.0.as_ref(), b"k1");
910        let second = iter.next().expect("second row").unwrap();
911        assert_eq!(second.0.as_ref(), b"k2");
912        assert!(iter.next().is_none());
913
914        db.close().unwrap();
915        let _ = std::fs::remove_dir_all(root);
916    }
917
918    #[test]
919    fn test_structured_write_batch_rejects_type_mismatch() {
920        let root = format!("/tmp/ds_structured_write_batch_mismatch_{}", Uuid::new_v4());
921        let config = Config {
922            volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
923            num_columns: 2,
924            ..Config::default()
925        };
926        let structured_schema = StructuredSchema {
927            columns: BTreeMap::from([(
928                1,
929                StructuredColumnType::List(ListConfig {
930                    max_elements: None,
931                    retain_mode: ListRetainMode::Last,
932                    preserve_element_ttl: false,
933                }),
934            )]),
935        };
936        let mut db = StructuredDb::open(config, vec![0u16..=0u16]).unwrap();
937        db.apply_schema(structured_schema).unwrap();
938        let mut batch = db.new_write_batch();
939        let err = batch
940            .put(0, b"k1", 1, Bytes::from_static(b"not-a-list"))
941            .expect_err("type mismatch should fail");
942        match err {
943            Error::InputError(msg) => assert!(msg.contains("column 1 expects")),
944            other => panic!("unexpected error: {other:?}"),
945        }
946        db.close().unwrap();
947        let _ = std::fs::remove_dir_all(root);
948    }
949
950    #[test]
951    fn test_structured_scan_with_projection_reindexes_schema() {
952        let root = format!("/tmp/ds_structured_scan_projection_{}", Uuid::new_v4());
953        let config = Config {
954            volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
955            num_columns: 2,
956            ..Config::default()
957        };
958        let structured_schema = StructuredSchema {
959            columns: BTreeMap::from([(
960                1,
961                StructuredColumnType::List(ListConfig {
962                    max_elements: Some(8),
963                    retain_mode: ListRetainMode::Last,
964                    preserve_element_ttl: false,
965                }),
966            )]),
967        };
968        let mut db = StructuredDb::open(config, vec![0u16..=0u16]).unwrap();
969        db.apply_schema(structured_schema).unwrap();
970        db.put(0, b"k1", 0, Bytes::from_static(b"v0")).unwrap();
971        db.merge(0, b"k1", 1, vec![Bytes::from_static(b"a")])
972            .unwrap();
973        db.merge(0, b"k1", 1, vec![Bytes::from_static(b"b")])
974            .unwrap();
975
976        let mut iter = db
977            .scan_with_options(
978                0,
979                b"k0".as_ref()..b"k9".as_ref(),
980                &ScanOptions::for_column(1),
981            )
982            .unwrap();
983        let first = iter.next().expect("one row").unwrap();
984        assert_eq!(first.0.as_ref(), b"k1");
985        assert_eq!(first.1.len(), 1);
986        assert_eq!(
987            first.1[0],
988            Some(StructuredColumnValue::List(vec![
989                Bytes::from_static(b"a"),
990                Bytes::from_static(b"b"),
991            ]))
992        );
993
994        db.close().unwrap();
995        let _ = std::fs::remove_dir_all(root);
996    }
997
998    #[test]
999    fn test_structured_get_with_projection_reindexes_schema() {
1000        let root = format!("/tmp/ds_structured_get_projection_{}", Uuid::new_v4());
1001        let config = Config {
1002            volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
1003            num_columns: 2,
1004            ..Config::default()
1005        };
1006        let structured_schema = StructuredSchema {
1007            columns: BTreeMap::from([(
1008                1,
1009                StructuredColumnType::List(ListConfig {
1010                    max_elements: Some(8),
1011                    retain_mode: ListRetainMode::Last,
1012                    preserve_element_ttl: false,
1013                }),
1014            )]),
1015        };
1016        let mut db = StructuredDb::open(config, vec![0u16..=0u16]).unwrap();
1017        db.apply_schema(structured_schema).unwrap();
1018        db.put(0, b"k1", 0, Bytes::from_static(b"v0")).unwrap();
1019        db.merge(0, b"k1", 1, vec![Bytes::from_static(b"a")])
1020            .unwrap();
1021        db.merge(0, b"k1", 1, vec![Bytes::from_static(b"b")])
1022            .unwrap();
1023
1024        let row = db
1025            .get_with_options(0, b"k1", &ReadOptions::for_column(1))
1026            .unwrap()
1027            .expect("row exists");
1028        assert_eq!(row.len(), 1);
1029        assert_eq!(
1030            row[0],
1031            Some(StructuredColumnValue::List(vec![
1032                Bytes::from_static(b"a"),
1033                Bytes::from_static(b"b"),
1034            ]))
1035        );
1036
1037        db.close().unwrap();
1038        let _ = std::fs::remove_dir_all(root);
1039    }
1040}