Skip to main content

do_memory_storage_turso/storage/
capacity.rs

1//! Capacity-constrained storage operations for Turso
2
3use crate::{Result, TursoStorage};
4use do_memory_core::Episode;
5use std::collections::HashMap;
6use tracing::{debug, info, warn};
7
8impl TursoStorage {
9    /// Store an episode with capacity management
10    ///
11    /// When the episode limit is reached, the least relevant episodes are evicted
12    /// based on the configured eviction policy.
13    pub async fn store_episode_with_capacity(
14        &self,
15        episode: &Episode,
16        max_episodes: usize,
17    ) -> Result<()> {
18        debug!(
19            "Storing episode with capacity management: {}, max_episodes={}",
20            episode.episode_id, max_episodes
21        );
22
23        // First, store the episode
24        self.store_episode(episode).await?;
25
26        // Then, check if we need to evict episodes
27        self.enforce_capacity(max_episodes).await?;
28
29        Ok(())
30    }
31
32    /// Enforce the maximum episode capacity
33    ///
34    /// Uses the configured eviction policy to determine which episodes to remove
35    /// when the capacity is exceeded.
36    async fn enforce_capacity(&self, max_episodes: usize) -> Result<()> {
37        let (conn, _conn_id) = self.get_connection_with_id().await?;
38
39        // Count current episodes
40        const COUNT_SQL: &str = "SELECT COUNT(*) as count FROM episodes";
41
42        let mut count_rows = conn.query(COUNT_SQL, ()).await.map_err(|e| {
43            do_memory_core::Error::Storage(format!("Failed to count episodes: {}", e))
44        })?;
45
46        let current_count = if let Some(row) = count_rows
47            .next()
48            .await
49            .map_err(|e| do_memory_core::Error::Storage(e.to_string()))?
50        {
51            let count: i64 = row
52                .get(0)
53                .map_err(|e| do_memory_core::Error::Storage(e.to_string()))?;
54            count as usize
55        } else {
56            0
57        };
58
59        if current_count <= max_episodes {
60            return Ok(());
61        }
62
63        // Episodes exceed capacity - need to evict
64        let to_remove = current_count - max_episodes;
65        warn!(
66            "Capacity exceeded: {} > {}, removing {} episodes",
67            current_count, max_episodes, to_remove
68        );
69
70        // Get episodes to evict (oldest first, using LRU)
71        // Order by start_time first, then by episode_id for deterministic tie-breaking
72        let evict_sql = format!(
73            r#"
74            SELECT episode_id FROM episodes
75            ORDER BY start_time ASC, episode_id ASC
76            LIMIT {}
77        "#,
78            to_remove
79        );
80
81        let mut evict_rows = conn.query(&evict_sql, ()).await.map_err(|e| {
82            do_memory_core::Error::Storage(format!("Failed to query episodes to evict: {}", e))
83        })?;
84
85        let mut evicted = Vec::new();
86        while let Some(row) = evict_rows
87            .next()
88            .await
89            .map_err(|e| do_memory_core::Error::Storage(e.to_string()))?
90        {
91            let episode_id: String = row
92                .get(0)
93                .map_err(|e| do_memory_core::Error::Storage(e.to_string()))?;
94            evicted.push(episode_id);
95        }
96
97        // Drop the connection and query results before starting deletions
98        // to avoid "database locked" errors when running tests in parallel
99        drop(evict_rows);
100        drop(conn);
101
102        // Delete evicted episodes
103        const DELETE_SQL: &str = "DELETE FROM episodes WHERE episode_id = ?";
104
105        for episode_id in &evicted {
106            // Delete associated embeddings first
107            let _ = self._delete_embedding_internal(episode_id).await;
108
109            // Then delete the episode
110            let (conn, _conn_id) = self.get_connection_with_id().await?;
111
112            // Use prepared statement cache
113            let stmt = self
114                .prepared_cache
115                .get_or_prepare(&conn, DELETE_SQL)
116                .await
117                .map_err(|e| {
118                    do_memory_core::Error::Storage(format!("Failed to prepare statement: {}", e))
119                })?;
120
121            stmt.execute(libsql::params![episode_id.clone()])
122                .await
123                .map_err(|e| {
124                    do_memory_core::Error::Storage(format!("Failed to delete episode: {}", e))
125                })?;
126            drop(conn);
127        }
128
129        info!(
130            "Evicted {} episodes to enforce capacity limit of {}",
131            evicted.len(),
132            max_episodes
133        );
134
135        Ok(())
136    }
137
138    /// Get storage statistics including capacity info
139    pub async fn get_capacity_statistics(&self) -> Result<CapacityStatistics> {
140        let (conn, _conn_id) = self.get_connection_with_id().await?;
141
142        // Count records in each table
143        let tables = [
144            "episodes",
145            "patterns",
146            "heuristics",
147            "embeddings",
148            "execution_records",
149            "agent_metrics",
150            "task_metrics",
151        ];
152
153        let mut table_counts = HashMap::new();
154        for table in tables {
155            // SAFETY: Table names are hardcoded in the local `tables` array above.
156            // These are not user-controlled values, preventing SQL injection.
157            // CodeQL may flag this as a potential SQL injection, but it is a false positive.
158            #[allow(clippy::literal_string_with_formatting_args)]
159            let sql = format!("SELECT COUNT(*) FROM {}", table);
160            let mut rows = conn.query(&sql, ()).await.map_err(|e| {
161                do_memory_core::Error::Storage(format!("Failed to count {}: {}", table, e))
162            })?;
163
164            if let Some(row) = rows
165                .next()
166                .await
167                .map_err(|e| do_memory_core::Error::Storage(e.to_string()))?
168            {
169                let count: i64 = row
170                    .get(0)
171                    .map_err(|e| do_memory_core::Error::Storage(e.to_string()))?;
172                table_counts.insert(table.to_string(), count as usize);
173            }
174        }
175
176        Ok(CapacityStatistics {
177            episode_count: table_counts.get("episodes").copied().unwrap_or(0),
178            pattern_count: table_counts.get("patterns").copied().unwrap_or(0),
179            heuristic_count: table_counts.get("heuristics").copied().unwrap_or(0),
180            embedding_count: table_counts.get("embeddings").copied().unwrap_or(0),
181            execution_record_count: table_counts.get("execution_records").copied().unwrap_or(0),
182            agent_metrics_count: table_counts.get("agent_metrics").copied().unwrap_or(0),
183            task_metrics_count: table_counts.get("task_metrics").copied().unwrap_or(0),
184        })
185    }
186}
187
188/// Storage statistics for capacity monitoring
189#[derive(Debug, Clone)]
190pub struct CapacityStatistics {
191    pub episode_count: usize,
192    pub pattern_count: usize,
193    pub heuristic_count: usize,
194    pub embedding_count: usize,
195    pub execution_record_count: usize,
196    pub agent_metrics_count: usize,
197    pub task_metrics_count: usize,
198}
199
200impl std::fmt::Display for CapacityStatistics {
201    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
202        write!(
203            f,
204            "CapacityStatistics(episodes={}, patterns={}, heuristics={}, embeddings={})",
205            self.episode_count, self.pattern_count, self.heuristic_count, self.embedding_count
206        )
207    }
208}