hanzo_database/
lib.rs

1//! # Hanzo DB - Multi-Backend Database Abstraction
2//!
3//! Production-ready database abstraction for Hanzo Node, supporting:
4//! - LanceDB for vector search and multimodal storage
5//! - DuckDB for analytics and OLAP queries
6//! - PostgreSQL for relational data
7//! - Redis for caching
8//! - SQLite for lightweight deployments
9//!
10//! Features:
11//! - Unified interface across all backends
12//! - Automatic backend selection based on workload
13//! - Connection pooling and transaction support
14//! - Migration between backends
15
16use anyhow::{Context, Result};
17use async_trait::async_trait;
18use log::{debug, error, info, warn};
19use std::path::{Path, PathBuf};
20use std::sync::Arc;
21use tokio::sync::RwLock;
22use thiserror::Error;
23
24/// Database error types
25#[derive(Debug, Error)]
26pub enum HanzoDbError {
27    #[error("Connection error: {0}")]
28    ConnectionError(String),
29
30    #[error("Query error: {0}")]
31    QueryError(String),
32
33    #[error("Schema error: {0}")]
34    SchemaError(String),
35
36    #[error("Invalid schema: {0}")]
37    InvalidSchema(String),
38
39    #[error("Invalid data: {0}")]
40    InvalidData(String),
41
42    #[error("Transaction error: {0}")]
43    TransactionError(String),
44
45    #[error("Migration error: {0}")]
46    MigrationError(String),
47
48    #[error("Not implemented: {0}")]
49    NotImplemented(String),
50}
51
52pub mod models;
53
54#[cfg(feature = "backend-lancedb")]
55pub mod vector_search;
56
57// Migration module (only with migration feature and SQLite backend)
58#[cfg(all(feature = "migration", feature = "backend-sqlite"))]
59pub mod migration;
60
61// Backend modules
62pub mod backends;
63
64// Re-exports
65pub use models::*;
66
67#[cfg(feature = "backend-lancedb")]
68pub use vector_search::*;
69
70/// Database backend type
71#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72pub enum DatabaseBackend {
73    /// LanceDB for vector operations and multimodal data
74    LanceDB,
75    /// DuckDB for analytics and OLAP
76    DuckDB,
77    /// PostgreSQL for relational data
78    PostgreSQL,
79    /// Redis for caching
80    Redis,
81    /// SQLite for lightweight deployments
82    SQLite,
83}
84
85impl DatabaseBackend {
86    /// Select optimal backend for workload type
87    pub fn for_workload(workload: WorkloadType) -> Self {
88        match workload {
89            WorkloadType::VectorSearch => Self::LanceDB,
90            WorkloadType::Analytics => Self::DuckDB,
91            WorkloadType::Transactional => Self::PostgreSQL,
92            WorkloadType::Cache => Self::Redis,
93            WorkloadType::Embedded => Self::SQLite,
94        }
95    }
96}
97
98/// Workload type for backend selection
99#[derive(Debug, Clone, Copy, PartialEq, Eq)]
100pub enum WorkloadType {
101    /// Vector similarity search
102    VectorSearch,
103    /// Analytical queries (OLAP)
104    Analytics,
105    /// Transactional operations (OLTP)
106    Transactional,
107    /// High-speed caching
108    Cache,
109    /// Embedded/lightweight operations
110    Embedded,
111}
112
113/// Unified database configuration
114#[derive(Debug, Clone)]
115pub struct HanzoDbConfig {
116    /// Selected backend
117    pub backend: DatabaseBackend,
118    /// Database path (for file-based backends)
119    pub path: Option<PathBuf>,
120    /// Connection URL (for network backends)
121    pub url: Option<String>,
122    /// Connection pool size
123    pub pool_size: usize,
124    /// Enable write-ahead logging
125    pub enable_wal: bool,
126    /// Cache size in bytes
127    pub cache_size: Option<usize>,
128    /// Enable compression
129    pub enable_compression: bool,
130}
131
132impl Default for HanzoDbConfig {
133    fn default() -> Self {
134        Self {
135            backend: DatabaseBackend::LanceDB,
136            path: Some(PathBuf::from("./storage/hanzo-db")),
137            url: None,
138            pool_size: 16,
139            enable_wal: true,
140            cache_size: Some(64 * 1024 * 1024), // 64MB
141            enable_compression: true,
142        }
143    }
144}
145
146/// Unified database trait
147#[async_trait]
148pub trait HanzoDatabase: Send + Sync {
149    /// Initialize the database
150    async fn init(&self) -> Result<()>;
151    
152    /// Create a table
153    async fn create_table(&self, name: &str, schema: TableSchema) -> Result<()>;
154    
155    /// Insert data
156    async fn insert(&self, table: &str, data: &[Record]) -> Result<()>;
157    
158    /// Query data
159    async fn query(&self, query: Query) -> Result<QueryResult>;
160    
161    /// Vector search
162    async fn vector_search(&self, query: VectorQuery) -> Result<Vec<SearchResult>>;
163    
164    /// Begin transaction
165    async fn begin_transaction(&self) -> Result<Transaction>;
166    
167    /// Optimize database
168    async fn optimize(&self) -> Result<()>;
169    
170    /// Get database statistics
171    async fn stats(&self) -> Result<DatabaseStats>;
172}
173
174/// Table schema definition
175#[derive(Debug, Clone)]
176pub struct TableSchema {
177    pub columns: Vec<Column>,
178    pub indexes: Vec<Index>,
179    pub constraints: Vec<Constraint>,
180}
181
182/// Column definition
183#[derive(Debug, Clone)]
184pub struct Column {
185    pub name: String,
186    pub data_type: DataType,
187    pub nullable: bool,
188    pub default: Option<Value>,
189}
190
191/// Data type enumeration
192#[derive(Debug, Clone)]
193pub enum DataType {
194    // Scalar types
195    Boolean,
196    Int32,
197    Int64,
198    Float32,
199    Float64,
200    String,
201    Binary,
202    Timestamp,
203    
204    // Vector types
205    Vector(usize), // dimension
206    
207    // Complex types
208    Json,
209    Array(Box<DataType>),
210    Struct(Vec<(String, DataType)>),
211}
212
213/// Index definition
214#[derive(Debug, Clone)]
215pub struct Index {
216    pub name: String,
217    pub columns: Vec<String>,
218    pub index_type: IndexType,
219}
220
221#[derive(Debug, Clone)]
222pub enum IndexType {
223    BTree,
224    Hash,
225    IVF_PQ { nlist: usize, nprobe: usize },
226    HNSW { max_elements: usize, m: usize },
227}
228
229/// Database constraint
230#[derive(Debug, Clone)]
231pub enum Constraint {
232    PrimaryKey(Vec<String>),
233    ForeignKey { columns: Vec<String>, references: String },
234    Unique(Vec<String>),
235    Check(String),
236}
237
238/// Query structure
239#[derive(Debug, Clone)]
240pub struct Query {
241    pub table: String,
242    pub select: Vec<String>,
243    pub filter: Option<Filter>,
244    pub order_by: Vec<OrderBy>,
245    pub limit: Option<usize>,
246    pub offset: Option<usize>,
247}
248
249/// Filter expression
250#[derive(Debug, Clone)]
251pub enum Filter {
252    Eq(String, Value),
253    Ne(String, Value),
254    Gt(String, Value),
255    Gte(String, Value),
256    Lt(String, Value),
257    Lte(String, Value),
258    In(String, Vec<Value>),
259    Like(String, String),
260    And(Box<Filter>, Box<Filter>),
261    Or(Box<Filter>, Box<Filter>),
262    Not(Box<Filter>),
263}
264
265/// Order by clause
266#[derive(Debug, Clone)]
267pub struct OrderBy {
268    pub column: String,
269    pub ascending: bool,
270}
271
272/// Value type
273#[derive(Debug, Clone)]
274pub enum Value {
275    Null,
276    Bool(bool),
277    Int32(i32),
278    Int64(i64),
279    Float32(f32),
280    Float64(f64),
281    String(String),
282    Binary(Vec<u8>),
283    Timestamp(i64),
284    Vector(Vec<f32>),
285    Json(serde_json::Value),
286}
287
288/// Record type
289#[derive(Debug, Clone)]
290pub struct Record {
291    pub values: Vec<(String, Value)>,
292}
293
294/// Query result
295#[derive(Debug)]
296pub struct QueryResult {
297    pub columns: Vec<String>,
298    pub rows: Vec<Record>,
299    pub row_count: usize,
300}
301
302/// Vector query
303#[derive(Debug, Clone)]
304pub struct VectorQuery {
305    pub table: String,
306    pub vector: Vec<f32>,
307    pub k: usize,
308    pub filter: Option<Filter>,
309    pub metric: DistanceMetric,
310}
311
312#[derive(Debug, Clone, Copy)]
313pub enum DistanceMetric {
314    L2,
315    Cosine,
316    InnerProduct,
317}
318
319/// Search result
320#[derive(Debug, Clone)]
321pub struct SearchResult {
322    pub record: Record,
323    pub score: f32,
324}
325
326/// Transaction handle
327pub struct Transaction {
328    inner: Arc<RwLock<TransactionInner>>,
329}
330
331struct TransactionInner {
332    backend: DatabaseBackend,
333    // Backend-specific transaction handle
334    handle: Box<dyn std::any::Any + Send + Sync>,
335}
336
337impl Transaction {
338    /// Commit the transaction
339    pub async fn commit(self) -> Result<()> {
340        let inner = self.inner.write().await;
341        // Backend-specific commit logic
342        Ok(())
343    }
344    
345    /// Rollback the transaction
346    pub async fn rollback(self) -> Result<()> {
347        let inner = self.inner.write().await;
348        // Backend-specific rollback logic
349        Ok(())
350    }
351}
352
353/// Database statistics
354#[derive(Debug, Clone)]
355pub struct DatabaseStats {
356    pub backend: DatabaseBackend,
357    pub table_count: usize,
358    pub total_rows: usize,
359    pub total_size_bytes: usize,
360    pub index_count: usize,
361    pub cache_hit_rate: f64,
362}
363
364/// Create a Hanzo database instance
365pub async fn connect(config: HanzoDbConfig) -> Result<Arc<dyn HanzoDatabase>> {
366    match config.backend {
367        #[cfg(feature = "backend-lancedb")]
368        DatabaseBackend::LanceDB => {
369            let db = backends::lancedb::LanceDbBackend::new(config).await?;
370            Ok(Arc::new(db))
371        }
372        #[cfg(not(feature = "backend-lancedb"))]
373        DatabaseBackend::LanceDB => {
374            anyhow::bail!("LanceDB backend not enabled. Compile with --features backend-lancedb")
375        }
376        #[cfg(feature = "backend-duckdb")]
377        DatabaseBackend::DuckDB => {
378            let db = backends::duckdb::DuckDbBackend::new(config).await?;
379            Ok(Arc::new(db))
380        }
381        #[cfg(not(feature = "backend-duckdb"))]
382        DatabaseBackend::DuckDB => {
383            anyhow::bail!("DuckDB backend not enabled. Compile with --features backend-duckdb")
384        }
385        #[cfg(feature = "backend-postgres")]
386        DatabaseBackend::PostgreSQL => {
387            let db = backends::postgres::PostgresBackend::new(config).await?;
388            Ok(Arc::new(db))
389        }
390        #[cfg(not(feature = "backend-postgres"))]
391        DatabaseBackend::PostgreSQL => {
392            anyhow::bail!("PostgreSQL backend not enabled. Compile with --features backend-postgres")
393        }
394        #[cfg(feature = "backend-redis")]
395        DatabaseBackend::Redis => {
396            let db = backends::redis::RedisBackend::new(config).await?;
397            Ok(Arc::new(db))
398        }
399        #[cfg(not(feature = "backend-redis"))]
400        DatabaseBackend::Redis => {
401            anyhow::bail!("Redis backend not enabled. Compile with --features backend-redis")
402        }
403        #[cfg(feature = "backend-sqlite")]
404        DatabaseBackend::SQLite => {
405            let db = backends::sqlite::SqliteBackend::new(config).await?;
406            Ok(Arc::new(db))
407        }
408        #[cfg(not(feature = "backend-sqlite"))]
409        DatabaseBackend::SQLite => {
410            anyhow::bail!("SQLite backend not enabled. Compile with --features backend-sqlite")
411        }
412    }
413}
414
415
416#[cfg(test)]
417mod tests {
418    use super::*;
419    
420    #[test]
421    fn test_backend_selection() {
422        assert_eq!(
423            DatabaseBackend::for_workload(WorkloadType::VectorSearch),
424            DatabaseBackend::LanceDB
425        );
426        assert_eq!(
427            DatabaseBackend::for_workload(WorkloadType::Analytics),
428            DatabaseBackend::DuckDB
429        );
430        assert_eq!(
431            DatabaseBackend::for_workload(WorkloadType::Transactional),
432            DatabaseBackend::PostgreSQL
433        );
434    }
435}