1use crate::error::{DbxError, DbxResult};
7use crate::storage::StorageBackend;
8use crate::storage::native_wos::NativeWosBackend;
9use arrow::datatypes::{DataType, Field, Schema};
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::sync::Arc;
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct SchemaMetadata {
21 pub table_name: String,
22 pub fields: Vec<FieldMetadata>,
23 #[serde(default)]
24 pub metadata: HashMap<String, String>,
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct FieldMetadata {
30 pub name: String,
31 pub data_type: String,
32 pub nullable: bool,
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct IndexMetadata {
38 pub index_name: String,
39 pub table_name: String,
40 pub column_name: String,
41}
42
43impl From<&Schema> for SchemaMetadata {
48 fn from(schema: &Schema) -> Self {
49 let fields = schema
50 .fields()
51 .iter()
52 .map(|field| FieldMetadata {
53 name: field.name().clone(),
54 data_type: datatype_to_string(field.data_type()),
55 nullable: field.is_nullable(),
56 })
57 .collect();
58
59 SchemaMetadata {
60 table_name: String::new(), fields,
62 metadata: schema.metadata().clone(),
63 }
64 }
65}
66
67impl TryFrom<SchemaMetadata> for Schema {
68 type Error = DbxError;
69
70 fn try_from(metadata: SchemaMetadata) -> Result<Self, Self::Error> {
71 let fields: Result<Vec<Field>, DbxError> = metadata
72 .fields
73 .iter()
74 .map(|field_meta| {
75 let data_type = string_to_datatype(&field_meta.data_type)?;
76 Ok(Field::new(&field_meta.name, data_type, field_meta.nullable))
77 })
78 .collect();
79
80 Ok(Schema::new(fields?).with_metadata(metadata.metadata))
81 }
82}
83
84fn datatype_to_string(data_type: &DataType) -> String {
90 match data_type {
91 DataType::Int8 => "Int8".to_string(),
92 DataType::Int16 => "Int16".to_string(),
93 DataType::Int32 => "Int32".to_string(),
94 DataType::Int64 => "Int64".to_string(),
95 DataType::UInt8 => "UInt8".to_string(),
96 DataType::UInt16 => "UInt16".to_string(),
97 DataType::UInt32 => "UInt32".to_string(),
98 DataType::UInt64 => "UInt64".to_string(),
99 DataType::Float32 => "Float32".to_string(),
100 DataType::Float64 => "Float64".to_string(),
101 DataType::Utf8 => "Utf8".to_string(),
102 DataType::Boolean => "Boolean".to_string(),
103 DataType::Binary => "Binary".to_string(),
104 DataType::Date32 => "Date32".to_string(),
105 DataType::Date64 => "Date64".to_string(),
106 DataType::Timestamp(unit, tz) => {
107 format!("Timestamp({:?}, {:?})", unit, tz)
108 }
109 _ => format!("{:?}", data_type), }
111}
112
113fn string_to_datatype(s: &str) -> DbxResult<DataType> {
115 match s {
116 "Int8" => Ok(DataType::Int8),
117 "Int16" => Ok(DataType::Int16),
118 "Int32" => Ok(DataType::Int32),
119 "Int64" => Ok(DataType::Int64),
120 "UInt8" => Ok(DataType::UInt8),
121 "UInt16" => Ok(DataType::UInt16),
122 "UInt32" => Ok(DataType::UInt32),
123 "UInt64" => Ok(DataType::UInt64),
124 "Float32" => Ok(DataType::Float32),
125 "Float64" => Ok(DataType::Float64),
126 "Utf8" => Ok(DataType::Utf8),
127 "Boolean" => Ok(DataType::Boolean),
128 "Binary" => Ok(DataType::Binary),
129 "Date32" => Ok(DataType::Date32),
130 "Date64" => Ok(DataType::Date64),
131 _ => Err(DbxError::Schema(format!("Unsupported data type: {}", s))),
132 }
133}
134
135pub fn save_schema(wos: &NativeWosBackend, table: &str, schema: &Schema) -> DbxResult<()> {
141 let mut metadata = SchemaMetadata::from(schema);
142 metadata.table_name = table.to_string();
143
144 let json_bytes =
145 serde_json::to_vec(&metadata).map_err(|e| DbxError::Serialization(e.to_string()))?;
146
147 wos.insert("__meta__/schemas", table.as_bytes(), &json_bytes)?;
148 Ok(())
149}
150
151pub fn load_schema(wos: &NativeWosBackend, table: &str) -> DbxResult<Option<Arc<Schema>>> {
153 match wos.get("__meta__/schemas", table.as_bytes())? {
154 Some(json_bytes) => {
155 let metadata: SchemaMetadata = serde_json::from_slice(&json_bytes)
156 .map_err(|e| DbxError::Serialization(e.to_string()))?;
157 let schema = Schema::try_from(metadata)?;
158 Ok(Some(Arc::new(schema)))
159 }
160 None => Ok(None),
161 }
162}
163
164pub fn delete_schema(wos: &NativeWosBackend, table: &str) -> DbxResult<()> {
166 wos.delete("__meta__/schemas", table.as_bytes())?;
167 Ok(())
168}
169
170pub fn load_all_schemas(wos: &NativeWosBackend) -> DbxResult<HashMap<String, Arc<Schema>>> {
172 let mut schemas = HashMap::new();
173 let all_records = wos.scan("__meta__/schemas", ..)?;
174
175 for (key_vec, value_vec) in all_records {
176 let table_name =
177 String::from_utf8(key_vec).map_err(|e| DbxError::Serialization(e.to_string()))?;
178 let metadata: SchemaMetadata = serde_json::from_slice(&value_vec)
179 .map_err(|e| DbxError::Serialization(e.to_string()))?;
180 let schema = Schema::try_from(metadata)?;
181 schemas.insert(table_name, Arc::new(schema));
182 }
183
184 Ok(schemas)
185}
186
187pub fn save_index(
193 wos: &NativeWosBackend,
194 index_name: &str,
195 table: &str,
196 column: &str,
197) -> DbxResult<()> {
198 let metadata = IndexMetadata {
199 index_name: index_name.to_string(),
200 table_name: table.to_string(),
201 column_name: column.to_string(),
202 };
203
204 let json_bytes =
205 serde_json::to_vec(&metadata).map_err(|e| DbxError::Serialization(e.to_string()))?;
206
207 wos.insert("__meta__/indexes", index_name.as_bytes(), &json_bytes)?;
208 Ok(())
209}
210
211pub fn delete_index(wos: &NativeWosBackend, index_name: &str) -> DbxResult<()> {
213 wos.delete("__meta__/indexes", index_name.as_bytes())?;
214 Ok(())
215}
216
217pub fn load_all_indexes(wos: &NativeWosBackend) -> DbxResult<HashMap<String, (String, String)>> {
219 let mut indexes = HashMap::new();
220 let all_records = wos.scan("__meta__/indexes", ..)?;
221
222 for (key_vec, value_vec) in all_records {
223 let index_name =
224 String::from_utf8(key_vec).map_err(|e| DbxError::Serialization(e.to_string()))?;
225 let metadata: IndexMetadata = serde_json::from_slice(&value_vec)
226 .map_err(|e| DbxError::Serialization(e.to_string()))?;
227 indexes.insert(index_name, (metadata.table_name, metadata.column_name));
228 }
229
230 Ok(indexes)
231}
232
233pub fn save_trigger(wos: &NativeWosBackend, trigger: &crate::automation::Trigger) -> DbxResult<()> {
239 let json = trigger.to_json()?;
240 wos.insert(
241 "__meta__/triggers",
242 trigger.name.as_bytes(),
243 json.as_bytes(),
244 )?;
245 Ok(())
246}
247
248pub fn load_trigger(
250 wos: &NativeWosBackend,
251 name: &str,
252) -> DbxResult<Option<crate::automation::Trigger>> {
253 match wos.get("__meta__/triggers", name.as_bytes())? {
254 Some(json_bytes) => {
255 let json = String::from_utf8(json_bytes)
256 .map_err(|e| DbxError::Serialization(e.to_string()))?;
257 let trigger = crate::automation::Trigger::from_json(&json)?;
258 Ok(Some(trigger))
259 }
260 None => Ok(None),
261 }
262}
263
264pub fn delete_trigger(wos: &NativeWosBackend, name: &str) -> DbxResult<()> {
266 wos.delete("__meta__/triggers", name.as_bytes())?;
267 Ok(())
268}
269
270pub fn load_all_triggers(wos: &NativeWosBackend) -> DbxResult<Vec<crate::automation::Trigger>> {
272 let mut triggers = Vec::new();
273 let all_records = wos.scan("__meta__/triggers", ..)?;
274
275 for (_key_vec, value_vec) in all_records {
276 let json =
277 String::from_utf8(value_vec).map_err(|e| DbxError::Serialization(e.to_string()))?;
278 let trigger = crate::automation::Trigger::from_json(&json)?;
279 triggers.push(trigger);
280 }
281
282 Ok(triggers)
283}
284
285pub fn save_procedure(
291 wos: &NativeWosBackend,
292 procedure: &crate::automation::StoredProcedure,
293) -> DbxResult<()> {
294 let json = procedure.to_json()?;
295 wos.insert(
296 "__meta__/procedures",
297 procedure.name.as_bytes(),
298 json.as_bytes(),
299 )?;
300 Ok(())
301}
302
303pub fn load_procedure(
305 wos: &NativeWosBackend,
306 name: &str,
307) -> DbxResult<Option<crate::automation::StoredProcedure>> {
308 match wos.get("__meta__/procedures", name.as_bytes())? {
309 Some(json_bytes) => {
310 let json = String::from_utf8(json_bytes)
311 .map_err(|e| DbxError::Serialization(e.to_string()))?;
312 let procedure = crate::automation::StoredProcedure::from_json(&json)?;
313 Ok(Some(procedure))
314 }
315 None => Ok(None),
316 }
317}
318
319pub fn delete_procedure(wos: &NativeWosBackend, name: &str) -> DbxResult<()> {
321 wos.delete("__meta__/procedures", name.as_bytes())?;
322 Ok(())
323}
324
325pub fn load_all_procedures(
327 wos: &NativeWosBackend,
328) -> DbxResult<Vec<crate::automation::StoredProcedure>> {
329 let mut procedures = Vec::new();
330 let all_records = wos.scan("__meta__/procedures", ..)?;
331
332 for (_key_vec, value_vec) in all_records {
333 let json =
334 String::from_utf8(value_vec).map_err(|e| DbxError::Serialization(e.to_string()))?;
335 let procedure = crate::automation::StoredProcedure::from_json(&json)?;
336 procedures.push(procedure);
337 }
338
339 Ok(procedures)
340}
341
342pub fn save_udf(wos: &NativeWosBackend, udf: &crate::automation::UdfMetadata) -> DbxResult<()> {
348 let json = udf.to_json()?;
349 wos.insert("__meta__/udfs", udf.name.as_bytes(), json.as_bytes())?;
350 Ok(())
351}
352
353pub fn load_udf(
355 wos: &NativeWosBackend,
356 name: &str,
357) -> DbxResult<Option<crate::automation::UdfMetadata>> {
358 match wos.get("__meta__/udfs", name.as_bytes())? {
359 Some(json_bytes) => {
360 let json = String::from_utf8(json_bytes)
361 .map_err(|e| DbxError::Serialization(e.to_string()))?;
362 let udf = crate::automation::UdfMetadata::from_json(&json)?;
363 Ok(Some(udf))
364 }
365 None => Ok(None),
366 }
367}
368
369pub fn delete_udf(wos: &NativeWosBackend, name: &str) -> DbxResult<()> {
371 wos.delete("__meta__/udfs", name.as_bytes())?;
372 Ok(())
373}
374
375pub fn load_all_udfs(wos: &NativeWosBackend) -> DbxResult<Vec<crate::automation::UdfMetadata>> {
377 let mut udfs = Vec::new();
378 let all_records = wos.scan("__meta__/udfs", ..)?;
379
380 for (_key_vec, value_vec) in all_records {
381 let json =
382 String::from_utf8(value_vec).map_err(|e| DbxError::Serialization(e.to_string()))?;
383 let udf = crate::automation::UdfMetadata::from_json(&json)?;
384 udfs.push(udf);
385 }
386
387 Ok(udfs)
388}
389
390pub fn save_schedule(
396 wos: &NativeWosBackend,
397 schedule: &crate::automation::Schedule,
398) -> DbxResult<()> {
399 let json = schedule.to_json()?;
400 wos.insert(
401 "__meta__/schedules",
402 schedule.name.as_bytes(),
403 json.as_bytes(),
404 )?;
405 Ok(())
406}
407
408pub fn load_schedule(
410 wos: &NativeWosBackend,
411 name: &str,
412) -> DbxResult<Option<crate::automation::Schedule>> {
413 match wos.get("__meta__/schedules", name.as_bytes())? {
414 Some(bytes) => {
415 let json = String::from_utf8(bytes.to_vec()).map_err(|e| {
416 DbxError::Serialization(format!("Failed to decode schedule JSON: {}", e))
417 })?;
418 let schedule = crate::automation::Schedule::from_json(&json)?;
419 Ok(Some(schedule))
420 }
421 None => Ok(None),
422 }
423}
424
425pub fn delete_schedule(wos: &NativeWosBackend, name: &str) -> DbxResult<()> {
427 wos.delete("__meta__/schedules", name.as_bytes())?;
428 Ok(())
429}
430
431pub fn load_all_schedules(
433 wos: &NativeWosBackend,
434) -> DbxResult<HashMap<String, crate::automation::Schedule>> {
435 let mut schedules = HashMap::new();
436 let all_records = wos.scan("__meta__/schedules", ..)?;
437
438 for (key_vec, value_vec) in all_records {
439 let name =
440 String::from_utf8(key_vec).map_err(|e| DbxError::Serialization(e.to_string()))?;
441
442 let json = String::from_utf8(value_vec).map_err(|e| {
443 DbxError::Serialization(format!("Failed to decode schedule JSON: {}", e))
444 })?;
445
446 let schedule = crate::automation::Schedule::from_json(&json)?;
447 schedules.insert(name, schedule);
448 }
449
450 Ok(schedules)
451}
452
453#[cfg(test)]
458mod tests {
459 use super::*;
460 use arrow::datatypes::{DataType, Field, Schema};
461
462 #[test]
463 fn test_schema_metadata_conversion() {
464 let schema = Schema::new(vec![
465 Field::new("id", DataType::Int64, false),
466 Field::new("name", DataType::Utf8, true),
467 Field::new("age", DataType::Int32, true),
468 ]);
469
470 let metadata = SchemaMetadata::from(&schema);
471 assert_eq!(metadata.fields.len(), 3);
472 assert_eq!(metadata.fields[0].name, "id");
473 assert_eq!(metadata.fields[0].data_type, "Int64");
474 assert!(!metadata.fields[0].nullable);
475
476 let restored_schema = Schema::try_from(metadata).unwrap();
477 assert_eq!(restored_schema.fields().len(), 3);
478 assert_eq!(restored_schema.field(0).name(), "id");
479 assert_eq!(restored_schema.field(0).data_type(), &DataType::Int64);
480 }
481
482 #[test]
483 fn test_schema_persistence() {
484 let wos = NativeWosBackend::open_temporary().unwrap();
485 let schema = Arc::new(Schema::new(vec![
486 Field::new("id", DataType::Int64, false),
487 Field::new("name", DataType::Utf8, true),
488 ]));
489
490 save_schema(&wos, "users", &schema).unwrap();
492
493 let loaded = load_schema(&wos, "users").unwrap();
495 assert!(loaded.is_some());
496 let loaded_schema = loaded.unwrap();
497 assert_eq!(loaded_schema.fields().len(), 2);
498 assert_eq!(loaded_schema.field(0).name(), "id");
499 assert_eq!(loaded_schema.field(1).name(), "name");
500
501 delete_schema(&wos, "users").unwrap();
503 let deleted = load_schema(&wos, "users").unwrap();
504 assert!(deleted.is_none());
505 }
506
507 #[test]
508 fn test_load_all_schemas() {
509 let wos = NativeWosBackend::open_temporary().unwrap();
510
511 let schema1 = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
513 let schema2 = Arc::new(Schema::new(vec![Field::new("name", DataType::Utf8, true)]));
514
515 save_schema(&wos, "users", &schema1).unwrap();
516 save_schema(&wos, "products", &schema2).unwrap();
517
518 let all_schemas = load_all_schemas(&wos).unwrap();
520 assert_eq!(all_schemas.len(), 2);
521 assert!(all_schemas.contains_key("users"));
522 assert!(all_schemas.contains_key("products"));
523 }
524
525 #[test]
526 fn test_index_persistence() {
527 let wos = NativeWosBackend::open_temporary().unwrap();
528
529 save_index(&wos, "idx_name", "users", "name").unwrap();
531
532 let indexes = load_all_indexes(&wos).unwrap();
534 assert_eq!(indexes.len(), 1);
535 assert_eq!(
536 indexes.get("idx_name"),
537 Some(&("users".to_string(), "name".to_string()))
538 );
539
540 delete_index(&wos, "idx_name").unwrap();
542 let deleted = load_all_indexes(&wos).unwrap();
543 assert!(deleted.is_empty());
544 }
545}