Skip to main content

do_memory_storage_redb/
episodes_queries.rs

1//! Episode query operations for redb cache
2
3use crate::{EPISODES_TABLE, RedbStorage};
4use do_memory_core::{Episode, Error, Result, apply_query_limit};
5use redb::{ReadableDatabase, ReadableTable};
6use std::sync::Arc;
7use tracing::{debug, info};
8
9impl RedbStorage {
10    /// Query episodes modified since a given timestamp
11    ///
12    /// Returns all episodes where start_time >= the given timestamp.
13    /// This is used for incremental synchronization.
14    ///
15    /// Note: This scans all episodes in the cache and filters by timestamp,
16    /// which may be slow for large datasets. Consider using Turso for
17    /// efficient timestamp-based queries.
18    ///
19    /// # Arguments
20    ///
21    /// * `since` - Timestamp to query from
22    /// * `limit` - Maximum number of episodes to return (default: 100, max: 1000)
23    pub async fn query_episodes_since(
24        &self,
25        since: chrono::DateTime<chrono::Utc>,
26        limit: Option<usize>,
27    ) -> Result<Vec<Episode>> {
28        // Apply limit with defaults and bounds
29        let effective_limit = apply_query_limit(limit);
30        debug!(
31            "Querying episodes since {} from cache (limit: {})",
32            since, effective_limit
33        );
34        let db = Arc::clone(&self.db);
35
36        tokio::task::spawn_blocking(move || {
37            let read_txn = db
38                .begin_read()
39                .map_err(|e| Error::Storage(format!("Failed to begin read transaction: {}", e)))?;
40
41            let table = read_txn
42                .open_table(EPISODES_TABLE)
43                .map_err(|e| Error::Storage(format!("Failed to open episodes table: {}", e)))?;
44
45            let mut episodes = Vec::new();
46            let iter = table
47                .iter()
48                .map_err(|e| Error::Storage(format!("Failed to iterate episodes: {}", e)))?;
49
50            for result in iter {
51                // Check if we've hit the limit
52                if episodes.len() >= effective_limit {
53                    break;
54                }
55
56                let (_, bytes_guard) = result
57                    .map_err(|e| Error::Storage(format!("Failed to read episode entry: {}", e)))?;
58
59                let episode: Episode = postcard::from_bytes(bytes_guard.value())
60                    .map_err(|e| Error::Storage(format!("Failed to deserialize episode: {}", e)))?;
61
62                // Filter by timestamp
63                if episode.start_time >= since {
64                    episodes.push(episode);
65                }
66            }
67
68            // Sort by start_time descending (most recent first)
69            episodes.sort_by_key(|b| std::cmp::Reverse(b.start_time));
70
71            // Apply limit after sorting (in case we collected more than limit during filtering)
72            episodes.truncate(effective_limit);
73
74            info!(
75                "Found {} episodes since {} in cache (limit: {})",
76                episodes.len(),
77                since,
78                effective_limit
79            );
80            Ok(episodes)
81        })
82        .await
83        .map_err(|e| Error::Storage(format!("Task join error: {}", e)))?
84    }
85
86    /// Query episodes by metadata key-value pair
87    ///
88    /// This method searches through all episodes and returns those whose metadata
89    /// contains the specified key-value pair. This is less efficient than
90    /// timestamp-based queries but necessary for metadata-based searches.
91    ///
92    /// # Arguments
93    ///
94    /// * `key` - Metadata key to search for
95    /// * `value` - Metadata value to match
96    /// * `limit` - Maximum number of episodes to return (default: 100, max: 1000)
97    ///
98    /// # Returns
99    ///
100    /// Vector of episodes matching the metadata criteria
101    pub async fn query_episodes_by_metadata(
102        &self,
103        key: &str,
104        value: &str,
105        limit: Option<usize>,
106    ) -> Result<Vec<Episode>> {
107        // Apply limit with defaults and bounds
108        let effective_limit = apply_query_limit(limit);
109        debug!(
110            "Querying episodes by metadata: {} = {} (limit: {})",
111            key, value, effective_limit
112        );
113        let db = Arc::clone(&self.db);
114        let key_str = key.to_string();
115        let value_str = value.to_string();
116
117        tokio::task::spawn_blocking(move || {
118            let read_txn = db
119                .begin_read()
120                .map_err(|e| Error::Storage(format!("Failed to begin read transaction: {}", e)))?;
121
122            let table = read_txn
123                .open_table(EPISODES_TABLE)
124                .map_err(|e| Error::Storage(format!("Failed to open episodes table: {}", e)))?;
125
126            let mut episodes = Vec::new();
127            let iter = table
128                .iter()
129                .map_err(|e| Error::Storage(format!("Failed to iterate episodes: {}", e)))?;
130
131            for result in iter {
132                // Check if we've hit the limit
133                if episodes.len() >= effective_limit {
134                    break;
135                }
136
137                let (_, bytes_guard) = result
138                    .map_err(|e| Error::Storage(format!("Failed to read episode entry: {}", e)))?;
139
140                let episode: Episode = postcard::from_bytes(bytes_guard.value())
141                    .map_err(|e| Error::Storage(format!("Failed to deserialize episode: {}", e)))?;
142
143                // Check if metadata contains the key-value pair
144                if let Some(metadata_value) = episode.metadata.get(key_str.as_str()) {
145                    if metadata_value == value_str.as_str() {
146                        episodes.push(episode);
147                    }
148                }
149            }
150
151            // Sort by start_time descending (most recent first)
152            episodes.sort_by_key(|b| std::cmp::Reverse(b.start_time));
153
154            // Apply limit after sorting
155            episodes.truncate(effective_limit);
156
157            info!(
158                "Found {} episodes with metadata {} = {} in cache (limit: {})",
159                episodes.len(),
160                key_str,
161                value_str,
162                effective_limit
163            );
164            Ok(episodes)
165        })
166        .await
167        .map_err(|e| Error::Storage(format!("Task join error: {}", e)))?
168    }
169}
170
171#[cfg(test)]
172mod tests {
173    use super::*;
174    use do_memory_core::{Episode, TaskContext, TaskType};
175    use tempfile::tempdir;
176
177    async fn create_test_storage() -> Result<RedbStorage> {
178        let dir = tempdir().unwrap();
179        let db_path = dir.path().join("test.redb");
180        RedbStorage::new(&db_path).await
181    }
182
183    #[tokio::test]
184    async fn test_query_episodes_by_metadata_sorting() {
185        let storage = create_test_storage().await.unwrap();
186        let now = chrono::Utc::now();
187        for i in 0..5 {
188            let mut episode = Episode::new(
189                format!("task-{}", i),
190                TaskContext::default(),
191                TaskType::CodeGeneration,
192            );
193            episode.start_time = now + chrono::Duration::minutes(i as i64);
194            episode
195                .metadata
196                .insert("category".to_string(), "test".to_string());
197            storage.store_episode(&episode).await.unwrap();
198        }
199        let results = storage
200            .query_episodes_by_metadata("category", "test", None)
201            .await
202            .unwrap();
203        assert_eq!(results.len(), 5);
204        for i in 0..4 {
205            assert!(results[i].start_time >= results[i + 1].start_time);
206        }
207    }
208}