use crate::TursoStorage;
use async_trait::async_trait;
use do_memory_core::embeddings::{EmbeddingStorageBackend, SimilaritySearchResult};
use do_memory_core::{Episode, Pattern, Result, episode::PatternId};
use tracing::{debug, info};
use uuid::Uuid;
pub mod batch;
pub mod capacity;
mod embedding_tables;
pub mod episodes;
pub mod heuristics;
pub mod monitoring;
pub mod patterns;
pub mod recommendations;
pub mod search;
pub mod tag_operations;
pub use batch::episode_batch::BatchConfig;
pub use episodes::EpisodeQuery;
#[allow(unused)]
pub use patterns::PatternMetadata;
pub use patterns::PatternQuery;
pub use tag_operations::TagStats;
#[async_trait]
impl EmbeddingStorageBackend for TursoStorage {
async fn store_episode_embedding(&self, episode_id: Uuid, embedding: Vec<f32>) -> Result<()> {
debug!("Storing episode embedding: {}", episode_id);
self._store_embedding_internal(&episode_id.to_string(), "episode", &embedding)
.await
}
async fn store_pattern_embedding(
&self,
pattern_id: PatternId,
embedding: Vec<f32>,
) -> Result<()> {
debug!("Storing pattern embedding: {}", pattern_id);
self._store_embedding_internal(&pattern_id.to_string(), "pattern", &embedding)
.await
}
async fn get_episode_embedding(&self, episode_id: Uuid) -> Result<Option<Vec<f32>>> {
debug!("Retrieving episode embedding: {}", episode_id);
self._get_embedding_internal(&episode_id.to_string(), "episode")
.await
}
async fn get_pattern_embedding(&self, pattern_id: PatternId) -> Result<Option<Vec<f32>>> {
debug!("Retrieving pattern embedding: {}", pattern_id);
self._get_embedding_internal(&pattern_id.to_string(), "pattern")
.await
}
async fn find_similar_episodes(
&self,
query_embedding: Vec<f32>,
limit: usize,
threshold: f32,
) -> Result<Vec<SimilaritySearchResult<Episode>>> {
debug!(
"Finding similar episodes (limit: {}, threshold: {})",
limit, threshold
);
let (conn, _conn_id) = self.get_connection_with_id().await?;
if let Ok(results) = self
.find_similar_episodes_native(&conn, &query_embedding, limit, threshold)
.await
{
info!(
"Found {} similar episodes using native vector search",
results.len()
);
return Ok(results);
}
debug!("Falling back to brute-force search (migration not applied)");
self.find_similar_episodes_brute_force(&query_embedding, limit, threshold)
.await
}
async fn find_similar_patterns(
&self,
query_embedding: Vec<f32>,
limit: usize,
threshold: f32,
) -> Result<Vec<SimilaritySearchResult<Pattern>>> {
debug!(
"Finding similar patterns (limit: {}, threshold: {})",
limit, threshold
);
let (conn, _conn_id) = self.get_connection_with_id().await?;
if let Ok(results) = self
.find_similar_patterns_native(&conn, &query_embedding, limit, threshold)
.await
{
info!(
"Found {} similar patterns using native vector search",
results.len()
);
return Ok(results);
}
debug!("Falling back to brute-force search (migration not applied)");
self.find_similar_patterns_brute_force(&query_embedding, limit, threshold)
.await
}
}
impl TursoStorage {
pub async fn _store_embedding_internal(
&self,
item_id: &str,
item_type: &str,
embedding: &[f32],
) -> Result<()> {
debug!(
"Storing embedding: item_id={}, item_type={}, dimension={}",
item_id,
item_type,
embedding.len()
);
let (conn, _conn_id) = self.get_connection_with_id().await?;
#[cfg(feature = "compression")]
let compression_threshold = self.config.compression_threshold;
#[cfg(not(feature = "compression"))]
let _compression_threshold = 0;
#[cfg(feature = "compression")]
let should_compress = self.config.compress_embeddings;
#[cfg(not(feature = "compression"))]
let _should_compress = false;
#[cfg(feature = "compression")]
let embedding_data: String = if should_compress {
let bytes: Vec<u8> = embedding.iter().flat_map(|&f| f.to_le_bytes()).collect();
use crate::compression::CompressedPayload;
let compression_start = std::time::Instant::now();
let compressed = match CompressedPayload::compress(&bytes, compression_threshold) {
Ok(payload) => payload,
Err(e) => {
if let Ok(mut stats) = self.compression_stats.lock() {
stats.record_failed();
}
return Err(e);
}
};
let compression_time_us = compression_start.elapsed().as_micros() as u64;
if compressed.algorithm == crate::CompressionAlgorithm::None {
if let Ok(mut stats) = self.compression_stats.lock() {
stats.record_skipped();
}
serde_json::to_string(embedding).map_err(do_memory_core::Error::Serialization)?
} else {
if let Ok(mut stats) = self.compression_stats.lock() {
stats.record_compression(
bytes.len(),
compressed.data.len(),
compression_time_us,
);
}
use base64::Engine;
format!(
"__compressed__:{}:{}\n{}",
compressed.algorithm,
compressed.original_size,
base64::engine::general_purpose::STANDARD.encode(&compressed.data)
)
}
} else {
serde_json::to_string(embedding).map_err(do_memory_core::Error::Serialization)?
};
#[cfg(not(feature = "compression"))]
let embedding_data: String =
serde_json::to_string(embedding).map_err(do_memory_core::Error::Serialization)?;
const SQL: &str = r#"
INSERT OR REPLACE INTO embeddings (embedding_id, item_id, item_type, embedding_data, dimension, model) VALUES (?, ?, ?, ?, ?, ?)
"#;
let embedding_id = self.generate_embedding_id(item_id, item_type);
let stmt = self
.prepared_cache
.get_or_prepare(&conn, SQL)
.await
.map_err(|e| {
do_memory_core::Error::Storage(format!("Failed to prepare statement: {}", e))
})?;
stmt.execute(libsql::params![
embedding_id,
item_id.to_string(),
item_type.to_string(),
embedding_data,
embedding.len() as i64,
"default"
])
.await
.map_err(|e| do_memory_core::Error::Storage(format!("Failed to store embedding: {}", e)))?;
info!("Successfully stored embedding: {}", item_id);
Ok(())
}
pub async fn _get_embedding_internal(
&self,
item_id: &str,
item_type: &str,
) -> Result<Option<Vec<f32>>> {
debug!(
"Retrieving embedding: item_id={}, item_type={}",
item_id, item_type
);
let (conn, _conn_id) = self.get_connection_with_id().await?;
const SQL: &str =
"SELECT embedding_data FROM embeddings WHERE item_id = ? AND item_type = ?";
let stmt = self
.prepared_cache
.get_or_prepare(&conn, SQL)
.await
.map_err(|e| {
do_memory_core::Error::Storage(format!("Failed to prepare statement: {}", e))
})?;
let mut rows = stmt
.query(libsql::params![item_id.to_string(), item_type.to_string()])
.await
.map_err(|e| {
do_memory_core::Error::Storage(format!("Failed to query embedding: {}", e))
})?;
if let Some(row) = rows.next().await.map_err(|e| {
do_memory_core::Error::Storage(format!("Failed to fetch embedding row: {}", e))
})? {
let embedding_data: String = row
.get(0)
.map_err(|e| do_memory_core::Error::Storage(e.to_string()))?;
#[cfg(feature = "compression")]
let embedding: Vec<f32> = if let Some(remainder) =
embedding_data.strip_prefix("__compressed__:")
{
let newline_pos = remainder.find('\n').ok_or_else(|| {
do_memory_core::Error::Storage(
"Invalid compressed data format: missing newline".to_string(),
)
})?;
let header = &remainder[..newline_pos];
let encoded_data = &remainder[newline_pos + 1..];
let colon_pos = header.find(':').ok_or_else(|| {
do_memory_core::Error::Storage("Invalid compressed header format".to_string())
})?;
let algorithm_str = &header[..colon_pos];
let original_size: usize = header[colon_pos + 1..].parse().map_err(|_| {
do_memory_core::Error::Storage(
"Invalid original size in compressed header".to_string(),
)
})?;
let algorithm = match algorithm_str {
"lz4" => crate::CompressionAlgorithm::Lz4,
"zstd" => crate::CompressionAlgorithm::Zstd,
"gzip" => crate::CompressionAlgorithm::Gzip,
_ => {
return Err(do_memory_core::Error::Storage(format!(
"Unknown compression algorithm: {}",
algorithm_str
)));
}
};
let compressed_data = base64::Engine::decode(
&base64::engine::general_purpose::STANDARD,
encoded_data,
)
.map_err(|e| {
do_memory_core::Error::Storage(format!(
"Failed to decode base64 compressed data: {}",
e
))
})?;
let payload = crate::CompressedPayload {
original_size,
compressed_size: compressed_data.len(),
compression_ratio: compressed_data.len() as f64 / original_size as f64,
data: compressed_data,
algorithm,
};
let bytes = payload.decompress()?;
bytes
.chunks_exact(4)
.map(|chunk| {
let mut arr = [0u8; 4];
arr.copy_from_slice(chunk);
f32::from_le_bytes(arr)
})
.collect()
} else {
serde_json::from_str(&embedding_data).map_err(|e| {
do_memory_core::Error::Storage(format!("Failed to parse embedding: {}", e))
})?
};
#[cfg(not(feature = "compression"))]
let embedding: Vec<f32> = serde_json::from_str(&embedding_data).map_err(|e| {
do_memory_core::Error::Storage(format!("Failed to parse embedding: {}", e))
})?;
Ok(Some(embedding))
} else {
Ok(None)
}
}
pub async fn _delete_embedding_internal(&self, item_id: &str) -> Result<bool> {
let (conn, _conn_id) = self.get_connection_with_id().await?;
const SQL: &str = "DELETE FROM embeddings WHERE item_id = ?";
let stmt = self
.prepared_cache
.get_or_prepare(&conn, SQL)
.await
.map_err(|e| {
do_memory_core::Error::Storage(format!("Failed to prepare statement: {}", e))
})?;
let rows_affected = stmt
.execute(libsql::params![item_id.to_string()])
.await
.map_err(|e| {
do_memory_core::Error::Storage(format!("Failed to delete embedding: {}", e))
})?;
Ok(rows_affected > 0)
}
pub async fn _store_embeddings_batch_internal(
&self,
embeddings: Vec<(String, Vec<f32>)>,
) -> Result<()> {
debug!("Storing embedding batch: {} items", embeddings.len());
let (conn, _conn_id) = self.get_connection_with_id().await?;
const SQL: &str = r#"
INSERT OR REPLACE INTO embeddings (embedding_id, item_id, item_type, embedding_data, dimension, model) VALUES (?, ?, ?, ?, ?, ?)
"#;
for (item_id, embedding) in embeddings {
let embedding_json =
serde_json::to_string(&embedding).map_err(do_memory_core::Error::Serialization)?;
let embedding_id = self.generate_embedding_id(&item_id, "embedding");
let stmt = self
.prepared_cache
.get_or_prepare(&conn, SQL)
.await
.map_err(|e| {
do_memory_core::Error::Storage(format!("Failed to prepare statement: {}", e))
})?;
stmt.execute(libsql::params![
embedding_id,
item_id,
"embedding",
embedding_json,
embedding.len() as i64,
"default"
])
.await
.map_err(|e| {
do_memory_core::Error::Storage(format!("Failed to store batch embedding: {}", e))
})?;
}
info!("Successfully stored embedding batch");
Ok(())
}
pub async fn _get_embeddings_batch_internal(
&self,
item_ids: &[String],
) -> Result<Vec<Option<Vec<f32>>>> {
debug!("Getting embedding batch: {} items", item_ids.len());
let mut results = Vec::with_capacity(item_ids.len());
for item_id in item_ids {
let embedding = self._get_embedding_internal(item_id, "embedding").await?;
results.push(embedding);
}
Ok(results)
}
fn generate_embedding_id(&self, item_id: &str, item_type: &str) -> String {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
format!("{}:{}", item_id, item_type).hash(&mut hasher);
format!("{:x}", hasher.finish())
}
pub async fn store_embedding_backend(&self, id: &str, embedding: Vec<f32>) -> Result<()> {
self._store_embedding_internal(id, "embedding", &embedding)
.await
}
pub async fn get_embedding_backend(&self, id: &str) -> Result<Option<Vec<f32>>> {
self._get_embedding_internal(id, "embedding").await
}
pub async fn delete_embedding_backend(&self, id: &str) -> Result<bool> {
self._delete_embedding_internal(id).await
}
pub async fn store_embeddings_batch_backend(
&self,
embeddings: Vec<(String, Vec<f32>)>,
) -> Result<()> {
self._store_embeddings_batch_internal(embeddings).await
}
pub async fn get_embeddings_batch_backend(
&self,
ids: &[String],
) -> Result<Vec<Option<Vec<f32>>>> {
self._get_embeddings_batch_internal(ids).await
}
}