do_memory_storage_redb/
episodes_queries.rs1use crate::{EPISODES_TABLE, RedbStorage};
4use do_memory_core::{Episode, Error, Result, apply_query_limit};
5use redb::{ReadableDatabase, ReadableTable};
6use std::sync::Arc;
7use tracing::{debug, info};
8
9impl RedbStorage {
10 pub async fn query_episodes_since(
24 &self,
25 since: chrono::DateTime<chrono::Utc>,
26 limit: Option<usize>,
27 ) -> Result<Vec<Episode>> {
28 let effective_limit = apply_query_limit(limit);
30 debug!(
31 "Querying episodes since {} from cache (limit: {})",
32 since, effective_limit
33 );
34 let db = Arc::clone(&self.db);
35
36 tokio::task::spawn_blocking(move || {
37 let read_txn = db
38 .begin_read()
39 .map_err(|e| Error::Storage(format!("Failed to begin read transaction: {}", e)))?;
40
41 let table = read_txn
42 .open_table(EPISODES_TABLE)
43 .map_err(|e| Error::Storage(format!("Failed to open episodes table: {}", e)))?;
44
45 let mut episodes = Vec::new();
46 let iter = table
47 .iter()
48 .map_err(|e| Error::Storage(format!("Failed to iterate episodes: {}", e)))?;
49
50 for result in iter {
51 if episodes.len() >= effective_limit {
53 break;
54 }
55
56 let (_, bytes_guard) = result
57 .map_err(|e| Error::Storage(format!("Failed to read episode entry: {}", e)))?;
58
59 let episode: Episode = postcard::from_bytes(bytes_guard.value())
60 .map_err(|e| Error::Storage(format!("Failed to deserialize episode: {}", e)))?;
61
62 if episode.start_time >= since {
64 episodes.push(episode);
65 }
66 }
67
68 episodes.sort_by_key(|b| std::cmp::Reverse(b.start_time));
70
71 episodes.truncate(effective_limit);
73
74 info!(
75 "Found {} episodes since {} in cache (limit: {})",
76 episodes.len(),
77 since,
78 effective_limit
79 );
80 Ok(episodes)
81 })
82 .await
83 .map_err(|e| Error::Storage(format!("Task join error: {}", e)))?
84 }
85
86 pub async fn query_episodes_by_metadata(
102 &self,
103 key: &str,
104 value: &str,
105 limit: Option<usize>,
106 ) -> Result<Vec<Episode>> {
107 let effective_limit = apply_query_limit(limit);
109 debug!(
110 "Querying episodes by metadata: {} = {} (limit: {})",
111 key, value, effective_limit
112 );
113 let db = Arc::clone(&self.db);
114 let key_str = key.to_string();
115 let value_str = value.to_string();
116
117 tokio::task::spawn_blocking(move || {
118 let read_txn = db
119 .begin_read()
120 .map_err(|e| Error::Storage(format!("Failed to begin read transaction: {}", e)))?;
121
122 let table = read_txn
123 .open_table(EPISODES_TABLE)
124 .map_err(|e| Error::Storage(format!("Failed to open episodes table: {}", e)))?;
125
126 let mut episodes = Vec::new();
127 let iter = table
128 .iter()
129 .map_err(|e| Error::Storage(format!("Failed to iterate episodes: {}", e)))?;
130
131 for result in iter {
132 if episodes.len() >= effective_limit {
134 break;
135 }
136
137 let (_, bytes_guard) = result
138 .map_err(|e| Error::Storage(format!("Failed to read episode entry: {}", e)))?;
139
140 let episode: Episode = postcard::from_bytes(bytes_guard.value())
141 .map_err(|e| Error::Storage(format!("Failed to deserialize episode: {}", e)))?;
142
143 if let Some(metadata_value) = episode.metadata.get(key_str.as_str()) {
145 if metadata_value == value_str.as_str() {
146 episodes.push(episode);
147 }
148 }
149 }
150
151 episodes.sort_by_key(|b| std::cmp::Reverse(b.start_time));
153
154 episodes.truncate(effective_limit);
156
157 info!(
158 "Found {} episodes with metadata {} = {} in cache (limit: {})",
159 episodes.len(),
160 key_str,
161 value_str,
162 effective_limit
163 );
164 Ok(episodes)
165 })
166 .await
167 .map_err(|e| Error::Storage(format!("Task join error: {}", e)))?
168 }
169}
170
171#[cfg(test)]
172mod tests {
173 use super::*;
174 use do_memory_core::{Episode, TaskContext, TaskType};
175 use tempfile::tempdir;
176
177 async fn create_test_storage() -> Result<RedbStorage> {
178 let dir = tempdir().unwrap();
179 let db_path = dir.path().join("test.redb");
180 RedbStorage::new(&db_path).await
181 }
182
183 #[tokio::test]
184 async fn test_query_episodes_by_metadata_sorting() {
185 let storage = create_test_storage().await.unwrap();
186 let now = chrono::Utc::now();
187 for i in 0..5 {
188 let mut episode = Episode::new(
189 format!("task-{}", i),
190 TaskContext::default(),
191 TaskType::CodeGeneration,
192 );
193 episode.start_time = now + chrono::Duration::minutes(i as i64);
194 episode
195 .metadata
196 .insert("category".to_string(), "test".to_string());
197 storage.store_episode(&episode).await.unwrap();
198 }
199 let results = storage
200 .query_episodes_by_metadata("category", "test", None)
201 .await
202 .unwrap();
203 assert_eq!(results.len(), 5);
204 for i in 0..4 {
205 assert!(results[i].start_time >= results[i + 1].start_time);
206 }
207 }
208}