Skip to main content

dbx_core/engine/
metadata.rs

1//! Metadata Persistence — Schema and Index metadata serialization and storage
2//!
3//! This module provides functionality to persist table schemas and index definitions
4//! to the sled backend, enabling automatic restoration on database reopen.
5
6use crate::error::{DbxError, DbxResult};
7use crate::storage::{StorageBackend, wos::WosBackend};
8use arrow::datatypes::{DataType, Field, Schema};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::sync::Arc;
12
13// ════════════════════════════════════════════
14// Metadata Structures
15// ════════════════════════════════════════════
16
17/// Serializable schema metadata for persistence
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct SchemaMetadata {
20    pub table_name: String,
21    pub fields: Vec<FieldMetadata>,
22}
23
24/// Serializable field metadata
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct FieldMetadata {
27    pub name: String,
28    pub data_type: String,
29    pub nullable: bool,
30}
31
32/// Serializable index metadata
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct IndexMetadata {
35    pub index_name: String,
36    pub table_name: String,
37    pub column_name: String,
38}
39
40// ════════════════════════════════════════════
41// Schema Conversion
42// ════════════════════════════════════════════
43
44impl From<&Schema> for SchemaMetadata {
45    fn from(schema: &Schema) -> Self {
46        let fields = schema
47            .fields()
48            .iter()
49            .map(|field| FieldMetadata {
50                name: field.name().clone(),
51                data_type: datatype_to_string(field.data_type()),
52                nullable: field.is_nullable(),
53            })
54            .collect();
55
56        SchemaMetadata {
57            table_name: String::new(), // Will be set by caller
58            fields,
59        }
60    }
61}
62
63impl TryFrom<SchemaMetadata> for Schema {
64    type Error = DbxError;
65
66    fn try_from(metadata: SchemaMetadata) -> Result<Self, Self::Error> {
67        let fields: Result<Vec<Field>, DbxError> = metadata
68            .fields
69            .iter()
70            .map(|field_meta| {
71                let data_type = string_to_datatype(&field_meta.data_type)?;
72                Ok(Field::new(&field_meta.name, data_type, field_meta.nullable))
73            })
74            .collect();
75
76        Ok(Schema::new(fields?))
77    }
78}
79
80// ════════════════════════════════════════════
81// DataType Conversion Helpers
82// ════════════════════════════════════════════
83
84/// Convert Arrow DataType to string representation
85fn datatype_to_string(data_type: &DataType) -> String {
86    match data_type {
87        DataType::Int8 => "Int8".to_string(),
88        DataType::Int16 => "Int16".to_string(),
89        DataType::Int32 => "Int32".to_string(),
90        DataType::Int64 => "Int64".to_string(),
91        DataType::UInt8 => "UInt8".to_string(),
92        DataType::UInt16 => "UInt16".to_string(),
93        DataType::UInt32 => "UInt32".to_string(),
94        DataType::UInt64 => "UInt64".to_string(),
95        DataType::Float32 => "Float32".to_string(),
96        DataType::Float64 => "Float64".to_string(),
97        DataType::Utf8 => "Utf8".to_string(),
98        DataType::Boolean => "Boolean".to_string(),
99        DataType::Binary => "Binary".to_string(),
100        DataType::Date32 => "Date32".to_string(),
101        DataType::Date64 => "Date64".to_string(),
102        DataType::Timestamp(unit, tz) => {
103            format!("Timestamp({:?}, {:?})", unit, tz)
104        }
105        _ => format!("{:?}", data_type), // Fallback for complex types
106    }
107}
108
109/// Convert string representation to Arrow DataType
110fn string_to_datatype(s: &str) -> DbxResult<DataType> {
111    match s {
112        "Int8" => Ok(DataType::Int8),
113        "Int16" => Ok(DataType::Int16),
114        "Int32" => Ok(DataType::Int32),
115        "Int64" => Ok(DataType::Int64),
116        "UInt8" => Ok(DataType::UInt8),
117        "UInt16" => Ok(DataType::UInt16),
118        "UInt32" => Ok(DataType::UInt32),
119        "UInt64" => Ok(DataType::UInt64),
120        "Float32" => Ok(DataType::Float32),
121        "Float64" => Ok(DataType::Float64),
122        "Utf8" => Ok(DataType::Utf8),
123        "Boolean" => Ok(DataType::Boolean),
124        "Binary" => Ok(DataType::Binary),
125        "Date32" => Ok(DataType::Date32),
126        "Date64" => Ok(DataType::Date64),
127        _ => Err(DbxError::Schema(format!("Unsupported data type: {}", s))),
128    }
129}
130
131// ════════════════════════════════════════════
132// Schema Persistence Functions
133// ════════════════════════════════════════════
134
135/// Save table schema to persistent storage
136pub fn save_schema(wos: &WosBackend, table: &str, schema: &Schema) -> DbxResult<()> {
137    let mut metadata = SchemaMetadata::from(schema);
138    metadata.table_name = table.to_string();
139
140    let json_bytes =
141        serde_json::to_vec(&metadata).map_err(|e| DbxError::Serialization(e.to_string()))?;
142
143    wos.insert("__meta__/schemas", table.as_bytes(), &json_bytes)?;
144    Ok(())
145}
146
147/// Load table schema from persistent storage
148pub fn load_schema(wos: &WosBackend, table: &str) -> DbxResult<Option<Arc<Schema>>> {
149    match wos.get("__meta__/schemas", table.as_bytes())? {
150        Some(json_bytes) => {
151            let metadata: SchemaMetadata = serde_json::from_slice(&json_bytes)
152                .map_err(|e| DbxError::Serialization(e.to_string()))?;
153            let schema = Schema::try_from(metadata)?;
154            Ok(Some(Arc::new(schema)))
155        }
156        None => Ok(None),
157    }
158}
159
160/// Delete table schema from persistent storage
161pub fn delete_schema(wos: &WosBackend, table: &str) -> DbxResult<()> {
162    wos.delete("__meta__/schemas", table.as_bytes())?;
163    Ok(())
164}
165
166/// Load all schemas from persistent storage
167pub fn load_all_schemas(wos: &WosBackend) -> DbxResult<HashMap<String, Arc<Schema>>> {
168    let mut schemas = HashMap::new();
169    let all_records = wos.scan("__meta__/schemas", ..)?;
170
171    for (key_vec, value_vec) in all_records {
172        let table_name =
173            String::from_utf8(key_vec).map_err(|e| DbxError::Serialization(e.to_string()))?;
174        let metadata: SchemaMetadata = serde_json::from_slice(&value_vec)
175            .map_err(|e| DbxError::Serialization(e.to_string()))?;
176        let schema = Schema::try_from(metadata)?;
177        schemas.insert(table_name, Arc::new(schema));
178    }
179
180    Ok(schemas)
181}
182
183// ════════════════════════════════════════════
184// Index Persistence Functions
185// ════════════════════════════════════════════
186
187/// Save index metadata to persistent storage
188pub fn save_index(wos: &WosBackend, index_name: &str, table: &str, column: &str) -> DbxResult<()> {
189    let metadata = IndexMetadata {
190        index_name: index_name.to_string(),
191        table_name: table.to_string(),
192        column_name: column.to_string(),
193    };
194
195    let json_bytes =
196        serde_json::to_vec(&metadata).map_err(|e| DbxError::Serialization(e.to_string()))?;
197
198    wos.insert("__meta__/indexes", index_name.as_bytes(), &json_bytes)?;
199    Ok(())
200}
201
202/// Delete index metadata from persistent storage
203pub fn delete_index(wos: &WosBackend, index_name: &str) -> DbxResult<()> {
204    wos.delete("__meta__/indexes", index_name.as_bytes())?;
205    Ok(())
206}
207
208/// Load all index metadata from persistent storage
209pub fn load_all_indexes(wos: &WosBackend) -> DbxResult<HashMap<String, (String, String)>> {
210    let mut indexes = HashMap::new();
211    let all_records = wos.scan("__meta__/indexes", ..)?;
212
213    for (key_vec, value_vec) in all_records {
214        let index_name =
215            String::from_utf8(key_vec).map_err(|e| DbxError::Serialization(e.to_string()))?;
216        let metadata: IndexMetadata = serde_json::from_slice(&value_vec)
217            .map_err(|e| DbxError::Serialization(e.to_string()))?;
218        indexes.insert(index_name, (metadata.table_name, metadata.column_name));
219    }
220
221    Ok(indexes)
222}
223
224// ════════════════════════════════════════════
225// Trigger Metadata Persistence Functions
226// ════════════════════════════════════════════
227
228/// Save trigger metadata to persistent storage
229pub fn save_trigger(wos: &WosBackend, trigger: &crate::automation::Trigger) -> DbxResult<()> {
230    let json = trigger.to_json()?;
231    wos.insert(
232        "__meta__/triggers",
233        trigger.name.as_bytes(),
234        json.as_bytes(),
235    )?;
236    Ok(())
237}
238
239/// Load trigger metadata from persistent storage
240pub fn load_trigger(wos: &WosBackend, name: &str) -> DbxResult<Option<crate::automation::Trigger>> {
241    match wos.get("__meta__/triggers", name.as_bytes())? {
242        Some(json_bytes) => {
243            let json = String::from_utf8(json_bytes)
244                .map_err(|e| DbxError::Serialization(e.to_string()))?;
245            let trigger = crate::automation::Trigger::from_json(&json)?;
246            Ok(Some(trigger))
247        }
248        None => Ok(None),
249    }
250}
251
252/// Delete trigger metadata from persistent storage
253pub fn delete_trigger(wos: &WosBackend, name: &str) -> DbxResult<()> {
254    wos.delete("__meta__/triggers", name.as_bytes())?;
255    Ok(())
256}
257
258/// Load all trigger metadata from persistent storage
259pub fn load_all_triggers(wos: &WosBackend) -> DbxResult<Vec<crate::automation::Trigger>> {
260    let mut triggers = Vec::new();
261    let all_records = wos.scan("__meta__/triggers", ..)?;
262
263    for (_key_vec, value_vec) in all_records {
264        let json =
265            String::from_utf8(value_vec).map_err(|e| DbxError::Serialization(e.to_string()))?;
266        let trigger = crate::automation::Trigger::from_json(&json)?;
267        triggers.push(trigger);
268    }
269
270    Ok(triggers)
271}
272
273// ════════════════════════════════════════════
274// Stored Procedure Metadata Persistence Functions
275// ════════════════════════════════════════════
276
277/// Save stored procedure metadata to persistent storage
278pub fn save_procedure(
279    wos: &WosBackend,
280    procedure: &crate::automation::StoredProcedure,
281) -> DbxResult<()> {
282    let json = procedure.to_json()?;
283    wos.insert(
284        "__meta__/procedures",
285        procedure.name.as_bytes(),
286        json.as_bytes(),
287    )?;
288    Ok(())
289}
290
291/// Load stored procedure metadata from persistent storage
292pub fn load_procedure(
293    wos: &WosBackend,
294    name: &str,
295) -> DbxResult<Option<crate::automation::StoredProcedure>> {
296    match wos.get("__meta__/procedures", name.as_bytes())? {
297        Some(json_bytes) => {
298            let json = String::from_utf8(json_bytes)
299                .map_err(|e| DbxError::Serialization(e.to_string()))?;
300            let procedure = crate::automation::StoredProcedure::from_json(&json)?;
301            Ok(Some(procedure))
302        }
303        None => Ok(None),
304    }
305}
306
307/// Delete stored procedure metadata from persistent storage
308pub fn delete_procedure(wos: &WosBackend, name: &str) -> DbxResult<()> {
309    wos.delete("__meta__/procedures", name.as_bytes())?;
310    Ok(())
311}
312
313/// Load all stored procedure metadata from persistent storage
314pub fn load_all_procedures(wos: &WosBackend) -> DbxResult<Vec<crate::automation::StoredProcedure>> {
315    let mut procedures = Vec::new();
316    let all_records = wos.scan("__meta__/procedures", ..)?;
317
318    for (_key_vec, value_vec) in all_records {
319        let json =
320            String::from_utf8(value_vec).map_err(|e| DbxError::Serialization(e.to_string()))?;
321        let procedure = crate::automation::StoredProcedure::from_json(&json)?;
322        procedures.push(procedure);
323    }
324
325    Ok(procedures)
326}
327
328// ════════════════════════════════════════════
329// UDF Metadata Persistence Functions
330// ════════════════════════════════════════════
331
332/// Save UDF metadata to persistent storage
333pub fn save_udf(wos: &WosBackend, udf: &crate::automation::UdfMetadata) -> DbxResult<()> {
334    let json = udf.to_json()?;
335    wos.insert("__meta__/udfs", udf.name.as_bytes(), json.as_bytes())?;
336    Ok(())
337}
338
339/// Load UDF metadata from persistent storage
340pub fn load_udf(wos: &WosBackend, name: &str) -> DbxResult<Option<crate::automation::UdfMetadata>> {
341    match wos.get("__meta__/udfs", name.as_bytes())? {
342        Some(json_bytes) => {
343            let json = String::from_utf8(json_bytes)
344                .map_err(|e| DbxError::Serialization(e.to_string()))?;
345            let udf = crate::automation::UdfMetadata::from_json(&json)?;
346            Ok(Some(udf))
347        }
348        None => Ok(None),
349    }
350}
351
352/// Delete UDF metadata from persistent storage
353pub fn delete_udf(wos: &WosBackend, name: &str) -> DbxResult<()> {
354    wos.delete("__meta__/udfs", name.as_bytes())?;
355    Ok(())
356}
357
358/// Load all UDF metadata from persistent storage
359pub fn load_all_udfs(wos: &WosBackend) -> DbxResult<Vec<crate::automation::UdfMetadata>> {
360    let mut udfs = Vec::new();
361    let all_records = wos.scan("__meta__/udfs", ..)?;
362
363    for (_key_vec, value_vec) in all_records {
364        let json =
365            String::from_utf8(value_vec).map_err(|e| DbxError::Serialization(e.to_string()))?;
366        let udf = crate::automation::UdfMetadata::from_json(&json)?;
367        udfs.push(udf);
368    }
369
370    Ok(udfs)
371}
372
373// ════════════════════════════════════════════
374// Schedule Metadata Persistence
375// ════════════════════════════════════════════
376
377/// Save schedule metadata
378pub fn save_schedule(wos: &WosBackend, schedule: &crate::automation::Schedule) -> DbxResult<()> {
379    let json = schedule.to_json()?;
380    wos.insert(
381        "__meta__/schedules",
382        schedule.name.as_bytes(),
383        json.as_bytes(),
384    )?;
385    Ok(())
386}
387
388/// Load schedule metadata by name
389pub fn load_schedule(
390    wos: &WosBackend,
391    name: &str,
392) -> DbxResult<Option<crate::automation::Schedule>> {
393    match wos.get("__meta__/schedules", name.as_bytes())? {
394        Some(bytes) => {
395            let json = String::from_utf8(bytes.to_vec()).map_err(|e| {
396                DbxError::Serialization(format!("Failed to decode schedule JSON: {}", e))
397            })?;
398            let schedule = crate::automation::Schedule::from_json(&json)?;
399            Ok(Some(schedule))
400        }
401        None => Ok(None),
402    }
403}
404
405/// Delete schedule metadata
406pub fn delete_schedule(wos: &WosBackend, name: &str) -> DbxResult<()> {
407    wos.delete("__meta__/schedules", name.as_bytes())?;
408    Ok(())
409}
410
411/// Load all schedules
412pub fn load_all_schedules(
413    wos: &WosBackend,
414) -> DbxResult<HashMap<String, crate::automation::Schedule>> {
415    let mut schedules = HashMap::new();
416    let all_records = wos.scan("__meta__/schedules", ..)?;
417
418    for (key_vec, value_vec) in all_records {
419        let name =
420            String::from_utf8(key_vec).map_err(|e| DbxError::Serialization(e.to_string()))?;
421
422        let json = String::from_utf8(value_vec).map_err(|e| {
423            DbxError::Serialization(format!("Failed to decode schedule JSON: {}", e))
424        })?;
425
426        let schedule = crate::automation::Schedule::from_json(&json)?;
427        schedules.insert(name, schedule);
428    }
429
430    Ok(schedules)
431}
432
433// ════════════════════════════════════════════
434// Tests
435// ════════════════════════════════════════════
436
437#[cfg(test)]
438mod tests {
439    use super::*;
440    use arrow::datatypes::{DataType, Field, Schema};
441
442    #[test]
443    fn test_schema_metadata_conversion() {
444        let schema = Schema::new(vec![
445            Field::new("id", DataType::Int64, false),
446            Field::new("name", DataType::Utf8, true),
447            Field::new("age", DataType::Int32, true),
448        ]);
449
450        let metadata = SchemaMetadata::from(&schema);
451        assert_eq!(metadata.fields.len(), 3);
452        assert_eq!(metadata.fields[0].name, "id");
453        assert_eq!(metadata.fields[0].data_type, "Int64");
454        assert!(!metadata.fields[0].nullable);
455
456        let restored_schema = Schema::try_from(metadata).unwrap();
457        assert_eq!(restored_schema.fields().len(), 3);
458        assert_eq!(restored_schema.field(0).name(), "id");
459        assert_eq!(restored_schema.field(0).data_type(), &DataType::Int64);
460    }
461
462    #[test]
463    fn test_schema_persistence() {
464        let wos = WosBackend::open_temporary().unwrap();
465        let schema = Arc::new(Schema::new(vec![
466            Field::new("id", DataType::Int64, false),
467            Field::new("name", DataType::Utf8, true),
468        ]));
469
470        // Save schema
471        save_schema(&wos, "users", &schema).unwrap();
472
473        // Load schema
474        let loaded = load_schema(&wos, "users").unwrap();
475        assert!(loaded.is_some());
476        let loaded_schema = loaded.unwrap();
477        assert_eq!(loaded_schema.fields().len(), 2);
478        assert_eq!(loaded_schema.field(0).name(), "id");
479        assert_eq!(loaded_schema.field(1).name(), "name");
480
481        // Delete schema
482        delete_schema(&wos, "users").unwrap();
483        let deleted = load_schema(&wos, "users").unwrap();
484        assert!(deleted.is_none());
485    }
486
487    #[test]
488    fn test_load_all_schemas() {
489        let wos = WosBackend::open_temporary().unwrap();
490
491        // Save multiple schemas
492        let schema1 = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
493        let schema2 = Arc::new(Schema::new(vec![Field::new("name", DataType::Utf8, true)]));
494
495        save_schema(&wos, "users", &schema1).unwrap();
496        save_schema(&wos, "products", &schema2).unwrap();
497
498        // Load all
499        let all_schemas = load_all_schemas(&wos).unwrap();
500        assert_eq!(all_schemas.len(), 2);
501        assert!(all_schemas.contains_key("users"));
502        assert!(all_schemas.contains_key("products"));
503    }
504
505    #[test]
506    fn test_index_persistence() {
507        let wos = WosBackend::open_temporary().unwrap();
508
509        // Save index
510        save_index(&wos, "idx_name", "users", "name").unwrap();
511
512        // Load all indexes
513        let indexes = load_all_indexes(&wos).unwrap();
514        assert_eq!(indexes.len(), 1);
515        assert_eq!(
516            indexes.get("idx_name"),
517            Some(&("users".to_string(), "name".to_string()))
518        );
519
520        // Delete index
521        delete_index(&wos, "idx_name").unwrap();
522        let deleted = load_all_indexes(&wos).unwrap();
523        assert!(deleted.is_empty());
524    }
525}