1use anyhow::{Context, Result};
17use async_trait::async_trait;
18use log::{debug, error, info, warn};
19use std::path::{Path, PathBuf};
20use std::sync::Arc;
21use tokio::sync::RwLock;
22use thiserror::Error;
23
24#[derive(Debug, Error)]
26pub enum HanzoDbError {
27 #[error("Connection error: {0}")]
28 ConnectionError(String),
29
30 #[error("Query error: {0}")]
31 QueryError(String),
32
33 #[error("Schema error: {0}")]
34 SchemaError(String),
35
36 #[error("Invalid schema: {0}")]
37 InvalidSchema(String),
38
39 #[error("Invalid data: {0}")]
40 InvalidData(String),
41
42 #[error("Transaction error: {0}")]
43 TransactionError(String),
44
45 #[error("Migration error: {0}")]
46 MigrationError(String),
47
48 #[error("Not implemented: {0}")]
49 NotImplemented(String),
50}
51
52pub mod models;
53
54#[cfg(feature = "backend-lancedb")]
55pub mod vector_search;
56
57#[cfg(all(feature = "migration", feature = "backend-sqlite"))]
59pub mod migration;
60
61pub mod backends;
63
64pub use models::*;
66
67#[cfg(feature = "backend-lancedb")]
68pub use vector_search::*;
69
70#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72pub enum DatabaseBackend {
73 LanceDB,
75 DuckDB,
77 PostgreSQL,
79 Redis,
81 SQLite,
83}
84
85impl DatabaseBackend {
86 pub fn for_workload(workload: WorkloadType) -> Self {
88 match workload {
89 WorkloadType::VectorSearch => Self::LanceDB,
90 WorkloadType::Analytics => Self::DuckDB,
91 WorkloadType::Transactional => Self::PostgreSQL,
92 WorkloadType::Cache => Self::Redis,
93 WorkloadType::Embedded => Self::SQLite,
94 }
95 }
96}
97
98#[derive(Debug, Clone, Copy, PartialEq, Eq)]
100pub enum WorkloadType {
101 VectorSearch,
103 Analytics,
105 Transactional,
107 Cache,
109 Embedded,
111}
112
113#[derive(Debug, Clone)]
115pub struct HanzoDbConfig {
116 pub backend: DatabaseBackend,
118 pub path: Option<PathBuf>,
120 pub url: Option<String>,
122 pub pool_size: usize,
124 pub enable_wal: bool,
126 pub cache_size: Option<usize>,
128 pub enable_compression: bool,
130}
131
132impl Default for HanzoDbConfig {
133 fn default() -> Self {
134 Self {
135 backend: DatabaseBackend::LanceDB,
136 path: Some(PathBuf::from("./storage/hanzo-db")),
137 url: None,
138 pool_size: 16,
139 enable_wal: true,
140 cache_size: Some(64 * 1024 * 1024), enable_compression: true,
142 }
143 }
144}
145
146#[async_trait]
148pub trait HanzoDatabase: Send + Sync {
149 async fn init(&self) -> Result<()>;
151
152 async fn create_table(&self, name: &str, schema: TableSchema) -> Result<()>;
154
155 async fn insert(&self, table: &str, data: &[Record]) -> Result<()>;
157
158 async fn query(&self, query: Query) -> Result<QueryResult>;
160
161 async fn vector_search(&self, query: VectorQuery) -> Result<Vec<SearchResult>>;
163
164 async fn begin_transaction(&self) -> Result<Transaction>;
166
167 async fn optimize(&self) -> Result<()>;
169
170 async fn stats(&self) -> Result<DatabaseStats>;
172}
173
174#[derive(Debug, Clone)]
176pub struct TableSchema {
177 pub columns: Vec<Column>,
178 pub indexes: Vec<Index>,
179 pub constraints: Vec<Constraint>,
180}
181
182#[derive(Debug, Clone)]
184pub struct Column {
185 pub name: String,
186 pub data_type: DataType,
187 pub nullable: bool,
188 pub default: Option<Value>,
189}
190
191#[derive(Debug, Clone)]
193pub enum DataType {
194 Boolean,
196 Int32,
197 Int64,
198 Float32,
199 Float64,
200 String,
201 Binary,
202 Timestamp,
203
204 Vector(usize), Json,
209 Array(Box<DataType>),
210 Struct(Vec<(String, DataType)>),
211}
212
213#[derive(Debug, Clone)]
215pub struct Index {
216 pub name: String,
217 pub columns: Vec<String>,
218 pub index_type: IndexType,
219}
220
221#[derive(Debug, Clone)]
222pub enum IndexType {
223 BTree,
224 Hash,
225 IVF_PQ { nlist: usize, nprobe: usize },
226 HNSW { max_elements: usize, m: usize },
227}
228
229#[derive(Debug, Clone)]
231pub enum Constraint {
232 PrimaryKey(Vec<String>),
233 ForeignKey { columns: Vec<String>, references: String },
234 Unique(Vec<String>),
235 Check(String),
236}
237
238#[derive(Debug, Clone)]
240pub struct Query {
241 pub table: String,
242 pub select: Vec<String>,
243 pub filter: Option<Filter>,
244 pub order_by: Vec<OrderBy>,
245 pub limit: Option<usize>,
246 pub offset: Option<usize>,
247}
248
249#[derive(Debug, Clone)]
251pub enum Filter {
252 Eq(String, Value),
253 Ne(String, Value),
254 Gt(String, Value),
255 Gte(String, Value),
256 Lt(String, Value),
257 Lte(String, Value),
258 In(String, Vec<Value>),
259 Like(String, String),
260 And(Box<Filter>, Box<Filter>),
261 Or(Box<Filter>, Box<Filter>),
262 Not(Box<Filter>),
263}
264
265#[derive(Debug, Clone)]
267pub struct OrderBy {
268 pub column: String,
269 pub ascending: bool,
270}
271
272#[derive(Debug, Clone)]
274pub enum Value {
275 Null,
276 Bool(bool),
277 Int32(i32),
278 Int64(i64),
279 Float32(f32),
280 Float64(f64),
281 String(String),
282 Binary(Vec<u8>),
283 Timestamp(i64),
284 Vector(Vec<f32>),
285 Json(serde_json::Value),
286}
287
288#[derive(Debug, Clone)]
290pub struct Record {
291 pub values: Vec<(String, Value)>,
292}
293
294#[derive(Debug)]
296pub struct QueryResult {
297 pub columns: Vec<String>,
298 pub rows: Vec<Record>,
299 pub row_count: usize,
300}
301
302#[derive(Debug, Clone)]
304pub struct VectorQuery {
305 pub table: String,
306 pub vector: Vec<f32>,
307 pub k: usize,
308 pub filter: Option<Filter>,
309 pub metric: DistanceMetric,
310}
311
312#[derive(Debug, Clone, Copy)]
313pub enum DistanceMetric {
314 L2,
315 Cosine,
316 InnerProduct,
317}
318
319#[derive(Debug, Clone)]
321pub struct SearchResult {
322 pub record: Record,
323 pub score: f32,
324}
325
326pub struct Transaction {
328 inner: Arc<RwLock<TransactionInner>>,
329}
330
331struct TransactionInner {
332 backend: DatabaseBackend,
333 handle: Box<dyn std::any::Any + Send + Sync>,
335}
336
337impl Transaction {
338 pub async fn commit(self) -> Result<()> {
340 let inner = self.inner.write().await;
341 Ok(())
343 }
344
345 pub async fn rollback(self) -> Result<()> {
347 let inner = self.inner.write().await;
348 Ok(())
350 }
351}
352
353#[derive(Debug, Clone)]
355pub struct DatabaseStats {
356 pub backend: DatabaseBackend,
357 pub table_count: usize,
358 pub total_rows: usize,
359 pub total_size_bytes: usize,
360 pub index_count: usize,
361 pub cache_hit_rate: f64,
362}
363
364pub async fn connect(config: HanzoDbConfig) -> Result<Arc<dyn HanzoDatabase>> {
366 match config.backend {
367 #[cfg(feature = "backend-lancedb")]
368 DatabaseBackend::LanceDB => {
369 let db = backends::lancedb::LanceDbBackend::new(config).await?;
370 Ok(Arc::new(db))
371 }
372 #[cfg(not(feature = "backend-lancedb"))]
373 DatabaseBackend::LanceDB => {
374 anyhow::bail!("LanceDB backend not enabled. Compile with --features backend-lancedb")
375 }
376 #[cfg(feature = "backend-duckdb")]
377 DatabaseBackend::DuckDB => {
378 let db = backends::duckdb::DuckDbBackend::new(config).await?;
379 Ok(Arc::new(db))
380 }
381 #[cfg(not(feature = "backend-duckdb"))]
382 DatabaseBackend::DuckDB => {
383 anyhow::bail!("DuckDB backend not enabled. Compile with --features backend-duckdb")
384 }
385 #[cfg(feature = "backend-postgres")]
386 DatabaseBackend::PostgreSQL => {
387 let db = backends::postgres::PostgresBackend::new(config).await?;
388 Ok(Arc::new(db))
389 }
390 #[cfg(not(feature = "backend-postgres"))]
391 DatabaseBackend::PostgreSQL => {
392 anyhow::bail!("PostgreSQL backend not enabled. Compile with --features backend-postgres")
393 }
394 #[cfg(feature = "backend-redis")]
395 DatabaseBackend::Redis => {
396 let db = backends::redis::RedisBackend::new(config).await?;
397 Ok(Arc::new(db))
398 }
399 #[cfg(not(feature = "backend-redis"))]
400 DatabaseBackend::Redis => {
401 anyhow::bail!("Redis backend not enabled. Compile with --features backend-redis")
402 }
403 #[cfg(feature = "backend-sqlite")]
404 DatabaseBackend::SQLite => {
405 let db = backends::sqlite::SqliteBackend::new(config).await?;
406 Ok(Arc::new(db))
407 }
408 #[cfg(not(feature = "backend-sqlite"))]
409 DatabaseBackend::SQLite => {
410 anyhow::bail!("SQLite backend not enabled. Compile with --features backend-sqlite")
411 }
412 }
413}
414
415
416#[cfg(test)]
417mod tests {
418 use super::*;
419
420 #[test]
421 fn test_backend_selection() {
422 assert_eq!(
423 DatabaseBackend::for_workload(WorkloadType::VectorSearch),
424 DatabaseBackend::LanceDB
425 );
426 assert_eq!(
427 DatabaseBackend::for_workload(WorkloadType::Analytics),
428 DatabaseBackend::DuckDB
429 );
430 assert_eq!(
431 DatabaseBackend::for_workload(WorkloadType::Transactional),
432 DatabaseBackend::PostgreSQL
433 );
434 }
435}