Skip to main content

tensorlogic_adapters/
database.rs

1//! Database integration for schema persistence.
2//!
3//! This module provides functionality to store and retrieve symbol tables
4//! from relational databases. Supported databases:
5//! - SQLite (via rusqlite) - embedded, file-based
6//! - PostgreSQL (via tokio-postgres) - server-based, multi-user
7//!
8//! The database schema includes tables for:
9//! - Domains (with cardinality and metadata)
10//! - Predicates (with arity, argument domains, and constraints)
11//! - Variables (with domain bindings)
12//! - Schema versioning and change history
13
14use std::collections::HashMap;
15
16use serde::{Deserialize, Serialize};
17
18use crate::{AdapterError, SymbolTable};
19
20/// Database storage trait for symbol tables.
21///
22/// Implementations handle the specifics of different database backends.
23pub trait SchemaDatabase {
24    /// Store a complete symbol table in the database.
25    ///
26    /// If a schema with the same name exists, it is updated.
27    fn store_schema(&mut self, name: &str, table: &SymbolTable) -> Result<SchemaId, AdapterError>;
28
29    /// Load a symbol table by schema ID.
30    fn load_schema(&self, id: SchemaId) -> Result<SymbolTable, AdapterError>;
31
32    /// Load a symbol table by name (returns most recent version).
33    fn load_schema_by_name(&self, name: &str) -> Result<SymbolTable, AdapterError>;
34
35    /// List all available schemas.
36    fn list_schemas(&self) -> Result<Vec<SchemaMetadata>, AdapterError>;
37
38    /// Delete a schema by ID.
39    fn delete_schema(&mut self, id: SchemaId) -> Result<(), AdapterError>;
40
41    /// Search schemas by name pattern.
42    fn search_schemas(&self, pattern: &str) -> Result<Vec<SchemaMetadata>, AdapterError>;
43
44    /// Get schema history (all versions).
45    fn get_schema_history(&self, name: &str) -> Result<Vec<SchemaVersion>, AdapterError>;
46}
47
48/// Unique identifier for a stored schema.
49#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
50pub struct SchemaId(pub u64);
51
52/// Metadata about a stored schema.
53#[derive(Clone, Debug, Serialize, Deserialize)]
54pub struct SchemaMetadata {
55    /// Unique identifier
56    pub id: SchemaId,
57    /// Schema name
58    pub name: String,
59    /// Version number
60    pub version: u32,
61    /// Creation timestamp (Unix epoch)
62    pub created_at: u64,
63    /// Last modification timestamp
64    pub updated_at: u64,
65    /// Number of domains
66    pub num_domains: usize,
67    /// Number of predicates
68    pub num_predicates: usize,
69    /// Number of variables
70    pub num_variables: usize,
71    /// Optional description
72    pub description: Option<String>,
73}
74
75/// Version information for a schema.
76#[derive(Clone, Debug, Serialize, Deserialize)]
77pub struct SchemaVersion {
78    /// Version number
79    pub version: u32,
80    /// Timestamp
81    pub timestamp: u64,
82    /// Change description
83    pub description: String,
84    /// Schema ID for this version
85    pub schema_id: SchemaId,
86}
87
88/// In-memory database implementation for testing and development.
89///
90/// This provides a simple in-memory store that implements the SchemaDatabase trait
91/// without requiring external database dependencies. Useful for:
92/// - Testing
93/// - Development
94/// - Small-scale applications
95/// - Temporary storage
96pub struct MemoryDatabase {
97    schemas: HashMap<SchemaId, StoredSchema>,
98    next_id: u64,
99    name_index: HashMap<String, Vec<SchemaId>>,
100}
101
102#[derive(Clone, Debug, Serialize, Deserialize)]
103struct StoredSchema {
104    id: SchemaId,
105    name: String,
106    version: u32,
107    table: SymbolTable,
108    created_at: u64,
109    updated_at: u64,
110    description: Option<String>,
111}
112
113impl MemoryDatabase {
114    /// Create a new empty memory database.
115    pub fn new() -> Self {
116        Self {
117            schemas: HashMap::new(),
118            next_id: 1,
119            name_index: HashMap::new(),
120        }
121    }
122
123    /// Get current timestamp (Unix epoch seconds).
124    fn current_timestamp() -> u64 {
125        std::time::SystemTime::now()
126            .duration_since(std::time::UNIX_EPOCH)
127            .unwrap()
128            .as_secs()
129    }
130
131    /// Find latest version for a schema name.
132    fn find_latest_version(&self, name: &str) -> Option<SchemaId> {
133        self.name_index.get(name).and_then(|ids| {
134            ids.iter()
135                .filter_map(|id| self.schemas.get(id))
136                .max_by_key(|s| s.version)
137                .map(|s| s.id)
138        })
139    }
140}
141
142impl Default for MemoryDatabase {
143    fn default() -> Self {
144        Self::new()
145    }
146}
147
148impl SchemaDatabase for MemoryDatabase {
149    fn store_schema(&mut self, name: &str, table: &SymbolTable) -> Result<SchemaId, AdapterError> {
150        let now = Self::current_timestamp();
151
152        // Check if schema with this name exists
153        let version = if let Some(existing_id) = self.find_latest_version(name) {
154            if let Some(existing) = self.schemas.get(&existing_id) {
155                existing.version + 1
156            } else {
157                1
158            }
159        } else {
160            1
161        };
162
163        let id = SchemaId(self.next_id);
164        self.next_id += 1;
165
166        let stored = StoredSchema {
167            id,
168            name: name.to_string(),
169            version,
170            table: table.clone(),
171            created_at: now,
172            updated_at: now,
173            description: None,
174        };
175
176        self.schemas.insert(id, stored);
177
178        // Update name index
179        self.name_index
180            .entry(name.to_string())
181            .or_default()
182            .push(id);
183
184        Ok(id)
185    }
186
187    fn load_schema(&self, id: SchemaId) -> Result<SymbolTable, AdapterError> {
188        self.schemas
189            .get(&id)
190            .map(|s| s.table.clone())
191            .ok_or_else(|| {
192                AdapterError::InvalidOperation(format!("Schema with ID {:?} not found", id))
193            })
194    }
195
196    fn load_schema_by_name(&self, name: &str) -> Result<SymbolTable, AdapterError> {
197        let id = self.find_latest_version(name).ok_or_else(|| {
198            AdapterError::InvalidOperation(format!("Schema '{}' not found", name))
199        })?;
200
201        self.load_schema(id)
202    }
203
204    fn list_schemas(&self) -> Result<Vec<SchemaMetadata>, AdapterError> {
205        let mut metadata: Vec<SchemaMetadata> = self
206            .schemas
207            .values()
208            .map(|s| SchemaMetadata {
209                id: s.id,
210                name: s.name.clone(),
211                version: s.version,
212                created_at: s.created_at,
213                updated_at: s.updated_at,
214                num_domains: s.table.domains.len(),
215                num_predicates: s.table.predicates.len(),
216                num_variables: s.table.variables.len(),
217                description: s.description.clone(),
218            })
219            .collect();
220
221        metadata.sort_by_key(|m| m.name.clone());
222        Ok(metadata)
223    }
224
225    fn delete_schema(&mut self, id: SchemaId) -> Result<(), AdapterError> {
226        if let Some(schema) = self.schemas.remove(&id) {
227            // Remove from name index
228            if let Some(ids) = self.name_index.get_mut(&schema.name) {
229                ids.retain(|&i| i != id);
230                if ids.is_empty() {
231                    self.name_index.remove(&schema.name);
232                }
233            }
234            Ok(())
235        } else {
236            Err(AdapterError::InvalidOperation(format!(
237                "Schema with ID {:?} not found",
238                id
239            )))
240        }
241    }
242
243    fn search_schemas(&self, pattern: &str) -> Result<Vec<SchemaMetadata>, AdapterError> {
244        let pattern_lower = pattern.to_lowercase();
245        let mut results: Vec<SchemaMetadata> = self
246            .schemas
247            .values()
248            .filter(|s| s.name.to_lowercase().contains(&pattern_lower))
249            .map(|s| SchemaMetadata {
250                id: s.id,
251                name: s.name.clone(),
252                version: s.version,
253                created_at: s.created_at,
254                updated_at: s.updated_at,
255                num_domains: s.table.domains.len(),
256                num_predicates: s.table.predicates.len(),
257                num_variables: s.table.variables.len(),
258                description: s.description.clone(),
259            })
260            .collect();
261
262        results.sort_by_key(|m| m.name.clone());
263        Ok(results)
264    }
265
266    fn get_schema_history(&self, name: &str) -> Result<Vec<SchemaVersion>, AdapterError> {
267        let ids = self.name_index.get(name).ok_or_else(|| {
268            AdapterError::InvalidOperation(format!("Schema '{}' not found", name))
269        })?;
270
271        let mut versions: Vec<SchemaVersion> = ids
272            .iter()
273            .filter_map(|id| {
274                self.schemas.get(id).map(|s| SchemaVersion {
275                    version: s.version,
276                    timestamp: s.created_at,
277                    description: format!("Version {}", s.version),
278                    schema_id: s.id,
279                })
280            })
281            .collect();
282
283        versions.sort_by_key(|v| v.version);
284        Ok(versions)
285    }
286}
287
288/// SQL query generator for schema database operations.
289///
290/// This utility generates SQL queries for creating tables and CRUD operations
291/// on schema databases. Can be used with both SQLite and PostgreSQL with
292/// minor dialect adjustments.
293pub struct SchemaDatabaseSQL;
294
295impl SchemaDatabaseSQL {
296    /// Generate CREATE TABLE statements for schema storage.
297    pub fn create_tables_sql() -> Vec<String> {
298        vec![
299            // Schemas table
300            r#"
301            CREATE TABLE IF NOT EXISTS schemas (
302                id INTEGER PRIMARY KEY AUTOINCREMENT,
303                name TEXT NOT NULL,
304                version INTEGER NOT NULL DEFAULT 1,
305                created_at INTEGER NOT NULL,
306                updated_at INTEGER NOT NULL,
307                description TEXT,
308                UNIQUE(name, version)
309            )
310            "#
311            .to_string(),
312            // Domains table
313            r#"
314            CREATE TABLE IF NOT EXISTS domains (
315                id INTEGER PRIMARY KEY AUTOINCREMENT,
316                schema_id INTEGER NOT NULL,
317                name TEXT NOT NULL,
318                cardinality INTEGER NOT NULL,
319                description TEXT,
320                metadata TEXT,
321                FOREIGN KEY (schema_id) REFERENCES schemas(id) ON DELETE CASCADE,
322                UNIQUE(schema_id, name)
323            )
324            "#
325            .to_string(),
326            // Predicates table
327            r#"
328            CREATE TABLE IF NOT EXISTS predicates (
329                id INTEGER PRIMARY KEY AUTOINCREMENT,
330                schema_id INTEGER NOT NULL,
331                name TEXT NOT NULL,
332                arity INTEGER NOT NULL,
333                description TEXT,
334                constraints TEXT,
335                metadata TEXT,
336                FOREIGN KEY (schema_id) REFERENCES schemas(id) ON DELETE CASCADE,
337                UNIQUE(schema_id, name)
338            )
339            "#
340            .to_string(),
341            // Predicate arguments table
342            r#"
343            CREATE TABLE IF NOT EXISTS predicate_arguments (
344                id INTEGER PRIMARY KEY AUTOINCREMENT,
345                predicate_id INTEGER NOT NULL,
346                position INTEGER NOT NULL,
347                domain_name TEXT NOT NULL,
348                FOREIGN KEY (predicate_id) REFERENCES predicates(id) ON DELETE CASCADE,
349                UNIQUE(predicate_id, position)
350            )
351            "#
352            .to_string(),
353            // Variables table
354            r#"
355            CREATE TABLE IF NOT EXISTS variables (
356                id INTEGER PRIMARY KEY AUTOINCREMENT,
357                schema_id INTEGER NOT NULL,
358                name TEXT NOT NULL,
359                domain_name TEXT NOT NULL,
360                FOREIGN KEY (schema_id) REFERENCES schemas(id) ON DELETE CASCADE,
361                UNIQUE(schema_id, name)
362            )
363            "#
364            .to_string(),
365            // Indexes for performance
366            "CREATE INDEX IF NOT EXISTS idx_schemas_name ON schemas(name)".to_string(),
367            "CREATE INDEX IF NOT EXISTS idx_domains_schema ON domains(schema_id)".to_string(),
368            "CREATE INDEX IF NOT EXISTS idx_predicates_schema ON predicates(schema_id)".to_string(),
369        ]
370    }
371
372    /// Generate INSERT query for storing a domain.
373    pub fn insert_domain_sql() -> &'static str {
374        r#"
375        INSERT INTO domains (schema_id, name, cardinality, description, metadata)
376        VALUES (?, ?, ?, ?, ?)
377        "#
378    }
379
380    /// Generate INSERT query for storing a predicate.
381    pub fn insert_predicate_sql() -> &'static str {
382        r#"
383        INSERT INTO predicates (schema_id, name, arity, description, constraints, metadata)
384        VALUES (?, ?, ?, ?, ?, ?)
385        "#
386    }
387
388    /// Generate INSERT query for storing a predicate argument.
389    pub fn insert_predicate_arg_sql() -> &'static str {
390        r#"
391        INSERT INTO predicate_arguments (predicate_id, position, domain_name)
392        VALUES (?, ?, ?)
393        "#
394    }
395
396    /// Generate INSERT query for storing a variable.
397    pub fn insert_variable_sql() -> &'static str {
398        r#"
399        INSERT INTO variables (schema_id, name, domain_name)
400        VALUES (?, ?, ?)
401        "#
402    }
403
404    /// Generate SELECT query for loading a schema.
405    pub fn select_schema_sql() -> &'static str {
406        "SELECT id, name, version, created_at, updated_at, description FROM schemas WHERE id = ?"
407    }
408
409    /// Generate SELECT query for loading domains.
410    pub fn select_domains_sql() -> &'static str {
411        "SELECT name, cardinality, description, metadata FROM domains WHERE schema_id = ?"
412    }
413
414    /// Generate SELECT query for loading predicates.
415    pub fn select_predicates_sql() -> &'static str {
416        "SELECT id, name, arity, description, constraints, metadata FROM predicates WHERE schema_id = ?"
417    }
418
419    /// Generate SELECT query for loading predicate arguments.
420    pub fn select_predicate_args_sql() -> &'static str {
421        "SELECT position, domain_name FROM predicate_arguments WHERE predicate_id = ? ORDER BY position"
422    }
423}
424
425/// Statistics about database storage.
426#[derive(Clone, Debug)]
427pub struct DatabaseStats {
428    /// Total number of stored schemas
429    pub total_schemas: usize,
430    /// Total number of domains across all schemas
431    pub total_domains: usize,
432    /// Total number of predicates across all schemas
433    pub total_predicates: usize,
434    /// Total database size in bytes (if applicable)
435    pub size_bytes: Option<usize>,
436}
437
438impl DatabaseStats {
439    /// Create empty statistics.
440    pub fn new() -> Self {
441        Self {
442            total_schemas: 0,
443            total_domains: 0,
444            total_predicates: 0,
445            size_bytes: None,
446        }
447    }
448
449    /// Calculate statistics from a database implementation.
450    pub fn from_database<D: SchemaDatabase>(db: &D) -> Result<Self, AdapterError> {
451        let schemas = db.list_schemas()?;
452        let total_schemas = schemas.len();
453        let total_domains: usize = schemas.iter().map(|s| s.num_domains).sum();
454        let total_predicates: usize = schemas.iter().map(|s| s.num_predicates).sum();
455
456        Ok(Self {
457            total_schemas,
458            total_domains,
459            total_predicates,
460            size_bytes: None,
461        })
462    }
463
464    /// Calculate average domains per schema.
465    pub fn avg_domains_per_schema(&self) -> f64 {
466        if self.total_schemas == 0 {
467            0.0
468        } else {
469            self.total_domains as f64 / self.total_schemas as f64
470        }
471    }
472
473    /// Calculate average predicates per schema.
474    pub fn avg_predicates_per_schema(&self) -> f64 {
475        if self.total_schemas == 0 {
476            0.0
477        } else {
478            self.total_predicates as f64 / self.total_schemas as f64
479        }
480    }
481}
482
483impl Default for DatabaseStats {
484    fn default() -> Self {
485        Self::new()
486    }
487}
488
489// ============================================================================
490// SQLite Backend Implementation
491// ============================================================================
492
493#[cfg(feature = "sqlite")]
494mod sqlite_backend {
495    use super::*;
496    use crate::{DomainInfo, PredicateInfo};
497    use rusqlite::{params, Connection, Result as SqliteResult};
498
499    /// SQLite database backend for schema storage.
500    ///
501    /// This implementation provides persistent storage using SQLite.
502    /// The database schema is automatically created on first use.
503    ///
504    /// # Example
505    ///
506    /// ```no_run
507    /// # #[cfg(feature = "sqlite")]
508    /// # {
509    /// use tensorlogic_adapters::{SQLiteDatabase, SchemaDatabase, SymbolTable, DomainInfo};
510    ///
511    /// let mut db = SQLiteDatabase::new(":memory:").unwrap();
512    /// let mut table = SymbolTable::new();
513    /// table.add_domain(DomainInfo::new("Person", 100)).unwrap();
514    ///
515    /// let id = db.store_schema("test", &table).unwrap();
516    /// let loaded = db.load_schema(id).unwrap();
517    /// # }
518    /// ```
519    pub struct SQLiteDatabase {
520        conn: Connection,
521    }
522
523    impl SQLiteDatabase {
524        /// Create a new SQLite database at the given path.
525        ///
526        /// Use `:memory:` for an in-memory database (testing).
527        pub fn new(path: &str) -> Result<Self, AdapterError> {
528            let conn = Connection::open(path).map_err(|e| {
529                AdapterError::InvalidOperation(format!("Failed to open SQLite database: {}", e))
530            })?;
531
532            let mut db = Self { conn };
533            db.initialize_schema()?;
534            Ok(db)
535        }
536
537        /// Initialize the database schema (create tables if they don't exist).
538        fn initialize_schema(&mut self) -> Result<(), AdapterError> {
539            for sql in SchemaDatabaseSQL::create_tables_sql() {
540                self.conn.execute(&sql, []).map_err(|e| {
541                    AdapterError::InvalidOperation(format!("Failed to create tables: {}", e))
542                })?;
543            }
544            Ok(())
545        }
546
547        /// Get current timestamp (Unix epoch seconds).
548        fn current_timestamp() -> u64 {
549            std::time::SystemTime::now()
550                .duration_since(std::time::UNIX_EPOCH)
551                .unwrap()
552                .as_secs()
553        }
554
555        /// Store schema metadata and return schema_id.
556        fn store_schema_metadata(&mut self, name: &str) -> Result<(i64, u32), AdapterError> {
557            let now = Self::current_timestamp() as i64;
558
559            // Check if schema exists
560            let existing_version: Option<u32> = self
561                .conn
562                .query_row(
563                    "SELECT MAX(version) FROM schemas WHERE name = ?1",
564                    params![name],
565                    |row| row.get(0),
566                )
567                .ok()
568                .flatten();
569
570            let version = existing_version.map(|v| v + 1).unwrap_or(1);
571
572            self.conn
573                .execute(
574                    "INSERT INTO schemas (name, version, created_at, updated_at) VALUES (?1, ?2, ?3, ?4)",
575                    params![name, version, now, now],
576                )
577                .map_err(|e| {
578                    AdapterError::InvalidOperation(format!("Failed to insert schema: {}", e))
579                })?;
580
581            let schema_id = self.conn.last_insert_rowid();
582            Ok((schema_id, version))
583        }
584
585        /// Store domains for a schema.
586        fn store_domains(
587            &mut self,
588            schema_id: i64,
589            table: &SymbolTable,
590        ) -> Result<(), AdapterError> {
591            for (name, domain) in &table.domains {
592                let metadata_json = serde_json::to_string(&domain.metadata).ok();
593                self.conn
594                    .execute(
595                        SchemaDatabaseSQL::insert_domain_sql(),
596                        params![
597                            schema_id,
598                            name,
599                            domain.cardinality as i64,
600                            domain.description.as_ref(),
601                            metadata_json
602                        ],
603                    )
604                    .map_err(|e| {
605                        AdapterError::InvalidOperation(format!("Failed to insert domain: {}", e))
606                    })?;
607            }
608            Ok(())
609        }
610
611        /// Store predicates for a schema.
612        fn store_predicates(
613            &mut self,
614            schema_id: i64,
615            table: &SymbolTable,
616        ) -> Result<(), AdapterError> {
617            for (name, predicate) in &table.predicates {
618                let constraints_json = serde_json::to_string(&predicate.constraints).ok();
619                let metadata_json = serde_json::to_string(&predicate.metadata).ok();
620
621                self.conn
622                    .execute(
623                        SchemaDatabaseSQL::insert_predicate_sql(),
624                        params![
625                            schema_id,
626                            name,
627                            predicate.arg_domains.len() as i64,
628                            predicate.description.as_ref(),
629                            constraints_json,
630                            metadata_json
631                        ],
632                    )
633                    .map_err(|e| {
634                        AdapterError::InvalidOperation(format!("Failed to insert predicate: {}", e))
635                    })?;
636
637                let predicate_id = self.conn.last_insert_rowid();
638
639                // Store argument domains
640                for (position, domain_name) in predicate.arg_domains.iter().enumerate() {
641                    self.conn
642                        .execute(
643                            SchemaDatabaseSQL::insert_predicate_arg_sql(),
644                            params![predicate_id, position as i64, domain_name],
645                        )
646                        .map_err(|e| {
647                            AdapterError::InvalidOperation(format!(
648                                "Failed to insert predicate argument: {}",
649                                e
650                            ))
651                        })?;
652                }
653            }
654            Ok(())
655        }
656
657        /// Store variables for a schema.
658        fn store_variables(
659            &mut self,
660            schema_id: i64,
661            table: &SymbolTable,
662        ) -> Result<(), AdapterError> {
663            for (var_name, domain_name) in &table.variables {
664                self.conn
665                    .execute(
666                        SchemaDatabaseSQL::insert_variable_sql(),
667                        params![schema_id, var_name, domain_name],
668                    )
669                    .map_err(|e| {
670                        AdapterError::InvalidOperation(format!("Failed to insert variable: {}", e))
671                    })?;
672            }
673            Ok(())
674        }
675
676        /// Load domains for a schema.
677        fn load_domains(
678            &self,
679            schema_id: i64,
680        ) -> Result<indexmap::IndexMap<String, DomainInfo>, AdapterError> {
681            let mut stmt = self
682                .conn
683                .prepare(SchemaDatabaseSQL::select_domains_sql())
684                .map_err(|e| {
685                    AdapterError::InvalidOperation(format!("Failed to prepare query: {}", e))
686                })?;
687
688            let domains = stmt
689                .query_map(params![schema_id], |row| {
690                    let name: String = row.get(0)?;
691                    let cardinality: i64 = row.get(1)?;
692                    let description: Option<String> = row.get(2)?;
693                    let metadata_json: Option<String> = row.get(3)?;
694
695                    let mut domain = DomainInfo::new(&name, cardinality as usize);
696                    if let Some(desc) = description {
697                        domain = domain.with_description(desc);
698                    }
699                    if let Some(meta_str) = metadata_json {
700                        if let Ok(metadata) = serde_json::from_str(&meta_str) {
701                            domain.metadata = metadata;
702                        }
703                    }
704
705                    Ok((name, domain))
706                })
707                .map_err(|e| {
708                    AdapterError::InvalidOperation(format!("Failed to query domains: {}", e))
709                })?
710                .collect::<SqliteResult<_>>()
711                .map_err(|e| {
712                    AdapterError::InvalidOperation(format!("Failed to collect domains: {}", e))
713                })?;
714
715            Ok(domains)
716        }
717
718        /// Load predicates for a schema.
719        fn load_predicates(
720            &self,
721            schema_id: i64,
722        ) -> Result<indexmap::IndexMap<String, PredicateInfo>, AdapterError> {
723            let mut stmt = self
724                .conn
725                .prepare(SchemaDatabaseSQL::select_predicates_sql())
726                .map_err(|e| {
727                    AdapterError::InvalidOperation(format!("Failed to prepare query: {}", e))
728                })?;
729
730            let predicates = stmt
731                .query_map(params![schema_id], |row| {
732                    let predicate_id: i64 = row.get(0)?;
733                    let name: String = row.get(1)?;
734                    let _arity: i64 = row.get(2)?;
735                    let description: Option<String> = row.get(3)?;
736                    let constraints_json: Option<String> = row.get(4)?;
737                    let metadata_json: Option<String> = row.get(5)?;
738
739                    Ok((
740                        predicate_id,
741                        name,
742                        description,
743                        constraints_json,
744                        metadata_json,
745                    ))
746                })
747                .map_err(|e| {
748                    AdapterError::InvalidOperation(format!("Failed to query predicates: {}", e))
749                })?
750                .collect::<SqliteResult<Vec<_>>>()
751                .map_err(|e| {
752                    AdapterError::InvalidOperation(format!("Failed to collect predicates: {}", e))
753                })?;
754
755            let mut result = indexmap::IndexMap::new();
756
757            for (predicate_id, name, description, constraints_json, metadata_json) in predicates {
758                // Load argument domains
759                let mut arg_stmt = self
760                    .conn
761                    .prepare(SchemaDatabaseSQL::select_predicate_args_sql())
762                    .map_err(|e| {
763                        AdapterError::InvalidOperation(format!("Failed to prepare query: {}", e))
764                    })?;
765
766                let arg_domains: Vec<String> = arg_stmt
767                    .query_map(params![predicate_id], |row| {
768                        let _position: i64 = row.get(0)?;
769                        let domain_name: String = row.get(1)?;
770                        Ok(domain_name)
771                    })
772                    .map_err(|e| {
773                        AdapterError::InvalidOperation(format!(
774                            "Failed to query predicate args: {}",
775                            e
776                        ))
777                    })?
778                    .collect::<SqliteResult<_>>()
779                    .map_err(|e| {
780                        AdapterError::InvalidOperation(format!(
781                            "Failed to collect predicate args: {}",
782                            e
783                        ))
784                    })?;
785
786                let mut predicate = PredicateInfo::new(&name, arg_domains);
787                if let Some(desc) = description {
788                    predicate = predicate.with_description(desc);
789                }
790                if let Some(constraints_str) = constraints_json {
791                    if let Ok(constraints) = serde_json::from_str(&constraints_str) {
792                        predicate.constraints = constraints;
793                    }
794                }
795                if let Some(meta_str) = metadata_json {
796                    if let Ok(metadata) = serde_json::from_str(&meta_str) {
797                        predicate.metadata = metadata;
798                    }
799                }
800
801                result.insert(name, predicate);
802            }
803
804            Ok(result)
805        }
806
807        /// Load variables for a schema.
808        fn load_variables(
809            &self,
810            schema_id: i64,
811        ) -> Result<indexmap::IndexMap<String, String>, AdapterError> {
812            let mut stmt = self
813                .conn
814                .prepare("SELECT name, domain_name FROM variables WHERE schema_id = ?")
815                .map_err(|e| {
816                    AdapterError::InvalidOperation(format!("Failed to prepare query: {}", e))
817                })?;
818
819            let variables = stmt
820                .query_map(params![schema_id], |row| {
821                    let name: String = row.get(0)?;
822                    let domain_name: String = row.get(1)?;
823                    Ok((name, domain_name))
824                })
825                .map_err(|e| {
826                    AdapterError::InvalidOperation(format!("Failed to query variables: {}", e))
827                })?
828                .collect::<SqliteResult<_>>()
829                .map_err(|e| {
830                    AdapterError::InvalidOperation(format!("Failed to collect variables: {}", e))
831                })?;
832
833            Ok(variables)
834        }
835    }
836
837    impl SchemaDatabase for SQLiteDatabase {
838        fn store_schema(
839            &mut self,
840            name: &str,
841            table: &SymbolTable,
842        ) -> Result<SchemaId, AdapterError> {
843            let (schema_id, _version) = self.store_schema_metadata(name)?;
844
845            self.store_domains(schema_id, table)?;
846            self.store_predicates(schema_id, table)?;
847            self.store_variables(schema_id, table)?;
848
849            Ok(SchemaId(schema_id as u64))
850        }
851
852        fn load_schema(&self, id: SchemaId) -> Result<SymbolTable, AdapterError> {
853            let schema_id = id.0 as i64;
854
855            // Verify schema exists
856            let _: i64 = self
857                .conn
858                .query_row(
859                    "SELECT id FROM schemas WHERE id = ?",
860                    params![schema_id],
861                    |row| row.get(0),
862                )
863                .map_err(|_| {
864                    AdapterError::InvalidOperation(format!("Schema with ID {:?} not found", id))
865                })?;
866
867            let mut table = SymbolTable::new();
868            table.domains = self.load_domains(schema_id)?;
869            table.predicates = self.load_predicates(schema_id)?;
870            table.variables = self.load_variables(schema_id)?;
871
872            Ok(table)
873        }
874
875        fn load_schema_by_name(&self, name: &str) -> Result<SymbolTable, AdapterError> {
876            let schema_id: i64 = self
877                .conn
878                .query_row(
879                    "SELECT id FROM schemas WHERE name = ? ORDER BY version DESC LIMIT 1",
880                    params![name],
881                    |row| row.get(0),
882                )
883                .map_err(|_| {
884                    AdapterError::InvalidOperation(format!("Schema '{}' not found", name))
885                })?;
886
887            self.load_schema(SchemaId(schema_id as u64))
888        }
889
890        fn list_schemas(&self) -> Result<Vec<SchemaMetadata>, AdapterError> {
891            let mut stmt = self
892                .conn
893                .prepare(
894                    r#"
895                    SELECT s.id, s.name, s.version, s.created_at, s.updated_at, s.description,
896                           (SELECT COUNT(*) FROM domains WHERE schema_id = s.id) as num_domains,
897                           (SELECT COUNT(*) FROM predicates WHERE schema_id = s.id) as num_predicates,
898                           (SELECT COUNT(*) FROM variables WHERE schema_id = s.id) as num_variables
899                    FROM schemas s
900                    ORDER BY s.name, s.version DESC
901                    "#,
902                )
903                .map_err(|e| {
904                    AdapterError::InvalidOperation(format!("Failed to prepare query: {}", e))
905                })?;
906
907            let schemas = stmt
908                .query_map([], |row| {
909                    Ok(SchemaMetadata {
910                        id: SchemaId(row.get::<_, i64>(0)? as u64),
911                        name: row.get(1)?,
912                        version: row.get(2)?,
913                        created_at: row.get::<_, i64>(3)? as u64,
914                        updated_at: row.get::<_, i64>(4)? as u64,
915                        num_domains: row.get::<_, i64>(6)? as usize,
916                        num_predicates: row.get::<_, i64>(7)? as usize,
917                        num_variables: row.get::<_, i64>(8)? as usize,
918                        description: row.get(5)?,
919                    })
920                })
921                .map_err(|e| {
922                    AdapterError::InvalidOperation(format!("Failed to query schemas: {}", e))
923                })?
924                .collect::<SqliteResult<_>>()
925                .map_err(|e| {
926                    AdapterError::InvalidOperation(format!("Failed to collect schemas: {}", e))
927                })?;
928
929            Ok(schemas)
930        }
931
932        fn delete_schema(&mut self, id: SchemaId) -> Result<(), AdapterError> {
933            let schema_id = id.0 as i64;
934            let affected = self
935                .conn
936                .execute("DELETE FROM schemas WHERE id = ?", params![schema_id])
937                .map_err(|e| {
938                    AdapterError::InvalidOperation(format!("Failed to delete schema: {}", e))
939                })?;
940
941            if affected == 0 {
942                return Err(AdapterError::InvalidOperation(format!(
943                    "Schema with ID {:?} not found",
944                    id
945                )));
946            }
947
948            Ok(())
949        }
950
951        fn search_schemas(&self, pattern: &str) -> Result<Vec<SchemaMetadata>, AdapterError> {
952            let search_pattern = format!("%{}%", pattern);
953            let mut stmt = self
954                .conn
955                .prepare(
956                    r#"
957                    SELECT s.id, s.name, s.version, s.created_at, s.updated_at, s.description,
958                           (SELECT COUNT(*) FROM domains WHERE schema_id = s.id) as num_domains,
959                           (SELECT COUNT(*) FROM predicates WHERE schema_id = s.id) as num_predicates,
960                           (SELECT COUNT(*) FROM variables WHERE schema_id = s.id) as num_variables
961                    FROM schemas s
962                    WHERE s.name LIKE ?
963                    ORDER BY s.name, s.version DESC
964                    "#,
965                )
966                .map_err(|e| {
967                    AdapterError::InvalidOperation(format!("Failed to prepare query: {}", e))
968                })?;
969
970            let schemas = stmt
971                .query_map(params![search_pattern], |row| {
972                    Ok(SchemaMetadata {
973                        id: SchemaId(row.get::<_, i64>(0)? as u64),
974                        name: row.get(1)?,
975                        version: row.get(2)?,
976                        created_at: row.get::<_, i64>(3)? as u64,
977                        updated_at: row.get::<_, i64>(4)? as u64,
978                        num_domains: row.get::<_, i64>(6)? as usize,
979                        num_predicates: row.get::<_, i64>(7)? as usize,
980                        num_variables: row.get::<_, i64>(8)? as usize,
981                        description: row.get(5)?,
982                    })
983                })
984                .map_err(|e| {
985                    AdapterError::InvalidOperation(format!("Failed to query schemas: {}", e))
986                })?
987                .collect::<SqliteResult<_>>()
988                .map_err(|e| {
989                    AdapterError::InvalidOperation(format!("Failed to collect schemas: {}", e))
990                })?;
991
992            Ok(schemas)
993        }
994
995        fn get_schema_history(&self, name: &str) -> Result<Vec<SchemaVersion>, AdapterError> {
996            let mut stmt = self
997                .conn
998                .prepare(
999                    r#"
1000                    SELECT version, created_at, id
1001                    FROM schemas
1002                    WHERE name = ?
1003                    ORDER BY version ASC
1004                    "#,
1005                )
1006                .map_err(|e| {
1007                    AdapterError::InvalidOperation(format!("Failed to prepare query: {}", e))
1008                })?;
1009
1010            let versions: Vec<SchemaVersion> = stmt
1011                .query_map(params![name], |row| {
1012                    let version: u32 = row.get(0)?;
1013                    let timestamp: i64 = row.get(1)?;
1014                    let schema_id: i64 = row.get(2)?;
1015
1016                    Ok(SchemaVersion {
1017                        version,
1018                        timestamp: timestamp as u64,
1019                        description: format!("Version {}", version),
1020                        schema_id: SchemaId(schema_id as u64),
1021                    })
1022                })
1023                .map_err(|e| {
1024                    AdapterError::InvalidOperation(format!("Failed to query versions: {}", e))
1025                })?
1026                .collect::<SqliteResult<_>>()
1027                .map_err(|e| {
1028                    AdapterError::InvalidOperation(format!("Failed to collect versions: {}", e))
1029                })?;
1030
1031            if versions.is_empty() {
1032                return Err(AdapterError::InvalidOperation(format!(
1033                    "Schema '{}' not found",
1034                    name
1035                )));
1036            }
1037
1038            Ok(versions)
1039        }
1040    }
1041}
1042
1043#[cfg(feature = "sqlite")]
1044pub use sqlite_backend::SQLiteDatabase;
1045
1046// ============================================================================
1047// PostgreSQL Backend Implementation
1048// ============================================================================
1049
1050#[cfg(feature = "postgres")]
1051mod postgres_backend {
1052    use super::*;
1053    use crate::{DomainInfo, PredicateInfo};
1054    use tokio_postgres::{Client, NoTls};
1055
1056    /// PostgreSQL database backend for schema storage.
1057    ///
1058    /// This implementation provides persistent storage using PostgreSQL
1059    /// with async support. The database schema is automatically created on first use.
1060    ///
1061    /// # Example
1062    ///
1063    /// ```no_run
1064    /// # #[cfg(feature = "postgres")]
1065    /// # {
1066    /// use tensorlogic_adapters::{PostgreSQLDatabase, SymbolTable, DomainInfo};
1067    ///
1068    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1069    /// let mut db = PostgreSQLDatabase::new("host=localhost user=postgres").await?;
1070    /// let mut table = SymbolTable::new();
1071    /// table.add_domain(DomainInfo::new("Person", 100))?;
1072    ///
1073    /// let id = db.store_schema_async("test", &table).await?;
1074    /// let loaded = db.load_schema_async(id).await?;
1075    /// # Ok(())
1076    /// # }
1077    /// # }
1078    /// ```
1079    pub struct PostgreSQLDatabase {
1080        client: Client,
1081    }
1082
1083    impl PostgreSQLDatabase {
1084        /// Create a new PostgreSQL database connection.
1085        ///
1086        /// The connection string should be in the format:
1087        /// `host=localhost user=postgres password=password dbname=tensorlogic`
1088        pub async fn new(connection_string: &str) -> Result<Self, AdapterError> {
1089            let (client, connection) = tokio_postgres::connect(connection_string, NoTls)
1090                .await
1091                .map_err(|e| {
1092                    AdapterError::InvalidOperation(format!(
1093                        "Failed to connect to PostgreSQL: {}",
1094                        e
1095                    ))
1096                })?;
1097
1098            // Spawn connection in background
1099            tokio::spawn(async move {
1100                if let Err(e) = connection.await {
1101                    eprintln!("PostgreSQL connection error: {}", e);
1102                }
1103            });
1104
1105            let mut db = Self { client };
1106            db.initialize_schema_async().await?;
1107            Ok(db)
1108        }
1109
1110        /// Initialize the database schema (create tables if they don't exist).
1111        async fn initialize_schema_async(&mut self) -> Result<(), AdapterError> {
1112            for sql in Self::create_tables_postgres_sql() {
1113                self.client.execute(&sql, &[]).await.map_err(|e| {
1114                    AdapterError::InvalidOperation(format!("Failed to create tables: {}", e))
1115                })?;
1116            }
1117            Ok(())
1118        }
1119
1120        /// Generate CREATE TABLE statements for PostgreSQL.
1121        ///
1122        /// PostgreSQL uses SERIAL instead of AUTOINCREMENT.
1123        fn create_tables_postgres_sql() -> Vec<String> {
1124            vec![
1125                // Schemas table
1126                r#"
1127                CREATE TABLE IF NOT EXISTS schemas (
1128                    id SERIAL PRIMARY KEY,
1129                    name TEXT NOT NULL,
1130                    version INTEGER NOT NULL DEFAULT 1,
1131                    created_at BIGINT NOT NULL,
1132                    updated_at BIGINT NOT NULL,
1133                    description TEXT,
1134                    UNIQUE(name, version)
1135                )
1136                "#
1137                .to_string(),
1138                // Domains table
1139                r#"
1140                CREATE TABLE IF NOT EXISTS domains (
1141                    id SERIAL PRIMARY KEY,
1142                    schema_id INTEGER NOT NULL,
1143                    name TEXT NOT NULL,
1144                    cardinality BIGINT NOT NULL,
1145                    description TEXT,
1146                    metadata TEXT,
1147                    FOREIGN KEY (schema_id) REFERENCES schemas(id) ON DELETE CASCADE,
1148                    UNIQUE(schema_id, name)
1149                )
1150                "#
1151                .to_string(),
1152                // Predicates table
1153                r#"
1154                CREATE TABLE IF NOT EXISTS predicates (
1155                    id SERIAL PRIMARY KEY,
1156                    schema_id INTEGER NOT NULL,
1157                    name TEXT NOT NULL,
1158                    arity INTEGER NOT NULL,
1159                    description TEXT,
1160                    constraints TEXT,
1161                    metadata TEXT,
1162                    FOREIGN KEY (schema_id) REFERENCES schemas(id) ON DELETE CASCADE,
1163                    UNIQUE(schema_id, name)
1164                )
1165                "#
1166                .to_string(),
1167                // Predicate arguments table
1168                r#"
1169                CREATE TABLE IF NOT EXISTS predicate_arguments (
1170                    id SERIAL PRIMARY KEY,
1171                    predicate_id INTEGER NOT NULL,
1172                    position INTEGER NOT NULL,
1173                    domain_name TEXT NOT NULL,
1174                    FOREIGN KEY (predicate_id) REFERENCES predicates(id) ON DELETE CASCADE,
1175                    UNIQUE(predicate_id, position)
1176                )
1177                "#
1178                .to_string(),
1179                // Variables table
1180                r#"
1181                CREATE TABLE IF NOT EXISTS variables (
1182                    id SERIAL PRIMARY KEY,
1183                    schema_id INTEGER NOT NULL,
1184                    name TEXT NOT NULL,
1185                    domain_name TEXT NOT NULL,
1186                    FOREIGN KEY (schema_id) REFERENCES schemas(id) ON DELETE CASCADE,
1187                    UNIQUE(schema_id, name)
1188                )
1189                "#
1190                .to_string(),
1191                // Indexes
1192                "CREATE INDEX IF NOT EXISTS idx_schemas_name ON schemas(name)".to_string(),
1193                "CREATE INDEX IF NOT EXISTS idx_domains_schema ON domains(schema_id)".to_string(),
1194                "CREATE INDEX IF NOT EXISTS idx_predicates_schema ON predicates(schema_id)"
1195                    .to_string(),
1196            ]
1197        }
1198
1199        /// Get current timestamp (Unix epoch seconds).
1200        fn current_timestamp() -> i64 {
1201            std::time::SystemTime::now()
1202                .duration_since(std::time::UNIX_EPOCH)
1203                .unwrap()
1204                .as_secs() as i64
1205        }
1206
1207        /// Store a schema asynchronously.
1208        pub async fn store_schema_async(
1209            &mut self,
1210            name: &str,
1211            table: &SymbolTable,
1212        ) -> Result<SchemaId, AdapterError> {
1213            let now = Self::current_timestamp();
1214
1215            // Check if schema exists
1216            let existing_version: Option<i32> = self
1217                .client
1218                .query_opt("SELECT MAX(version) FROM schemas WHERE name = $1", &[&name])
1219                .await
1220                .ok()
1221                .flatten()
1222                .and_then(|row| row.get(0));
1223
1224            let version = existing_version.map(|v| v + 1).unwrap_or(1);
1225
1226            // Insert schema
1227            let row = self
1228                .client
1229                .query_one(
1230                    "INSERT INTO schemas (name, version, created_at, updated_at) VALUES ($1, $2, $3, $4) RETURNING id",
1231                    &[&name, &version, &now, &now],
1232                )
1233                .await
1234                .map_err(|e| {
1235                    AdapterError::InvalidOperation(format!("Failed to insert schema: {}", e))
1236                })?;
1237
1238            let schema_id: i32 = row.get(0);
1239
1240            // Store domains
1241            for (domain_name, domain) in &table.domains {
1242                let metadata_json = serde_json::to_string(&domain.metadata).ok();
1243                self.client
1244                    .execute(
1245                        "INSERT INTO domains (schema_id, name, cardinality, description, metadata) VALUES ($1, $2, $3, $4, $5)",
1246                        &[
1247                            &schema_id,
1248                            &domain_name.as_str(),
1249                            &(domain.cardinality as i64),
1250                            &domain.description.as_ref(),
1251                            &metadata_json.as_ref(),
1252                        ],
1253                    )
1254                    .await
1255                    .map_err(|e| {
1256                        AdapterError::InvalidOperation(format!("Failed to insert domain: {}", e))
1257                    })?;
1258            }
1259
1260            // Store predicates
1261            for (predicate_name, predicate) in &table.predicates {
1262                let constraints_json = serde_json::to_string(&predicate.constraints).ok();
1263                let metadata_json = serde_json::to_string(&predicate.metadata).ok();
1264
1265                let pred_row = self
1266                    .client
1267                    .query_one(
1268                        "INSERT INTO predicates (schema_id, name, arity, description, constraints, metadata) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id",
1269                        &[
1270                            &schema_id,
1271                            &predicate_name.as_str(),
1272                            &(predicate.arg_domains.len() as i32),
1273                            &predicate.description.as_ref(),
1274                            &constraints_json.as_ref(),
1275                            &metadata_json.as_ref(),
1276                        ],
1277                    )
1278                    .await
1279                    .map_err(|e| {
1280                        AdapterError::InvalidOperation(format!("Failed to insert predicate: {}", e))
1281                    })?;
1282
1283                let predicate_id: i32 = pred_row.get(0);
1284
1285                // Store argument domains
1286                for (position, domain_name) in predicate.arg_domains.iter().enumerate() {
1287                    self.client
1288                        .execute(
1289                            "INSERT INTO predicate_arguments (predicate_id, position, domain_name) VALUES ($1, $2, $3)",
1290                            &[&predicate_id, &(position as i32), &domain_name.as_str()],
1291                        )
1292                        .await
1293                        .map_err(|e| {
1294                            AdapterError::InvalidOperation(format!(
1295                                "Failed to insert predicate argument: {}",
1296                                e
1297                            ))
1298                        })?;
1299                }
1300            }
1301
1302            // Store variables
1303            for (var_name, domain_name) in &table.variables {
1304                self.client
1305                    .execute(
1306                        "INSERT INTO variables (schema_id, name, domain_name) VALUES ($1, $2, $3)",
1307                        &[&schema_id, &var_name.as_str(), &domain_name.as_str()],
1308                    )
1309                    .await
1310                    .map_err(|e| {
1311                        AdapterError::InvalidOperation(format!("Failed to insert variable: {}", e))
1312                    })?;
1313            }
1314
1315            Ok(SchemaId(schema_id as u64))
1316        }
1317
1318        /// Load a schema by ID asynchronously.
1319        pub async fn load_schema_async(&self, id: SchemaId) -> Result<SymbolTable, AdapterError> {
1320            let schema_id = id.0 as i32;
1321
1322            // Verify schema exists
1323            self.client
1324                .query_opt("SELECT id FROM schemas WHERE id = $1", &[&schema_id])
1325                .await
1326                .map_err(|e| AdapterError::InvalidOperation(format!("Database error: {}", e)))?
1327                .ok_or_else(|| {
1328                    AdapterError::InvalidOperation(format!("Schema with ID {:?} not found", id))
1329                })?;
1330
1331            let mut table = SymbolTable::new();
1332
1333            // Load domains
1334            let domain_rows = self
1335                .client
1336                .query(
1337                    "SELECT name, cardinality, description, metadata FROM domains WHERE schema_id = $1",
1338                    &[&schema_id],
1339                )
1340                .await
1341                .map_err(|e| {
1342                    AdapterError::InvalidOperation(format!("Failed to query domains: {}", e))
1343                })?;
1344
1345            for row in domain_rows {
1346                let name: String = row.get(0);
1347                let cardinality: i64 = row.get(1);
1348                let description: Option<String> = row.get(2);
1349                let metadata_json: Option<String> = row.get(3);
1350
1351                let mut domain = DomainInfo::new(&name, cardinality as usize);
1352                if let Some(desc) = description {
1353                    domain = domain.with_description(desc);
1354                }
1355                if let Some(meta_str) = metadata_json {
1356                    if let Ok(metadata) = serde_json::from_str(&meta_str) {
1357                        domain.metadata = metadata;
1358                    }
1359                }
1360
1361                table.domains.insert(name, domain);
1362            }
1363
1364            // Load predicates
1365            let predicate_rows = self
1366                .client
1367                .query(
1368                    "SELECT id, name, arity, description, constraints, metadata FROM predicates WHERE schema_id = $1",
1369                    &[&schema_id],
1370                )
1371                .await
1372                .map_err(|e| {
1373                    AdapterError::InvalidOperation(format!("Failed to query predicates: {}", e))
1374                })?;
1375
1376            for pred_row in predicate_rows {
1377                let predicate_id: i32 = pred_row.get(0);
1378                let name: String = pred_row.get(1);
1379                let _arity: i32 = pred_row.get(2);
1380                let description: Option<String> = pred_row.get(3);
1381                let constraints_json: Option<String> = pred_row.get(4);
1382                let metadata_json: Option<String> = pred_row.get(5);
1383
1384                // Load argument domains
1385                let arg_rows = self
1386                    .client
1387                    .query(
1388                        "SELECT position, domain_name FROM predicate_arguments WHERE predicate_id = $1 ORDER BY position",
1389                        &[&predicate_id],
1390                    )
1391                    .await
1392                    .map_err(|e| {
1393                        AdapterError::InvalidOperation(format!(
1394                            "Failed to query predicate args: {}",
1395                            e
1396                        ))
1397                    })?;
1398
1399                let arg_domains: Vec<String> = arg_rows.iter().map(|row| row.get(1)).collect();
1400
1401                let mut predicate = PredicateInfo::new(&name, arg_domains);
1402                if let Some(desc) = description {
1403                    predicate = predicate.with_description(desc);
1404                }
1405                if let Some(constraints_str) = constraints_json {
1406                    if let Ok(constraints) = serde_json::from_str(&constraints_str) {
1407                        predicate.constraints = constraints;
1408                    }
1409                }
1410                if let Some(meta_str) = metadata_json {
1411                    if let Ok(metadata) = serde_json::from_str(&meta_str) {
1412                        predicate.metadata = metadata;
1413                    }
1414                }
1415
1416                table.predicates.insert(name, predicate);
1417            }
1418
1419            // Load variables
1420            let var_rows = self
1421                .client
1422                .query(
1423                    "SELECT name, domain_name FROM variables WHERE schema_id = $1",
1424                    &[&schema_id],
1425                )
1426                .await
1427                .map_err(|e| {
1428                    AdapterError::InvalidOperation(format!("Failed to query variables: {}", e))
1429                })?;
1430
1431            for row in var_rows {
1432                let name: String = row.get(0);
1433                let domain_name: String = row.get(1);
1434                table.variables.insert(name, domain_name);
1435            }
1436
1437            Ok(table)
1438        }
1439
1440        /// Load a schema by name (latest version) asynchronously.
1441        pub async fn load_schema_by_name_async(
1442            &self,
1443            name: &str,
1444        ) -> Result<SymbolTable, AdapterError> {
1445            let row = self
1446                .client
1447                .query_opt(
1448                    "SELECT id FROM schemas WHERE name = $1 ORDER BY version DESC LIMIT 1",
1449                    &[&name],
1450                )
1451                .await
1452                .map_err(|e| AdapterError::InvalidOperation(format!("Database error: {}", e)))?
1453                .ok_or_else(|| {
1454                    AdapterError::InvalidOperation(format!("Schema '{}' not found", name))
1455                })?;
1456
1457            let schema_id: i32 = row.get(0);
1458            self.load_schema_async(SchemaId(schema_id as u64)).await
1459        }
1460
1461        /// List all schemas asynchronously.
1462        pub async fn list_schemas_async(&self) -> Result<Vec<SchemaMetadata>, AdapterError> {
1463            let rows = self
1464                .client
1465                .query(
1466                    r#"
1467                    SELECT s.id, s.name, s.version, s.created_at, s.updated_at, s.description,
1468                           (SELECT COUNT(*) FROM domains WHERE schema_id = s.id) as num_domains,
1469                           (SELECT COUNT(*) FROM predicates WHERE schema_id = s.id) as num_predicates,
1470                           (SELECT COUNT(*) FROM variables WHERE schema_id = s.id) as num_variables
1471                    FROM schemas s
1472                    ORDER BY s.name, s.version DESC
1473                    "#,
1474                    &[],
1475                )
1476                .await
1477                .map_err(|e| {
1478                    AdapterError::InvalidOperation(format!("Failed to query schemas: {}", e))
1479                })?;
1480
1481            let schemas = rows
1482                .iter()
1483                .map(|row| SchemaMetadata {
1484                    id: SchemaId(row.get::<_, i32>(0) as u64),
1485                    name: row.get(1),
1486                    version: row.get::<_, i32>(2) as u32,
1487                    created_at: row.get::<_, i64>(3) as u64,
1488                    updated_at: row.get::<_, i64>(4) as u64,
1489                    num_domains: row.get::<_, i64>(6) as usize,
1490                    num_predicates: row.get::<_, i64>(7) as usize,
1491                    num_variables: row.get::<_, i64>(8) as usize,
1492                    description: row.get(5),
1493                })
1494                .collect();
1495
1496            Ok(schemas)
1497        }
1498
1499        /// Delete a schema by ID asynchronously.
1500        pub async fn delete_schema_async(&mut self, id: SchemaId) -> Result<(), AdapterError> {
1501            let schema_id = id.0 as i32;
1502            let affected = self
1503                .client
1504                .execute("DELETE FROM schemas WHERE id = $1", &[&schema_id])
1505                .await
1506                .map_err(|e| {
1507                    AdapterError::InvalidOperation(format!("Failed to delete schema: {}", e))
1508                })?;
1509
1510            if affected == 0 {
1511                return Err(AdapterError::InvalidOperation(format!(
1512                    "Schema with ID {:?} not found",
1513                    id
1514                )));
1515            }
1516
1517            Ok(())
1518        }
1519
1520        /// Search schemas by pattern asynchronously.
1521        pub async fn search_schemas_async(
1522            &self,
1523            pattern: &str,
1524        ) -> Result<Vec<SchemaMetadata>, AdapterError> {
1525            let search_pattern = format!("%{}%", pattern);
1526            let rows = self
1527                .client
1528                .query(
1529                    r#"
1530                    SELECT s.id, s.name, s.version, s.created_at, s.updated_at, s.description,
1531                           (SELECT COUNT(*) FROM domains WHERE schema_id = s.id) as num_domains,
1532                           (SELECT COUNT(*) FROM predicates WHERE schema_id = s.id) as num_predicates,
1533                           (SELECT COUNT(*) FROM variables WHERE schema_id = s.id) as num_variables
1534                    FROM schemas s
1535                    WHERE s.name LIKE $1
1536                    ORDER BY s.name, s.version DESC
1537                    "#,
1538                    &[&search_pattern],
1539                )
1540                .await
1541                .map_err(|e| {
1542                    AdapterError::InvalidOperation(format!("Failed to query schemas: {}", e))
1543                })?;
1544
1545            let schemas = rows
1546                .iter()
1547                .map(|row| SchemaMetadata {
1548                    id: SchemaId(row.get::<_, i32>(0) as u64),
1549                    name: row.get(1),
1550                    version: row.get::<_, i32>(2) as u32,
1551                    created_at: row.get::<_, i64>(3) as u64,
1552                    updated_at: row.get::<_, i64>(4) as u64,
1553                    num_domains: row.get::<_, i64>(6) as usize,
1554                    num_predicates: row.get::<_, i64>(7) as usize,
1555                    num_variables: row.get::<_, i64>(8) as usize,
1556                    description: row.get(5),
1557                })
1558                .collect();
1559
1560            Ok(schemas)
1561        }
1562
1563        /// Get schema history asynchronously.
1564        pub async fn get_schema_history_async(
1565            &self,
1566            name: &str,
1567        ) -> Result<Vec<SchemaVersion>, AdapterError> {
1568            let rows = self
1569                .client
1570                .query(
1571                    "SELECT version, created_at, id FROM schemas WHERE name = $1 ORDER BY version ASC",
1572                    &[&name],
1573                )
1574                .await
1575                .map_err(|e| {
1576                    AdapterError::InvalidOperation(format!("Failed to query versions: {}", e))
1577                })?;
1578
1579            if rows.is_empty() {
1580                return Err(AdapterError::InvalidOperation(format!(
1581                    "Schema '{}' not found",
1582                    name
1583                )));
1584            }
1585
1586            let versions = rows
1587                .iter()
1588                .map(|row| {
1589                    let version: i32 = row.get(0);
1590                    let timestamp: i64 = row.get(1);
1591                    let schema_id: i32 = row.get(2);
1592
1593                    SchemaVersion {
1594                        version: version as u32,
1595                        timestamp: timestamp as u64,
1596                        description: format!("Version {}", version),
1597                        schema_id: SchemaId(schema_id as u64),
1598                    }
1599                })
1600                .collect();
1601
1602            Ok(versions)
1603        }
1604    }
1605}
1606
1607#[cfg(feature = "postgres")]
1608pub use postgres_backend::PostgreSQLDatabase;
1609
1610// Tests are in a separate module to keep this file under 2000 lines
1611#[cfg(test)]
1612#[path = "database_tests.rs"]
1613mod tests;