Skip to main content

cobble_data_structure/
structured_db.rs

1use crate::list::{
2    LIST_OPERATOR_ID, ListConfig, decode_list_for_read, encode_list_for_write, list_operator,
3    list_operator_from_metadata,
4};
5use bytes::Bytes;
6use cobble::{
7    BytesMergeOperator, Config, Db, DbIterator, Error, MergeOperatorResolver, ReadOptions, Result,
8    ScanOptions, Schema, SchemaBuilder, ShardSnapshotInput, WriteBatch, WriteOptions,
9};
10use serde::{Deserialize, Serialize};
11use serde_json::Value as JsonValue;
12use std::collections::BTreeMap;
13use std::ops::{Range, RangeInclusive};
14use std::sync::Arc;
15
16#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
17pub struct StructuredSchema {
18    pub columns: BTreeMap<u16, StructuredColumnType>,
19}
20
21#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
22#[serde(tag = "kind", rename_all = "snake_case")]
23pub enum StructuredColumnType {
24    #[default]
25    Bytes,
26    List(ListConfig),
27}
28
29#[derive(Clone, Debug, PartialEq, Eq)]
30pub enum StructuredColumnValue {
31    Bytes(Bytes),
32    List(Vec<Bytes>),
33}
34
35// ── Shared helpers (used by all Structured* wrappers) ───────────────────────
36
37pub(crate) fn column_type(schema: &StructuredSchema, column: u16) -> &StructuredColumnType {
38    schema
39        .columns
40        .get(&column)
41        .unwrap_or(&StructuredColumnType::Bytes)
42}
43
44pub(crate) fn encode_for_write(
45    schema: &StructuredSchema,
46    now_seconds: u32,
47    column: u16,
48    value: StructuredColumnValue,
49    ttl_seconds: Option<u32>,
50) -> Result<Bytes> {
51    match (column_type(schema, column), value) {
52        (StructuredColumnType::Bytes, StructuredColumnValue::Bytes(value)) => Ok(value),
53        (StructuredColumnType::List(config), StructuredColumnValue::List(elements)) => {
54            encode_list_for_write(elements, config, ttl_seconds, now_seconds)
55        }
56        (_, _) => Err(Error::InputError(format!(
57            "column {} expects a different type of value",
58            column,
59        ))),
60    }
61}
62
63pub(crate) fn decode_row(
64    schema: &StructuredSchema,
65    now_seconds: u32,
66    columns: Vec<Option<Bytes>>,
67) -> Result<Vec<Option<StructuredColumnValue>>> {
68    columns
69        .into_iter()
70        .enumerate()
71        .map(|(idx, column)| {
72            let Some(raw) = column else {
73                return Ok(None);
74            };
75            match column_type(schema, idx as u16) {
76                StructuredColumnType::Bytes => Ok(Some(StructuredColumnValue::Bytes(raw))),
77                StructuredColumnType::List(config) => Ok(Some(StructuredColumnValue::List(
78                    decode_list_for_read(&raw, config, now_seconds)?,
79                ))),
80            }
81        })
82        .collect()
83}
84
85pub(crate) fn project_structured_schema_for_indices(
86    schema: &StructuredSchema,
87    column_indices: Option<&[usize]>,
88) -> Arc<StructuredSchema> {
89    let Some(indices) = column_indices else {
90        return Arc::new(schema.clone());
91    };
92    let mut columns = BTreeMap::new();
93    for (projected_idx, original_idx) in indices.iter().enumerate() {
94        if let Some(column_type) = schema.columns.get(&(*original_idx as u16)) {
95            columns.insert(projected_idx as u16, column_type.clone());
96        }
97    }
98    Arc::new(StructuredSchema { columns })
99}
100
101pub(crate) fn project_structured_schema_for_scan(
102    schema: &StructuredSchema,
103    options: &ScanOptions,
104) -> Arc<StructuredSchema> {
105    project_structured_schema_for_indices(schema, options.column_indices.as_deref())
106}
107
108pub(crate) fn project_decoded_row_for_read(
109    row: Vec<Option<StructuredColumnValue>>,
110    options: &ReadOptions,
111) -> Vec<Option<StructuredColumnValue>> {
112    let Some(indices) = options.column_indices.as_deref() else {
113        return row;
114    };
115    indices
116        .iter()
117        .map(|&idx| row.get(idx).cloned().unwrap_or(None))
118        .collect()
119}
120
121pub(crate) fn load_structured_schema_from_cobble_schema(
122    schema: &Schema,
123) -> Result<StructuredSchema> {
124    let operator_ids = schema.all_operator_ids();
125    let mut columns = BTreeMap::new();
126    for column_idx in 0..schema.num_columns() {
127        let operator_id = operator_ids
128            .get(column_idx)
129            .map(|s| s.as_str())
130            .unwrap_or("");
131        if operator_id == LIST_OPERATOR_ID {
132            let metadata_value = schema.column_metadata_at(column_idx).ok_or_else(|| {
133                Error::FileFormatError(format!("list column {} missing metadata", column_idx))
134            })?;
135            let config =
136                serde_json::from_value::<ListConfig>(metadata_value.clone()).map_err(|err| {
137                    Error::FileFormatError(format!(
138                        "failed to decode list config at column {}: {}",
139                        column_idx, err
140                    ))
141                })?;
142            columns.insert(column_idx as u16, StructuredColumnType::List(config));
143        }
144    }
145    Ok(StructuredSchema { columns })
146}
147
148pub(crate) fn combined_resolver(
149    custom: Option<Arc<dyn MergeOperatorResolver>>,
150) -> Arc<dyn MergeOperatorResolver> {
151    Arc::new(move |id: &str, metadata: Option<&JsonValue>| {
152        if let Some(operator) = list_operator_from_metadata(id, metadata) {
153            return Some(operator);
154        }
155        custom
156            .as_ref()
157            .and_then(|resolver| resolver.resolve(id, metadata))
158    })
159}
160
161/// Returns a `MergeOperatorResolver` that can resolve all structured data type
162/// merge operators (e.g. list) from their metadata.
163pub fn structured_merge_operator_resolver() -> Arc<dyn MergeOperatorResolver> {
164    combined_resolver(None)
165}
166
167/// Returns the merge operator IDs that `structured_merge_operator_resolver` can resolve.
168pub fn structured_resolvable_operator_ids() -> Vec<String> {
169    vec![LIST_OPERATOR_ID.to_string()]
170}
171
172// ── StructuredWriteBatch ────────────────────────────────────────────────────
173
174/// Structured write batch wrapper.
175///
176/// Each operation is encoded and written into the inner `cobble::WriteBatch` immediately, so we
177/// avoid a second typed-op staging buffer and an extra conversion pass at flush time.
178pub struct StructuredWriteBatch {
179    structured_schema: Arc<StructuredSchema>,
180    now_seconds: u32,
181    inner: WriteBatch,
182}
183
184impl StructuredWriteBatch {
185    pub(crate) fn new(structured_schema: Arc<StructuredSchema>, now_seconds: u32) -> Self {
186        Self {
187            structured_schema,
188            now_seconds,
189            inner: WriteBatch::new(),
190        }
191    }
192
193    pub fn put<K, V>(&mut self, bucket: u16, key: K, column: u16, value: V) -> Result<()>
194    where
195        K: AsRef<[u8]>,
196        V: Into<StructuredColumnValue>,
197    {
198        self.put_with_options(bucket, key, column, value, &WriteOptions::default())
199    }
200
201    pub fn put_with_options<K, V>(
202        &mut self,
203        bucket: u16,
204        key: K,
205        column: u16,
206        value: V,
207        options: &WriteOptions,
208    ) -> Result<()>
209    where
210        K: AsRef<[u8]>,
211        V: Into<StructuredColumnValue>,
212    {
213        let encoded = encode_for_write(
214            &self.structured_schema,
215            self.now_seconds,
216            column,
217            value.into(),
218            options.ttl_seconds,
219        )?;
220        self.inner
221            .put_with_options(bucket, key, column, encoded, options);
222        Ok(())
223    }
224
225    pub fn delete<K>(&mut self, bucket: u16, key: K, column: u16)
226    where
227        K: AsRef<[u8]>,
228    {
229        self.inner.delete(bucket, key, column);
230    }
231
232    pub fn merge<K, V>(&mut self, bucket: u16, key: K, column: u16, value: V) -> Result<()>
233    where
234        K: AsRef<[u8]>,
235        V: Into<StructuredColumnValue>,
236    {
237        self.merge_with_options(bucket, key, column, value, &WriteOptions::default())
238    }
239
240    pub fn merge_with_options<K, V>(
241        &mut self,
242        bucket: u16,
243        key: K,
244        column: u16,
245        value: V,
246        options: &WriteOptions,
247    ) -> Result<()>
248    where
249        K: AsRef<[u8]>,
250        V: Into<StructuredColumnValue>,
251    {
252        let encoded = encode_for_write(
253            &self.structured_schema,
254            self.now_seconds,
255            column,
256            value.into(),
257            options.ttl_seconds,
258        )?;
259        self.inner
260            .merge_with_options(bucket, key, column, encoded, options);
261        Ok(())
262    }
263
264    pub(crate) fn into_inner(self) -> WriteBatch {
265        self.inner
266    }
267}
268
269// ── StructuredColumnValue conversions ───────────────────────────────────────
270
271impl From<Bytes> for StructuredColumnValue {
272    fn from(value: Bytes) -> Self {
273        Self::Bytes(value)
274    }
275}
276
277impl From<Vec<u8>> for StructuredColumnValue {
278    fn from(value: Vec<u8>) -> Self {
279        Self::Bytes(Bytes::from(value))
280    }
281}
282
283impl From<Vec<Bytes>> for StructuredColumnValue {
284    fn from(value: Vec<Bytes>) -> Self {
285        Self::List(value)
286    }
287}
288
289impl From<Vec<Vec<u8>>> for StructuredColumnValue {
290    fn from(value: Vec<Vec<u8>>) -> Self {
291        Self::List(value.into_iter().map(Bytes::from).collect())
292    }
293}
294
295// ── StructuredDb (formerly DataStructureDb) ─────────────────────────────────
296
297pub struct StructuredDb {
298    db: Db,
299    structured_schema: Arc<StructuredSchema>,
300}
301
302pub trait StructuredSchemaOwner {
303    fn current_structured_schema(&self) -> StructuredSchema;
304    fn begin_core_schema_update(&self) -> SchemaBuilder;
305    fn reload_structured_schema_from_core(&mut self) -> Result<StructuredSchema>;
306}
307
308pub struct StructuredSchemaBuilder<'a, O: StructuredSchemaOwner> {
309    owner: &'a mut O,
310    schema: StructuredSchema,
311    inner: Option<SchemaBuilder>,
312    pending_error: Option<Error>,
313}
314
315impl<'a, O: StructuredSchemaOwner> StructuredSchemaBuilder<'a, O> {
316    pub fn new(owner: &'a mut O) -> Self {
317        let schema = owner.current_structured_schema();
318        let inner = owner.begin_core_schema_update();
319        Self {
320            owner,
321            schema,
322            inner: Some(inner),
323            pending_error: None,
324        }
325    }
326
327    pub fn add_bytes_column(&mut self, column: u16) -> &mut Self {
328        self.schema.columns.remove(&column);
329        self.apply_inner(|inner| {
330            inner.set_column_operator(column as usize, Arc::new(BytesMergeOperator))?;
331            inner.clear_column_metadata(column as usize)?;
332            Ok(())
333        });
334        self
335    }
336
337    pub fn add_list_column(&mut self, column: u16, config: ListConfig) -> &mut Self {
338        self.schema
339            .columns
340            .insert(column, StructuredColumnType::List(config.clone()));
341        self.apply_inner(|inner| {
342            inner.set_column_operator(column as usize, list_operator(config.clone()))?;
343            inner.set_column_metadata(
344                column as usize,
345                serde_json::to_value(config).map_err(|err| {
346                    Error::FileFormatError(format!(
347                        "failed to encode list config metadata: {}",
348                        err
349                    ))
350                })?,
351            )?;
352            Ok(())
353        });
354        self
355    }
356
357    pub fn delete_column(&mut self, column: u16) -> &mut Self {
358        // Structured delete means dropping structured typing for this column (back to Bytes).
359        self.schema.columns.remove(&column);
360        self.apply_inner(|inner| {
361            inner.set_column_operator(column as usize, Arc::new(BytesMergeOperator))?;
362            inner.clear_column_metadata(column as usize)?;
363            Ok(())
364        });
365        self
366    }
367
368    pub fn current_schema(&self) -> &StructuredSchema {
369        &self.schema
370    }
371
372    pub fn commit(&mut self) -> Result<StructuredSchema> {
373        if let Some(err) = self.pending_error.take() {
374            return Err(err);
375        }
376        let inner = self
377            .inner
378            .take()
379            .ok_or_else(|| Error::InvalidState("schema builder already committed".to_string()))?;
380        inner.commit();
381        self.owner.reload_structured_schema_from_core()
382    }
383
384    fn apply_inner<F>(&mut self, f: F)
385    where
386        F: FnOnce(&mut SchemaBuilder) -> Result<()>,
387    {
388        if self.pending_error.is_some() {
389            return;
390        }
391        let Some(inner) = self.inner.as_mut() else {
392            self.pending_error = Some(Error::InvalidState(
393                "schema builder already committed".to_string(),
394            ));
395            return;
396        };
397        if let Err(err) = f(inner) {
398            self.pending_error = Some(err);
399        }
400    }
401}
402
403impl StructuredDb {
404    pub fn open(config: Config, bucket_ranges: Vec<RangeInclusive<u16>>) -> Result<Self> {
405        let db = Db::open(config, bucket_ranges)?;
406        let structured_schema = load_structured_schema_from_cobble_schema(&db.current_schema())?;
407        Ok(Self {
408            db,
409            structured_schema: Arc::new(structured_schema),
410        })
411    }
412
413    pub fn open_from_snapshot(config: Config, snapshot_id: u64, db_id: String) -> Result<Self> {
414        Self::open_from_snapshot_with_resolver(config, snapshot_id, db_id, None)
415    }
416
417    pub fn open_from_snapshot_with_resolver(
418        config: Config,
419        snapshot_id: u64,
420        db_id: String,
421        resolver: Option<Arc<dyn MergeOperatorResolver>>,
422    ) -> Result<Self> {
423        let db = Db::open_from_snapshot_with_resolver(
424            config,
425            snapshot_id,
426            db_id,
427            Some(combined_resolver(resolver)),
428        )?;
429        let structured_schema = load_structured_schema_from_cobble_schema(&db.current_schema())?;
430        Ok(Self {
431            db,
432            structured_schema: Arc::new(structured_schema),
433        })
434    }
435
436    pub fn resume(config: Config, db_id: String) -> Result<Self> {
437        Self::resume_with_resolver(config, db_id, None)
438    }
439
440    pub fn resume_with_resolver(
441        config: Config,
442        db_id: String,
443        resolver: Option<Arc<dyn MergeOperatorResolver>>,
444    ) -> Result<Self> {
445        let db = Db::resume_with_resolver(config, db_id, Some(combined_resolver(resolver)))?;
446        let structured_schema = load_structured_schema_from_cobble_schema(&db.current_schema())?;
447        Ok(Self {
448            db,
449            structured_schema: Arc::new(structured_schema),
450        })
451    }
452
453    pub fn id(&self) -> &str {
454        self.db.id()
455    }
456
457    pub fn current_schema(&self) -> StructuredSchema {
458        self.structured_schema.as_ref().clone()
459    }
460
461    pub fn update_schema(&mut self) -> StructuredSchemaBuilder<'_, Self> {
462        StructuredSchemaBuilder::new(self)
463    }
464
465    pub fn reload_schema(&mut self) -> Result<()> {
466        let schema = load_structured_schema_from_cobble_schema(&self.db.current_schema())?;
467        self.structured_schema = Arc::new(schema);
468        Ok(())
469    }
470
471    pub fn apply_schema(
472        &mut self,
473        structured_schema: StructuredSchema,
474    ) -> Result<StructuredSchema> {
475        persist_structured_schema(&self.db, &structured_schema)?;
476        let reloaded = load_structured_schema_from_cobble_schema(&self.db.current_schema())?;
477        self.structured_schema = Arc::new(reloaded.clone());
478        Ok(reloaded)
479    }
480
481    pub fn put<K, V>(&self, bucket: u16, key: K, column: u16, value: V) -> Result<()>
482    where
483        K: AsRef<[u8]>,
484        V: Into<StructuredColumnValue>,
485    {
486        self.put_with_options(bucket, key, column, value, &WriteOptions::default())
487    }
488
489    pub fn put_with_options<K, V>(
490        &self,
491        bucket: u16,
492        key: K,
493        column: u16,
494        value: V,
495        options: &WriteOptions,
496    ) -> Result<()>
497    where
498        K: AsRef<[u8]>,
499        V: Into<StructuredColumnValue>,
500    {
501        let encoded = encode_for_write(
502            &self.structured_schema,
503            self.db.now_seconds(),
504            column,
505            value.into(),
506            options.ttl_seconds,
507        )?;
508        self.db
509            .put_with_options(bucket, key, column, encoded, options)
510    }
511
512    pub fn merge<K, V>(&self, bucket: u16, key: K, column: u16, value: V) -> Result<()>
513    where
514        K: AsRef<[u8]>,
515        V: Into<StructuredColumnValue>,
516    {
517        self.merge_with_options(bucket, key, column, value, &WriteOptions::default())
518    }
519
520    pub fn merge_with_options<K, V>(
521        &self,
522        bucket: u16,
523        key: K,
524        column: u16,
525        value: V,
526        options: &WriteOptions,
527    ) -> Result<()>
528    where
529        K: AsRef<[u8]>,
530        V: Into<StructuredColumnValue>,
531    {
532        let encoded = encode_for_write(
533            &self.structured_schema,
534            self.db.now_seconds(),
535            column,
536            value.into(),
537            options.ttl_seconds,
538        )?;
539        self.db
540            .merge_with_options(bucket, key, column, encoded, options)
541    }
542
543    pub fn delete<K>(&self, bucket: u16, key: K, column: u16) -> Result<()>
544    where
545        K: AsRef<[u8]>,
546    {
547        self.db.delete(bucket, key, column)
548    }
549
550    pub fn new_write_batch(&self) -> StructuredWriteBatch {
551        StructuredWriteBatch::new(Arc::clone(&self.structured_schema), self.db.now_seconds())
552    }
553
554    pub fn write_batch(&self, batch: StructuredWriteBatch) -> Result<()> {
555        self.db.write_batch(batch.into_inner())
556    }
557
558    pub fn get<K>(&self, bucket: u16, key: K) -> Result<Option<Vec<Option<StructuredColumnValue>>>>
559    where
560        K: AsRef<[u8]>,
561    {
562        let raw = self.db.get(bucket, key.as_ref())?;
563        raw.map(|columns| decode_row(&self.structured_schema, 0, columns))
564            .transpose()
565    }
566
567    pub fn get_with_options<K>(
568        &self,
569        bucket: u16,
570        key: K,
571        options: &ReadOptions,
572    ) -> Result<Option<Vec<Option<StructuredColumnValue>>>>
573    where
574        K: AsRef<[u8]>,
575    {
576        let raw = if options.column_indices.is_some() {
577            self.db.get(bucket, key.as_ref())?
578        } else {
579            self.db.get_with_options(bucket, key.as_ref(), options)?
580        };
581        raw.map(|columns| decode_row(&self.structured_schema, 0, columns))
582            .transpose()
583            .map(|row| row.map(|decoded| project_decoded_row_for_read(decoded, options)))
584    }
585
586    pub fn scan<'a>(
587        &'a self,
588        bucket: u16,
589        range: Range<&[u8]>,
590    ) -> Result<StructuredDbIterator<'a>> {
591        self.scan_with_options(bucket, range, &ScanOptions::default())
592    }
593
594    pub fn scan_with_options<'a>(
595        &'a self,
596        bucket: u16,
597        range: Range<&[u8]>,
598        options: &ScanOptions,
599    ) -> Result<StructuredDbIterator<'a>> {
600        let inner = self.db.scan_with_options(bucket, range, options)?;
601        let projected_schema = project_structured_schema_for_scan(&self.structured_schema, options);
602        Ok(StructuredDbIterator::new(inner, projected_schema, 0))
603    }
604
605    pub fn snapshot(&self) -> Result<u64> {
606        self.db.snapshot()
607    }
608
609    pub fn snapshot_with_callback<F>(&self, callback: F) -> Result<u64>
610    where
611        F: Fn(Result<ShardSnapshotInput>) + Send + Sync + 'static,
612    {
613        self.db.snapshot_with_callback(callback)
614    }
615
616    pub fn expire_snapshot(&self, snapshot_id: u64) -> Result<bool> {
617        self.db.expire_snapshot(snapshot_id)
618    }
619
620    pub fn retain_snapshot(&self, snapshot_id: u64) -> bool {
621        self.db.retain_snapshot(snapshot_id)
622    }
623
624    pub fn shard_snapshot_input(&self, snapshot_id: u64) -> Result<ShardSnapshotInput> {
625        self.db.shard_snapshot_input(snapshot_id)
626    }
627
628    pub fn set_time(&self, next: u32) {
629        self.db.set_time(next);
630    }
631
632    pub fn now_seconds(&self) -> u32 {
633        self.db.now_seconds()
634    }
635
636    pub fn get_raw_with_options(
637        &self,
638        bucket: u16,
639        key: &[u8],
640        options: &ReadOptions,
641    ) -> Result<Option<Vec<Option<Bytes>>>> {
642        self.db.get_with_options(bucket, key, options)
643    }
644
645    pub fn scan_raw<'a>(
646        &'a self,
647        bucket: u16,
648        range: Range<&[u8]>,
649        options: &ScanOptions,
650    ) -> Result<DbIterator<'a>> {
651        self.db.scan_with_options(bucket, range, options)
652    }
653
654    pub fn close(&self) -> Result<()> {
655        self.db.close()
656    }
657}
658
659impl StructuredSchemaOwner for StructuredDb {
660    fn current_structured_schema(&self) -> StructuredSchema {
661        self.current_schema()
662    }
663
664    fn begin_core_schema_update(&self) -> SchemaBuilder {
665        self.db.update_schema()
666    }
667
668    fn reload_structured_schema_from_core(&mut self) -> Result<StructuredSchema> {
669        self.reload_schema()?;
670        Ok(self.current_schema())
671    }
672}
673
674/// Type alias for backward compatibility.
675pub type DataStructureDb = StructuredDb;
676
677// ── StructuredDbIterator ────────────────────────────────────────────────────
678
679pub struct StructuredDbIterator<'a> {
680    inner: DbIterator<'a>,
681    structured_schema: Arc<StructuredSchema>,
682    now_seconds: u32,
683}
684
685impl<'a> StructuredDbIterator<'a> {
686    pub(crate) fn new(
687        inner: DbIterator<'a>,
688        structured_schema: Arc<StructuredSchema>,
689        now_seconds: u32,
690    ) -> Self {
691        Self {
692            inner,
693            structured_schema,
694            now_seconds,
695        }
696    }
697}
698
699impl Iterator for StructuredDbIterator<'_> {
700    type Item = Result<(Bytes, Vec<Option<StructuredColumnValue>>)>;
701
702    fn next(&mut self) -> Option<Self::Item> {
703        self.inner.next().map(|item| {
704            let (key, columns) = item?;
705            let decoded = decode_row(&self.structured_schema, self.now_seconds, columns)?;
706            Ok((key, decoded))
707        })
708    }
709}
710
711// ── Internal helpers ────────────────────────────────────────────────────────
712
713pub(crate) fn persist_structured_schema_on_db(
714    db: &Db,
715    structured_schema: &StructuredSchema,
716) -> Result<()> {
717    let mut schema = db.update_schema();
718    apply_structured_schema(&mut schema, structured_schema)?;
719    schema.commit();
720    Ok(())
721}
722
723fn persist_structured_schema(db: &Db, structured_schema: &StructuredSchema) -> Result<()> {
724    persist_structured_schema_on_db(db, structured_schema)
725}
726
727fn apply_structured_schema(
728    schema: &mut SchemaBuilder,
729    structured_schema: &StructuredSchema,
730) -> Result<()> {
731    for (column, column_type) in &structured_schema.columns {
732        match column_type {
733            StructuredColumnType::Bytes => {}
734            StructuredColumnType::List(config) => {
735                schema.set_column_operator(*column as usize, list_operator(config.clone()))?;
736                schema.set_column_metadata(
737                    *column as usize,
738                    serde_json::to_value(config).map_err(|err| {
739                        Error::FileFormatError(format!(
740                            "failed to encode list config metadata: {}",
741                            err
742                        ))
743                    })?,
744                )?;
745            }
746        }
747    }
748    Ok(())
749}
750
751#[cfg(test)]
752mod tests {
753    use super::*;
754    use crate::list::{ListConfig, ListRetainMode};
755    use cobble::{ReadOptions, VolumeDescriptor};
756    use std::thread;
757    use std::time::Duration;
758    use uuid::Uuid;
759
760    #[test]
761    fn test_structured_db_resume_loads_structured_schema() {
762        let root = format!("/tmp/ds_structured_resume_{}", Uuid::new_v4());
763        let config = Config {
764            volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
765            snapshot_on_flush: true,
766            num_columns: 2,
767            ..Config::default()
768        };
769        let structured_schema = StructuredSchema {
770            columns: BTreeMap::from([(
771                1,
772                StructuredColumnType::List(ListConfig {
773                    max_elements: Some(2),
774                    retain_mode: ListRetainMode::Last,
775                    preserve_element_ttl: true,
776                }),
777            )]),
778        };
779        let mut db = StructuredDb::open(config.clone(), vec![0u16..=0u16]).unwrap();
780        db.apply_schema(structured_schema.clone()).unwrap();
781        db.merge(0, b"k", 1, vec![Bytes::from_static(b"a")])
782            .unwrap();
783        let _ = db.snapshot().unwrap();
784        thread::sleep(Duration::from_millis(200));
785        let db_id = db.id().to_string();
786        db.close().unwrap();
787
788        let resumed = StructuredDb::resume(config, db_id).unwrap();
789        assert_eq!(resumed.current_schema(), structured_schema);
790        resumed.close().unwrap();
791        let _ = std::fs::remove_dir_all(root);
792    }
793
794    #[test]
795    fn test_structured_db_get_and_scan_return_structured_values() {
796        let root = format!("/tmp/ds_structured_get_scan_{}", Uuid::new_v4());
797        let config = Config {
798            volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
799            num_columns: 2,
800            ..Config::default()
801        };
802        let structured_schema = StructuredSchema {
803            columns: BTreeMap::from([(
804                1,
805                StructuredColumnType::List(ListConfig {
806                    max_elements: Some(2),
807                    retain_mode: ListRetainMode::Last,
808                    preserve_element_ttl: false,
809                }),
810            )]),
811        };
812        let mut db = StructuredDb::open(config, vec![0u16..=0u16]).unwrap();
813        db.apply_schema(structured_schema).unwrap();
814        db.put(0, b"k1", 0, Bytes::from_static(b"v0")).unwrap();
815        db.merge(0, b"k1", 1, vec![Bytes::from_static(b"a")])
816            .unwrap();
817        db.merge(0, b"k1", 1, vec![Bytes::from_static(b"b")])
818            .unwrap();
819        db.merge(0, b"k1", 1, vec![Bytes::from_static(b"c")])
820            .unwrap();
821
822        let row = db.get(0, b"k1").unwrap().expect("row exists");
823        assert_eq!(
824            row[0],
825            Some(StructuredColumnValue::Bytes(Bytes::from_static(b"v0")))
826        );
827        assert_eq!(
828            row[1],
829            Some(StructuredColumnValue::List(vec![
830                Bytes::from_static(b"b"),
831                Bytes::from_static(b"c")
832            ]))
833        );
834
835        let mut iter = db.scan(0, b"k0".as_ref()..b"k9".as_ref()).unwrap();
836        let first = iter.next().expect("one row").unwrap();
837        assert_eq!(first.0.as_ref(), b"k1");
838        assert_eq!(first.1.len(), 2, "scan row should have 2 columns");
839        assert_eq!(
840            first.1[0],
841            Some(StructuredColumnValue::Bytes(Bytes::from_static(b"v0")))
842        );
843        assert_eq!(
844            first.1[1],
845            Some(StructuredColumnValue::List(vec![
846                Bytes::from_static(b"b"),
847                Bytes::from_static(b"c")
848            ]))
849        );
850        assert!(iter.next().is_none());
851
852        db.close().unwrap();
853        let _ = std::fs::remove_dir_all(root);
854    }
855
856    #[test]
857    fn test_structured_write_batch_round_trip() {
858        let root = format!("/tmp/ds_structured_write_batch_{}", Uuid::new_v4());
859        let config = Config {
860            volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
861            num_columns: 2,
862            ..Config::default()
863        };
864        let structured_schema = StructuredSchema {
865            columns: BTreeMap::from([(
866                1,
867                StructuredColumnType::List(ListConfig {
868                    max_elements: Some(3),
869                    retain_mode: ListRetainMode::Last,
870                    preserve_element_ttl: false,
871                }),
872            )]),
873        };
874        let mut db = StructuredDb::open(config, vec![0u16..=0u16]).unwrap();
875        db.apply_schema(structured_schema).unwrap();
876        let mut batch = db.new_write_batch();
877        batch.put(0, b"k1", 0, Bytes::from_static(b"v0")).unwrap();
878        batch
879            .merge(0, b"k1", 1, vec![Bytes::from_static(b"a")])
880            .unwrap();
881        batch
882            .merge(0, b"k1", 1, vec![Bytes::from_static(b"b")])
883            .unwrap();
884        batch
885            .merge(0, b"k1", 1, vec![Bytes::from_static(b"c")])
886            .unwrap();
887        batch.put(0, b"k2", 0, Bytes::from_static(b"v2")).unwrap();
888        db.write_batch(batch).unwrap();
889
890        let row = db.get(0, b"k1").unwrap().expect("row exists");
891        assert_eq!(
892            row[0],
893            Some(StructuredColumnValue::Bytes(Bytes::from_static(b"v0")))
894        );
895        assert_eq!(
896            row[1],
897            Some(StructuredColumnValue::List(vec![
898                Bytes::from_static(b"a"),
899                Bytes::from_static(b"b"),
900                Bytes::from_static(b"c")
901            ]))
902        );
903        let mut iter = db.scan(0, b"k0".as_ref()..b"k9".as_ref()).unwrap();
904        let first = iter.next().expect("first row").unwrap();
905        assert_eq!(first.0.as_ref(), b"k1");
906        let second = iter.next().expect("second row").unwrap();
907        assert_eq!(second.0.as_ref(), b"k2");
908        assert!(iter.next().is_none());
909
910        db.close().unwrap();
911        let _ = std::fs::remove_dir_all(root);
912    }
913
914    #[test]
915    fn test_structured_write_batch_rejects_type_mismatch() {
916        let root = format!("/tmp/ds_structured_write_batch_mismatch_{}", Uuid::new_v4());
917        let config = Config {
918            volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
919            num_columns: 2,
920            ..Config::default()
921        };
922        let structured_schema = StructuredSchema {
923            columns: BTreeMap::from([(
924                1,
925                StructuredColumnType::List(ListConfig {
926                    max_elements: None,
927                    retain_mode: ListRetainMode::Last,
928                    preserve_element_ttl: false,
929                }),
930            )]),
931        };
932        let mut db = StructuredDb::open(config, vec![0u16..=0u16]).unwrap();
933        db.apply_schema(structured_schema).unwrap();
934        let mut batch = db.new_write_batch();
935        let err = batch
936            .put(0, b"k1", 1, Bytes::from_static(b"not-a-list"))
937            .expect_err("type mismatch should fail");
938        match err {
939            Error::InputError(msg) => assert!(msg.contains("column 1 expects")),
940            other => panic!("unexpected error: {other:?}"),
941        }
942        db.close().unwrap();
943        let _ = std::fs::remove_dir_all(root);
944    }
945
946    #[test]
947    fn test_structured_scan_with_projection_reindexes_schema() {
948        let root = format!("/tmp/ds_structured_scan_projection_{}", Uuid::new_v4());
949        let config = Config {
950            volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
951            num_columns: 2,
952            ..Config::default()
953        };
954        let structured_schema = StructuredSchema {
955            columns: BTreeMap::from([(
956                1,
957                StructuredColumnType::List(ListConfig {
958                    max_elements: Some(8),
959                    retain_mode: ListRetainMode::Last,
960                    preserve_element_ttl: false,
961                }),
962            )]),
963        };
964        let mut db = StructuredDb::open(config, vec![0u16..=0u16]).unwrap();
965        db.apply_schema(structured_schema).unwrap();
966        db.put(0, b"k1", 0, Bytes::from_static(b"v0")).unwrap();
967        db.merge(0, b"k1", 1, vec![Bytes::from_static(b"a")])
968            .unwrap();
969        db.merge(0, b"k1", 1, vec![Bytes::from_static(b"b")])
970            .unwrap();
971
972        let mut iter = db
973            .scan_with_options(
974                0,
975                b"k0".as_ref()..b"k9".as_ref(),
976                &ScanOptions::for_column(1),
977            )
978            .unwrap();
979        let first = iter.next().expect("one row").unwrap();
980        assert_eq!(first.0.as_ref(), b"k1");
981        assert_eq!(first.1.len(), 1);
982        assert_eq!(
983            first.1[0],
984            Some(StructuredColumnValue::List(vec![
985                Bytes::from_static(b"a"),
986                Bytes::from_static(b"b"),
987            ]))
988        );
989
990        db.close().unwrap();
991        let _ = std::fs::remove_dir_all(root);
992    }
993
994    #[test]
995    fn test_structured_get_with_projection_reindexes_schema() {
996        let root = format!("/tmp/ds_structured_get_projection_{}", Uuid::new_v4());
997        let config = Config {
998            volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
999            num_columns: 2,
1000            ..Config::default()
1001        };
1002        let structured_schema = StructuredSchema {
1003            columns: BTreeMap::from([(
1004                1,
1005                StructuredColumnType::List(ListConfig {
1006                    max_elements: Some(8),
1007                    retain_mode: ListRetainMode::Last,
1008                    preserve_element_ttl: false,
1009                }),
1010            )]),
1011        };
1012        let mut db = StructuredDb::open(config, vec![0u16..=0u16]).unwrap();
1013        db.apply_schema(structured_schema).unwrap();
1014        db.put(0, b"k1", 0, Bytes::from_static(b"v0")).unwrap();
1015        db.merge(0, b"k1", 1, vec![Bytes::from_static(b"a")])
1016            .unwrap();
1017        db.merge(0, b"k1", 1, vec![Bytes::from_static(b"b")])
1018            .unwrap();
1019
1020        let row = db
1021            .get_with_options(0, b"k1", &ReadOptions::for_column(1))
1022            .unwrap()
1023            .expect("row exists");
1024        assert_eq!(row.len(), 1);
1025        assert_eq!(
1026            row[0],
1027            Some(StructuredColumnValue::List(vec![
1028                Bytes::from_static(b"a"),
1029                Bytes::from_static(b"b"),
1030            ]))
1031        );
1032
1033        db.close().unwrap();
1034        let _ = std::fs::remove_dir_all(root);
1035    }
1036}