use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use anyhow::Context as _;
use tokio::sync::Mutex;
pub use forgekit_core::{
FlushStats, IncrementalIndexer, PathFilter, QueryCache, WatchEvent, Watcher,
};
pub mod metrics;
pub use metrics::{MetricKind, MetricsSummary, RuntimeMetrics};
#[derive(Clone, Debug)]
pub struct RuntimeConfig {
pub watch_enabled: bool,
pub debounce_ms: u64,
pub cache_size: usize,
pub cache_ttl_secs: u64,
pub watch_dir: String,
}
impl Default for RuntimeConfig {
fn default() -> Self {
Self {
watch_enabled: false,
debounce_ms: 500,
cache_size: 10_000,
cache_ttl_secs: 300,
watch_dir: "src".to_string(),
}
}
}
#[derive(Clone, Debug)]
pub struct RuntimeStats {
pub cache_size: usize,
pub watch_active: bool,
pub reindex_count: u64,
pub metrics: MetricsSummary,
}
pub struct ForgeRuntime {
codebase_path: PathBuf,
config: RuntimeConfig,
store: Option<Arc<forgekit_core::UnifiedGraphStore>>,
watcher: Option<Watcher>,
indexer: Option<IncrementalIndexer>,
cache: Option<QueryCache<String, String>>,
metrics: RuntimeMetrics,
watch_handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
watch_active: Arc<std::sync::atomic::AtomicBool>,
}
impl ForgeRuntime {
pub async fn new(codebase_path: impl AsRef<Path>) -> anyhow::Result<Self> {
Self::with_config(codebase_path, RuntimeConfig::default()).await
}
pub async fn with_config(
codebase_path: impl AsRef<Path>,
config: RuntimeConfig,
) -> anyhow::Result<Self> {
let codebase_path = codebase_path
.as_ref()
.canonicalize()
.context("Failed to canonicalize codebase path")?;
let store = Arc::new(
forgekit_core::UnifiedGraphStore::open(
&codebase_path,
forgekit_core::BackendKind::default(),
)
.await
.context("Failed to open graph store")?,
);
let filter = PathFilter::include_dirs(&[&config.watch_dir]);
let indexer = IncrementalIndexer::with_filter(Arc::clone(&store), filter);
let cache = QueryCache::new(
config.cache_size,
Duration::from_secs(config.cache_ttl_secs),
);
Ok(Self {
codebase_path,
config,
store: Some(store),
watcher: None,
indexer: Some(indexer),
cache: Some(cache),
metrics: RuntimeMetrics::new(),
watch_handle: Arc::new(Mutex::new(None)),
watch_active: Arc::new(std::sync::atomic::AtomicBool::new(false)),
})
}
pub async fn watch(&mut self) -> anyhow::Result<()> {
if !self.config.watch_enabled {
return Err(anyhow::anyhow!("File watching is not enabled in config"));
}
if self.watch_active.load(std::sync::atomic::Ordering::Relaxed) {
return Err(anyhow::anyhow!("File watching is already active"));
}
let store = self
.store
.clone()
.ok_or_else(|| anyhow::anyhow!("Store not initialized"))?;
let indexer = self
.indexer
.clone()
.ok_or_else(|| anyhow::anyhow!("Indexer not initialized"))?;
let (tx, rx) = Watcher::channel();
let watcher = Watcher::new(store, tx);
self.watcher = Some(watcher);
let watch_path = self.codebase_path.join(&self.config.watch_dir);
if !watch_path.exists() {
return Err(anyhow::anyhow!(
"Watch directory does not exist: {}",
watch_path.display()
));
}
if let Some(watcher) = &self.watcher {
watcher
.start(watch_path.clone())
.await
.context("Failed to start file watcher")?;
}
let metrics = self.metrics.clone();
let indexer_clone = indexer.clone();
let watch_active = self.watch_active.clone();
let debounce = Duration::from_millis(self.config.debounce_ms);
let handle = tokio::spawn(async move {
watch_active.store(true, std::sync::atomic::Ordering::Relaxed);
let mut rx = rx;
let mut last_flush = std::time::Instant::now();
loop {
let is_running = watch_active.load(std::sync::atomic::Ordering::Relaxed);
if !is_running {
break;
}
match tokio::time::timeout(Duration::from_secs(1), rx.recv()).await {
Ok(Some(event)) => {
indexer_clone.queue(event);
}
Ok(None) => {
break;
}
Err(_) => {
}
}
if last_flush.elapsed() >= debounce {
if let Ok(_stats) = indexer_clone.flush().await {
metrics.record(MetricKind::Reindex);
}
last_flush = std::time::Instant::now();
}
}
let _ = indexer_clone.flush().await;
});
*self.watch_handle.lock().await = Some(handle);
Ok(())
}
pub async fn stop_watching(&mut self) -> anyhow::Result<()> {
self.watch_active
.store(false, std::sync::atomic::Ordering::Relaxed);
if let Some(handle) = self.watch_handle.lock().await.take() {
handle.await.ok();
}
Ok(())
}
pub fn cache(&self) -> Option<&QueryCache<String, String>> {
self.cache.as_ref()
}
pub fn metrics(&self) -> &RuntimeMetrics {
&self.metrics
}
pub async fn clear_cache(&self) -> anyhow::Result<()> {
if let Some(cache) = &self.cache {
cache.clear().await;
}
Ok(())
}
pub fn stats(&self) -> RuntimeStats {
RuntimeStats {
cache_size: self
.cache
.as_ref()
.map(|c| futures::executor::block_on(c.len()))
.unwrap_or(0),
watch_active: self.watch_active.load(std::sync::atomic::Ordering::Relaxed),
reindex_count: self.metrics.count(MetricKind::Reindex),
metrics: self.metrics.summary(),
}
}
pub fn codebase_path(&self) -> &Path {
&self.codebase_path
}
pub fn config(&self) -> &RuntimeConfig {
&self.config
}
}
impl Drop for ForgeRuntime {
fn drop(&mut self) {
self.watch_active
.store(false, std::sync::atomic::Ordering::Relaxed);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_runtime_config_default() {
let config = RuntimeConfig::default();
assert_eq!(config.debounce_ms, 500);
assert_eq!(config.cache_size, 10_000);
assert_eq!(config.cache_ttl_secs, 300);
}
#[tokio::test]
async fn test_runtime_creation() {
let temp = tempfile::tempdir().unwrap();
let config = RuntimeConfig::default();
let runtime = ForgeRuntime::with_config(temp.path(), config)
.await
.unwrap();
let stats = runtime.stats();
assert_eq!(stats.cache_size, 0);
assert!(!stats.watch_active);
}
#[tokio::test]
async fn test_runtime_with_custom_config() {
let temp = tempfile::tempdir().unwrap();
let config = RuntimeConfig {
watch_enabled: false,
debounce_ms: 1000,
cache_size: 100,
cache_ttl_secs: 600,
watch_dir: "src".to_string(),
};
let runtime = ForgeRuntime::with_config(temp.path(), config)
.await
.unwrap();
assert_eq!(runtime.config().debounce_ms, 1000);
assert_eq!(runtime.config().cache_size, 100);
}
#[tokio::test]
async fn test_runtime_cache_operations() {
let temp = tempfile::tempdir().unwrap();
let runtime = ForgeRuntime::new(temp.path()).await.unwrap();
let cache = runtime.cache().expect("Cache should be initialized");
cache.insert("key1".to_string(), "value1".to_string()).await;
let value = cache.get(&"key1".to_string()).await;
assert_eq!(value, Some("value1".to_string()));
}
#[tokio::test]
async fn test_runtime_metrics() {
let temp = tempfile::tempdir().unwrap();
let runtime = ForgeRuntime::new(temp.path()).await.unwrap();
runtime.metrics().record(MetricKind::GraphQuery);
runtime.metrics().record(MetricKind::GraphQuery);
assert_eq!(runtime.metrics().count(MetricKind::GraphQuery), 2);
}
#[tokio::test]
async fn test_runtime_clear_cache() {
let temp = tempfile::tempdir().unwrap();
let runtime = ForgeRuntime::new(temp.path()).await.unwrap();
let cache = runtime.cache().expect("Cache should be initialized");
cache.insert("key1".to_string(), "value1".to_string()).await;
runtime.clear_cache().await.unwrap();
assert!(cache.is_empty().await);
}
#[tokio::test]
async fn test_runtime_stats() {
let temp = tempfile::tempdir().unwrap();
let runtime = ForgeRuntime::new(temp.path()).await.unwrap();
runtime.metrics().record(MetricKind::GraphQuery);
runtime.metrics().record_cache_access(true);
let stats = runtime.stats();
assert_eq!(stats.metrics.graph_queries, 1);
assert_eq!(stats.metrics.cache_hit_rate, 1.0);
}
#[tokio::test]
async fn test_runtime_watch_fails_when_disabled() {
let temp = tempfile::tempdir().unwrap();
let config = RuntimeConfig {
watch_enabled: false,
..Default::default()
};
let mut runtime = ForgeRuntime::with_config(temp.path(), config)
.await
.unwrap();
assert!(runtime.watch().await.is_err());
}
#[tokio::test]
async fn test_runtime_watch_fails_for_nonexistent_dir() {
let temp = tempfile::tempdir().unwrap();
let config = RuntimeConfig {
watch_enabled: true,
watch_dir: "nonexistent".to_string(),
..Default::default()
};
let mut runtime = ForgeRuntime::with_config(temp.path(), config)
.await
.unwrap();
assert!(runtime.watch().await.is_err());
}
}