use crate::cache::QueryCache;
use crate::indexing::IncrementalIndexer;
use crate::pool::ConnectionPool;
use crate::storage::UnifiedGraphStore;
use crate::watcher::Watcher;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
#[derive(Clone, Debug)]
pub struct Runtime {
pub store: Arc<UnifiedGraphStore>,
pub watcher: Option<Watcher>,
pub indexer: IncrementalIndexer,
pub cache: QueryCache<String, String>,
pub pool: Option<ConnectionPool>,
}
impl Runtime {
pub async fn new(path: PathBuf) -> anyhow::Result<Self> {
let store =
Arc::new(UnifiedGraphStore::open(&path, crate::storage::BackendKind::SQLite).await?);
let indexer = IncrementalIndexer::new(Arc::clone(&store));
let cache = QueryCache::new(1000, Duration::from_secs(300));
let db_path = path.join(".forge/graph.db");
let pool = Some(ConnectionPool::new(&db_path, 10));
Ok(Self {
store,
watcher: None,
indexer,
cache,
pool,
})
}
pub async fn start_with_watching(&mut self) -> anyhow::Result<()> {
let (tx, _rx) = Watcher::channel();
let watcher = Watcher::new(Arc::clone(&self.store), tx);
let path = std::env::current_dir()?;
watcher.start(path).await?;
self.watcher = Some(watcher);
Ok(())
}
pub async fn process_events(&self) -> anyhow::Result<crate::indexing::FlushStats> {
self.indexer.flush().await
}
pub fn cache(&self) -> &QueryCache<String, String> {
&self.cache
}
pub fn pool(&self) -> Option<&ConnectionPool> {
self.pool.as_ref()
}
pub async fn pending_changes(&self) -> usize {
self.indexer.pending_count().await
}
pub fn is_watching(&self) -> bool {
self.watcher.is_some()
}
pub async fn start_watching(&mut self) -> anyhow::Result<()> {
self.start_with_watching().await
}
pub fn stop_watching(&mut self) {
self.watcher = None;
}
pub async fn indexer_stats(&self) -> crate::indexing::FlushStats {
crate::indexing::FlushStats {
indexed: 0,
deleted: 0,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::watcher::WatchEvent;
use tempfile::TempDir;
#[tokio::test]
async fn test_runtime_creation() {
let temp_dir = TempDir::new().unwrap();
let path = temp_dir.path().to_path_buf();
let runtime = Runtime::new(path).await.unwrap();
assert!(!runtime.is_watching());
assert_eq!(runtime.pending_changes().await, 0);
assert!(runtime.pool().is_some()); }
#[tokio::test]
async fn test_runtime_cache() {
let temp_dir = TempDir::new().unwrap();
let path = temp_dir.path().to_path_buf();
let runtime = Runtime::new(path).await.unwrap();
runtime
.cache
.insert("test".to_string(), "value".to_string())
.await;
let value = runtime.cache.get(&"test".to_string()).await;
assert_eq!(value, Some("value".to_string()));
}
#[tokio::test]
async fn test_runtime_pending_changes() {
let temp_dir = TempDir::new().unwrap();
let path = temp_dir.path().to_path_buf();
let runtime = Runtime::new(path).await.unwrap();
runtime
.indexer
.queue(WatchEvent::Modified(PathBuf::from("src/lib.rs")));
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(runtime.pending_changes().await, 1);
runtime.process_events().await.unwrap();
assert_eq!(runtime.pending_changes().await, 0);
}
#[tokio::test]
async fn test_runtime_process_events() {
let temp_dir = TempDir::new().unwrap();
let path = temp_dir.path().to_path_buf();
let runtime = Runtime::new(path).await.unwrap();
runtime
.indexer
.queue(WatchEvent::Created(PathBuf::from("test.rs")));
tokio::time::sleep(Duration::from_millis(50)).await;
let _stats = runtime.process_events().await.unwrap();
}
#[tokio::test]
async fn test_runtime_is_watching() {
let temp_dir = TempDir::new().unwrap();
let path = temp_dir.path().to_path_buf();
let runtime = Runtime::new(path).await.unwrap();
assert!(!runtime.is_watching());
}
#[tokio::test]
async fn test_runtime_cache_and_pool_access() {
let temp_dir = TempDir::new().unwrap();
let path = temp_dir.path().to_path_buf();
let runtime = Runtime::new(path).await.unwrap();
let cache = runtime.cache();
cache.insert("test".to_string(), "value".to_string()).await;
let value = cache.get(&"test".to_string()).await;
assert_eq!(value, Some("value".to_string()));
let pool = runtime.pool();
assert!(pool.is_some());
let pool = pool.unwrap();
assert!(pool.available_connections() > 0);
}
#[tokio::test]
async fn test_runtime_indexer_integration() {
let temp_dir = TempDir::new().unwrap();
let path = temp_dir.path().to_path_buf();
let mut runtime = Runtime::new(path).await.unwrap();
runtime.start_watching().await.unwrap();
assert!(runtime.is_watching());
runtime
.indexer
.queue(WatchEvent::Created(PathBuf::from("src/main.rs")));
tokio::time::sleep(Duration::from_millis(50)).await;
let pending = runtime.pending_changes().await;
assert!(pending >= 1, "Expected pending changes but got {}", pending);
let _stats = runtime.process_events().await.unwrap();
assert_eq!(runtime.pending_changes().await, 0);
}
#[tokio::test]
async fn test_runtime_full_orchestration() {
let temp_dir = TempDir::new().unwrap();
let path = temp_dir.path().to_path_buf();
let runtime = Runtime::new(path).await.unwrap();
runtime
.cache
.insert("query".to_string(), "result".to_string())
.await;
let cached = runtime.cache.get(&"query".to_string()).await;
assert_eq!(cached, Some("result".to_string()));
runtime
.indexer
.queue(WatchEvent::Modified(PathBuf::from("modified.rs")));
tokio::time::sleep(Duration::from_millis(50)).await;
let _stats = runtime.process_events().await.unwrap();
let pool = runtime.pool().unwrap();
assert!(pool.available_connections() > 0);
}
#[tokio::test]
async fn test_runtime_double_start_watching() {
let temp_dir = TempDir::new().unwrap();
let path = temp_dir.path().to_path_buf();
let mut runtime = Runtime::new(path).await.unwrap();
runtime.start_watching().await.unwrap();
assert!(runtime.is_watching());
let result = runtime.start_watching().await;
assert!(result.is_ok());
assert!(runtime.is_watching());
}
#[tokio::test]
async fn test_runtime_stop_watching() {
let temp_dir = TempDir::new().unwrap();
let path = temp_dir.path().to_path_buf();
let mut runtime = Runtime::new(path).await.unwrap();
runtime.start_watching().await.unwrap();
assert!(runtime.is_watching());
runtime.stop_watching();
assert!(!runtime.is_watching());
let pending = runtime.pending_changes().await;
assert_eq!(pending, 0);
}
#[tokio::test]
async fn test_runtime_error_handling() {
let result = Runtime::new(PathBuf::from("")).await;
let _ = result;
let temp_dir = TempDir::new().unwrap();
let nonexistent = temp_dir
.path()
.join("nonexistent")
.join("deep")
.join("path");
let result = Runtime::new(nonexistent.clone()).await;
assert!(result.is_err(), "Runtime should reject non-existent paths");
tokio::fs::create_dir_all(&nonexistent).await.unwrap();
let result = Runtime::new(nonexistent).await;
assert!(
result.is_ok(),
"Runtime should work after directory is created"
);
let runtime = result.unwrap();
assert!(!runtime.is_watching());
assert_eq!(runtime.pending_changes().await, 0);
}
}