1use crate::error::{DbxError, DbxResult};
7use crate::storage::StorageBackend;
8use crate::storage::native_wos::NativeWosBackend;
9use arrow::datatypes::{DataType, Field, Schema};
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::sync::Arc;
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct SchemaMetadata {
21 pub table_name: String,
22 pub fields: Vec<FieldMetadata>,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct FieldMetadata {
28 pub name: String,
29 pub data_type: String,
30 pub nullable: bool,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct IndexMetadata {
36 pub index_name: String,
37 pub table_name: String,
38 pub column_name: String,
39}
40
41impl From<&Schema> for SchemaMetadata {
46 fn from(schema: &Schema) -> Self {
47 let fields = schema
48 .fields()
49 .iter()
50 .map(|field| FieldMetadata {
51 name: field.name().clone(),
52 data_type: datatype_to_string(field.data_type()),
53 nullable: field.is_nullable(),
54 })
55 .collect();
56
57 SchemaMetadata {
58 table_name: String::new(), fields,
60 }
61 }
62}
63
64impl TryFrom<SchemaMetadata> for Schema {
65 type Error = DbxError;
66
67 fn try_from(metadata: SchemaMetadata) -> Result<Self, Self::Error> {
68 let fields: Result<Vec<Field>, DbxError> = metadata
69 .fields
70 .iter()
71 .map(|field_meta| {
72 let data_type = string_to_datatype(&field_meta.data_type)?;
73 Ok(Field::new(&field_meta.name, data_type, field_meta.nullable))
74 })
75 .collect();
76
77 Ok(Schema::new(fields?))
78 }
79}
80
81fn datatype_to_string(data_type: &DataType) -> String {
87 match data_type {
88 DataType::Int8 => "Int8".to_string(),
89 DataType::Int16 => "Int16".to_string(),
90 DataType::Int32 => "Int32".to_string(),
91 DataType::Int64 => "Int64".to_string(),
92 DataType::UInt8 => "UInt8".to_string(),
93 DataType::UInt16 => "UInt16".to_string(),
94 DataType::UInt32 => "UInt32".to_string(),
95 DataType::UInt64 => "UInt64".to_string(),
96 DataType::Float32 => "Float32".to_string(),
97 DataType::Float64 => "Float64".to_string(),
98 DataType::Utf8 => "Utf8".to_string(),
99 DataType::Boolean => "Boolean".to_string(),
100 DataType::Binary => "Binary".to_string(),
101 DataType::Date32 => "Date32".to_string(),
102 DataType::Date64 => "Date64".to_string(),
103 DataType::Timestamp(unit, tz) => {
104 format!("Timestamp({:?}, {:?})", unit, tz)
105 }
106 _ => format!("{:?}", data_type), }
108}
109
110fn string_to_datatype(s: &str) -> DbxResult<DataType> {
112 match s {
113 "Int8" => Ok(DataType::Int8),
114 "Int16" => Ok(DataType::Int16),
115 "Int32" => Ok(DataType::Int32),
116 "Int64" => Ok(DataType::Int64),
117 "UInt8" => Ok(DataType::UInt8),
118 "UInt16" => Ok(DataType::UInt16),
119 "UInt32" => Ok(DataType::UInt32),
120 "UInt64" => Ok(DataType::UInt64),
121 "Float32" => Ok(DataType::Float32),
122 "Float64" => Ok(DataType::Float64),
123 "Utf8" => Ok(DataType::Utf8),
124 "Boolean" => Ok(DataType::Boolean),
125 "Binary" => Ok(DataType::Binary),
126 "Date32" => Ok(DataType::Date32),
127 "Date64" => Ok(DataType::Date64),
128 _ => Err(DbxError::Schema(format!("Unsupported data type: {}", s))),
129 }
130}
131
132pub fn save_schema(wos: &NativeWosBackend, table: &str, schema: &Schema) -> DbxResult<()> {
138 let mut metadata = SchemaMetadata::from(schema);
139 metadata.table_name = table.to_string();
140
141 let json_bytes =
142 serde_json::to_vec(&metadata).map_err(|e| DbxError::Serialization(e.to_string()))?;
143
144 wos.insert("__meta__/schemas", table.as_bytes(), &json_bytes)?;
145 Ok(())
146}
147
148pub fn load_schema(wos: &NativeWosBackend, table: &str) -> DbxResult<Option<Arc<Schema>>> {
150 match wos.get("__meta__/schemas", table.as_bytes())? {
151 Some(json_bytes) => {
152 let metadata: SchemaMetadata = serde_json::from_slice(&json_bytes)
153 .map_err(|e| DbxError::Serialization(e.to_string()))?;
154 let schema = Schema::try_from(metadata)?;
155 Ok(Some(Arc::new(schema)))
156 }
157 None => Ok(None),
158 }
159}
160
161pub fn delete_schema(wos: &NativeWosBackend, table: &str) -> DbxResult<()> {
163 wos.delete("__meta__/schemas", table.as_bytes())?;
164 Ok(())
165}
166
167pub fn load_all_schemas(wos: &NativeWosBackend) -> DbxResult<HashMap<String, Arc<Schema>>> {
169 let mut schemas = HashMap::new();
170 let all_records = wos.scan("__meta__/schemas", ..)?;
171
172 for (key_vec, value_vec) in all_records {
173 let table_name =
174 String::from_utf8(key_vec).map_err(|e| DbxError::Serialization(e.to_string()))?;
175 let metadata: SchemaMetadata = serde_json::from_slice(&value_vec)
176 .map_err(|e| DbxError::Serialization(e.to_string()))?;
177 let schema = Schema::try_from(metadata)?;
178 schemas.insert(table_name, Arc::new(schema));
179 }
180
181 Ok(schemas)
182}
183
184pub fn save_index(
190 wos: &NativeWosBackend,
191 index_name: &str,
192 table: &str,
193 column: &str,
194) -> DbxResult<()> {
195 let metadata = IndexMetadata {
196 index_name: index_name.to_string(),
197 table_name: table.to_string(),
198 column_name: column.to_string(),
199 };
200
201 let json_bytes =
202 serde_json::to_vec(&metadata).map_err(|e| DbxError::Serialization(e.to_string()))?;
203
204 wos.insert("__meta__/indexes", index_name.as_bytes(), &json_bytes)?;
205 Ok(())
206}
207
208pub fn delete_index(wos: &NativeWosBackend, index_name: &str) -> DbxResult<()> {
210 wos.delete("__meta__/indexes", index_name.as_bytes())?;
211 Ok(())
212}
213
214pub fn load_all_indexes(wos: &NativeWosBackend) -> DbxResult<HashMap<String, (String, String)>> {
216 let mut indexes = HashMap::new();
217 let all_records = wos.scan("__meta__/indexes", ..)?;
218
219 for (key_vec, value_vec) in all_records {
220 let index_name =
221 String::from_utf8(key_vec).map_err(|e| DbxError::Serialization(e.to_string()))?;
222 let metadata: IndexMetadata = serde_json::from_slice(&value_vec)
223 .map_err(|e| DbxError::Serialization(e.to_string()))?;
224 indexes.insert(index_name, (metadata.table_name, metadata.column_name));
225 }
226
227 Ok(indexes)
228}
229
230pub fn save_trigger(wos: &NativeWosBackend, trigger: &crate::automation::Trigger) -> DbxResult<()> {
236 let json = trigger.to_json()?;
237 wos.insert(
238 "__meta__/triggers",
239 trigger.name.as_bytes(),
240 json.as_bytes(),
241 )?;
242 Ok(())
243}
244
245pub fn load_trigger(
247 wos: &NativeWosBackend,
248 name: &str,
249) -> DbxResult<Option<crate::automation::Trigger>> {
250 match wos.get("__meta__/triggers", name.as_bytes())? {
251 Some(json_bytes) => {
252 let json = String::from_utf8(json_bytes)
253 .map_err(|e| DbxError::Serialization(e.to_string()))?;
254 let trigger = crate::automation::Trigger::from_json(&json)?;
255 Ok(Some(trigger))
256 }
257 None => Ok(None),
258 }
259}
260
261pub fn delete_trigger(wos: &NativeWosBackend, name: &str) -> DbxResult<()> {
263 wos.delete("__meta__/triggers", name.as_bytes())?;
264 Ok(())
265}
266
267pub fn load_all_triggers(wos: &NativeWosBackend) -> DbxResult<Vec<crate::automation::Trigger>> {
269 let mut triggers = Vec::new();
270 let all_records = wos.scan("__meta__/triggers", ..)?;
271
272 for (_key_vec, value_vec) in all_records {
273 let json =
274 String::from_utf8(value_vec).map_err(|e| DbxError::Serialization(e.to_string()))?;
275 let trigger = crate::automation::Trigger::from_json(&json)?;
276 triggers.push(trigger);
277 }
278
279 Ok(triggers)
280}
281
282pub fn save_procedure(
288 wos: &NativeWosBackend,
289 procedure: &crate::automation::StoredProcedure,
290) -> DbxResult<()> {
291 let json = procedure.to_json()?;
292 wos.insert(
293 "__meta__/procedures",
294 procedure.name.as_bytes(),
295 json.as_bytes(),
296 )?;
297 Ok(())
298}
299
300pub fn load_procedure(
302 wos: &NativeWosBackend,
303 name: &str,
304) -> DbxResult<Option<crate::automation::StoredProcedure>> {
305 match wos.get("__meta__/procedures", name.as_bytes())? {
306 Some(json_bytes) => {
307 let json = String::from_utf8(json_bytes)
308 .map_err(|e| DbxError::Serialization(e.to_string()))?;
309 let procedure = crate::automation::StoredProcedure::from_json(&json)?;
310 Ok(Some(procedure))
311 }
312 None => Ok(None),
313 }
314}
315
316pub fn delete_procedure(wos: &NativeWosBackend, name: &str) -> DbxResult<()> {
318 wos.delete("__meta__/procedures", name.as_bytes())?;
319 Ok(())
320}
321
322pub fn load_all_procedures(
324 wos: &NativeWosBackend,
325) -> DbxResult<Vec<crate::automation::StoredProcedure>> {
326 let mut procedures = Vec::new();
327 let all_records = wos.scan("__meta__/procedures", ..)?;
328
329 for (_key_vec, value_vec) in all_records {
330 let json =
331 String::from_utf8(value_vec).map_err(|e| DbxError::Serialization(e.to_string()))?;
332 let procedure = crate::automation::StoredProcedure::from_json(&json)?;
333 procedures.push(procedure);
334 }
335
336 Ok(procedures)
337}
338
339pub fn save_udf(wos: &NativeWosBackend, udf: &crate::automation::UdfMetadata) -> DbxResult<()> {
345 let json = udf.to_json()?;
346 wos.insert("__meta__/udfs", udf.name.as_bytes(), json.as_bytes())?;
347 Ok(())
348}
349
350pub fn load_udf(
352 wos: &NativeWosBackend,
353 name: &str,
354) -> DbxResult<Option<crate::automation::UdfMetadata>> {
355 match wos.get("__meta__/udfs", name.as_bytes())? {
356 Some(json_bytes) => {
357 let json = String::from_utf8(json_bytes)
358 .map_err(|e| DbxError::Serialization(e.to_string()))?;
359 let udf = crate::automation::UdfMetadata::from_json(&json)?;
360 Ok(Some(udf))
361 }
362 None => Ok(None),
363 }
364}
365
366pub fn delete_udf(wos: &NativeWosBackend, name: &str) -> DbxResult<()> {
368 wos.delete("__meta__/udfs", name.as_bytes())?;
369 Ok(())
370}
371
372pub fn load_all_udfs(wos: &NativeWosBackend) -> DbxResult<Vec<crate::automation::UdfMetadata>> {
374 let mut udfs = Vec::new();
375 let all_records = wos.scan("__meta__/udfs", ..)?;
376
377 for (_key_vec, value_vec) in all_records {
378 let json =
379 String::from_utf8(value_vec).map_err(|e| DbxError::Serialization(e.to_string()))?;
380 let udf = crate::automation::UdfMetadata::from_json(&json)?;
381 udfs.push(udf);
382 }
383
384 Ok(udfs)
385}
386
387pub fn save_schedule(
393 wos: &NativeWosBackend,
394 schedule: &crate::automation::Schedule,
395) -> DbxResult<()> {
396 let json = schedule.to_json()?;
397 wos.insert(
398 "__meta__/schedules",
399 schedule.name.as_bytes(),
400 json.as_bytes(),
401 )?;
402 Ok(())
403}
404
405pub fn load_schedule(
407 wos: &NativeWosBackend,
408 name: &str,
409) -> DbxResult<Option<crate::automation::Schedule>> {
410 match wos.get("__meta__/schedules", name.as_bytes())? {
411 Some(bytes) => {
412 let json = String::from_utf8(bytes.to_vec()).map_err(|e| {
413 DbxError::Serialization(format!("Failed to decode schedule JSON: {}", e))
414 })?;
415 let schedule = crate::automation::Schedule::from_json(&json)?;
416 Ok(Some(schedule))
417 }
418 None => Ok(None),
419 }
420}
421
422pub fn delete_schedule(wos: &NativeWosBackend, name: &str) -> DbxResult<()> {
424 wos.delete("__meta__/schedules", name.as_bytes())?;
425 Ok(())
426}
427
428pub fn load_all_schedules(
430 wos: &NativeWosBackend,
431) -> DbxResult<HashMap<String, crate::automation::Schedule>> {
432 let mut schedules = HashMap::new();
433 let all_records = wos.scan("__meta__/schedules", ..)?;
434
435 for (key_vec, value_vec) in all_records {
436 let name =
437 String::from_utf8(key_vec).map_err(|e| DbxError::Serialization(e.to_string()))?;
438
439 let json = String::from_utf8(value_vec).map_err(|e| {
440 DbxError::Serialization(format!("Failed to decode schedule JSON: {}", e))
441 })?;
442
443 let schedule = crate::automation::Schedule::from_json(&json)?;
444 schedules.insert(name, schedule);
445 }
446
447 Ok(schedules)
448}
449
450#[cfg(test)]
455mod tests {
456 use super::*;
457 use arrow::datatypes::{DataType, Field, Schema};
458
459 #[test]
460 fn test_schema_metadata_conversion() {
461 let schema = Schema::new(vec![
462 Field::new("id", DataType::Int64, false),
463 Field::new("name", DataType::Utf8, true),
464 Field::new("age", DataType::Int32, true),
465 ]);
466
467 let metadata = SchemaMetadata::from(&schema);
468 assert_eq!(metadata.fields.len(), 3);
469 assert_eq!(metadata.fields[0].name, "id");
470 assert_eq!(metadata.fields[0].data_type, "Int64");
471 assert!(!metadata.fields[0].nullable);
472
473 let restored_schema = Schema::try_from(metadata).unwrap();
474 assert_eq!(restored_schema.fields().len(), 3);
475 assert_eq!(restored_schema.field(0).name(), "id");
476 assert_eq!(restored_schema.field(0).data_type(), &DataType::Int64);
477 }
478
479 #[test]
480 fn test_schema_persistence() {
481 let wos = NativeWosBackend::open_temporary().unwrap();
482 let schema = Arc::new(Schema::new(vec![
483 Field::new("id", DataType::Int64, false),
484 Field::new("name", DataType::Utf8, true),
485 ]));
486
487 save_schema(&wos, "users", &schema).unwrap();
489
490 let loaded = load_schema(&wos, "users").unwrap();
492 assert!(loaded.is_some());
493 let loaded_schema = loaded.unwrap();
494 assert_eq!(loaded_schema.fields().len(), 2);
495 assert_eq!(loaded_schema.field(0).name(), "id");
496 assert_eq!(loaded_schema.field(1).name(), "name");
497
498 delete_schema(&wos, "users").unwrap();
500 let deleted = load_schema(&wos, "users").unwrap();
501 assert!(deleted.is_none());
502 }
503
504 #[test]
505 fn test_load_all_schemas() {
506 let wos = NativeWosBackend::open_temporary().unwrap();
507
508 let schema1 = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
510 let schema2 = Arc::new(Schema::new(vec![Field::new("name", DataType::Utf8, true)]));
511
512 save_schema(&wos, "users", &schema1).unwrap();
513 save_schema(&wos, "products", &schema2).unwrap();
514
515 let all_schemas = load_all_schemas(&wos).unwrap();
517 assert_eq!(all_schemas.len(), 2);
518 assert!(all_schemas.contains_key("users"));
519 assert!(all_schemas.contains_key("products"));
520 }
521
522 #[test]
523 fn test_index_persistence() {
524 let wos = NativeWosBackend::open_temporary().unwrap();
525
526 save_index(&wos, "idx_name", "users", "name").unwrap();
528
529 let indexes = load_all_indexes(&wos).unwrap();
531 assert_eq!(indexes.len(), 1);
532 assert_eq!(
533 indexes.get("idx_name"),
534 Some(&("users".to_string(), "name".to_string()))
535 );
536
537 delete_index(&wos, "idx_name").unwrap();
539 let deleted = load_all_indexes(&wos).unwrap();
540 assert!(deleted.is_empty());
541 }
542}