1use crate::error::{DbxError, DbxResult};
7use crate::storage::{StorageBackend, wos::WosBackend};
8use arrow::datatypes::{DataType, Field, Schema};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::sync::Arc;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct SchemaMetadata {
20 pub table_name: String,
21 pub fields: Vec<FieldMetadata>,
22}
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct FieldMetadata {
27 pub name: String,
28 pub data_type: String,
29 pub nullable: bool,
30}
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct IndexMetadata {
35 pub index_name: String,
36 pub table_name: String,
37 pub column_name: String,
38}
39
40impl From<&Schema> for SchemaMetadata {
45 fn from(schema: &Schema) -> Self {
46 let fields = schema
47 .fields()
48 .iter()
49 .map(|field| FieldMetadata {
50 name: field.name().clone(),
51 data_type: datatype_to_string(field.data_type()),
52 nullable: field.is_nullable(),
53 })
54 .collect();
55
56 SchemaMetadata {
57 table_name: String::new(), fields,
59 }
60 }
61}
62
63impl TryFrom<SchemaMetadata> for Schema {
64 type Error = DbxError;
65
66 fn try_from(metadata: SchemaMetadata) -> Result<Self, Self::Error> {
67 let fields: Result<Vec<Field>, DbxError> = metadata
68 .fields
69 .iter()
70 .map(|field_meta| {
71 let data_type = string_to_datatype(&field_meta.data_type)?;
72 Ok(Field::new(&field_meta.name, data_type, field_meta.nullable))
73 })
74 .collect();
75
76 Ok(Schema::new(fields?))
77 }
78}
79
80fn datatype_to_string(data_type: &DataType) -> String {
86 match data_type {
87 DataType::Int8 => "Int8".to_string(),
88 DataType::Int16 => "Int16".to_string(),
89 DataType::Int32 => "Int32".to_string(),
90 DataType::Int64 => "Int64".to_string(),
91 DataType::UInt8 => "UInt8".to_string(),
92 DataType::UInt16 => "UInt16".to_string(),
93 DataType::UInt32 => "UInt32".to_string(),
94 DataType::UInt64 => "UInt64".to_string(),
95 DataType::Float32 => "Float32".to_string(),
96 DataType::Float64 => "Float64".to_string(),
97 DataType::Utf8 => "Utf8".to_string(),
98 DataType::Boolean => "Boolean".to_string(),
99 DataType::Binary => "Binary".to_string(),
100 DataType::Date32 => "Date32".to_string(),
101 DataType::Date64 => "Date64".to_string(),
102 DataType::Timestamp(unit, tz) => {
103 format!("Timestamp({:?}, {:?})", unit, tz)
104 }
105 _ => format!("{:?}", data_type), }
107}
108
109fn string_to_datatype(s: &str) -> DbxResult<DataType> {
111 match s {
112 "Int8" => Ok(DataType::Int8),
113 "Int16" => Ok(DataType::Int16),
114 "Int32" => Ok(DataType::Int32),
115 "Int64" => Ok(DataType::Int64),
116 "UInt8" => Ok(DataType::UInt8),
117 "UInt16" => Ok(DataType::UInt16),
118 "UInt32" => Ok(DataType::UInt32),
119 "UInt64" => Ok(DataType::UInt64),
120 "Float32" => Ok(DataType::Float32),
121 "Float64" => Ok(DataType::Float64),
122 "Utf8" => Ok(DataType::Utf8),
123 "Boolean" => Ok(DataType::Boolean),
124 "Binary" => Ok(DataType::Binary),
125 "Date32" => Ok(DataType::Date32),
126 "Date64" => Ok(DataType::Date64),
127 _ => Err(DbxError::Schema(format!("Unsupported data type: {}", s))),
128 }
129}
130
131pub fn save_schema(wos: &WosBackend, table: &str, schema: &Schema) -> DbxResult<()> {
137 let mut metadata = SchemaMetadata::from(schema);
138 metadata.table_name = table.to_string();
139
140 let json_bytes =
141 serde_json::to_vec(&metadata).map_err(|e| DbxError::Serialization(e.to_string()))?;
142
143 wos.insert("__meta__/schemas", table.as_bytes(), &json_bytes)?;
144 Ok(())
145}
146
147pub fn load_schema(wos: &WosBackend, table: &str) -> DbxResult<Option<Arc<Schema>>> {
149 match wos.get("__meta__/schemas", table.as_bytes())? {
150 Some(json_bytes) => {
151 let metadata: SchemaMetadata = serde_json::from_slice(&json_bytes)
152 .map_err(|e| DbxError::Serialization(e.to_string()))?;
153 let schema = Schema::try_from(metadata)?;
154 Ok(Some(Arc::new(schema)))
155 }
156 None => Ok(None),
157 }
158}
159
160pub fn delete_schema(wos: &WosBackend, table: &str) -> DbxResult<()> {
162 wos.delete("__meta__/schemas", table.as_bytes())?;
163 Ok(())
164}
165
166pub fn load_all_schemas(wos: &WosBackend) -> DbxResult<HashMap<String, Arc<Schema>>> {
168 let mut schemas = HashMap::new();
169 let all_records = wos.scan("__meta__/schemas", ..)?;
170
171 for (key_vec, value_vec) in all_records {
172 let table_name =
173 String::from_utf8(key_vec).map_err(|e| DbxError::Serialization(e.to_string()))?;
174 let metadata: SchemaMetadata = serde_json::from_slice(&value_vec)
175 .map_err(|e| DbxError::Serialization(e.to_string()))?;
176 let schema = Schema::try_from(metadata)?;
177 schemas.insert(table_name, Arc::new(schema));
178 }
179
180 Ok(schemas)
181}
182
183pub fn save_index(wos: &WosBackend, index_name: &str, table: &str, column: &str) -> DbxResult<()> {
189 let metadata = IndexMetadata {
190 index_name: index_name.to_string(),
191 table_name: table.to_string(),
192 column_name: column.to_string(),
193 };
194
195 let json_bytes =
196 serde_json::to_vec(&metadata).map_err(|e| DbxError::Serialization(e.to_string()))?;
197
198 wos.insert("__meta__/indexes", index_name.as_bytes(), &json_bytes)?;
199 Ok(())
200}
201
202pub fn delete_index(wos: &WosBackend, index_name: &str) -> DbxResult<()> {
204 wos.delete("__meta__/indexes", index_name.as_bytes())?;
205 Ok(())
206}
207
208pub fn load_all_indexes(wos: &WosBackend) -> DbxResult<HashMap<String, (String, String)>> {
210 let mut indexes = HashMap::new();
211 let all_records = wos.scan("__meta__/indexes", ..)?;
212
213 for (key_vec, value_vec) in all_records {
214 let index_name =
215 String::from_utf8(key_vec).map_err(|e| DbxError::Serialization(e.to_string()))?;
216 let metadata: IndexMetadata = serde_json::from_slice(&value_vec)
217 .map_err(|e| DbxError::Serialization(e.to_string()))?;
218 indexes.insert(index_name, (metadata.table_name, metadata.column_name));
219 }
220
221 Ok(indexes)
222}
223
224pub fn save_trigger(wos: &WosBackend, trigger: &crate::automation::Trigger) -> DbxResult<()> {
230 let json = trigger.to_json()?;
231 wos.insert(
232 "__meta__/triggers",
233 trigger.name.as_bytes(),
234 json.as_bytes(),
235 )?;
236 Ok(())
237}
238
239pub fn load_trigger(wos: &WosBackend, name: &str) -> DbxResult<Option<crate::automation::Trigger>> {
241 match wos.get("__meta__/triggers", name.as_bytes())? {
242 Some(json_bytes) => {
243 let json = String::from_utf8(json_bytes)
244 .map_err(|e| DbxError::Serialization(e.to_string()))?;
245 let trigger = crate::automation::Trigger::from_json(&json)?;
246 Ok(Some(trigger))
247 }
248 None => Ok(None),
249 }
250}
251
252pub fn delete_trigger(wos: &WosBackend, name: &str) -> DbxResult<()> {
254 wos.delete("__meta__/triggers", name.as_bytes())?;
255 Ok(())
256}
257
258pub fn load_all_triggers(wos: &WosBackend) -> DbxResult<Vec<crate::automation::Trigger>> {
260 let mut triggers = Vec::new();
261 let all_records = wos.scan("__meta__/triggers", ..)?;
262
263 for (_key_vec, value_vec) in all_records {
264 let json =
265 String::from_utf8(value_vec).map_err(|e| DbxError::Serialization(e.to_string()))?;
266 let trigger = crate::automation::Trigger::from_json(&json)?;
267 triggers.push(trigger);
268 }
269
270 Ok(triggers)
271}
272
273pub fn save_procedure(
279 wos: &WosBackend,
280 procedure: &crate::automation::StoredProcedure,
281) -> DbxResult<()> {
282 let json = procedure.to_json()?;
283 wos.insert(
284 "__meta__/procedures",
285 procedure.name.as_bytes(),
286 json.as_bytes(),
287 )?;
288 Ok(())
289}
290
291pub fn load_procedure(
293 wos: &WosBackend,
294 name: &str,
295) -> DbxResult<Option<crate::automation::StoredProcedure>> {
296 match wos.get("__meta__/procedures", name.as_bytes())? {
297 Some(json_bytes) => {
298 let json = String::from_utf8(json_bytes)
299 .map_err(|e| DbxError::Serialization(e.to_string()))?;
300 let procedure = crate::automation::StoredProcedure::from_json(&json)?;
301 Ok(Some(procedure))
302 }
303 None => Ok(None),
304 }
305}
306
307pub fn delete_procedure(wos: &WosBackend, name: &str) -> DbxResult<()> {
309 wos.delete("__meta__/procedures", name.as_bytes())?;
310 Ok(())
311}
312
313pub fn load_all_procedures(wos: &WosBackend) -> DbxResult<Vec<crate::automation::StoredProcedure>> {
315 let mut procedures = Vec::new();
316 let all_records = wos.scan("__meta__/procedures", ..)?;
317
318 for (_key_vec, value_vec) in all_records {
319 let json =
320 String::from_utf8(value_vec).map_err(|e| DbxError::Serialization(e.to_string()))?;
321 let procedure = crate::automation::StoredProcedure::from_json(&json)?;
322 procedures.push(procedure);
323 }
324
325 Ok(procedures)
326}
327
328pub fn save_udf(wos: &WosBackend, udf: &crate::automation::UdfMetadata) -> DbxResult<()> {
334 let json = udf.to_json()?;
335 wos.insert("__meta__/udfs", udf.name.as_bytes(), json.as_bytes())?;
336 Ok(())
337}
338
339pub fn load_udf(wos: &WosBackend, name: &str) -> DbxResult<Option<crate::automation::UdfMetadata>> {
341 match wos.get("__meta__/udfs", name.as_bytes())? {
342 Some(json_bytes) => {
343 let json = String::from_utf8(json_bytes)
344 .map_err(|e| DbxError::Serialization(e.to_string()))?;
345 let udf = crate::automation::UdfMetadata::from_json(&json)?;
346 Ok(Some(udf))
347 }
348 None => Ok(None),
349 }
350}
351
352pub fn delete_udf(wos: &WosBackend, name: &str) -> DbxResult<()> {
354 wos.delete("__meta__/udfs", name.as_bytes())?;
355 Ok(())
356}
357
358pub fn load_all_udfs(wos: &WosBackend) -> DbxResult<Vec<crate::automation::UdfMetadata>> {
360 let mut udfs = Vec::new();
361 let all_records = wos.scan("__meta__/udfs", ..)?;
362
363 for (_key_vec, value_vec) in all_records {
364 let json =
365 String::from_utf8(value_vec).map_err(|e| DbxError::Serialization(e.to_string()))?;
366 let udf = crate::automation::UdfMetadata::from_json(&json)?;
367 udfs.push(udf);
368 }
369
370 Ok(udfs)
371}
372
373pub fn save_schedule(wos: &WosBackend, schedule: &crate::automation::Schedule) -> DbxResult<()> {
379 let json = schedule.to_json()?;
380 wos.insert(
381 "__meta__/schedules",
382 schedule.name.as_bytes(),
383 json.as_bytes(),
384 )?;
385 Ok(())
386}
387
388pub fn load_schedule(
390 wos: &WosBackend,
391 name: &str,
392) -> DbxResult<Option<crate::automation::Schedule>> {
393 match wos.get("__meta__/schedules", name.as_bytes())? {
394 Some(bytes) => {
395 let json = String::from_utf8(bytes.to_vec()).map_err(|e| {
396 DbxError::Serialization(format!("Failed to decode schedule JSON: {}", e))
397 })?;
398 let schedule = crate::automation::Schedule::from_json(&json)?;
399 Ok(Some(schedule))
400 }
401 None => Ok(None),
402 }
403}
404
405pub fn delete_schedule(wos: &WosBackend, name: &str) -> DbxResult<()> {
407 wos.delete("__meta__/schedules", name.as_bytes())?;
408 Ok(())
409}
410
411pub fn load_all_schedules(
413 wos: &WosBackend,
414) -> DbxResult<HashMap<String, crate::automation::Schedule>> {
415 let mut schedules = HashMap::new();
416 let all_records = wos.scan("__meta__/schedules", ..)?;
417
418 for (key_vec, value_vec) in all_records {
419 let name =
420 String::from_utf8(key_vec).map_err(|e| DbxError::Serialization(e.to_string()))?;
421
422 let json = String::from_utf8(value_vec).map_err(|e| {
423 DbxError::Serialization(format!("Failed to decode schedule JSON: {}", e))
424 })?;
425
426 let schedule = crate::automation::Schedule::from_json(&json)?;
427 schedules.insert(name, schedule);
428 }
429
430 Ok(schedules)
431}
432
433#[cfg(test)]
438mod tests {
439 use super::*;
440 use arrow::datatypes::{DataType, Field, Schema};
441
442 #[test]
443 fn test_schema_metadata_conversion() {
444 let schema = Schema::new(vec![
445 Field::new("id", DataType::Int64, false),
446 Field::new("name", DataType::Utf8, true),
447 Field::new("age", DataType::Int32, true),
448 ]);
449
450 let metadata = SchemaMetadata::from(&schema);
451 assert_eq!(metadata.fields.len(), 3);
452 assert_eq!(metadata.fields[0].name, "id");
453 assert_eq!(metadata.fields[0].data_type, "Int64");
454 assert!(!metadata.fields[0].nullable);
455
456 let restored_schema = Schema::try_from(metadata).unwrap();
457 assert_eq!(restored_schema.fields().len(), 3);
458 assert_eq!(restored_schema.field(0).name(), "id");
459 assert_eq!(restored_schema.field(0).data_type(), &DataType::Int64);
460 }
461
462 #[test]
463 fn test_schema_persistence() {
464 let wos = WosBackend::open_temporary().unwrap();
465 let schema = Arc::new(Schema::new(vec![
466 Field::new("id", DataType::Int64, false),
467 Field::new("name", DataType::Utf8, true),
468 ]));
469
470 save_schema(&wos, "users", &schema).unwrap();
472
473 let loaded = load_schema(&wos, "users").unwrap();
475 assert!(loaded.is_some());
476 let loaded_schema = loaded.unwrap();
477 assert_eq!(loaded_schema.fields().len(), 2);
478 assert_eq!(loaded_schema.field(0).name(), "id");
479 assert_eq!(loaded_schema.field(1).name(), "name");
480
481 delete_schema(&wos, "users").unwrap();
483 let deleted = load_schema(&wos, "users").unwrap();
484 assert!(deleted.is_none());
485 }
486
487 #[test]
488 fn test_load_all_schemas() {
489 let wos = WosBackend::open_temporary().unwrap();
490
491 let schema1 = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
493 let schema2 = Arc::new(Schema::new(vec![Field::new("name", DataType::Utf8, true)]));
494
495 save_schema(&wos, "users", &schema1).unwrap();
496 save_schema(&wos, "products", &schema2).unwrap();
497
498 let all_schemas = load_all_schemas(&wos).unwrap();
500 assert_eq!(all_schemas.len(), 2);
501 assert!(all_schemas.contains_key("users"));
502 assert!(all_schemas.contains_key("products"));
503 }
504
505 #[test]
506 fn test_index_persistence() {
507 let wos = WosBackend::open_temporary().unwrap();
508
509 save_index(&wos, "idx_name", "users", "name").unwrap();
511
512 let indexes = load_all_indexes(&wos).unwrap();
514 assert_eq!(indexes.len(), 1);
515 assert_eq!(
516 indexes.get("idx_name"),
517 Some(&("users".to_string(), "name".to_string()))
518 );
519
520 delete_index(&wos, "idx_name").unwrap();
522 let deleted = load_all_indexes(&wos).unwrap();
523 assert!(deleted.is_empty());
524 }
525}