do_memory_storage_turso/storage/
capacity.rs1use crate::{Result, TursoStorage};
4use do_memory_core::Episode;
5use std::collections::HashMap;
6use tracing::{debug, info, warn};
7
8impl TursoStorage {
9 pub async fn store_episode_with_capacity(
14 &self,
15 episode: &Episode,
16 max_episodes: usize,
17 ) -> Result<()> {
18 debug!(
19 "Storing episode with capacity management: {}, max_episodes={}",
20 episode.episode_id, max_episodes
21 );
22
23 self.store_episode(episode).await?;
25
26 self.enforce_capacity(max_episodes).await?;
28
29 Ok(())
30 }
31
32 async fn enforce_capacity(&self, max_episodes: usize) -> Result<()> {
37 let (conn, _conn_id) = self.get_connection_with_id().await?;
38
39 const COUNT_SQL: &str = "SELECT COUNT(*) as count FROM episodes";
41
42 let mut count_rows = conn.query(COUNT_SQL, ()).await.map_err(|e| {
43 do_memory_core::Error::Storage(format!("Failed to count episodes: {}", e))
44 })?;
45
46 let current_count = if let Some(row) = count_rows
47 .next()
48 .await
49 .map_err(|e| do_memory_core::Error::Storage(e.to_string()))?
50 {
51 let count: i64 = row
52 .get(0)
53 .map_err(|e| do_memory_core::Error::Storage(e.to_string()))?;
54 count as usize
55 } else {
56 0
57 };
58
59 if current_count <= max_episodes {
60 return Ok(());
61 }
62
63 let to_remove = current_count - max_episodes;
65 warn!(
66 "Capacity exceeded: {} > {}, removing {} episodes",
67 current_count, max_episodes, to_remove
68 );
69
70 let evict_sql = format!(
73 r#"
74 SELECT episode_id FROM episodes
75 ORDER BY start_time ASC, episode_id ASC
76 LIMIT {}
77 "#,
78 to_remove
79 );
80
81 let mut evict_rows = conn.query(&evict_sql, ()).await.map_err(|e| {
82 do_memory_core::Error::Storage(format!("Failed to query episodes to evict: {}", e))
83 })?;
84
85 let mut evicted = Vec::new();
86 while let Some(row) = evict_rows
87 .next()
88 .await
89 .map_err(|e| do_memory_core::Error::Storage(e.to_string()))?
90 {
91 let episode_id: String = row
92 .get(0)
93 .map_err(|e| do_memory_core::Error::Storage(e.to_string()))?;
94 evicted.push(episode_id);
95 }
96
97 drop(evict_rows);
100 drop(conn);
101
102 const DELETE_SQL: &str = "DELETE FROM episodes WHERE episode_id = ?";
104
105 for episode_id in &evicted {
106 let _ = self._delete_embedding_internal(episode_id).await;
108
109 let (conn, _conn_id) = self.get_connection_with_id().await?;
111
112 let stmt = self
114 .prepared_cache
115 .get_or_prepare(&conn, DELETE_SQL)
116 .await
117 .map_err(|e| {
118 do_memory_core::Error::Storage(format!("Failed to prepare statement: {}", e))
119 })?;
120
121 stmt.execute(libsql::params![episode_id.clone()])
122 .await
123 .map_err(|e| {
124 do_memory_core::Error::Storage(format!("Failed to delete episode: {}", e))
125 })?;
126 drop(conn);
127 }
128
129 info!(
130 "Evicted {} episodes to enforce capacity limit of {}",
131 evicted.len(),
132 max_episodes
133 );
134
135 Ok(())
136 }
137
138 pub async fn get_capacity_statistics(&self) -> Result<CapacityStatistics> {
140 let (conn, _conn_id) = self.get_connection_with_id().await?;
141
142 let tables = [
144 "episodes",
145 "patterns",
146 "heuristics",
147 "embeddings",
148 "execution_records",
149 "agent_metrics",
150 "task_metrics",
151 ];
152
153 let mut table_counts = HashMap::new();
154 for table in tables {
155 #[allow(clippy::literal_string_with_formatting_args)]
159 let sql = format!("SELECT COUNT(*) FROM {}", table);
160 let mut rows = conn.query(&sql, ()).await.map_err(|e| {
161 do_memory_core::Error::Storage(format!("Failed to count {}: {}", table, e))
162 })?;
163
164 if let Some(row) = rows
165 .next()
166 .await
167 .map_err(|e| do_memory_core::Error::Storage(e.to_string()))?
168 {
169 let count: i64 = row
170 .get(0)
171 .map_err(|e| do_memory_core::Error::Storage(e.to_string()))?;
172 table_counts.insert(table.to_string(), count as usize);
173 }
174 }
175
176 Ok(CapacityStatistics {
177 episode_count: table_counts.get("episodes").copied().unwrap_or(0),
178 pattern_count: table_counts.get("patterns").copied().unwrap_or(0),
179 heuristic_count: table_counts.get("heuristics").copied().unwrap_or(0),
180 embedding_count: table_counts.get("embeddings").copied().unwrap_or(0),
181 execution_record_count: table_counts.get("execution_records").copied().unwrap_or(0),
182 agent_metrics_count: table_counts.get("agent_metrics").copied().unwrap_or(0),
183 task_metrics_count: table_counts.get("task_metrics").copied().unwrap_or(0),
184 })
185 }
186}
187
188#[derive(Debug, Clone)]
190pub struct CapacityStatistics {
191 pub episode_count: usize,
192 pub pattern_count: usize,
193 pub heuristic_count: usize,
194 pub embedding_count: usize,
195 pub execution_record_count: usize,
196 pub agent_metrics_count: usize,
197 pub task_metrics_count: usize,
198}
199
200impl std::fmt::Display for CapacityStatistics {
201 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
202 write!(
203 f,
204 "CapacityStatistics(episodes={}, patterns={}, heuristics={}, embeddings={})",
205 self.episode_count, self.pattern_count, self.heuristic_count, self.embedding_count
206 )
207 }
208}