Skip to main content

do_memory_storage_turso/storage/
mod.rs

1//! Storage operations for episodes, patterns, and heuristics
2//!
3//! This module is organized into submodules for different storage concerns:
4//! - `episodes`: Episode CRUD operations
5//! - `patterns`: Pattern CRUD operations
6//! - `heuristics`: Heuristic CRUD operations
7//! - `monitoring`: Monitoring and metrics storage
8//! - `embeddings`: Embedding storage and retrieval
9//! - `search`: Vector similarity search
10//! - `capacity`: Capacity-constrained storage
11
12use crate::TursoStorage;
13use do_memory_core::Result;
14use tracing::{debug, info};
15
16// Re-export submodules
17pub mod batch;
18pub mod capacity;
19mod embedding_backend;
20mod embedding_tables;
21pub mod episodes;
22pub mod heuristics;
23pub mod monitoring;
24pub mod patterns;
25pub mod recommendations;
26pub mod search;
27pub mod tag_operations;
28
29// Multi-dimensional embedding storage (feature-gated)
30#[cfg(feature = "turso_multi_dimension")]
31mod embeddings_multi;
32
33pub use batch::episode_batch::BatchConfig;
34pub use episodes::EpisodeQuery;
35pub use episodes::raw_query::EPISODE_SELECT_COLUMNS;
36pub use episodes::raw_query::RawEpisodeQuery;
37pub use patterns::PATTERN_SELECT_COLUMNS;
38#[allow(unused)]
39pub use patterns::PatternMetadata;
40pub use patterns::PatternQuery;
41pub use patterns::RawPatternQuery;
42pub use tag_operations::TagStats;
43
44// Re-export dimension stats when multi-dimension feature is enabled
45#[cfg(feature = "turso_multi_dimension")]
46pub use embeddings_multi::DimensionStats;
47
48impl TursoStorage {
49    // ========== Internal Embedding Methods ==========
50
51    /// Store an embedding (internal implementation)
52    ///
53    /// When compression is enabled, embeddings are compressed using the configured
54    /// algorithm (LZ4, Zstd, or Gzip) to reduce network bandwidth.
55    ///
56    /// When turso_multi_dimension feature is enabled, routes to dimension-specific tables.
57    pub async fn _store_embedding_internal(
58        &self,
59        item_id: &str,
60        item_type: &str,
61        embedding: &[f32],
62    ) -> Result<()> {
63        // Route to dimension-aware storage when multi-dimension feature is enabled
64        #[cfg(feature = "turso_multi_dimension")]
65        {
66            return self
67                .store_embedding_dimension_aware(item_id, item_type, embedding)
68                .await;
69        }
70
71        // Standard single-table storage when multi-dimension is disabled
72        #[cfg(not(feature = "turso_multi_dimension"))]
73        {
74            self._store_embedding_single_table(item_id, item_type, embedding)
75                .await
76        }
77    }
78
79    /// Store embedding in single embeddings table (non-multi-dimension mode)
80    #[cfg(not(feature = "turso_multi_dimension"))]
81    async fn _store_embedding_single_table(
82        &self,
83        item_id: &str,
84        item_type: &str,
85        embedding: &[f32],
86    ) -> Result<()> {
87        debug!(
88            "Storing embedding: item_id={}, item_type={}, dimension={}",
89            item_id,
90            item_type,
91            embedding.len()
92        );
93        let (conn, _conn_id) = self.get_connection_with_id().await?;
94
95        // Get compression threshold from config
96        #[cfg(feature = "compression")]
97        let compression_threshold = self.config.compression_threshold;
98        #[cfg(not(feature = "compression"))]
99        let _compression_threshold = 0;
100
101        #[cfg(feature = "compression")]
102        let should_compress = self.config.compress_embeddings;
103        #[cfg(not(feature = "compression"))]
104        let _should_compress = false;
105
106        #[cfg(feature = "compression")]
107        let embedding_data: String = if should_compress {
108            // Convert f32 to bytes and compress
109            let bytes: Vec<u8> = embedding.iter().flat_map(|&f| f.to_le_bytes()).collect();
110
111            use crate::compression::CompressedPayload;
112            let compression_start = std::time::Instant::now();
113            let compressed = match CompressedPayload::compress(&bytes, compression_threshold) {
114                Ok(payload) => payload,
115                Err(e) => {
116                    if let Ok(mut stats) = self.compression_stats.lock() {
117                        stats.record_failed();
118                    }
119                    return Err(e);
120                }
121            };
122            let compression_time_us = compression_start.elapsed().as_micros() as u64;
123
124            if compressed.algorithm == crate::CompressionAlgorithm::None {
125                if let Ok(mut stats) = self.compression_stats.lock() {
126                    stats.record_skipped();
127                }
128                // No compression, store as JSON
129                serde_json::to_string(embedding).map_err(do_memory_core::Error::Serialization)?
130            } else {
131                if let Ok(mut stats) = self.compression_stats.lock() {
132                    stats.record_compression(
133                        bytes.len(),
134                        compressed.data.len(),
135                        compression_time_us,
136                    );
137                }
138                // Store compressed data with header
139                use base64::Engine;
140                format!(
141                    "__compressed__:{}:{}\n{}",
142                    compressed.algorithm,
143                    compressed.original_size,
144                    base64::engine::general_purpose::STANDARD.encode(&compressed.data)
145                )
146            }
147        } else {
148            // No compression, store as JSON
149            serde_json::to_string(embedding).map_err(do_memory_core::Error::Serialization)?
150        };
151
152        #[cfg(not(feature = "compression"))]
153        let embedding_data: String =
154            serde_json::to_string(embedding).map_err(do_memory_core::Error::Serialization)?;
155
156        // Always create JSON for vector32() - it must receive a JSON array "[...]"
157        // This is separate from embedding_data which may be compressed
158        let embedding_json_for_vector: String =
159            serde_json::to_string(embedding).map_err(do_memory_core::Error::Serialization)?;
160
161        // Store embedding with native vector column for DiskANN search
162        // Uses vector32() to convert JSON array to F32_BLOB format
163        // Note: embedding_vector column enables vector_top_k() search
164        const SQL: &str = r#"
165            INSERT OR REPLACE INTO embeddings (embedding_id, item_id, item_type, embedding_data, embedding_vector, dimension, model)
166            VALUES (?, ?, ?, ?, vector32(?), ?, ?)
167        "#;
168
169        let embedding_id = self.generate_embedding_id(item_id, item_type);
170
171        // Use prepared statement cache for optimal performance
172        // The cache is connection-aware and handles all connection types properly
173        let stmt = self
174            .prepared_cache
175            .get_or_prepare(&conn, SQL)
176            .await
177            .map_err(|e| {
178                do_memory_core::Error::Storage(format!("Failed to prepare statement: {}", e))
179            })?;
180        stmt.execute(libsql::params![
181            embedding_id,
182            item_id.to_string(),
183            item_type.to_string(),
184            embedding_data,            // May be compressed or JSON
185            embedding_json_for_vector, // Always JSON array for vector32()
186            embedding.len() as i64,
187            "default"
188        ])
189        .await
190        .map_err(|e| do_memory_core::Error::Storage(format!("Failed to store embedding: {}", e)))?;
191
192        info!("Successfully stored embedding: {}", item_id);
193        Ok(())
194    }
195
196    /// Get an embedding (internal implementation)
197    ///
198    /// Automatically decompresses embeddings if they were stored compressed.
199    pub async fn _get_embedding_internal(
200        &self,
201        item_id: &str,
202        item_type: &str,
203    ) -> Result<Option<Vec<f32>>> {
204        debug!(
205            "Retrieving embedding: item_id={}, item_type={}",
206            item_id, item_type
207        );
208        let (conn, _conn_id) = self.get_connection_with_id().await?;
209
210        const SQL: &str =
211            "SELECT embedding_data FROM embeddings WHERE item_id = ? AND item_type = ?";
212
213        // Use prepared statement cache for optimal performance
214        // The cache is connection-aware and handles all connection types properly
215        let stmt = self
216            .prepared_cache
217            .get_or_prepare(&conn, SQL)
218            .await
219            .map_err(|e| {
220                do_memory_core::Error::Storage(format!("Failed to prepare statement: {}", e))
221            })?;
222        let mut rows = stmt
223            .query(libsql::params![item_id.to_string(), item_type.to_string()])
224            .await
225            .map_err(|e| {
226                do_memory_core::Error::Storage(format!("Failed to query embedding: {}", e))
227            })?;
228
229        if let Some(row) = rows.next().await.map_err(|e| {
230            do_memory_core::Error::Storage(format!("Failed to fetch embedding row: {}", e))
231        })? {
232            let embedding_data: String = row
233                .get(0)
234                .map_err(|e| do_memory_core::Error::Storage(e.to_string()))?;
235
236            // Check if data is compressed (only when compression is enabled)
237            #[cfg(feature = "compression")]
238            let embedding: Vec<f32> = if let Some(remainder) =
239                embedding_data.strip_prefix("__compressed__:")
240            {
241                // Parse compressed format
242                let newline_pos = remainder.find('\n').ok_or_else(|| {
243                    do_memory_core::Error::Storage(
244                        "Invalid compressed data format: missing newline".to_string(),
245                    )
246                })?;
247                let header = &remainder[..newline_pos];
248                let encoded_data = &remainder[newline_pos + 1..];
249
250                // Parse header
251                let colon_pos = header.find(':').ok_or_else(|| {
252                    do_memory_core::Error::Storage("Invalid compressed header format".to_string())
253                })?;
254                let algorithm_str = &header[..colon_pos];
255                let original_size: usize = header[colon_pos + 1..].parse().map_err(|_| {
256                    do_memory_core::Error::Storage(
257                        "Invalid original size in compressed header".to_string(),
258                    )
259                })?;
260
261                let algorithm = match algorithm_str {
262                    "lz4" => crate::CompressionAlgorithm::Lz4,
263                    "zstd" => crate::CompressionAlgorithm::Zstd,
264                    "gzip" => crate::CompressionAlgorithm::Gzip,
265                    _ => {
266                        return Err(do_memory_core::Error::Storage(format!(
267                            "Unknown compression algorithm: {}",
268                            algorithm_str
269                        )));
270                    }
271                };
272
273                let compressed_data = base64::Engine::decode(
274                    &base64::engine::general_purpose::STANDARD,
275                    encoded_data,
276                )
277                .map_err(|e| {
278                    do_memory_core::Error::Storage(format!(
279                        "Failed to decode base64 compressed data: {}",
280                        e
281                    ))
282                })?;
283
284                let payload = crate::CompressedPayload {
285                    original_size,
286                    compressed_size: compressed_data.len(),
287                    compression_ratio: compressed_data.len() as f64 / original_size as f64,
288                    data: compressed_data,
289                    algorithm,
290                };
291
292                let bytes = payload.decompress()?;
293                bytes
294                    .chunks_exact(4)
295                    .map(|chunk| {
296                        let mut arr = [0u8; 4];
297                        arr.copy_from_slice(chunk);
298                        f32::from_le_bytes(arr)
299                    })
300                    .collect()
301            } else {
302                // Not compressed, parse as JSON
303                serde_json::from_str(&embedding_data).map_err(|e| {
304                    do_memory_core::Error::Storage(format!("Failed to parse embedding: {}", e))
305                })?
306            };
307
308            #[cfg(not(feature = "compression"))]
309            let embedding: Vec<f32> = serde_json::from_str(&embedding_data).map_err(|e| {
310                do_memory_core::Error::Storage(format!("Failed to parse embedding: {}", e))
311            })?;
312
313            Ok(Some(embedding))
314        } else {
315            Ok(None)
316        }
317    }
318
319    /// Delete an embedding (internal implementation)
320    pub async fn _delete_embedding_internal(&self, item_id: &str) -> Result<bool> {
321        let (conn, _conn_id) = self.get_connection_with_id().await?;
322
323        const SQL: &str = "DELETE FROM embeddings WHERE item_id = ?";
324
325        // Use prepared statement cache for optimal performance
326        // The cache is connection-aware and handles all connection types properly
327        let stmt = self
328            .prepared_cache
329            .get_or_prepare(&conn, SQL)
330            .await
331            .map_err(|e| {
332                do_memory_core::Error::Storage(format!("Failed to prepare statement: {}", e))
333            })?;
334        let rows_affected = stmt
335            .execute(libsql::params![item_id.to_string()])
336            .await
337            .map_err(|e| {
338                do_memory_core::Error::Storage(format!("Failed to delete embedding: {}", e))
339            })?;
340
341        Ok(rows_affected > 0)
342    }
343
344    /// Store embeddings in batch (internal implementation)
345    pub async fn _store_embeddings_batch_internal(
346        &self,
347        embeddings: Vec<(String, Vec<f32>)>,
348    ) -> Result<()> {
349        debug!("Storing embedding batch: {} items", embeddings.len());
350        let (conn, _conn_id) = self.get_connection_with_id().await?;
351
352        const SQL: &str = r#"
353            INSERT OR REPLACE INTO embeddings (embedding_id, item_id, item_type, embedding_data, embedding_vector, dimension, model)
354            VALUES (?, ?, ?, ?, vector32(?), ?, ?)
355        "#;
356
357        for (item_id, embedding) in embeddings {
358            let embedding_json =
359                serde_json::to_string(&embedding).map_err(do_memory_core::Error::Serialization)?;
360
361            let embedding_id = self.generate_embedding_id(&item_id, "embedding");
362
363            // Prepare statement for each iteration to avoid statement reuse issues
364            let stmt = self
365                .prepared_cache
366                .get_or_prepare(&conn, SQL)
367                .await
368                .map_err(|e| {
369                    do_memory_core::Error::Storage(format!("Failed to prepare statement: {}", e))
370                })?;
371
372            stmt.execute(libsql::params![
373                embedding_id,
374                item_id,
375                "embedding",
376                embedding_json.clone(),
377                embedding_json, // JSON array passed to vector32() for native vector storage
378                embedding.len() as i64,
379                "default"
380            ])
381            .await
382            .map_err(|e| {
383                do_memory_core::Error::Storage(format!("Failed to store batch embedding: {}", e))
384            })?;
385        }
386
387        info!("Successfully stored embedding batch");
388        Ok(())
389    }
390
391    /// Get embeddings in batch (internal implementation)
392    pub async fn _get_embeddings_batch_internal(
393        &self,
394        item_ids: &[String],
395    ) -> Result<Vec<Option<Vec<f32>>>> {
396        debug!("Getting embedding batch: {} items", item_ids.len());
397
398        let mut results = Vec::with_capacity(item_ids.len());
399
400        for item_id in item_ids {
401            let embedding = self._get_embedding_internal(item_id, "embedding").await?;
402            results.push(embedding);
403        }
404
405        Ok(results)
406    }
407
408    /// Generate a deterministic embedding_id from item_id and item_type
409    fn generate_embedding_id(&self, item_id: &str, item_type: &str) -> String {
410        use std::collections::hash_map::DefaultHasher;
411        use std::hash::{Hash, Hasher};
412
413        let mut hasher = DefaultHasher::new();
414        format!("{}:{}", item_id, item_type).hash(&mut hasher);
415        format!("{:x}", hasher.finish())
416    }
417
418    /// Migrate existing embeddings to populate embedding_vector column
419    ///
420    /// This migration populates the `embedding_vector` F32_BLOB column for
421    /// embeddings that were stored before native vector support was added.
422    /// The vector column enables DiskANN-accelerated vector_top_k search.
423    ///
424    /// Returns the number of embeddings migrated.
425    pub async fn migrate_embeddings_to_vector_format(&self) -> Result<usize> {
426        info!("Starting embedding vector migration...");
427        let (conn, _conn_id) = self.get_connection_with_id().await?;
428
429        // Update all embeddings where embedding_vector is NULL
430        // Uses vector32() to convert JSON embedding_data to F32_BLOB format
431        let sql = r#"
432            UPDATE embeddings
433            SET embedding_vector = vector32(embedding_data)
434            WHERE embedding_vector IS NULL AND embedding_data IS NOT NULL
435        "#;
436
437        let result = conn.execute(sql, ()).await.map_err(|e| {
438            do_memory_core::Error::Storage(format!("Failed to migrate embeddings: {}", e))
439        })?;
440
441        info!("Migrated {} embeddings to vector format", result);
442        Ok(result as usize)
443    }
444
445    /// Check if embedding vector column is populated for vector_top_k search
446    ///
447    /// Returns true if at least one embedding has the vector column populated.
448    pub async fn has_vector_embeddings(&self) -> Result<bool> {
449        let (conn, _conn_id) = self.get_connection_with_id().await?;
450
451        let sql = "SELECT COUNT(*) FROM embeddings WHERE embedding_vector IS NOT NULL LIMIT 1";
452
453        let mut rows = conn.query(sql, ()).await.map_err(|e| {
454            do_memory_core::Error::Storage(format!("Failed to check vector embeddings: {}", e))
455        })?;
456
457        if let Some(row) = rows
458            .next()
459            .await
460            .map_err(|e| do_memory_core::Error::Storage(e.to_string()))?
461        {
462            let count: i64 = row
463                .get(0)
464                .map_err(|e| do_memory_core::Error::Storage(e.to_string()))?;
465            return Ok(count > 0);
466        }
467
468        Ok(false)
469    }
470
471    // ========== Backend-compatible embedding methods ==========
472
473    /// Store an embedding (backend API)
474    pub async fn store_embedding_backend(&self, id: &str, embedding: Vec<f32>) -> Result<()> {
475        self._store_embedding_internal(id, "embedding", &embedding)
476            .await
477    }
478
479    /// Get an embedding (backend API)
480    pub async fn get_embedding_backend(&self, id: &str) -> Result<Option<Vec<f32>>> {
481        self._get_embedding_internal(id, "embedding").await
482    }
483
484    /// Delete an embedding (backend API)
485    pub async fn delete_embedding_backend(&self, id: &str) -> Result<bool> {
486        self._delete_embedding_internal(id).await
487    }
488
489    /// Store embeddings in batch (backend API)
490    pub async fn store_embeddings_batch_backend(
491        &self,
492        embeddings: Vec<(String, Vec<f32>)>,
493    ) -> Result<()> {
494        self._store_embeddings_batch_internal(embeddings).await
495    }
496
497    /// Get embeddings in batch (backend API)
498    pub async fn get_embeddings_batch_backend(
499        &self,
500        ids: &[String],
501    ) -> Result<Vec<Option<Vec<f32>>>> {
502        self._get_embeddings_batch_internal(ids).await
503    }
504}