1use std::collections::HashMap;
15
16use serde::{Deserialize, Serialize};
17
18use crate::{AdapterError, SymbolTable};
19
20pub trait SchemaDatabase {
24 fn store_schema(&mut self, name: &str, table: &SymbolTable) -> Result<SchemaId, AdapterError>;
28
29 fn load_schema(&self, id: SchemaId) -> Result<SymbolTable, AdapterError>;
31
32 fn load_schema_by_name(&self, name: &str) -> Result<SymbolTable, AdapterError>;
34
35 fn list_schemas(&self) -> Result<Vec<SchemaMetadata>, AdapterError>;
37
38 fn delete_schema(&mut self, id: SchemaId) -> Result<(), AdapterError>;
40
41 fn search_schemas(&self, pattern: &str) -> Result<Vec<SchemaMetadata>, AdapterError>;
43
44 fn get_schema_history(&self, name: &str) -> Result<Vec<SchemaVersion>, AdapterError>;
46}
47
48#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
50pub struct SchemaId(pub u64);
51
52#[derive(Clone, Debug, Serialize, Deserialize)]
54pub struct SchemaMetadata {
55 pub id: SchemaId,
57 pub name: String,
59 pub version: u32,
61 pub created_at: u64,
63 pub updated_at: u64,
65 pub num_domains: usize,
67 pub num_predicates: usize,
69 pub num_variables: usize,
71 pub description: Option<String>,
73}
74
75#[derive(Clone, Debug, Serialize, Deserialize)]
77pub struct SchemaVersion {
78 pub version: u32,
80 pub timestamp: u64,
82 pub description: String,
84 pub schema_id: SchemaId,
86}
87
88pub struct MemoryDatabase {
97 schemas: HashMap<SchemaId, StoredSchema>,
98 next_id: u64,
99 name_index: HashMap<String, Vec<SchemaId>>,
100}
101
102#[derive(Clone, Debug, Serialize, Deserialize)]
103struct StoredSchema {
104 id: SchemaId,
105 name: String,
106 version: u32,
107 table: SymbolTable,
108 created_at: u64,
109 updated_at: u64,
110 description: Option<String>,
111}
112
113impl MemoryDatabase {
114 pub fn new() -> Self {
116 Self {
117 schemas: HashMap::new(),
118 next_id: 1,
119 name_index: HashMap::new(),
120 }
121 }
122
123 fn current_timestamp() -> u64 {
125 std::time::SystemTime::now()
126 .duration_since(std::time::UNIX_EPOCH)
127 .unwrap()
128 .as_secs()
129 }
130
131 fn find_latest_version(&self, name: &str) -> Option<SchemaId> {
133 self.name_index.get(name).and_then(|ids| {
134 ids.iter()
135 .filter_map(|id| self.schemas.get(id))
136 .max_by_key(|s| s.version)
137 .map(|s| s.id)
138 })
139 }
140}
141
142impl Default for MemoryDatabase {
143 fn default() -> Self {
144 Self::new()
145 }
146}
147
148impl SchemaDatabase for MemoryDatabase {
149 fn store_schema(&mut self, name: &str, table: &SymbolTable) -> Result<SchemaId, AdapterError> {
150 let now = Self::current_timestamp();
151
152 let version = if let Some(existing_id) = self.find_latest_version(name) {
154 if let Some(existing) = self.schemas.get(&existing_id) {
155 existing.version + 1
156 } else {
157 1
158 }
159 } else {
160 1
161 };
162
163 let id = SchemaId(self.next_id);
164 self.next_id += 1;
165
166 let stored = StoredSchema {
167 id,
168 name: name.to_string(),
169 version,
170 table: table.clone(),
171 created_at: now,
172 updated_at: now,
173 description: None,
174 };
175
176 self.schemas.insert(id, stored);
177
178 self.name_index
180 .entry(name.to_string())
181 .or_default()
182 .push(id);
183
184 Ok(id)
185 }
186
187 fn load_schema(&self, id: SchemaId) -> Result<SymbolTable, AdapterError> {
188 self.schemas
189 .get(&id)
190 .map(|s| s.table.clone())
191 .ok_or_else(|| {
192 AdapterError::InvalidOperation(format!("Schema with ID {:?} not found", id))
193 })
194 }
195
196 fn load_schema_by_name(&self, name: &str) -> Result<SymbolTable, AdapterError> {
197 let id = self.find_latest_version(name).ok_or_else(|| {
198 AdapterError::InvalidOperation(format!("Schema '{}' not found", name))
199 })?;
200
201 self.load_schema(id)
202 }
203
204 fn list_schemas(&self) -> Result<Vec<SchemaMetadata>, AdapterError> {
205 let mut metadata: Vec<SchemaMetadata> = self
206 .schemas
207 .values()
208 .map(|s| SchemaMetadata {
209 id: s.id,
210 name: s.name.clone(),
211 version: s.version,
212 created_at: s.created_at,
213 updated_at: s.updated_at,
214 num_domains: s.table.domains.len(),
215 num_predicates: s.table.predicates.len(),
216 num_variables: s.table.variables.len(),
217 description: s.description.clone(),
218 })
219 .collect();
220
221 metadata.sort_by_key(|m| m.name.clone());
222 Ok(metadata)
223 }
224
225 fn delete_schema(&mut self, id: SchemaId) -> Result<(), AdapterError> {
226 if let Some(schema) = self.schemas.remove(&id) {
227 if let Some(ids) = self.name_index.get_mut(&schema.name) {
229 ids.retain(|&i| i != id);
230 if ids.is_empty() {
231 self.name_index.remove(&schema.name);
232 }
233 }
234 Ok(())
235 } else {
236 Err(AdapterError::InvalidOperation(format!(
237 "Schema with ID {:?} not found",
238 id
239 )))
240 }
241 }
242
243 fn search_schemas(&self, pattern: &str) -> Result<Vec<SchemaMetadata>, AdapterError> {
244 let pattern_lower = pattern.to_lowercase();
245 let mut results: Vec<SchemaMetadata> = self
246 .schemas
247 .values()
248 .filter(|s| s.name.to_lowercase().contains(&pattern_lower))
249 .map(|s| SchemaMetadata {
250 id: s.id,
251 name: s.name.clone(),
252 version: s.version,
253 created_at: s.created_at,
254 updated_at: s.updated_at,
255 num_domains: s.table.domains.len(),
256 num_predicates: s.table.predicates.len(),
257 num_variables: s.table.variables.len(),
258 description: s.description.clone(),
259 })
260 .collect();
261
262 results.sort_by_key(|m| m.name.clone());
263 Ok(results)
264 }
265
266 fn get_schema_history(&self, name: &str) -> Result<Vec<SchemaVersion>, AdapterError> {
267 let ids = self.name_index.get(name).ok_or_else(|| {
268 AdapterError::InvalidOperation(format!("Schema '{}' not found", name))
269 })?;
270
271 let mut versions: Vec<SchemaVersion> = ids
272 .iter()
273 .filter_map(|id| {
274 self.schemas.get(id).map(|s| SchemaVersion {
275 version: s.version,
276 timestamp: s.created_at,
277 description: format!("Version {}", s.version),
278 schema_id: s.id,
279 })
280 })
281 .collect();
282
283 versions.sort_by_key(|v| v.version);
284 Ok(versions)
285 }
286}
287
288pub struct SchemaDatabaseSQL;
294
295impl SchemaDatabaseSQL {
296 pub fn create_tables_sql() -> Vec<String> {
298 vec![
299 r#"
301 CREATE TABLE IF NOT EXISTS schemas (
302 id INTEGER PRIMARY KEY AUTOINCREMENT,
303 name TEXT NOT NULL,
304 version INTEGER NOT NULL DEFAULT 1,
305 created_at INTEGER NOT NULL,
306 updated_at INTEGER NOT NULL,
307 description TEXT,
308 UNIQUE(name, version)
309 )
310 "#
311 .to_string(),
312 r#"
314 CREATE TABLE IF NOT EXISTS domains (
315 id INTEGER PRIMARY KEY AUTOINCREMENT,
316 schema_id INTEGER NOT NULL,
317 name TEXT NOT NULL,
318 cardinality INTEGER NOT NULL,
319 description TEXT,
320 metadata TEXT,
321 FOREIGN KEY (schema_id) REFERENCES schemas(id) ON DELETE CASCADE,
322 UNIQUE(schema_id, name)
323 )
324 "#
325 .to_string(),
326 r#"
328 CREATE TABLE IF NOT EXISTS predicates (
329 id INTEGER PRIMARY KEY AUTOINCREMENT,
330 schema_id INTEGER NOT NULL,
331 name TEXT NOT NULL,
332 arity INTEGER NOT NULL,
333 description TEXT,
334 constraints TEXT,
335 metadata TEXT,
336 FOREIGN KEY (schema_id) REFERENCES schemas(id) ON DELETE CASCADE,
337 UNIQUE(schema_id, name)
338 )
339 "#
340 .to_string(),
341 r#"
343 CREATE TABLE IF NOT EXISTS predicate_arguments (
344 id INTEGER PRIMARY KEY AUTOINCREMENT,
345 predicate_id INTEGER NOT NULL,
346 position INTEGER NOT NULL,
347 domain_name TEXT NOT NULL,
348 FOREIGN KEY (predicate_id) REFERENCES predicates(id) ON DELETE CASCADE,
349 UNIQUE(predicate_id, position)
350 )
351 "#
352 .to_string(),
353 r#"
355 CREATE TABLE IF NOT EXISTS variables (
356 id INTEGER PRIMARY KEY AUTOINCREMENT,
357 schema_id INTEGER NOT NULL,
358 name TEXT NOT NULL,
359 domain_name TEXT NOT NULL,
360 FOREIGN KEY (schema_id) REFERENCES schemas(id) ON DELETE CASCADE,
361 UNIQUE(schema_id, name)
362 )
363 "#
364 .to_string(),
365 "CREATE INDEX IF NOT EXISTS idx_schemas_name ON schemas(name)".to_string(),
367 "CREATE INDEX IF NOT EXISTS idx_domains_schema ON domains(schema_id)".to_string(),
368 "CREATE INDEX IF NOT EXISTS idx_predicates_schema ON predicates(schema_id)".to_string(),
369 ]
370 }
371
372 pub fn insert_domain_sql() -> &'static str {
374 r#"
375 INSERT INTO domains (schema_id, name, cardinality, description, metadata)
376 VALUES (?, ?, ?, ?, ?)
377 "#
378 }
379
380 pub fn insert_predicate_sql() -> &'static str {
382 r#"
383 INSERT INTO predicates (schema_id, name, arity, description, constraints, metadata)
384 VALUES (?, ?, ?, ?, ?, ?)
385 "#
386 }
387
388 pub fn insert_predicate_arg_sql() -> &'static str {
390 r#"
391 INSERT INTO predicate_arguments (predicate_id, position, domain_name)
392 VALUES (?, ?, ?)
393 "#
394 }
395
396 pub fn insert_variable_sql() -> &'static str {
398 r#"
399 INSERT INTO variables (schema_id, name, domain_name)
400 VALUES (?, ?, ?)
401 "#
402 }
403
404 pub fn select_schema_sql() -> &'static str {
406 "SELECT id, name, version, created_at, updated_at, description FROM schemas WHERE id = ?"
407 }
408
409 pub fn select_domains_sql() -> &'static str {
411 "SELECT name, cardinality, description, metadata FROM domains WHERE schema_id = ?"
412 }
413
414 pub fn select_predicates_sql() -> &'static str {
416 "SELECT id, name, arity, description, constraints, metadata FROM predicates WHERE schema_id = ?"
417 }
418
419 pub fn select_predicate_args_sql() -> &'static str {
421 "SELECT position, domain_name FROM predicate_arguments WHERE predicate_id = ? ORDER BY position"
422 }
423}
424
425#[derive(Clone, Debug)]
427pub struct DatabaseStats {
428 pub total_schemas: usize,
430 pub total_domains: usize,
432 pub total_predicates: usize,
434 pub size_bytes: Option<usize>,
436}
437
438impl DatabaseStats {
439 pub fn new() -> Self {
441 Self {
442 total_schemas: 0,
443 total_domains: 0,
444 total_predicates: 0,
445 size_bytes: None,
446 }
447 }
448
449 pub fn from_database<D: SchemaDatabase>(db: &D) -> Result<Self, AdapterError> {
451 let schemas = db.list_schemas()?;
452 let total_schemas = schemas.len();
453 let total_domains: usize = schemas.iter().map(|s| s.num_domains).sum();
454 let total_predicates: usize = schemas.iter().map(|s| s.num_predicates).sum();
455
456 Ok(Self {
457 total_schemas,
458 total_domains,
459 total_predicates,
460 size_bytes: None,
461 })
462 }
463
464 pub fn avg_domains_per_schema(&self) -> f64 {
466 if self.total_schemas == 0 {
467 0.0
468 } else {
469 self.total_domains as f64 / self.total_schemas as f64
470 }
471 }
472
473 pub fn avg_predicates_per_schema(&self) -> f64 {
475 if self.total_schemas == 0 {
476 0.0
477 } else {
478 self.total_predicates as f64 / self.total_schemas as f64
479 }
480 }
481}
482
483impl Default for DatabaseStats {
484 fn default() -> Self {
485 Self::new()
486 }
487}
488
489#[cfg(feature = "sqlite")]
494mod sqlite_backend {
495 use super::*;
496 use crate::{DomainInfo, PredicateInfo};
497 use rusqlite::{params, Connection, Result as SqliteResult};
498
499 pub struct SQLiteDatabase {
520 conn: Connection,
521 }
522
523 impl SQLiteDatabase {
524 pub fn new(path: &str) -> Result<Self, AdapterError> {
528 let conn = Connection::open(path).map_err(|e| {
529 AdapterError::InvalidOperation(format!("Failed to open SQLite database: {}", e))
530 })?;
531
532 let mut db = Self { conn };
533 db.initialize_schema()?;
534 Ok(db)
535 }
536
537 fn initialize_schema(&mut self) -> Result<(), AdapterError> {
539 for sql in SchemaDatabaseSQL::create_tables_sql() {
540 self.conn.execute(&sql, []).map_err(|e| {
541 AdapterError::InvalidOperation(format!("Failed to create tables: {}", e))
542 })?;
543 }
544 Ok(())
545 }
546
547 fn current_timestamp() -> u64 {
549 std::time::SystemTime::now()
550 .duration_since(std::time::UNIX_EPOCH)
551 .unwrap()
552 .as_secs()
553 }
554
555 fn store_schema_metadata(&mut self, name: &str) -> Result<(i64, u32), AdapterError> {
557 let now = Self::current_timestamp() as i64;
558
559 let existing_version: Option<u32> = self
561 .conn
562 .query_row(
563 "SELECT MAX(version) FROM schemas WHERE name = ?1",
564 params![name],
565 |row| row.get(0),
566 )
567 .ok()
568 .flatten();
569
570 let version = existing_version.map(|v| v + 1).unwrap_or(1);
571
572 self.conn
573 .execute(
574 "INSERT INTO schemas (name, version, created_at, updated_at) VALUES (?1, ?2, ?3, ?4)",
575 params![name, version, now, now],
576 )
577 .map_err(|e| {
578 AdapterError::InvalidOperation(format!("Failed to insert schema: {}", e))
579 })?;
580
581 let schema_id = self.conn.last_insert_rowid();
582 Ok((schema_id, version))
583 }
584
585 fn store_domains(
587 &mut self,
588 schema_id: i64,
589 table: &SymbolTable,
590 ) -> Result<(), AdapterError> {
591 for (name, domain) in &table.domains {
592 let metadata_json = serde_json::to_string(&domain.metadata).ok();
593 self.conn
594 .execute(
595 SchemaDatabaseSQL::insert_domain_sql(),
596 params![
597 schema_id,
598 name,
599 domain.cardinality as i64,
600 domain.description.as_ref(),
601 metadata_json
602 ],
603 )
604 .map_err(|e| {
605 AdapterError::InvalidOperation(format!("Failed to insert domain: {}", e))
606 })?;
607 }
608 Ok(())
609 }
610
611 fn store_predicates(
613 &mut self,
614 schema_id: i64,
615 table: &SymbolTable,
616 ) -> Result<(), AdapterError> {
617 for (name, predicate) in &table.predicates {
618 let constraints_json = serde_json::to_string(&predicate.constraints).ok();
619 let metadata_json = serde_json::to_string(&predicate.metadata).ok();
620
621 self.conn
622 .execute(
623 SchemaDatabaseSQL::insert_predicate_sql(),
624 params![
625 schema_id,
626 name,
627 predicate.arg_domains.len() as i64,
628 predicate.description.as_ref(),
629 constraints_json,
630 metadata_json
631 ],
632 )
633 .map_err(|e| {
634 AdapterError::InvalidOperation(format!("Failed to insert predicate: {}", e))
635 })?;
636
637 let predicate_id = self.conn.last_insert_rowid();
638
639 for (position, domain_name) in predicate.arg_domains.iter().enumerate() {
641 self.conn
642 .execute(
643 SchemaDatabaseSQL::insert_predicate_arg_sql(),
644 params![predicate_id, position as i64, domain_name],
645 )
646 .map_err(|e| {
647 AdapterError::InvalidOperation(format!(
648 "Failed to insert predicate argument: {}",
649 e
650 ))
651 })?;
652 }
653 }
654 Ok(())
655 }
656
657 fn store_variables(
659 &mut self,
660 schema_id: i64,
661 table: &SymbolTable,
662 ) -> Result<(), AdapterError> {
663 for (var_name, domain_name) in &table.variables {
664 self.conn
665 .execute(
666 SchemaDatabaseSQL::insert_variable_sql(),
667 params![schema_id, var_name, domain_name],
668 )
669 .map_err(|e| {
670 AdapterError::InvalidOperation(format!("Failed to insert variable: {}", e))
671 })?;
672 }
673 Ok(())
674 }
675
676 fn load_domains(
678 &self,
679 schema_id: i64,
680 ) -> Result<indexmap::IndexMap<String, DomainInfo>, AdapterError> {
681 let mut stmt = self
682 .conn
683 .prepare(SchemaDatabaseSQL::select_domains_sql())
684 .map_err(|e| {
685 AdapterError::InvalidOperation(format!("Failed to prepare query: {}", e))
686 })?;
687
688 let domains = stmt
689 .query_map(params![schema_id], |row| {
690 let name: String = row.get(0)?;
691 let cardinality: i64 = row.get(1)?;
692 let description: Option<String> = row.get(2)?;
693 let metadata_json: Option<String> = row.get(3)?;
694
695 let mut domain = DomainInfo::new(&name, cardinality as usize);
696 if let Some(desc) = description {
697 domain = domain.with_description(desc);
698 }
699 if let Some(meta_str) = metadata_json {
700 if let Ok(metadata) = serde_json::from_str(&meta_str) {
701 domain.metadata = metadata;
702 }
703 }
704
705 Ok((name, domain))
706 })
707 .map_err(|e| {
708 AdapterError::InvalidOperation(format!("Failed to query domains: {}", e))
709 })?
710 .collect::<SqliteResult<_>>()
711 .map_err(|e| {
712 AdapterError::InvalidOperation(format!("Failed to collect domains: {}", e))
713 })?;
714
715 Ok(domains)
716 }
717
718 fn load_predicates(
720 &self,
721 schema_id: i64,
722 ) -> Result<indexmap::IndexMap<String, PredicateInfo>, AdapterError> {
723 let mut stmt = self
724 .conn
725 .prepare(SchemaDatabaseSQL::select_predicates_sql())
726 .map_err(|e| {
727 AdapterError::InvalidOperation(format!("Failed to prepare query: {}", e))
728 })?;
729
730 let predicates = stmt
731 .query_map(params![schema_id], |row| {
732 let predicate_id: i64 = row.get(0)?;
733 let name: String = row.get(1)?;
734 let _arity: i64 = row.get(2)?;
735 let description: Option<String> = row.get(3)?;
736 let constraints_json: Option<String> = row.get(4)?;
737 let metadata_json: Option<String> = row.get(5)?;
738
739 Ok((
740 predicate_id,
741 name,
742 description,
743 constraints_json,
744 metadata_json,
745 ))
746 })
747 .map_err(|e| {
748 AdapterError::InvalidOperation(format!("Failed to query predicates: {}", e))
749 })?
750 .collect::<SqliteResult<Vec<_>>>()
751 .map_err(|e| {
752 AdapterError::InvalidOperation(format!("Failed to collect predicates: {}", e))
753 })?;
754
755 let mut result = indexmap::IndexMap::new();
756
757 for (predicate_id, name, description, constraints_json, metadata_json) in predicates {
758 let mut arg_stmt = self
760 .conn
761 .prepare(SchemaDatabaseSQL::select_predicate_args_sql())
762 .map_err(|e| {
763 AdapterError::InvalidOperation(format!("Failed to prepare query: {}", e))
764 })?;
765
766 let arg_domains: Vec<String> = arg_stmt
767 .query_map(params![predicate_id], |row| {
768 let _position: i64 = row.get(0)?;
769 let domain_name: String = row.get(1)?;
770 Ok(domain_name)
771 })
772 .map_err(|e| {
773 AdapterError::InvalidOperation(format!(
774 "Failed to query predicate args: {}",
775 e
776 ))
777 })?
778 .collect::<SqliteResult<_>>()
779 .map_err(|e| {
780 AdapterError::InvalidOperation(format!(
781 "Failed to collect predicate args: {}",
782 e
783 ))
784 })?;
785
786 let mut predicate = PredicateInfo::new(&name, arg_domains);
787 if let Some(desc) = description {
788 predicate = predicate.with_description(desc);
789 }
790 if let Some(constraints_str) = constraints_json {
791 if let Ok(constraints) = serde_json::from_str(&constraints_str) {
792 predicate.constraints = constraints;
793 }
794 }
795 if let Some(meta_str) = metadata_json {
796 if let Ok(metadata) = serde_json::from_str(&meta_str) {
797 predicate.metadata = metadata;
798 }
799 }
800
801 result.insert(name, predicate);
802 }
803
804 Ok(result)
805 }
806
807 fn load_variables(
809 &self,
810 schema_id: i64,
811 ) -> Result<indexmap::IndexMap<String, String>, AdapterError> {
812 let mut stmt = self
813 .conn
814 .prepare("SELECT name, domain_name FROM variables WHERE schema_id = ?")
815 .map_err(|e| {
816 AdapterError::InvalidOperation(format!("Failed to prepare query: {}", e))
817 })?;
818
819 let variables = stmt
820 .query_map(params![schema_id], |row| {
821 let name: String = row.get(0)?;
822 let domain_name: String = row.get(1)?;
823 Ok((name, domain_name))
824 })
825 .map_err(|e| {
826 AdapterError::InvalidOperation(format!("Failed to query variables: {}", e))
827 })?
828 .collect::<SqliteResult<_>>()
829 .map_err(|e| {
830 AdapterError::InvalidOperation(format!("Failed to collect variables: {}", e))
831 })?;
832
833 Ok(variables)
834 }
835 }
836
837 impl SchemaDatabase for SQLiteDatabase {
838 fn store_schema(
839 &mut self,
840 name: &str,
841 table: &SymbolTable,
842 ) -> Result<SchemaId, AdapterError> {
843 let (schema_id, _version) = self.store_schema_metadata(name)?;
844
845 self.store_domains(schema_id, table)?;
846 self.store_predicates(schema_id, table)?;
847 self.store_variables(schema_id, table)?;
848
849 Ok(SchemaId(schema_id as u64))
850 }
851
852 fn load_schema(&self, id: SchemaId) -> Result<SymbolTable, AdapterError> {
853 let schema_id = id.0 as i64;
854
855 let _: i64 = self
857 .conn
858 .query_row(
859 "SELECT id FROM schemas WHERE id = ?",
860 params![schema_id],
861 |row| row.get(0),
862 )
863 .map_err(|_| {
864 AdapterError::InvalidOperation(format!("Schema with ID {:?} not found", id))
865 })?;
866
867 let mut table = SymbolTable::new();
868 table.domains = self.load_domains(schema_id)?;
869 table.predicates = self.load_predicates(schema_id)?;
870 table.variables = self.load_variables(schema_id)?;
871
872 Ok(table)
873 }
874
875 fn load_schema_by_name(&self, name: &str) -> Result<SymbolTable, AdapterError> {
876 let schema_id: i64 = self
877 .conn
878 .query_row(
879 "SELECT id FROM schemas WHERE name = ? ORDER BY version DESC LIMIT 1",
880 params![name],
881 |row| row.get(0),
882 )
883 .map_err(|_| {
884 AdapterError::InvalidOperation(format!("Schema '{}' not found", name))
885 })?;
886
887 self.load_schema(SchemaId(schema_id as u64))
888 }
889
890 fn list_schemas(&self) -> Result<Vec<SchemaMetadata>, AdapterError> {
891 let mut stmt = self
892 .conn
893 .prepare(
894 r#"
895 SELECT s.id, s.name, s.version, s.created_at, s.updated_at, s.description,
896 (SELECT COUNT(*) FROM domains WHERE schema_id = s.id) as num_domains,
897 (SELECT COUNT(*) FROM predicates WHERE schema_id = s.id) as num_predicates,
898 (SELECT COUNT(*) FROM variables WHERE schema_id = s.id) as num_variables
899 FROM schemas s
900 ORDER BY s.name, s.version DESC
901 "#,
902 )
903 .map_err(|e| {
904 AdapterError::InvalidOperation(format!("Failed to prepare query: {}", e))
905 })?;
906
907 let schemas = stmt
908 .query_map([], |row| {
909 Ok(SchemaMetadata {
910 id: SchemaId(row.get::<_, i64>(0)? as u64),
911 name: row.get(1)?,
912 version: row.get(2)?,
913 created_at: row.get::<_, i64>(3)? as u64,
914 updated_at: row.get::<_, i64>(4)? as u64,
915 num_domains: row.get::<_, i64>(6)? as usize,
916 num_predicates: row.get::<_, i64>(7)? as usize,
917 num_variables: row.get::<_, i64>(8)? as usize,
918 description: row.get(5)?,
919 })
920 })
921 .map_err(|e| {
922 AdapterError::InvalidOperation(format!("Failed to query schemas: {}", e))
923 })?
924 .collect::<SqliteResult<_>>()
925 .map_err(|e| {
926 AdapterError::InvalidOperation(format!("Failed to collect schemas: {}", e))
927 })?;
928
929 Ok(schemas)
930 }
931
932 fn delete_schema(&mut self, id: SchemaId) -> Result<(), AdapterError> {
933 let schema_id = id.0 as i64;
934 let affected = self
935 .conn
936 .execute("DELETE FROM schemas WHERE id = ?", params![schema_id])
937 .map_err(|e| {
938 AdapterError::InvalidOperation(format!("Failed to delete schema: {}", e))
939 })?;
940
941 if affected == 0 {
942 return Err(AdapterError::InvalidOperation(format!(
943 "Schema with ID {:?} not found",
944 id
945 )));
946 }
947
948 Ok(())
949 }
950
951 fn search_schemas(&self, pattern: &str) -> Result<Vec<SchemaMetadata>, AdapterError> {
952 let search_pattern = format!("%{}%", pattern);
953 let mut stmt = self
954 .conn
955 .prepare(
956 r#"
957 SELECT s.id, s.name, s.version, s.created_at, s.updated_at, s.description,
958 (SELECT COUNT(*) FROM domains WHERE schema_id = s.id) as num_domains,
959 (SELECT COUNT(*) FROM predicates WHERE schema_id = s.id) as num_predicates,
960 (SELECT COUNT(*) FROM variables WHERE schema_id = s.id) as num_variables
961 FROM schemas s
962 WHERE s.name LIKE ?
963 ORDER BY s.name, s.version DESC
964 "#,
965 )
966 .map_err(|e| {
967 AdapterError::InvalidOperation(format!("Failed to prepare query: {}", e))
968 })?;
969
970 let schemas = stmt
971 .query_map(params![search_pattern], |row| {
972 Ok(SchemaMetadata {
973 id: SchemaId(row.get::<_, i64>(0)? as u64),
974 name: row.get(1)?,
975 version: row.get(2)?,
976 created_at: row.get::<_, i64>(3)? as u64,
977 updated_at: row.get::<_, i64>(4)? as u64,
978 num_domains: row.get::<_, i64>(6)? as usize,
979 num_predicates: row.get::<_, i64>(7)? as usize,
980 num_variables: row.get::<_, i64>(8)? as usize,
981 description: row.get(5)?,
982 })
983 })
984 .map_err(|e| {
985 AdapterError::InvalidOperation(format!("Failed to query schemas: {}", e))
986 })?
987 .collect::<SqliteResult<_>>()
988 .map_err(|e| {
989 AdapterError::InvalidOperation(format!("Failed to collect schemas: {}", e))
990 })?;
991
992 Ok(schemas)
993 }
994
995 fn get_schema_history(&self, name: &str) -> Result<Vec<SchemaVersion>, AdapterError> {
996 let mut stmt = self
997 .conn
998 .prepare(
999 r#"
1000 SELECT version, created_at, id
1001 FROM schemas
1002 WHERE name = ?
1003 ORDER BY version ASC
1004 "#,
1005 )
1006 .map_err(|e| {
1007 AdapterError::InvalidOperation(format!("Failed to prepare query: {}", e))
1008 })?;
1009
1010 let versions: Vec<SchemaVersion> = stmt
1011 .query_map(params![name], |row| {
1012 let version: u32 = row.get(0)?;
1013 let timestamp: i64 = row.get(1)?;
1014 let schema_id: i64 = row.get(2)?;
1015
1016 Ok(SchemaVersion {
1017 version,
1018 timestamp: timestamp as u64,
1019 description: format!("Version {}", version),
1020 schema_id: SchemaId(schema_id as u64),
1021 })
1022 })
1023 .map_err(|e| {
1024 AdapterError::InvalidOperation(format!("Failed to query versions: {}", e))
1025 })?
1026 .collect::<SqliteResult<_>>()
1027 .map_err(|e| {
1028 AdapterError::InvalidOperation(format!("Failed to collect versions: {}", e))
1029 })?;
1030
1031 if versions.is_empty() {
1032 return Err(AdapterError::InvalidOperation(format!(
1033 "Schema '{}' not found",
1034 name
1035 )));
1036 }
1037
1038 Ok(versions)
1039 }
1040 }
1041}
1042
1043#[cfg(feature = "sqlite")]
1044pub use sqlite_backend::SQLiteDatabase;
1045
1046#[cfg(feature = "postgres")]
1051mod postgres_backend {
1052 use super::*;
1053 use crate::{DomainInfo, PredicateInfo};
1054 use tokio_postgres::{Client, NoTls};
1055
1056 pub struct PostgreSQLDatabase {
1080 client: Client,
1081 }
1082
1083 impl PostgreSQLDatabase {
1084 pub async fn new(connection_string: &str) -> Result<Self, AdapterError> {
1089 let (client, connection) = tokio_postgres::connect(connection_string, NoTls)
1090 .await
1091 .map_err(|e| {
1092 AdapterError::InvalidOperation(format!(
1093 "Failed to connect to PostgreSQL: {}",
1094 e
1095 ))
1096 })?;
1097
1098 tokio::spawn(async move {
1100 if let Err(e) = connection.await {
1101 eprintln!("PostgreSQL connection error: {}", e);
1102 }
1103 });
1104
1105 let mut db = Self { client };
1106 db.initialize_schema_async().await?;
1107 Ok(db)
1108 }
1109
1110 async fn initialize_schema_async(&mut self) -> Result<(), AdapterError> {
1112 for sql in Self::create_tables_postgres_sql() {
1113 self.client.execute(&sql, &[]).await.map_err(|e| {
1114 AdapterError::InvalidOperation(format!("Failed to create tables: {}", e))
1115 })?;
1116 }
1117 Ok(())
1118 }
1119
1120 fn create_tables_postgres_sql() -> Vec<String> {
1124 vec![
1125 r#"
1127 CREATE TABLE IF NOT EXISTS schemas (
1128 id SERIAL PRIMARY KEY,
1129 name TEXT NOT NULL,
1130 version INTEGER NOT NULL DEFAULT 1,
1131 created_at BIGINT NOT NULL,
1132 updated_at BIGINT NOT NULL,
1133 description TEXT,
1134 UNIQUE(name, version)
1135 )
1136 "#
1137 .to_string(),
1138 r#"
1140 CREATE TABLE IF NOT EXISTS domains (
1141 id SERIAL PRIMARY KEY,
1142 schema_id INTEGER NOT NULL,
1143 name TEXT NOT NULL,
1144 cardinality BIGINT NOT NULL,
1145 description TEXT,
1146 metadata TEXT,
1147 FOREIGN KEY (schema_id) REFERENCES schemas(id) ON DELETE CASCADE,
1148 UNIQUE(schema_id, name)
1149 )
1150 "#
1151 .to_string(),
1152 r#"
1154 CREATE TABLE IF NOT EXISTS predicates (
1155 id SERIAL PRIMARY KEY,
1156 schema_id INTEGER NOT NULL,
1157 name TEXT NOT NULL,
1158 arity INTEGER NOT NULL,
1159 description TEXT,
1160 constraints TEXT,
1161 metadata TEXT,
1162 FOREIGN KEY (schema_id) REFERENCES schemas(id) ON DELETE CASCADE,
1163 UNIQUE(schema_id, name)
1164 )
1165 "#
1166 .to_string(),
1167 r#"
1169 CREATE TABLE IF NOT EXISTS predicate_arguments (
1170 id SERIAL PRIMARY KEY,
1171 predicate_id INTEGER NOT NULL,
1172 position INTEGER NOT NULL,
1173 domain_name TEXT NOT NULL,
1174 FOREIGN KEY (predicate_id) REFERENCES predicates(id) ON DELETE CASCADE,
1175 UNIQUE(predicate_id, position)
1176 )
1177 "#
1178 .to_string(),
1179 r#"
1181 CREATE TABLE IF NOT EXISTS variables (
1182 id SERIAL PRIMARY KEY,
1183 schema_id INTEGER NOT NULL,
1184 name TEXT NOT NULL,
1185 domain_name TEXT NOT NULL,
1186 FOREIGN KEY (schema_id) REFERENCES schemas(id) ON DELETE CASCADE,
1187 UNIQUE(schema_id, name)
1188 )
1189 "#
1190 .to_string(),
1191 "CREATE INDEX IF NOT EXISTS idx_schemas_name ON schemas(name)".to_string(),
1193 "CREATE INDEX IF NOT EXISTS idx_domains_schema ON domains(schema_id)".to_string(),
1194 "CREATE INDEX IF NOT EXISTS idx_predicates_schema ON predicates(schema_id)"
1195 .to_string(),
1196 ]
1197 }
1198
1199 fn current_timestamp() -> i64 {
1201 std::time::SystemTime::now()
1202 .duration_since(std::time::UNIX_EPOCH)
1203 .unwrap()
1204 .as_secs() as i64
1205 }
1206
1207 pub async fn store_schema_async(
1209 &mut self,
1210 name: &str,
1211 table: &SymbolTable,
1212 ) -> Result<SchemaId, AdapterError> {
1213 let now = Self::current_timestamp();
1214
1215 let existing_version: Option<i32> = self
1217 .client
1218 .query_opt("SELECT MAX(version) FROM schemas WHERE name = $1", &[&name])
1219 .await
1220 .ok()
1221 .flatten()
1222 .and_then(|row| row.get(0));
1223
1224 let version = existing_version.map(|v| v + 1).unwrap_or(1);
1225
1226 let row = self
1228 .client
1229 .query_one(
1230 "INSERT INTO schemas (name, version, created_at, updated_at) VALUES ($1, $2, $3, $4) RETURNING id",
1231 &[&name, &version, &now, &now],
1232 )
1233 .await
1234 .map_err(|e| {
1235 AdapterError::InvalidOperation(format!("Failed to insert schema: {}", e))
1236 })?;
1237
1238 let schema_id: i32 = row.get(0);
1239
1240 for (domain_name, domain) in &table.domains {
1242 let metadata_json = serde_json::to_string(&domain.metadata).ok();
1243 self.client
1244 .execute(
1245 "INSERT INTO domains (schema_id, name, cardinality, description, metadata) VALUES ($1, $2, $3, $4, $5)",
1246 &[
1247 &schema_id,
1248 &domain_name.as_str(),
1249 &(domain.cardinality as i64),
1250 &domain.description.as_ref(),
1251 &metadata_json.as_ref(),
1252 ],
1253 )
1254 .await
1255 .map_err(|e| {
1256 AdapterError::InvalidOperation(format!("Failed to insert domain: {}", e))
1257 })?;
1258 }
1259
1260 for (predicate_name, predicate) in &table.predicates {
1262 let constraints_json = serde_json::to_string(&predicate.constraints).ok();
1263 let metadata_json = serde_json::to_string(&predicate.metadata).ok();
1264
1265 let pred_row = self
1266 .client
1267 .query_one(
1268 "INSERT INTO predicates (schema_id, name, arity, description, constraints, metadata) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id",
1269 &[
1270 &schema_id,
1271 &predicate_name.as_str(),
1272 &(predicate.arg_domains.len() as i32),
1273 &predicate.description.as_ref(),
1274 &constraints_json.as_ref(),
1275 &metadata_json.as_ref(),
1276 ],
1277 )
1278 .await
1279 .map_err(|e| {
1280 AdapterError::InvalidOperation(format!("Failed to insert predicate: {}", e))
1281 })?;
1282
1283 let predicate_id: i32 = pred_row.get(0);
1284
1285 for (position, domain_name) in predicate.arg_domains.iter().enumerate() {
1287 self.client
1288 .execute(
1289 "INSERT INTO predicate_arguments (predicate_id, position, domain_name) VALUES ($1, $2, $3)",
1290 &[&predicate_id, &(position as i32), &domain_name.as_str()],
1291 )
1292 .await
1293 .map_err(|e| {
1294 AdapterError::InvalidOperation(format!(
1295 "Failed to insert predicate argument: {}",
1296 e
1297 ))
1298 })?;
1299 }
1300 }
1301
1302 for (var_name, domain_name) in &table.variables {
1304 self.client
1305 .execute(
1306 "INSERT INTO variables (schema_id, name, domain_name) VALUES ($1, $2, $3)",
1307 &[&schema_id, &var_name.as_str(), &domain_name.as_str()],
1308 )
1309 .await
1310 .map_err(|e| {
1311 AdapterError::InvalidOperation(format!("Failed to insert variable: {}", e))
1312 })?;
1313 }
1314
1315 Ok(SchemaId(schema_id as u64))
1316 }
1317
1318 pub async fn load_schema_async(&self, id: SchemaId) -> Result<SymbolTable, AdapterError> {
1320 let schema_id = id.0 as i32;
1321
1322 self.client
1324 .query_opt("SELECT id FROM schemas WHERE id = $1", &[&schema_id])
1325 .await
1326 .map_err(|e| AdapterError::InvalidOperation(format!("Database error: {}", e)))?
1327 .ok_or_else(|| {
1328 AdapterError::InvalidOperation(format!("Schema with ID {:?} not found", id))
1329 })?;
1330
1331 let mut table = SymbolTable::new();
1332
1333 let domain_rows = self
1335 .client
1336 .query(
1337 "SELECT name, cardinality, description, metadata FROM domains WHERE schema_id = $1",
1338 &[&schema_id],
1339 )
1340 .await
1341 .map_err(|e| {
1342 AdapterError::InvalidOperation(format!("Failed to query domains: {}", e))
1343 })?;
1344
1345 for row in domain_rows {
1346 let name: String = row.get(0);
1347 let cardinality: i64 = row.get(1);
1348 let description: Option<String> = row.get(2);
1349 let metadata_json: Option<String> = row.get(3);
1350
1351 let mut domain = DomainInfo::new(&name, cardinality as usize);
1352 if let Some(desc) = description {
1353 domain = domain.with_description(desc);
1354 }
1355 if let Some(meta_str) = metadata_json {
1356 if let Ok(metadata) = serde_json::from_str(&meta_str) {
1357 domain.metadata = metadata;
1358 }
1359 }
1360
1361 table.domains.insert(name, domain);
1362 }
1363
1364 let predicate_rows = self
1366 .client
1367 .query(
1368 "SELECT id, name, arity, description, constraints, metadata FROM predicates WHERE schema_id = $1",
1369 &[&schema_id],
1370 )
1371 .await
1372 .map_err(|e| {
1373 AdapterError::InvalidOperation(format!("Failed to query predicates: {}", e))
1374 })?;
1375
1376 for pred_row in predicate_rows {
1377 let predicate_id: i32 = pred_row.get(0);
1378 let name: String = pred_row.get(1);
1379 let _arity: i32 = pred_row.get(2);
1380 let description: Option<String> = pred_row.get(3);
1381 let constraints_json: Option<String> = pred_row.get(4);
1382 let metadata_json: Option<String> = pred_row.get(5);
1383
1384 let arg_rows = self
1386 .client
1387 .query(
1388 "SELECT position, domain_name FROM predicate_arguments WHERE predicate_id = $1 ORDER BY position",
1389 &[&predicate_id],
1390 )
1391 .await
1392 .map_err(|e| {
1393 AdapterError::InvalidOperation(format!(
1394 "Failed to query predicate args: {}",
1395 e
1396 ))
1397 })?;
1398
1399 let arg_domains: Vec<String> = arg_rows.iter().map(|row| row.get(1)).collect();
1400
1401 let mut predicate = PredicateInfo::new(&name, arg_domains);
1402 if let Some(desc) = description {
1403 predicate = predicate.with_description(desc);
1404 }
1405 if let Some(constraints_str) = constraints_json {
1406 if let Ok(constraints) = serde_json::from_str(&constraints_str) {
1407 predicate.constraints = constraints;
1408 }
1409 }
1410 if let Some(meta_str) = metadata_json {
1411 if let Ok(metadata) = serde_json::from_str(&meta_str) {
1412 predicate.metadata = metadata;
1413 }
1414 }
1415
1416 table.predicates.insert(name, predicate);
1417 }
1418
1419 let var_rows = self
1421 .client
1422 .query(
1423 "SELECT name, domain_name FROM variables WHERE schema_id = $1",
1424 &[&schema_id],
1425 )
1426 .await
1427 .map_err(|e| {
1428 AdapterError::InvalidOperation(format!("Failed to query variables: {}", e))
1429 })?;
1430
1431 for row in var_rows {
1432 let name: String = row.get(0);
1433 let domain_name: String = row.get(1);
1434 table.variables.insert(name, domain_name);
1435 }
1436
1437 Ok(table)
1438 }
1439
1440 pub async fn load_schema_by_name_async(
1442 &self,
1443 name: &str,
1444 ) -> Result<SymbolTable, AdapterError> {
1445 let row = self
1446 .client
1447 .query_opt(
1448 "SELECT id FROM schemas WHERE name = $1 ORDER BY version DESC LIMIT 1",
1449 &[&name],
1450 )
1451 .await
1452 .map_err(|e| AdapterError::InvalidOperation(format!("Database error: {}", e)))?
1453 .ok_or_else(|| {
1454 AdapterError::InvalidOperation(format!("Schema '{}' not found", name))
1455 })?;
1456
1457 let schema_id: i32 = row.get(0);
1458 self.load_schema_async(SchemaId(schema_id as u64)).await
1459 }
1460
1461 pub async fn list_schemas_async(&self) -> Result<Vec<SchemaMetadata>, AdapterError> {
1463 let rows = self
1464 .client
1465 .query(
1466 r#"
1467 SELECT s.id, s.name, s.version, s.created_at, s.updated_at, s.description,
1468 (SELECT COUNT(*) FROM domains WHERE schema_id = s.id) as num_domains,
1469 (SELECT COUNT(*) FROM predicates WHERE schema_id = s.id) as num_predicates,
1470 (SELECT COUNT(*) FROM variables WHERE schema_id = s.id) as num_variables
1471 FROM schemas s
1472 ORDER BY s.name, s.version DESC
1473 "#,
1474 &[],
1475 )
1476 .await
1477 .map_err(|e| {
1478 AdapterError::InvalidOperation(format!("Failed to query schemas: {}", e))
1479 })?;
1480
1481 let schemas = rows
1482 .iter()
1483 .map(|row| SchemaMetadata {
1484 id: SchemaId(row.get::<_, i32>(0) as u64),
1485 name: row.get(1),
1486 version: row.get::<_, i32>(2) as u32,
1487 created_at: row.get::<_, i64>(3) as u64,
1488 updated_at: row.get::<_, i64>(4) as u64,
1489 num_domains: row.get::<_, i64>(6) as usize,
1490 num_predicates: row.get::<_, i64>(7) as usize,
1491 num_variables: row.get::<_, i64>(8) as usize,
1492 description: row.get(5),
1493 })
1494 .collect();
1495
1496 Ok(schemas)
1497 }
1498
1499 pub async fn delete_schema_async(&mut self, id: SchemaId) -> Result<(), AdapterError> {
1501 let schema_id = id.0 as i32;
1502 let affected = self
1503 .client
1504 .execute("DELETE FROM schemas WHERE id = $1", &[&schema_id])
1505 .await
1506 .map_err(|e| {
1507 AdapterError::InvalidOperation(format!("Failed to delete schema: {}", e))
1508 })?;
1509
1510 if affected == 0 {
1511 return Err(AdapterError::InvalidOperation(format!(
1512 "Schema with ID {:?} not found",
1513 id
1514 )));
1515 }
1516
1517 Ok(())
1518 }
1519
1520 pub async fn search_schemas_async(
1522 &self,
1523 pattern: &str,
1524 ) -> Result<Vec<SchemaMetadata>, AdapterError> {
1525 let search_pattern = format!("%{}%", pattern);
1526 let rows = self
1527 .client
1528 .query(
1529 r#"
1530 SELECT s.id, s.name, s.version, s.created_at, s.updated_at, s.description,
1531 (SELECT COUNT(*) FROM domains WHERE schema_id = s.id) as num_domains,
1532 (SELECT COUNT(*) FROM predicates WHERE schema_id = s.id) as num_predicates,
1533 (SELECT COUNT(*) FROM variables WHERE schema_id = s.id) as num_variables
1534 FROM schemas s
1535 WHERE s.name LIKE $1
1536 ORDER BY s.name, s.version DESC
1537 "#,
1538 &[&search_pattern],
1539 )
1540 .await
1541 .map_err(|e| {
1542 AdapterError::InvalidOperation(format!("Failed to query schemas: {}", e))
1543 })?;
1544
1545 let schemas = rows
1546 .iter()
1547 .map(|row| SchemaMetadata {
1548 id: SchemaId(row.get::<_, i32>(0) as u64),
1549 name: row.get(1),
1550 version: row.get::<_, i32>(2) as u32,
1551 created_at: row.get::<_, i64>(3) as u64,
1552 updated_at: row.get::<_, i64>(4) as u64,
1553 num_domains: row.get::<_, i64>(6) as usize,
1554 num_predicates: row.get::<_, i64>(7) as usize,
1555 num_variables: row.get::<_, i64>(8) as usize,
1556 description: row.get(5),
1557 })
1558 .collect();
1559
1560 Ok(schemas)
1561 }
1562
1563 pub async fn get_schema_history_async(
1565 &self,
1566 name: &str,
1567 ) -> Result<Vec<SchemaVersion>, AdapterError> {
1568 let rows = self
1569 .client
1570 .query(
1571 "SELECT version, created_at, id FROM schemas WHERE name = $1 ORDER BY version ASC",
1572 &[&name],
1573 )
1574 .await
1575 .map_err(|e| {
1576 AdapterError::InvalidOperation(format!("Failed to query versions: {}", e))
1577 })?;
1578
1579 if rows.is_empty() {
1580 return Err(AdapterError::InvalidOperation(format!(
1581 "Schema '{}' not found",
1582 name
1583 )));
1584 }
1585
1586 let versions = rows
1587 .iter()
1588 .map(|row| {
1589 let version: i32 = row.get(0);
1590 let timestamp: i64 = row.get(1);
1591 let schema_id: i32 = row.get(2);
1592
1593 SchemaVersion {
1594 version: version as u32,
1595 timestamp: timestamp as u64,
1596 description: format!("Version {}", version),
1597 schema_id: SchemaId(schema_id as u64),
1598 }
1599 })
1600 .collect();
1601
1602 Ok(versions)
1603 }
1604 }
1605}
1606
1607#[cfg(feature = "postgres")]
1608pub use postgres_backend::PostgreSQLDatabase;
1609
1610#[cfg(test)]
1612#[path = "database_tests.rs"]
1613mod tests;