do_memory_storage_turso/storage/
mod.rs1use crate::TursoStorage;
13use do_memory_core::Result;
14use tracing::{debug, info};
15
16pub mod batch;
18pub mod capacity;
19mod embedding_backend;
20mod embedding_tables;
21pub mod episodes;
22pub mod heuristics;
23pub mod monitoring;
24pub mod patterns;
25pub mod recommendations;
26pub mod search;
27pub mod tag_operations;
28
29#[cfg(feature = "turso_multi_dimension")]
31mod embeddings_multi;
32
33pub use batch::episode_batch::BatchConfig;
34pub use episodes::EpisodeQuery;
35pub use episodes::raw_query::EPISODE_SELECT_COLUMNS;
36pub use episodes::raw_query::RawEpisodeQuery;
37pub use patterns::PATTERN_SELECT_COLUMNS;
38#[allow(unused)]
39pub use patterns::PatternMetadata;
40pub use patterns::PatternQuery;
41pub use patterns::RawPatternQuery;
42pub use tag_operations::TagStats;
43
44#[cfg(feature = "turso_multi_dimension")]
46pub use embeddings_multi::DimensionStats;
47
48impl TursoStorage {
49 pub async fn _store_embedding_internal(
58 &self,
59 item_id: &str,
60 item_type: &str,
61 embedding: &[f32],
62 ) -> Result<()> {
63 #[cfg(feature = "turso_multi_dimension")]
65 {
66 return self
67 .store_embedding_dimension_aware(item_id, item_type, embedding)
68 .await;
69 }
70
71 #[cfg(not(feature = "turso_multi_dimension"))]
73 {
74 self._store_embedding_single_table(item_id, item_type, embedding)
75 .await
76 }
77 }
78
79 #[cfg(not(feature = "turso_multi_dimension"))]
81 async fn _store_embedding_single_table(
82 &self,
83 item_id: &str,
84 item_type: &str,
85 embedding: &[f32],
86 ) -> Result<()> {
87 debug!(
88 "Storing embedding: item_id={}, item_type={}, dimension={}",
89 item_id,
90 item_type,
91 embedding.len()
92 );
93 let (conn, _conn_id) = self.get_connection_with_id().await?;
94
95 #[cfg(feature = "compression")]
97 let compression_threshold = self.config.compression_threshold;
98 #[cfg(not(feature = "compression"))]
99 let _compression_threshold = 0;
100
101 #[cfg(feature = "compression")]
102 let should_compress = self.config.compress_embeddings;
103 #[cfg(not(feature = "compression"))]
104 let _should_compress = false;
105
106 #[cfg(feature = "compression")]
107 let embedding_data: String = if should_compress {
108 let bytes: Vec<u8> = embedding.iter().flat_map(|&f| f.to_le_bytes()).collect();
110
111 use crate::compression::CompressedPayload;
112 let compression_start = std::time::Instant::now();
113 let compressed = match CompressedPayload::compress(&bytes, compression_threshold) {
114 Ok(payload) => payload,
115 Err(e) => {
116 if let Ok(mut stats) = self.compression_stats.lock() {
117 stats.record_failed();
118 }
119 return Err(e);
120 }
121 };
122 let compression_time_us = compression_start.elapsed().as_micros() as u64;
123
124 if compressed.algorithm == crate::CompressionAlgorithm::None {
125 if let Ok(mut stats) = self.compression_stats.lock() {
126 stats.record_skipped();
127 }
128 serde_json::to_string(embedding).map_err(do_memory_core::Error::Serialization)?
130 } else {
131 if let Ok(mut stats) = self.compression_stats.lock() {
132 stats.record_compression(
133 bytes.len(),
134 compressed.data.len(),
135 compression_time_us,
136 );
137 }
138 use base64::Engine;
140 format!(
141 "__compressed__:{}:{}\n{}",
142 compressed.algorithm,
143 compressed.original_size,
144 base64::engine::general_purpose::STANDARD.encode(&compressed.data)
145 )
146 }
147 } else {
148 serde_json::to_string(embedding).map_err(do_memory_core::Error::Serialization)?
150 };
151
152 #[cfg(not(feature = "compression"))]
153 let embedding_data: String =
154 serde_json::to_string(embedding).map_err(do_memory_core::Error::Serialization)?;
155
156 let embedding_json_for_vector: String =
159 serde_json::to_string(embedding).map_err(do_memory_core::Error::Serialization)?;
160
161 const SQL: &str = r#"
165 INSERT OR REPLACE INTO embeddings (embedding_id, item_id, item_type, embedding_data, embedding_vector, dimension, model)
166 VALUES (?, ?, ?, ?, vector32(?), ?, ?)
167 "#;
168
169 let embedding_id = self.generate_embedding_id(item_id, item_type);
170
171 let stmt = self
174 .prepared_cache
175 .get_or_prepare(&conn, SQL)
176 .await
177 .map_err(|e| {
178 do_memory_core::Error::Storage(format!("Failed to prepare statement: {}", e))
179 })?;
180 stmt.execute(libsql::params![
181 embedding_id,
182 item_id.to_string(),
183 item_type.to_string(),
184 embedding_data, embedding_json_for_vector, embedding.len() as i64,
187 "default"
188 ])
189 .await
190 .map_err(|e| do_memory_core::Error::Storage(format!("Failed to store embedding: {}", e)))?;
191
192 info!("Successfully stored embedding: {}", item_id);
193 Ok(())
194 }
195
196 pub async fn _get_embedding_internal(
200 &self,
201 item_id: &str,
202 item_type: &str,
203 ) -> Result<Option<Vec<f32>>> {
204 debug!(
205 "Retrieving embedding: item_id={}, item_type={}",
206 item_id, item_type
207 );
208 let (conn, _conn_id) = self.get_connection_with_id().await?;
209
210 const SQL: &str =
211 "SELECT embedding_data FROM embeddings WHERE item_id = ? AND item_type = ?";
212
213 let stmt = self
216 .prepared_cache
217 .get_or_prepare(&conn, SQL)
218 .await
219 .map_err(|e| {
220 do_memory_core::Error::Storage(format!("Failed to prepare statement: {}", e))
221 })?;
222 let mut rows = stmt
223 .query(libsql::params![item_id.to_string(), item_type.to_string()])
224 .await
225 .map_err(|e| {
226 do_memory_core::Error::Storage(format!("Failed to query embedding: {}", e))
227 })?;
228
229 if let Some(row) = rows.next().await.map_err(|e| {
230 do_memory_core::Error::Storage(format!("Failed to fetch embedding row: {}", e))
231 })? {
232 let embedding_data: String = row
233 .get(0)
234 .map_err(|e| do_memory_core::Error::Storage(e.to_string()))?;
235
236 #[cfg(feature = "compression")]
238 let embedding: Vec<f32> = if let Some(remainder) =
239 embedding_data.strip_prefix("__compressed__:")
240 {
241 let newline_pos = remainder.find('\n').ok_or_else(|| {
243 do_memory_core::Error::Storage(
244 "Invalid compressed data format: missing newline".to_string(),
245 )
246 })?;
247 let header = &remainder[..newline_pos];
248 let encoded_data = &remainder[newline_pos + 1..];
249
250 let colon_pos = header.find(':').ok_or_else(|| {
252 do_memory_core::Error::Storage("Invalid compressed header format".to_string())
253 })?;
254 let algorithm_str = &header[..colon_pos];
255 let original_size: usize = header[colon_pos + 1..].parse().map_err(|_| {
256 do_memory_core::Error::Storage(
257 "Invalid original size in compressed header".to_string(),
258 )
259 })?;
260
261 let algorithm = match algorithm_str {
262 "lz4" => crate::CompressionAlgorithm::Lz4,
263 "zstd" => crate::CompressionAlgorithm::Zstd,
264 "gzip" => crate::CompressionAlgorithm::Gzip,
265 _ => {
266 return Err(do_memory_core::Error::Storage(format!(
267 "Unknown compression algorithm: {}",
268 algorithm_str
269 )));
270 }
271 };
272
273 let compressed_data = base64::Engine::decode(
274 &base64::engine::general_purpose::STANDARD,
275 encoded_data,
276 )
277 .map_err(|e| {
278 do_memory_core::Error::Storage(format!(
279 "Failed to decode base64 compressed data: {}",
280 e
281 ))
282 })?;
283
284 let payload = crate::CompressedPayload {
285 original_size,
286 compressed_size: compressed_data.len(),
287 compression_ratio: compressed_data.len() as f64 / original_size as f64,
288 data: compressed_data,
289 algorithm,
290 };
291
292 let bytes = payload.decompress()?;
293 bytes
294 .chunks_exact(4)
295 .map(|chunk| {
296 let mut arr = [0u8; 4];
297 arr.copy_from_slice(chunk);
298 f32::from_le_bytes(arr)
299 })
300 .collect()
301 } else {
302 serde_json::from_str(&embedding_data).map_err(|e| {
304 do_memory_core::Error::Storage(format!("Failed to parse embedding: {}", e))
305 })?
306 };
307
308 #[cfg(not(feature = "compression"))]
309 let embedding: Vec<f32> = serde_json::from_str(&embedding_data).map_err(|e| {
310 do_memory_core::Error::Storage(format!("Failed to parse embedding: {}", e))
311 })?;
312
313 Ok(Some(embedding))
314 } else {
315 Ok(None)
316 }
317 }
318
319 pub async fn _delete_embedding_internal(&self, item_id: &str) -> Result<bool> {
321 let (conn, _conn_id) = self.get_connection_with_id().await?;
322
323 const SQL: &str = "DELETE FROM embeddings WHERE item_id = ?";
324
325 let stmt = self
328 .prepared_cache
329 .get_or_prepare(&conn, SQL)
330 .await
331 .map_err(|e| {
332 do_memory_core::Error::Storage(format!("Failed to prepare statement: {}", e))
333 })?;
334 let rows_affected = stmt
335 .execute(libsql::params![item_id.to_string()])
336 .await
337 .map_err(|e| {
338 do_memory_core::Error::Storage(format!("Failed to delete embedding: {}", e))
339 })?;
340
341 Ok(rows_affected > 0)
342 }
343
344 pub async fn _store_embeddings_batch_internal(
346 &self,
347 embeddings: Vec<(String, Vec<f32>)>,
348 ) -> Result<()> {
349 debug!("Storing embedding batch: {} items", embeddings.len());
350 let (conn, _conn_id) = self.get_connection_with_id().await?;
351
352 const SQL: &str = r#"
353 INSERT OR REPLACE INTO embeddings (embedding_id, item_id, item_type, embedding_data, embedding_vector, dimension, model)
354 VALUES (?, ?, ?, ?, vector32(?), ?, ?)
355 "#;
356
357 for (item_id, embedding) in embeddings {
358 let embedding_json =
359 serde_json::to_string(&embedding).map_err(do_memory_core::Error::Serialization)?;
360
361 let embedding_id = self.generate_embedding_id(&item_id, "embedding");
362
363 let stmt = self
365 .prepared_cache
366 .get_or_prepare(&conn, SQL)
367 .await
368 .map_err(|e| {
369 do_memory_core::Error::Storage(format!("Failed to prepare statement: {}", e))
370 })?;
371
372 stmt.execute(libsql::params![
373 embedding_id,
374 item_id,
375 "embedding",
376 embedding_json.clone(),
377 embedding_json, embedding.len() as i64,
379 "default"
380 ])
381 .await
382 .map_err(|e| {
383 do_memory_core::Error::Storage(format!("Failed to store batch embedding: {}", e))
384 })?;
385 }
386
387 info!("Successfully stored embedding batch");
388 Ok(())
389 }
390
391 pub async fn _get_embeddings_batch_internal(
393 &self,
394 item_ids: &[String],
395 ) -> Result<Vec<Option<Vec<f32>>>> {
396 debug!("Getting embedding batch: {} items", item_ids.len());
397
398 let mut results = Vec::with_capacity(item_ids.len());
399
400 for item_id in item_ids {
401 let embedding = self._get_embedding_internal(item_id, "embedding").await?;
402 results.push(embedding);
403 }
404
405 Ok(results)
406 }
407
408 fn generate_embedding_id(&self, item_id: &str, item_type: &str) -> String {
410 use std::collections::hash_map::DefaultHasher;
411 use std::hash::{Hash, Hasher};
412
413 let mut hasher = DefaultHasher::new();
414 format!("{}:{}", item_id, item_type).hash(&mut hasher);
415 format!("{:x}", hasher.finish())
416 }
417
418 pub async fn migrate_embeddings_to_vector_format(&self) -> Result<usize> {
426 info!("Starting embedding vector migration...");
427 let (conn, _conn_id) = self.get_connection_with_id().await?;
428
429 let sql = r#"
432 UPDATE embeddings
433 SET embedding_vector = vector32(embedding_data)
434 WHERE embedding_vector IS NULL AND embedding_data IS NOT NULL
435 "#;
436
437 let result = conn.execute(sql, ()).await.map_err(|e| {
438 do_memory_core::Error::Storage(format!("Failed to migrate embeddings: {}", e))
439 })?;
440
441 info!("Migrated {} embeddings to vector format", result);
442 Ok(result as usize)
443 }
444
445 pub async fn has_vector_embeddings(&self) -> Result<bool> {
449 let (conn, _conn_id) = self.get_connection_with_id().await?;
450
451 let sql = "SELECT COUNT(*) FROM embeddings WHERE embedding_vector IS NOT NULL LIMIT 1";
452
453 let mut rows = conn.query(sql, ()).await.map_err(|e| {
454 do_memory_core::Error::Storage(format!("Failed to check vector embeddings: {}", e))
455 })?;
456
457 if let Some(row) = rows
458 .next()
459 .await
460 .map_err(|e| do_memory_core::Error::Storage(e.to_string()))?
461 {
462 let count: i64 = row
463 .get(0)
464 .map_err(|e| do_memory_core::Error::Storage(e.to_string()))?;
465 return Ok(count > 0);
466 }
467
468 Ok(false)
469 }
470
471 pub async fn store_embedding_backend(&self, id: &str, embedding: Vec<f32>) -> Result<()> {
475 self._store_embedding_internal(id, "embedding", &embedding)
476 .await
477 }
478
479 pub async fn get_embedding_backend(&self, id: &str) -> Result<Option<Vec<f32>>> {
481 self._get_embedding_internal(id, "embedding").await
482 }
483
484 pub async fn delete_embedding_backend(&self, id: &str) -> Result<bool> {
486 self._delete_embedding_internal(id).await
487 }
488
489 pub async fn store_embeddings_batch_backend(
491 &self,
492 embeddings: Vec<(String, Vec<f32>)>,
493 ) -> Result<()> {
494 self._store_embeddings_batch_internal(embeddings).await
495 }
496
497 pub async fn get_embeddings_batch_backend(
499 &self,
500 ids: &[String],
501 ) -> Result<Vec<Option<Vec<f32>>>> {
502 self._get_embeddings_batch_internal(ids).await
503 }
504}