use crate::{error::Result, schema::SCHEMA};
use agentdb::{
AgentDB, BackendFamily, Capabilities, DefaultCapabilities, QueryResult, Row, ScanResult,
Transaction, Value,
};
use async_trait::async_trait;
use rusqlite::{params, Connection, OptionalExtension};
use std::{path::Path, sync::Arc};
use tokio::sync::Mutex;
pub struct SqliteBackend {
conn: Arc<Mutex<Connection>>,
capabilities: DefaultCapabilities,
}
impl SqliteBackend {
pub async fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
let conn = Connection::open(path.as_ref())?;
conn.execute("PRAGMA foreign_keys = ON", [])?;
conn.execute("PRAGMA journal_mode = WAL", [])?;
let backend = Self {
conn: Arc::new(Mutex::new(conn)),
capabilities: DefaultCapabilities {
transactions: true,
directories: true,
graph_queries: false,
sql_queries: true,
indexes: true,
ttl: false,
max_key_size: Some(1024 * 1024), max_value_size: Some(1024 * 1024 * 1024), },
};
backend.migrate().await?;
Ok(backend)
}
async fn migrate(&self) -> Result<()> {
let conn = self.conn.lock().await;
conn.execute_batch(SCHEMA)?;
Ok(())
}
}
#[async_trait]
impl AgentDB for SqliteBackend {
fn family(&self) -> BackendFamily {
BackendFamily::Sql
}
fn capabilities(&self) -> &dyn Capabilities {
&self.capabilities
}
async fn put(&self, key: &str, value: Value) -> agentdb::Result<()> {
let conn = self.conn.lock().await;
conn.execute(
"INSERT OR REPLACE INTO kv_store (key, value, updated_at) VALUES (?1, ?2, CURRENT_TIMESTAMP)",
params![key, value.as_bytes()],
)
.map_err(|e| agentdb::AgentDbError::Backend(e.to_string()))?;
Ok(())
}
async fn get(&self, key: &str) -> agentdb::Result<Option<Value>> {
let conn = self.conn.lock().await;
let mut stmt = conn
.prepare("SELECT value FROM kv_store WHERE key = ?1")
.map_err(|e| agentdb::AgentDbError::Backend(e.to_string()))?;
let result = stmt
.query_row(params![key], |row| {
let bytes: Vec<u8> = row.get(0)?;
Ok(Value::new(bytes))
})
.optional()
.map_err(|e| agentdb::AgentDbError::Backend(e.to_string()))?;
Ok(result)
}
async fn delete(&self, key: &str) -> agentdb::Result<()> {
let conn = self.conn.lock().await;
let affected = conn
.execute("DELETE FROM kv_store WHERE key = ?1", params![key])
.map_err(|e| agentdb::AgentDbError::Backend(e.to_string()))?;
if affected == 0 {
return Err(agentdb::AgentDbError::NotFound(key.to_string()));
}
Ok(())
}
async fn exists(&self, key: &str) -> agentdb::Result<bool> {
let conn = self.conn.lock().await;
let mut stmt = conn
.prepare("SELECT 1 FROM kv_store WHERE key = ?1")
.map_err(|e| agentdb::AgentDbError::Backend(e.to_string()))?;
let exists = stmt
.exists(params![key])
.map_err(|e| agentdb::AgentDbError::Backend(e.to_string()))?;
Ok(exists)
}
async fn query(&self, query: &str, _params: Vec<Value>) -> agentdb::Result<QueryResult> {
let conn = self.conn.lock().await;
let query_upper = query.trim().to_uppercase();
if query_upper.starts_with("SELECT") {
let mut stmt = conn
.prepare(query)
.map_err(|e| agentdb::AgentDbError::Backend(e.to_string()))?;
let column_names: Vec<String> = stmt
.column_names()
.iter()
.map(|s| s.to_string())
.collect();
let rows = stmt
.query_map([], |sql_row| {
let mut row = Row::new();
for (i, col_name) in column_names.iter().enumerate() {
let value: Vec<u8> = sql_row.get(i).or_else(|_: rusqlite::Error| {
let text: String = sql_row.get(i)?;
Ok::<Vec<u8>, rusqlite::Error>(text.into_bytes())
})?;
row = row.with_column(col_name.clone(), Value::new(value));
}
Ok(row)
})
.map_err(|e| agentdb::AgentDbError::Backend(e.to_string()))?
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(|e: rusqlite::Error| agentdb::AgentDbError::Backend(e.to_string()))?;
Ok(QueryResult::new(rows, 0))
} else {
let affected = conn
.execute(query, [])
.map_err(|e| agentdb::AgentDbError::Backend(e.to_string()))?;
Ok(QueryResult::new(Vec::new(), affected))
}
}
async fn scan(&self, prefix: &str) -> agentdb::Result<ScanResult> {
let conn = self.conn.lock().await;
let mut stmt = conn
.prepare("SELECT key FROM kv_store WHERE key LIKE ?1 || '%' ORDER BY key")
.map_err(|e| agentdb::AgentDbError::Backend(e.to_string()))?;
let keys = stmt
.query_map(params![prefix], |row| row.get(0))
.map_err(|e| agentdb::AgentDbError::Backend(e.to_string()))?
.collect::<std::result::Result<Vec<String>, _>>()
.map_err(|e: rusqlite::Error| agentdb::AgentDbError::Backend(e.to_string()))?;
Ok(ScanResult::new(keys))
}
async fn begin(&self) -> agentdb::Result<Box<dyn Transaction>> {
Err(agentdb::AgentDbError::Unsupported(
"Transactions not yet implemented for SQLite backend".to_string(),
))
}
async fn close(&self) -> agentdb::Result<()> {
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_sqlite_basic_operations() {
let db = SqliteBackend::new(":memory:").await.unwrap();
db.put("test_key", b"test_value".into()).await.unwrap();
let value = db.get("test_key").await.unwrap().unwrap();
assert_eq!(value.as_bytes(), b"test_value");
assert!(db.exists("test_key").await.unwrap());
assert!(!db.exists("nonexistent").await.unwrap());
db.put("prefix_1", b"v1".into()).await.unwrap();
db.put("prefix_2", b"v2".into()).await.unwrap();
let result = db.scan("prefix").await.unwrap();
assert_eq!(result.keys.len(), 2);
db.delete("test_key").await.unwrap();
assert!(!db.exists("test_key").await.unwrap());
}
}