Skip to main content

dbx_core/engine/
metadata.rs

1//! Metadata Persistence — Schema and Index metadata serialization and storage
2//!
3//! This module provides functionality to persist table schemas and index definitions
4//! to the sled backend, enabling automatic restoration on database reopen.
5
6use crate::error::{DbxError, DbxResult};
7use crate::storage::StorageBackend;
8use crate::storage::native_wos::NativeWosBackend;
9use arrow::datatypes::{DataType, Field, Schema};
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::sync::Arc;
13
14// ════════════════════════════════════════════
15// Metadata Structures
16// ════════════════════════════════════════════
17
18/// Serializable schema metadata for persistence
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct SchemaMetadata {
21    pub table_name: String,
22    pub fields: Vec<FieldMetadata>,
23    #[serde(default)]
24    pub metadata: HashMap<String, String>,
25}
26
27/// Serializable field metadata
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct FieldMetadata {
30    pub name: String,
31    pub data_type: String,
32    pub nullable: bool,
33}
34
35/// Serializable index metadata
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct IndexMetadata {
38    pub index_name: String,
39    pub table_name: String,
40    pub column_name: String,
41}
42
43// ════════════════════════════════════════════
44// Schema Conversion
45// ════════════════════════════════════════════
46
47impl From<&Schema> for SchemaMetadata {
48    fn from(schema: &Schema) -> Self {
49        let fields = schema
50            .fields()
51            .iter()
52            .map(|field| FieldMetadata {
53                name: field.name().clone(),
54                data_type: datatype_to_string(field.data_type()),
55                nullable: field.is_nullable(),
56            })
57            .collect();
58
59        SchemaMetadata {
60            table_name: String::new(), // Will be set by caller
61            fields,
62            metadata: schema.metadata().clone(),
63        }
64    }
65}
66
67impl TryFrom<SchemaMetadata> for Schema {
68    type Error = DbxError;
69
70    fn try_from(metadata: SchemaMetadata) -> Result<Self, Self::Error> {
71        let fields: Result<Vec<Field>, DbxError> = metadata
72            .fields
73            .iter()
74            .map(|field_meta| {
75                let data_type = string_to_datatype(&field_meta.data_type)?;
76                Ok(Field::new(&field_meta.name, data_type, field_meta.nullable))
77            })
78            .collect();
79
80        Ok(Schema::new(fields?).with_metadata(metadata.metadata))
81    }
82}
83
84// ════════════════════════════════════════════
85// DataType Conversion Helpers
86// ════════════════════════════════════════════
87
88/// Convert Arrow DataType to string representation
89fn datatype_to_string(data_type: &DataType) -> String {
90    match data_type {
91        DataType::Int8 => "Int8".to_string(),
92        DataType::Int16 => "Int16".to_string(),
93        DataType::Int32 => "Int32".to_string(),
94        DataType::Int64 => "Int64".to_string(),
95        DataType::UInt8 => "UInt8".to_string(),
96        DataType::UInt16 => "UInt16".to_string(),
97        DataType::UInt32 => "UInt32".to_string(),
98        DataType::UInt64 => "UInt64".to_string(),
99        DataType::Float32 => "Float32".to_string(),
100        DataType::Float64 => "Float64".to_string(),
101        DataType::Utf8 => "Utf8".to_string(),
102        DataType::Boolean => "Boolean".to_string(),
103        DataType::Binary => "Binary".to_string(),
104        DataType::Date32 => "Date32".to_string(),
105        DataType::Date64 => "Date64".to_string(),
106        DataType::Timestamp(unit, tz) => {
107            format!("Timestamp({:?}, {:?})", unit, tz)
108        }
109        _ => format!("{:?}", data_type), // Fallback for complex types
110    }
111}
112
113/// Convert string representation to Arrow DataType
114fn string_to_datatype(s: &str) -> DbxResult<DataType> {
115    match s {
116        "Int8" => Ok(DataType::Int8),
117        "Int16" => Ok(DataType::Int16),
118        "Int32" => Ok(DataType::Int32),
119        "Int64" => Ok(DataType::Int64),
120        "UInt8" => Ok(DataType::UInt8),
121        "UInt16" => Ok(DataType::UInt16),
122        "UInt32" => Ok(DataType::UInt32),
123        "UInt64" => Ok(DataType::UInt64),
124        "Float32" => Ok(DataType::Float32),
125        "Float64" => Ok(DataType::Float64),
126        "Utf8" => Ok(DataType::Utf8),
127        "Boolean" => Ok(DataType::Boolean),
128        "Binary" => Ok(DataType::Binary),
129        "Date32" => Ok(DataType::Date32),
130        "Date64" => Ok(DataType::Date64),
131        _ => Err(DbxError::Schema(format!("Unsupported data type: {}", s))),
132    }
133}
134
135// ════════════════════════════════════════════
136// Schema Persistence Functions
137// ════════════════════════════════════════════
138
139/// Save table schema to persistent storage
140pub fn save_schema(wos: &NativeWosBackend, table: &str, schema: &Schema) -> DbxResult<()> {
141    let mut metadata = SchemaMetadata::from(schema);
142    metadata.table_name = table.to_string();
143
144    let json_bytes =
145        serde_json::to_vec(&metadata).map_err(|e| DbxError::Serialization(e.to_string()))?;
146
147    wos.insert("__meta__/schemas", table.as_bytes(), &json_bytes)?;
148    Ok(())
149}
150
151/// Load table schema from persistent storage
152pub fn load_schema(wos: &NativeWosBackend, table: &str) -> DbxResult<Option<Arc<Schema>>> {
153    match wos.get("__meta__/schemas", table.as_bytes())? {
154        Some(json_bytes) => {
155            let metadata: SchemaMetadata = serde_json::from_slice(&json_bytes)
156                .map_err(|e| DbxError::Serialization(e.to_string()))?;
157            let schema = Schema::try_from(metadata)?;
158            Ok(Some(Arc::new(schema)))
159        }
160        None => Ok(None),
161    }
162}
163
164/// Delete table schema from persistent storage
165pub fn delete_schema(wos: &NativeWosBackend, table: &str) -> DbxResult<()> {
166    wos.delete("__meta__/schemas", table.as_bytes())?;
167    Ok(())
168}
169
170/// Load all schemas from persistent storage
171pub fn load_all_schemas(wos: &NativeWosBackend) -> DbxResult<HashMap<String, Arc<Schema>>> {
172    let mut schemas = HashMap::new();
173    let all_records = wos.scan("__meta__/schemas", ..)?;
174
175    for (key_vec, value_vec) in all_records {
176        let table_name =
177            String::from_utf8(key_vec).map_err(|e| DbxError::Serialization(e.to_string()))?;
178        let metadata: SchemaMetadata = serde_json::from_slice(&value_vec)
179            .map_err(|e| DbxError::Serialization(e.to_string()))?;
180        let schema = Schema::try_from(metadata)?;
181        schemas.insert(table_name, Arc::new(schema));
182    }
183
184    Ok(schemas)
185}
186
187// ════════════════════════════════════════════
188// Index Persistence Functions
189// ════════════════════════════════════════════
190
191/// Save index metadata to persistent storage
192pub fn save_index(
193    wos: &NativeWosBackend,
194    index_name: &str,
195    table: &str,
196    column: &str,
197) -> DbxResult<()> {
198    let metadata = IndexMetadata {
199        index_name: index_name.to_string(),
200        table_name: table.to_string(),
201        column_name: column.to_string(),
202    };
203
204    let json_bytes =
205        serde_json::to_vec(&metadata).map_err(|e| DbxError::Serialization(e.to_string()))?;
206
207    wos.insert("__meta__/indexes", index_name.as_bytes(), &json_bytes)?;
208    Ok(())
209}
210
211/// Delete index metadata from persistent storage
212pub fn delete_index(wos: &NativeWosBackend, index_name: &str) -> DbxResult<()> {
213    wos.delete("__meta__/indexes", index_name.as_bytes())?;
214    Ok(())
215}
216
217/// Load all index metadata from persistent storage
218pub fn load_all_indexes(wos: &NativeWosBackend) -> DbxResult<HashMap<String, (String, String)>> {
219    let mut indexes = HashMap::new();
220    let all_records = wos.scan("__meta__/indexes", ..)?;
221
222    for (key_vec, value_vec) in all_records {
223        let index_name =
224            String::from_utf8(key_vec).map_err(|e| DbxError::Serialization(e.to_string()))?;
225        let metadata: IndexMetadata = serde_json::from_slice(&value_vec)
226            .map_err(|e| DbxError::Serialization(e.to_string()))?;
227        indexes.insert(index_name, (metadata.table_name, metadata.column_name));
228    }
229
230    Ok(indexes)
231}
232
233// ════════════════════════════════════════════
234// Trigger Metadata Persistence Functions
235// ════════════════════════════════════════════
236
237/// Save trigger metadata to persistent storage
238pub fn save_trigger(wos: &NativeWosBackend, trigger: &crate::automation::Trigger) -> DbxResult<()> {
239    let json = trigger.to_json()?;
240    wos.insert(
241        "__meta__/triggers",
242        trigger.name.as_bytes(),
243        json.as_bytes(),
244    )?;
245    Ok(())
246}
247
248/// Load trigger metadata from persistent storage
249pub fn load_trigger(
250    wos: &NativeWosBackend,
251    name: &str,
252) -> DbxResult<Option<crate::automation::Trigger>> {
253    match wos.get("__meta__/triggers", name.as_bytes())? {
254        Some(json_bytes) => {
255            let json = String::from_utf8(json_bytes)
256                .map_err(|e| DbxError::Serialization(e.to_string()))?;
257            let trigger = crate::automation::Trigger::from_json(&json)?;
258            Ok(Some(trigger))
259        }
260        None => Ok(None),
261    }
262}
263
264/// Delete trigger metadata from persistent storage
265pub fn delete_trigger(wos: &NativeWosBackend, name: &str) -> DbxResult<()> {
266    wos.delete("__meta__/triggers", name.as_bytes())?;
267    Ok(())
268}
269
270/// Load all trigger metadata from persistent storage
271pub fn load_all_triggers(wos: &NativeWosBackend) -> DbxResult<Vec<crate::automation::Trigger>> {
272    let mut triggers = Vec::new();
273    let all_records = wos.scan("__meta__/triggers", ..)?;
274
275    for (_key_vec, value_vec) in all_records {
276        let json =
277            String::from_utf8(value_vec).map_err(|e| DbxError::Serialization(e.to_string()))?;
278        let trigger = crate::automation::Trigger::from_json(&json)?;
279        triggers.push(trigger);
280    }
281
282    Ok(triggers)
283}
284
285// ════════════════════════════════════════════
286// Stored Procedure Metadata Persistence Functions
287// ════════════════════════════════════════════
288
289/// Save stored procedure metadata to persistent storage
290pub fn save_procedure(
291    wos: &NativeWosBackend,
292    procedure: &crate::automation::StoredProcedure,
293) -> DbxResult<()> {
294    let json = procedure.to_json()?;
295    wos.insert(
296        "__meta__/procedures",
297        procedure.name.as_bytes(),
298        json.as_bytes(),
299    )?;
300    Ok(())
301}
302
303/// Load stored procedure metadata from persistent storage
304pub fn load_procedure(
305    wos: &NativeWosBackend,
306    name: &str,
307) -> DbxResult<Option<crate::automation::StoredProcedure>> {
308    match wos.get("__meta__/procedures", name.as_bytes())? {
309        Some(json_bytes) => {
310            let json = String::from_utf8(json_bytes)
311                .map_err(|e| DbxError::Serialization(e.to_string()))?;
312            let procedure = crate::automation::StoredProcedure::from_json(&json)?;
313            Ok(Some(procedure))
314        }
315        None => Ok(None),
316    }
317}
318
319/// Delete stored procedure metadata from persistent storage
320pub fn delete_procedure(wos: &NativeWosBackend, name: &str) -> DbxResult<()> {
321    wos.delete("__meta__/procedures", name.as_bytes())?;
322    Ok(())
323}
324
325/// Load all stored procedure metadata from persistent storage
326pub fn load_all_procedures(
327    wos: &NativeWosBackend,
328) -> DbxResult<Vec<crate::automation::StoredProcedure>> {
329    let mut procedures = Vec::new();
330    let all_records = wos.scan("__meta__/procedures", ..)?;
331
332    for (_key_vec, value_vec) in all_records {
333        let json =
334            String::from_utf8(value_vec).map_err(|e| DbxError::Serialization(e.to_string()))?;
335        let procedure = crate::automation::StoredProcedure::from_json(&json)?;
336        procedures.push(procedure);
337    }
338
339    Ok(procedures)
340}
341
342// ════════════════════════════════════════════
343// UDF Metadata Persistence Functions
344// ════════════════════════════════════════════
345
346/// Save UDF metadata to persistent storage
347pub fn save_udf(wos: &NativeWosBackend, udf: &crate::automation::UdfMetadata) -> DbxResult<()> {
348    let json = udf.to_json()?;
349    wos.insert("__meta__/udfs", udf.name.as_bytes(), json.as_bytes())?;
350    Ok(())
351}
352
353/// Load UDF metadata from persistent storage
354pub fn load_udf(
355    wos: &NativeWosBackend,
356    name: &str,
357) -> DbxResult<Option<crate::automation::UdfMetadata>> {
358    match wos.get("__meta__/udfs", name.as_bytes())? {
359        Some(json_bytes) => {
360            let json = String::from_utf8(json_bytes)
361                .map_err(|e| DbxError::Serialization(e.to_string()))?;
362            let udf = crate::automation::UdfMetadata::from_json(&json)?;
363            Ok(Some(udf))
364        }
365        None => Ok(None),
366    }
367}
368
369/// Delete UDF metadata from persistent storage
370pub fn delete_udf(wos: &NativeWosBackend, name: &str) -> DbxResult<()> {
371    wos.delete("__meta__/udfs", name.as_bytes())?;
372    Ok(())
373}
374
375/// Load all UDF metadata from persistent storage
376pub fn load_all_udfs(wos: &NativeWosBackend) -> DbxResult<Vec<crate::automation::UdfMetadata>> {
377    let mut udfs = Vec::new();
378    let all_records = wos.scan("__meta__/udfs", ..)?;
379
380    for (_key_vec, value_vec) in all_records {
381        let json =
382            String::from_utf8(value_vec).map_err(|e| DbxError::Serialization(e.to_string()))?;
383        let udf = crate::automation::UdfMetadata::from_json(&json)?;
384        udfs.push(udf);
385    }
386
387    Ok(udfs)
388}
389
390// ════════════════════════════════════════════
391// Schedule Metadata Persistence
392// ════════════════════════════════════════════
393
394/// Save schedule metadata
395pub fn save_schedule(
396    wos: &NativeWosBackend,
397    schedule: &crate::automation::Schedule,
398) -> DbxResult<()> {
399    let json = schedule.to_json()?;
400    wos.insert(
401        "__meta__/schedules",
402        schedule.name.as_bytes(),
403        json.as_bytes(),
404    )?;
405    Ok(())
406}
407
408/// Load schedule metadata by name
409pub fn load_schedule(
410    wos: &NativeWosBackend,
411    name: &str,
412) -> DbxResult<Option<crate::automation::Schedule>> {
413    match wos.get("__meta__/schedules", name.as_bytes())? {
414        Some(bytes) => {
415            let json = String::from_utf8(bytes.to_vec()).map_err(|e| {
416                DbxError::Serialization(format!("Failed to decode schedule JSON: {}", e))
417            })?;
418            let schedule = crate::automation::Schedule::from_json(&json)?;
419            Ok(Some(schedule))
420        }
421        None => Ok(None),
422    }
423}
424
425/// Delete schedule metadata
426pub fn delete_schedule(wos: &NativeWosBackend, name: &str) -> DbxResult<()> {
427    wos.delete("__meta__/schedules", name.as_bytes())?;
428    Ok(())
429}
430
431/// Load all schedules
432pub fn load_all_schedules(
433    wos: &NativeWosBackend,
434) -> DbxResult<HashMap<String, crate::automation::Schedule>> {
435    let mut schedules = HashMap::new();
436    let all_records = wos.scan("__meta__/schedules", ..)?;
437
438    for (key_vec, value_vec) in all_records {
439        let name =
440            String::from_utf8(key_vec).map_err(|e| DbxError::Serialization(e.to_string()))?;
441
442        let json = String::from_utf8(value_vec).map_err(|e| {
443            DbxError::Serialization(format!("Failed to decode schedule JSON: {}", e))
444        })?;
445
446        let schedule = crate::automation::Schedule::from_json(&json)?;
447        schedules.insert(name, schedule);
448    }
449
450    Ok(schedules)
451}
452
453// ════════════════════════════════════════════
454// Tests
455// ════════════════════════════════════════════
456
457#[cfg(test)]
458mod tests {
459    use super::*;
460    use arrow::datatypes::{DataType, Field, Schema};
461
462    #[test]
463    fn test_schema_metadata_conversion() {
464        let schema = Schema::new(vec![
465            Field::new("id", DataType::Int64, false),
466            Field::new("name", DataType::Utf8, true),
467            Field::new("age", DataType::Int32, true),
468        ]);
469
470        let metadata = SchemaMetadata::from(&schema);
471        assert_eq!(metadata.fields.len(), 3);
472        assert_eq!(metadata.fields[0].name, "id");
473        assert_eq!(metadata.fields[0].data_type, "Int64");
474        assert!(!metadata.fields[0].nullable);
475
476        let restored_schema = Schema::try_from(metadata).unwrap();
477        assert_eq!(restored_schema.fields().len(), 3);
478        assert_eq!(restored_schema.field(0).name(), "id");
479        assert_eq!(restored_schema.field(0).data_type(), &DataType::Int64);
480    }
481
482    #[test]
483    fn test_schema_persistence() {
484        let wos = NativeWosBackend::open_temporary().unwrap();
485        let schema = Arc::new(Schema::new(vec![
486            Field::new("id", DataType::Int64, false),
487            Field::new("name", DataType::Utf8, true),
488        ]));
489
490        // Save schema
491        save_schema(&wos, "users", &schema).unwrap();
492
493        // Load schema
494        let loaded = load_schema(&wos, "users").unwrap();
495        assert!(loaded.is_some());
496        let loaded_schema = loaded.unwrap();
497        assert_eq!(loaded_schema.fields().len(), 2);
498        assert_eq!(loaded_schema.field(0).name(), "id");
499        assert_eq!(loaded_schema.field(1).name(), "name");
500
501        // Delete schema
502        delete_schema(&wos, "users").unwrap();
503        let deleted = load_schema(&wos, "users").unwrap();
504        assert!(deleted.is_none());
505    }
506
507    #[test]
508    fn test_load_all_schemas() {
509        let wos = NativeWosBackend::open_temporary().unwrap();
510
511        // Save multiple schemas
512        let schema1 = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
513        let schema2 = Arc::new(Schema::new(vec![Field::new("name", DataType::Utf8, true)]));
514
515        save_schema(&wos, "users", &schema1).unwrap();
516        save_schema(&wos, "products", &schema2).unwrap();
517
518        // Load all
519        let all_schemas = load_all_schemas(&wos).unwrap();
520        assert_eq!(all_schemas.len(), 2);
521        assert!(all_schemas.contains_key("users"));
522        assert!(all_schemas.contains_key("products"));
523    }
524
525    #[test]
526    fn test_index_persistence() {
527        let wos = NativeWosBackend::open_temporary().unwrap();
528
529        // Save index
530        save_index(&wos, "idx_name", "users", "name").unwrap();
531
532        // Load all indexes
533        let indexes = load_all_indexes(&wos).unwrap();
534        assert_eq!(indexes.len(), 1);
535        assert_eq!(
536            indexes.get("idx_name"),
537            Some(&("users".to_string(), "name".to_string()))
538        );
539
540        // Delete index
541        delete_index(&wos, "idx_name").unwrap();
542        let deleted = load_all_indexes(&wos).unwrap();
543        assert!(deleted.is_empty());
544    }
545}