1use crate::cache::QueryCache;
7use crate::indexing::IncrementalIndexer;
8use crate::pool::ConnectionPool;
9use crate::storage::UnifiedGraphStore;
10use crate::watcher::Watcher;
11use std::path::PathBuf;
12use std::sync::Arc;
13use std::time::Duration;
14
15#[derive(Clone, Debug)]
39pub struct Runtime {
40 pub store: Arc<UnifiedGraphStore>,
42 pub watcher: Option<Watcher>,
44 pub indexer: IncrementalIndexer,
46 pub cache: QueryCache<String, String>,
48 pub pool: Option<ConnectionPool>,
50}
51
52impl Runtime {
53 pub async fn new(path: PathBuf) -> anyhow::Result<Self> {
67 let store =
68 Arc::new(UnifiedGraphStore::open(&path, crate::storage::BackendKind::SQLite).await?);
69 let indexer = IncrementalIndexer::new(Arc::clone(&store));
70
71 let cache = QueryCache::new(1000, Duration::from_secs(300));
73
74 let db_path = path.join(".forge/graph.db");
76 let pool = Some(ConnectionPool::new(&db_path, 10));
77
78 Ok(Self {
79 store,
80 watcher: None,
81 indexer,
82 cache,
83 pool,
84 })
85 }
86
87 pub async fn start_with_watching(&mut self) -> anyhow::Result<()> {
97 let (tx, _rx) = Watcher::channel();
98 let watcher = Watcher::new(Arc::clone(&self.store), tx);
99
100 let path = std::env::current_dir()?;
102 watcher.start(path).await?;
103
104 self.watcher = Some(watcher);
105
106 Ok(())
111 }
112
113 pub async fn process_events(&self) -> anyhow::Result<crate::indexing::FlushStats> {
126 self.indexer.flush().await
127 }
128
129 pub fn cache(&self) -> &QueryCache<String, String> {
131 &self.cache
132 }
133
134 pub fn pool(&self) -> Option<&ConnectionPool> {
136 self.pool.as_ref()
137 }
138
139 pub async fn pending_changes(&self) -> usize {
141 self.indexer.pending_count().await
142 }
143
144 pub fn is_watching(&self) -> bool {
146 self.watcher.is_some()
147 }
148
149 pub async fn start_watching(&mut self) -> anyhow::Result<()> {
159 self.start_with_watching().await
160 }
161
162 pub fn stop_watching(&mut self) {
166 self.watcher = None;
167 }
168
169 pub async fn indexer_stats(&self) -> crate::indexing::FlushStats {
173 crate::indexing::FlushStats {
174 indexed: 0,
175 deleted: 0,
176 }
177 }
178}
179
180#[cfg(test)]
181mod tests {
182 use super::*;
183 use crate::watcher::WatchEvent;
184 use tempfile::TempDir;
185
186 #[tokio::test]
187 async fn test_runtime_creation() {
188 let temp_dir = TempDir::new().unwrap();
189 let path = temp_dir.path().to_path_buf();
190
191 let runtime = Runtime::new(path).await.unwrap();
192
193 assert!(!runtime.is_watching());
194 assert_eq!(runtime.pending_changes().await, 0);
195 assert!(runtime.pool().is_some()); }
197
198 #[tokio::test]
199 async fn test_runtime_cache() {
200 let temp_dir = TempDir::new().unwrap();
201 let path = temp_dir.path().to_path_buf();
202
203 let runtime = Runtime::new(path).await.unwrap();
204
205 runtime
206 .cache
207 .insert("test".to_string(), "value".to_string())
208 .await;
209 let value = runtime.cache.get(&"test".to_string()).await;
210
211 assert_eq!(value, Some("value".to_string()));
212 }
213
214 #[tokio::test]
215 async fn test_runtime_pending_changes() {
216 let temp_dir = TempDir::new().unwrap();
217 let path = temp_dir.path().to_path_buf();
218
219 let runtime = Runtime::new(path).await.unwrap();
220
221 runtime
222 .indexer
223 .queue(WatchEvent::Modified(PathBuf::from("src/lib.rs")));
224 tokio::time::sleep(Duration::from_millis(50)).await;
225
226 assert_eq!(runtime.pending_changes().await, 1);
227
228 runtime.process_events().await.unwrap();
229 assert_eq!(runtime.pending_changes().await, 0);
230 }
231
232 #[tokio::test]
233 async fn test_runtime_process_events() {
234 let temp_dir = TempDir::new().unwrap();
235 let path = temp_dir.path().to_path_buf();
236
237 let runtime = Runtime::new(path).await.unwrap();
238
239 runtime
240 .indexer
241 .queue(WatchEvent::Created(PathBuf::from("test.rs")));
242 tokio::time::sleep(Duration::from_millis(50)).await;
243
244 let _stats = runtime.process_events().await.unwrap();
245 }
247
248 #[tokio::test]
249 async fn test_runtime_is_watching() {
250 let temp_dir = TempDir::new().unwrap();
251 let path = temp_dir.path().to_path_buf();
252
253 let runtime = Runtime::new(path).await.unwrap();
254 assert!(!runtime.is_watching());
255
256 }
259
260 #[tokio::test]
263 async fn test_runtime_cache_and_pool_access() {
264 let temp_dir = TempDir::new().unwrap();
265 let path = temp_dir.path().to_path_buf();
266
267 let runtime = Runtime::new(path).await.unwrap();
268
269 let cache = runtime.cache();
271 cache.insert("test".to_string(), "value".to_string()).await;
272 let value = cache.get(&"test".to_string()).await;
273 assert_eq!(value, Some("value".to_string()));
274
275 let pool = runtime.pool();
277 assert!(pool.is_some());
278 let pool = pool.unwrap();
279 assert!(pool.available_connections() > 0);
281 }
282
283 #[tokio::test]
284 async fn test_runtime_indexer_integration() {
285 let temp_dir = TempDir::new().unwrap();
286 let path = temp_dir.path().to_path_buf();
287
288 let mut runtime = Runtime::new(path).await.unwrap();
289
290 runtime.start_watching().await.unwrap();
292 assert!(runtime.is_watching());
293
294 runtime
296 .indexer
297 .queue(WatchEvent::Created(PathBuf::from("src/main.rs")));
298 tokio::time::sleep(Duration::from_millis(50)).await;
299
300 let pending = runtime.pending_changes().await;
302 assert!(pending >= 1, "Expected pending changes but got {}", pending);
303
304 let _stats = runtime.process_events().await.unwrap();
306 assert_eq!(runtime.pending_changes().await, 0);
310 }
311
312 #[tokio::test]
313 async fn test_runtime_full_orchestration() {
314 let temp_dir = TempDir::new().unwrap();
315 let path = temp_dir.path().to_path_buf();
316
317 let runtime = Runtime::new(path).await.unwrap();
318
319 runtime
321 .cache
322 .insert("query".to_string(), "result".to_string())
323 .await;
324 let cached = runtime.cache.get(&"query".to_string()).await;
325 assert_eq!(cached, Some("result".to_string()));
326
327 runtime
329 .indexer
330 .queue(WatchEvent::Modified(PathBuf::from("modified.rs")));
331 tokio::time::sleep(Duration::from_millis(50)).await;
332
333 let _stats = runtime.process_events().await.unwrap();
335 let pool = runtime.pool().unwrap();
339 assert!(pool.available_connections() > 0);
340
341 }
343
344 #[tokio::test]
345 async fn test_runtime_double_start_watching() {
346 let temp_dir = TempDir::new().unwrap();
347 let path = temp_dir.path().to_path_buf();
348
349 let mut runtime = Runtime::new(path).await.unwrap();
350
351 runtime.start_watching().await.unwrap();
353 assert!(runtime.is_watching());
354
355 let result = runtime.start_watching().await;
358 assert!(result.is_ok());
359 assert!(runtime.is_watching());
360
361 }
364
365 #[tokio::test]
366 async fn test_runtime_stop_watching() {
367 let temp_dir = TempDir::new().unwrap();
368 let path = temp_dir.path().to_path_buf();
369
370 let mut runtime = Runtime::new(path).await.unwrap();
371
372 runtime.start_watching().await.unwrap();
374 assert!(runtime.is_watching());
375
376 runtime.stop_watching();
378 assert!(!runtime.is_watching());
379
380 let pending = runtime.pending_changes().await;
384 assert_eq!(pending, 0);
386 }
387
388 #[tokio::test]
389 async fn test_runtime_error_handling() {
390 let result = Runtime::new(PathBuf::from("")).await;
391 let _ = result;
392
393 let temp_dir = TempDir::new().unwrap();
394 let nonexistent = temp_dir
395 .path()
396 .join("nonexistent")
397 .join("deep")
398 .join("path");
399
400 let result = Runtime::new(nonexistent.clone()).await;
401 assert!(result.is_err(), "Runtime should reject non-existent paths");
402
403 tokio::fs::create_dir_all(&nonexistent).await.unwrap();
404 let result = Runtime::new(nonexistent).await;
405 assert!(
406 result.is_ok(),
407 "Runtime should work after directory is created"
408 );
409
410 let runtime = result.unwrap();
411 assert!(!runtime.is_watching());
412 assert_eq!(runtime.pending_changes().await, 0);
413 }
414}