Skip to main content

dbx_core/engine/
metadata.rs

1//! Metadata Persistence — Schema and Index metadata serialization and storage
2//!
3//! This module provides functionality to persist table schemas and index definitions
4//! to the sled backend, enabling automatic restoration on database reopen.
5
6use crate::error::{DbxError, DbxResult};
7use crate::storage::StorageBackend;
8use crate::storage::native_wos::NativeWosBackend;
9use arrow::datatypes::{DataType, Field, Schema};
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::sync::Arc;
13
14// ════════════════════════════════════════════
15// Metadata Structures
16// ════════════════════════════════════════════
17
18/// Serializable schema metadata for persistence
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct SchemaMetadata {
21    pub table_name: String,
22    pub fields: Vec<FieldMetadata>,
23}
24
25/// Serializable field metadata
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct FieldMetadata {
28    pub name: String,
29    pub data_type: String,
30    pub nullable: bool,
31}
32
33/// Serializable index metadata
34#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct IndexMetadata {
36    pub index_name: String,
37    pub table_name: String,
38    pub column_name: String,
39}
40
41// ════════════════════════════════════════════
42// Schema Conversion
43// ════════════════════════════════════════════
44
45impl From<&Schema> for SchemaMetadata {
46    fn from(schema: &Schema) -> Self {
47        let fields = schema
48            .fields()
49            .iter()
50            .map(|field| FieldMetadata {
51                name: field.name().clone(),
52                data_type: datatype_to_string(field.data_type()),
53                nullable: field.is_nullable(),
54            })
55            .collect();
56
57        SchemaMetadata {
58            table_name: String::new(), // Will be set by caller
59            fields,
60        }
61    }
62}
63
64impl TryFrom<SchemaMetadata> for Schema {
65    type Error = DbxError;
66
67    fn try_from(metadata: SchemaMetadata) -> Result<Self, Self::Error> {
68        let fields: Result<Vec<Field>, DbxError> = metadata
69            .fields
70            .iter()
71            .map(|field_meta| {
72                let data_type = string_to_datatype(&field_meta.data_type)?;
73                Ok(Field::new(&field_meta.name, data_type, field_meta.nullable))
74            })
75            .collect();
76
77        Ok(Schema::new(fields?))
78    }
79}
80
81// ════════════════════════════════════════════
82// DataType Conversion Helpers
83// ════════════════════════════════════════════
84
85/// Convert Arrow DataType to string representation
86fn datatype_to_string(data_type: &DataType) -> String {
87    match data_type {
88        DataType::Int8 => "Int8".to_string(),
89        DataType::Int16 => "Int16".to_string(),
90        DataType::Int32 => "Int32".to_string(),
91        DataType::Int64 => "Int64".to_string(),
92        DataType::UInt8 => "UInt8".to_string(),
93        DataType::UInt16 => "UInt16".to_string(),
94        DataType::UInt32 => "UInt32".to_string(),
95        DataType::UInt64 => "UInt64".to_string(),
96        DataType::Float32 => "Float32".to_string(),
97        DataType::Float64 => "Float64".to_string(),
98        DataType::Utf8 => "Utf8".to_string(),
99        DataType::Boolean => "Boolean".to_string(),
100        DataType::Binary => "Binary".to_string(),
101        DataType::Date32 => "Date32".to_string(),
102        DataType::Date64 => "Date64".to_string(),
103        DataType::Timestamp(unit, tz) => {
104            format!("Timestamp({:?}, {:?})", unit, tz)
105        }
106        _ => format!("{:?}", data_type), // Fallback for complex types
107    }
108}
109
110/// Convert string representation to Arrow DataType
111fn string_to_datatype(s: &str) -> DbxResult<DataType> {
112    match s {
113        "Int8" => Ok(DataType::Int8),
114        "Int16" => Ok(DataType::Int16),
115        "Int32" => Ok(DataType::Int32),
116        "Int64" => Ok(DataType::Int64),
117        "UInt8" => Ok(DataType::UInt8),
118        "UInt16" => Ok(DataType::UInt16),
119        "UInt32" => Ok(DataType::UInt32),
120        "UInt64" => Ok(DataType::UInt64),
121        "Float32" => Ok(DataType::Float32),
122        "Float64" => Ok(DataType::Float64),
123        "Utf8" => Ok(DataType::Utf8),
124        "Boolean" => Ok(DataType::Boolean),
125        "Binary" => Ok(DataType::Binary),
126        "Date32" => Ok(DataType::Date32),
127        "Date64" => Ok(DataType::Date64),
128        _ => Err(DbxError::Schema(format!("Unsupported data type: {}", s))),
129    }
130}
131
132// ════════════════════════════════════════════
133// Schema Persistence Functions
134// ════════════════════════════════════════════
135
136/// Save table schema to persistent storage
137pub fn save_schema(wos: &NativeWosBackend, table: &str, schema: &Schema) -> DbxResult<()> {
138    let mut metadata = SchemaMetadata::from(schema);
139    metadata.table_name = table.to_string();
140
141    let json_bytes =
142        serde_json::to_vec(&metadata).map_err(|e| DbxError::Serialization(e.to_string()))?;
143
144    wos.insert("__meta__/schemas", table.as_bytes(), &json_bytes)?;
145    Ok(())
146}
147
148/// Load table schema from persistent storage
149pub fn load_schema(wos: &NativeWosBackend, table: &str) -> DbxResult<Option<Arc<Schema>>> {
150    match wos.get("__meta__/schemas", table.as_bytes())? {
151        Some(json_bytes) => {
152            let metadata: SchemaMetadata = serde_json::from_slice(&json_bytes)
153                .map_err(|e| DbxError::Serialization(e.to_string()))?;
154            let schema = Schema::try_from(metadata)?;
155            Ok(Some(Arc::new(schema)))
156        }
157        None => Ok(None),
158    }
159}
160
161/// Delete table schema from persistent storage
162pub fn delete_schema(wos: &NativeWosBackend, table: &str) -> DbxResult<()> {
163    wos.delete("__meta__/schemas", table.as_bytes())?;
164    Ok(())
165}
166
167/// Load all schemas from persistent storage
168pub fn load_all_schemas(wos: &NativeWosBackend) -> DbxResult<HashMap<String, Arc<Schema>>> {
169    let mut schemas = HashMap::new();
170    let all_records = wos.scan("__meta__/schemas", ..)?;
171
172    for (key_vec, value_vec) in all_records {
173        let table_name =
174            String::from_utf8(key_vec).map_err(|e| DbxError::Serialization(e.to_string()))?;
175        let metadata: SchemaMetadata = serde_json::from_slice(&value_vec)
176            .map_err(|e| DbxError::Serialization(e.to_string()))?;
177        let schema = Schema::try_from(metadata)?;
178        schemas.insert(table_name, Arc::new(schema));
179    }
180
181    Ok(schemas)
182}
183
184// ════════════════════════════════════════════
185// Index Persistence Functions
186// ════════════════════════════════════════════
187
188/// Save index metadata to persistent storage
189pub fn save_index(
190    wos: &NativeWosBackend,
191    index_name: &str,
192    table: &str,
193    column: &str,
194) -> DbxResult<()> {
195    let metadata = IndexMetadata {
196        index_name: index_name.to_string(),
197        table_name: table.to_string(),
198        column_name: column.to_string(),
199    };
200
201    let json_bytes =
202        serde_json::to_vec(&metadata).map_err(|e| DbxError::Serialization(e.to_string()))?;
203
204    wos.insert("__meta__/indexes", index_name.as_bytes(), &json_bytes)?;
205    Ok(())
206}
207
208/// Delete index metadata from persistent storage
209pub fn delete_index(wos: &NativeWosBackend, index_name: &str) -> DbxResult<()> {
210    wos.delete("__meta__/indexes", index_name.as_bytes())?;
211    Ok(())
212}
213
214/// Load all index metadata from persistent storage
215pub fn load_all_indexes(wos: &NativeWosBackend) -> DbxResult<HashMap<String, (String, String)>> {
216    let mut indexes = HashMap::new();
217    let all_records = wos.scan("__meta__/indexes", ..)?;
218
219    for (key_vec, value_vec) in all_records {
220        let index_name =
221            String::from_utf8(key_vec).map_err(|e| DbxError::Serialization(e.to_string()))?;
222        let metadata: IndexMetadata = serde_json::from_slice(&value_vec)
223            .map_err(|e| DbxError::Serialization(e.to_string()))?;
224        indexes.insert(index_name, (metadata.table_name, metadata.column_name));
225    }
226
227    Ok(indexes)
228}
229
230// ════════════════════════════════════════════
231// Trigger Metadata Persistence Functions
232// ════════════════════════════════════════════
233
234/// Save trigger metadata to persistent storage
235pub fn save_trigger(wos: &NativeWosBackend, trigger: &crate::automation::Trigger) -> DbxResult<()> {
236    let json = trigger.to_json()?;
237    wos.insert(
238        "__meta__/triggers",
239        trigger.name.as_bytes(),
240        json.as_bytes(),
241    )?;
242    Ok(())
243}
244
245/// Load trigger metadata from persistent storage
246pub fn load_trigger(
247    wos: &NativeWosBackend,
248    name: &str,
249) -> DbxResult<Option<crate::automation::Trigger>> {
250    match wos.get("__meta__/triggers", name.as_bytes())? {
251        Some(json_bytes) => {
252            let json = String::from_utf8(json_bytes)
253                .map_err(|e| DbxError::Serialization(e.to_string()))?;
254            let trigger = crate::automation::Trigger::from_json(&json)?;
255            Ok(Some(trigger))
256        }
257        None => Ok(None),
258    }
259}
260
261/// Delete trigger metadata from persistent storage
262pub fn delete_trigger(wos: &NativeWosBackend, name: &str) -> DbxResult<()> {
263    wos.delete("__meta__/triggers", name.as_bytes())?;
264    Ok(())
265}
266
267/// Load all trigger metadata from persistent storage
268pub fn load_all_triggers(wos: &NativeWosBackend) -> DbxResult<Vec<crate::automation::Trigger>> {
269    let mut triggers = Vec::new();
270    let all_records = wos.scan("__meta__/triggers", ..)?;
271
272    for (_key_vec, value_vec) in all_records {
273        let json =
274            String::from_utf8(value_vec).map_err(|e| DbxError::Serialization(e.to_string()))?;
275        let trigger = crate::automation::Trigger::from_json(&json)?;
276        triggers.push(trigger);
277    }
278
279    Ok(triggers)
280}
281
282// ════════════════════════════════════════════
283// Stored Procedure Metadata Persistence Functions
284// ════════════════════════════════════════════
285
286/// Save stored procedure metadata to persistent storage
287pub fn save_procedure(
288    wos: &NativeWosBackend,
289    procedure: &crate::automation::StoredProcedure,
290) -> DbxResult<()> {
291    let json = procedure.to_json()?;
292    wos.insert(
293        "__meta__/procedures",
294        procedure.name.as_bytes(),
295        json.as_bytes(),
296    )?;
297    Ok(())
298}
299
300/// Load stored procedure metadata from persistent storage
301pub fn load_procedure(
302    wos: &NativeWosBackend,
303    name: &str,
304) -> DbxResult<Option<crate::automation::StoredProcedure>> {
305    match wos.get("__meta__/procedures", name.as_bytes())? {
306        Some(json_bytes) => {
307            let json = String::from_utf8(json_bytes)
308                .map_err(|e| DbxError::Serialization(e.to_string()))?;
309            let procedure = crate::automation::StoredProcedure::from_json(&json)?;
310            Ok(Some(procedure))
311        }
312        None => Ok(None),
313    }
314}
315
316/// Delete stored procedure metadata from persistent storage
317pub fn delete_procedure(wos: &NativeWosBackend, name: &str) -> DbxResult<()> {
318    wos.delete("__meta__/procedures", name.as_bytes())?;
319    Ok(())
320}
321
322/// Load all stored procedure metadata from persistent storage
323pub fn load_all_procedures(
324    wos: &NativeWosBackend,
325) -> DbxResult<Vec<crate::automation::StoredProcedure>> {
326    let mut procedures = Vec::new();
327    let all_records = wos.scan("__meta__/procedures", ..)?;
328
329    for (_key_vec, value_vec) in all_records {
330        let json =
331            String::from_utf8(value_vec).map_err(|e| DbxError::Serialization(e.to_string()))?;
332        let procedure = crate::automation::StoredProcedure::from_json(&json)?;
333        procedures.push(procedure);
334    }
335
336    Ok(procedures)
337}
338
339// ════════════════════════════════════════════
340// UDF Metadata Persistence Functions
341// ════════════════════════════════════════════
342
343/// Save UDF metadata to persistent storage
344pub fn save_udf(wos: &NativeWosBackend, udf: &crate::automation::UdfMetadata) -> DbxResult<()> {
345    let json = udf.to_json()?;
346    wos.insert("__meta__/udfs", udf.name.as_bytes(), json.as_bytes())?;
347    Ok(())
348}
349
350/// Load UDF metadata from persistent storage
351pub fn load_udf(
352    wos: &NativeWosBackend,
353    name: &str,
354) -> DbxResult<Option<crate::automation::UdfMetadata>> {
355    match wos.get("__meta__/udfs", name.as_bytes())? {
356        Some(json_bytes) => {
357            let json = String::from_utf8(json_bytes)
358                .map_err(|e| DbxError::Serialization(e.to_string()))?;
359            let udf = crate::automation::UdfMetadata::from_json(&json)?;
360            Ok(Some(udf))
361        }
362        None => Ok(None),
363    }
364}
365
366/// Delete UDF metadata from persistent storage
367pub fn delete_udf(wos: &NativeWosBackend, name: &str) -> DbxResult<()> {
368    wos.delete("__meta__/udfs", name.as_bytes())?;
369    Ok(())
370}
371
372/// Load all UDF metadata from persistent storage
373pub fn load_all_udfs(wos: &NativeWosBackend) -> DbxResult<Vec<crate::automation::UdfMetadata>> {
374    let mut udfs = Vec::new();
375    let all_records = wos.scan("__meta__/udfs", ..)?;
376
377    for (_key_vec, value_vec) in all_records {
378        let json =
379            String::from_utf8(value_vec).map_err(|e| DbxError::Serialization(e.to_string()))?;
380        let udf = crate::automation::UdfMetadata::from_json(&json)?;
381        udfs.push(udf);
382    }
383
384    Ok(udfs)
385}
386
387// ════════════════════════════════════════════
388// Schedule Metadata Persistence
389// ════════════════════════════════════════════
390
391/// Save schedule metadata
392pub fn save_schedule(
393    wos: &NativeWosBackend,
394    schedule: &crate::automation::Schedule,
395) -> DbxResult<()> {
396    let json = schedule.to_json()?;
397    wos.insert(
398        "__meta__/schedules",
399        schedule.name.as_bytes(),
400        json.as_bytes(),
401    )?;
402    Ok(())
403}
404
405/// Load schedule metadata by name
406pub fn load_schedule(
407    wos: &NativeWosBackend,
408    name: &str,
409) -> DbxResult<Option<crate::automation::Schedule>> {
410    match wos.get("__meta__/schedules", name.as_bytes())? {
411        Some(bytes) => {
412            let json = String::from_utf8(bytes.to_vec()).map_err(|e| {
413                DbxError::Serialization(format!("Failed to decode schedule JSON: {}", e))
414            })?;
415            let schedule = crate::automation::Schedule::from_json(&json)?;
416            Ok(Some(schedule))
417        }
418        None => Ok(None),
419    }
420}
421
422/// Delete schedule metadata
423pub fn delete_schedule(wos: &NativeWosBackend, name: &str) -> DbxResult<()> {
424    wos.delete("__meta__/schedules", name.as_bytes())?;
425    Ok(())
426}
427
428/// Load all schedules
429pub fn load_all_schedules(
430    wos: &NativeWosBackend,
431) -> DbxResult<HashMap<String, crate::automation::Schedule>> {
432    let mut schedules = HashMap::new();
433    let all_records = wos.scan("__meta__/schedules", ..)?;
434
435    for (key_vec, value_vec) in all_records {
436        let name =
437            String::from_utf8(key_vec).map_err(|e| DbxError::Serialization(e.to_string()))?;
438
439        let json = String::from_utf8(value_vec).map_err(|e| {
440            DbxError::Serialization(format!("Failed to decode schedule JSON: {}", e))
441        })?;
442
443        let schedule = crate::automation::Schedule::from_json(&json)?;
444        schedules.insert(name, schedule);
445    }
446
447    Ok(schedules)
448}
449
450// ════════════════════════════════════════════
451// Tests
452// ════════════════════════════════════════════
453
454#[cfg(test)]
455mod tests {
456    use super::*;
457    use arrow::datatypes::{DataType, Field, Schema};
458
459    #[test]
460    fn test_schema_metadata_conversion() {
461        let schema = Schema::new(vec![
462            Field::new("id", DataType::Int64, false),
463            Field::new("name", DataType::Utf8, true),
464            Field::new("age", DataType::Int32, true),
465        ]);
466
467        let metadata = SchemaMetadata::from(&schema);
468        assert_eq!(metadata.fields.len(), 3);
469        assert_eq!(metadata.fields[0].name, "id");
470        assert_eq!(metadata.fields[0].data_type, "Int64");
471        assert!(!metadata.fields[0].nullable);
472
473        let restored_schema = Schema::try_from(metadata).unwrap();
474        assert_eq!(restored_schema.fields().len(), 3);
475        assert_eq!(restored_schema.field(0).name(), "id");
476        assert_eq!(restored_schema.field(0).data_type(), &DataType::Int64);
477    }
478
479    #[test]
480    fn test_schema_persistence() {
481        let wos = NativeWosBackend::open_temporary().unwrap();
482        let schema = Arc::new(Schema::new(vec![
483            Field::new("id", DataType::Int64, false),
484            Field::new("name", DataType::Utf8, true),
485        ]));
486
487        // Save schema
488        save_schema(&wos, "users", &schema).unwrap();
489
490        // Load schema
491        let loaded = load_schema(&wos, "users").unwrap();
492        assert!(loaded.is_some());
493        let loaded_schema = loaded.unwrap();
494        assert_eq!(loaded_schema.fields().len(), 2);
495        assert_eq!(loaded_schema.field(0).name(), "id");
496        assert_eq!(loaded_schema.field(1).name(), "name");
497
498        // Delete schema
499        delete_schema(&wos, "users").unwrap();
500        let deleted = load_schema(&wos, "users").unwrap();
501        assert!(deleted.is_none());
502    }
503
504    #[test]
505    fn test_load_all_schemas() {
506        let wos = NativeWosBackend::open_temporary().unwrap();
507
508        // Save multiple schemas
509        let schema1 = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
510        let schema2 = Arc::new(Schema::new(vec![Field::new("name", DataType::Utf8, true)]));
511
512        save_schema(&wos, "users", &schema1).unwrap();
513        save_schema(&wos, "products", &schema2).unwrap();
514
515        // Load all
516        let all_schemas = load_all_schemas(&wos).unwrap();
517        assert_eq!(all_schemas.len(), 2);
518        assert!(all_schemas.contains_key("users"));
519        assert!(all_schemas.contains_key("products"));
520    }
521
522    #[test]
523    fn test_index_persistence() {
524        let wos = NativeWosBackend::open_temporary().unwrap();
525
526        // Save index
527        save_index(&wos, "idx_name", "users", "name").unwrap();
528
529        // Load all indexes
530        let indexes = load_all_indexes(&wos).unwrap();
531        assert_eq!(indexes.len(), 1);
532        assert_eq!(
533            indexes.get("idx_name"),
534            Some(&("users".to_string(), "name".to_string()))
535        );
536
537        // Delete index
538        delete_index(&wos, "idx_name").unwrap();
539        let deleted = load_all_indexes(&wos).unwrap();
540        assert!(deleted.is_empty());
541    }
542}