1use std::path::{Path, PathBuf};
36use std::sync::Arc;
37use std::time::Duration;
38
39use anyhow::Context as _;
40use tokio::sync::Mutex;
41
42pub use forgekit_core::{
44 FlushStats, IncrementalIndexer, PathFilter, QueryCache, WatchEvent, Watcher,
45};
46
47pub mod metrics;
48pub use metrics::{MetricKind, MetricsSummary, RuntimeMetrics};
49
50#[derive(Clone, Debug)]
52pub struct RuntimeConfig {
53 pub watch_enabled: bool,
55 pub debounce_ms: u64,
57 pub cache_size: usize,
59 pub cache_ttl_secs: u64,
61 pub watch_dir: String,
63}
64
65impl Default for RuntimeConfig {
66 fn default() -> Self {
67 Self {
68 watch_enabled: false,
69 debounce_ms: 500,
70 cache_size: 10_000,
71 cache_ttl_secs: 300,
72 watch_dir: "src".to_string(),
73 }
74 }
75}
76
77#[derive(Clone, Debug)]
79pub struct RuntimeStats {
80 pub cache_size: usize,
82 pub watch_active: bool,
84 pub reindex_count: u64,
86 pub metrics: MetricsSummary,
88}
89
90pub struct ForgeRuntime {
95 codebase_path: PathBuf,
97 config: RuntimeConfig,
99 store: Option<Arc<forgekit_core::UnifiedGraphStore>>,
101 watcher: Option<Watcher>,
103 indexer: Option<IncrementalIndexer>,
105 cache: Option<QueryCache<String, String>>,
107 metrics: RuntimeMetrics,
109 watch_handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
111 watch_active: Arc<std::sync::atomic::AtomicBool>,
113}
114
115impl ForgeRuntime {
116 pub async fn new(codebase_path: impl AsRef<Path>) -> anyhow::Result<Self> {
122 Self::with_config(codebase_path, RuntimeConfig::default()).await
123 }
124
125 pub async fn with_config(
132 codebase_path: impl AsRef<Path>,
133 config: RuntimeConfig,
134 ) -> anyhow::Result<Self> {
135 let codebase_path = codebase_path
136 .as_ref()
137 .canonicalize()
138 .context("Failed to canonicalize codebase path")?;
139
140 let store = Arc::new(
142 forgekit_core::UnifiedGraphStore::open(
143 &codebase_path,
144 forgekit_core::BackendKind::default(),
145 )
146 .await
147 .context("Failed to open graph store")?,
148 );
149
150 let filter = PathFilter::include_dirs(&[&config.watch_dir]);
152 let indexer = IncrementalIndexer::with_filter(Arc::clone(&store), filter);
153
154 let cache = QueryCache::new(
156 config.cache_size,
157 Duration::from_secs(config.cache_ttl_secs),
158 );
159
160 Ok(Self {
161 codebase_path,
162 config,
163 store: Some(store),
164 watcher: None,
165 indexer: Some(indexer),
166 cache: Some(cache),
167 metrics: RuntimeMetrics::new(),
168 watch_handle: Arc::new(Mutex::new(None)),
169 watch_active: Arc::new(std::sync::atomic::AtomicBool::new(false)),
170 })
171 }
172
173 pub async fn watch(&mut self) -> anyhow::Result<()> {
179 if !self.config.watch_enabled {
180 return Err(anyhow::anyhow!("File watching is not enabled in config"));
181 }
182
183 if self.watch_active.load(std::sync::atomic::Ordering::Relaxed) {
184 return Err(anyhow::anyhow!("File watching is already active"));
185 }
186
187 let store = self
188 .store
189 .clone()
190 .ok_or_else(|| anyhow::anyhow!("Store not initialized"))?;
191 let indexer = self
192 .indexer
193 .clone()
194 .ok_or_else(|| anyhow::anyhow!("Indexer not initialized"))?;
195
196 let (tx, rx) = Watcher::channel();
198 let watcher = Watcher::new(store, tx);
199 self.watcher = Some(watcher);
200
201 let watch_path = self.codebase_path.join(&self.config.watch_dir);
203
204 if !watch_path.exists() {
206 return Err(anyhow::anyhow!(
207 "Watch directory does not exist: {}",
208 watch_path.display()
209 ));
210 }
211
212 if let Some(watcher) = &self.watcher {
213 watcher
214 .start(watch_path.clone())
215 .await
216 .context("Failed to start file watcher")?;
217 }
218
219 let metrics = self.metrics.clone();
221 let indexer_clone = indexer.clone();
222 let watch_active = self.watch_active.clone();
223 let debounce = Duration::from_millis(self.config.debounce_ms);
224
225 let handle = tokio::spawn(async move {
226 watch_active.store(true, std::sync::atomic::Ordering::Relaxed);
227
228 let mut rx = rx;
229 let mut last_flush = std::time::Instant::now();
230
231 loop {
232 let is_running = watch_active.load(std::sync::atomic::Ordering::Relaxed);
233 if !is_running {
234 break;
235 }
236
237 match tokio::time::timeout(Duration::from_secs(1), rx.recv()).await {
238 Ok(Some(event)) => {
239 indexer_clone.queue(event);
241 }
242 Ok(None) => {
243 break;
245 }
246 Err(_) => {
247 }
249 }
250
251 if last_flush.elapsed() >= debounce {
253 if let Ok(_stats) = indexer_clone.flush().await {
254 metrics.record(MetricKind::Reindex);
255 }
256 last_flush = std::time::Instant::now();
257 }
258 }
259
260 let _ = indexer_clone.flush().await;
262 });
263
264 *self.watch_handle.lock().await = Some(handle);
265
266 Ok(())
267 }
268
269 pub async fn stop_watching(&mut self) -> anyhow::Result<()> {
271 self.watch_active
272 .store(false, std::sync::atomic::Ordering::Relaxed);
273
274 if let Some(handle) = self.watch_handle.lock().await.take() {
275 handle.await.ok();
276 }
277
278 Ok(())
279 }
280
281 pub fn cache(&self) -> Option<&QueryCache<String, String>> {
283 self.cache.as_ref()
284 }
285
286 pub fn metrics(&self) -> &RuntimeMetrics {
288 &self.metrics
289 }
290
291 pub async fn clear_cache(&self) -> anyhow::Result<()> {
293 if let Some(cache) = &self.cache {
294 cache.clear().await;
295 }
296 Ok(())
297 }
298
299 pub fn stats(&self) -> RuntimeStats {
301 RuntimeStats {
302 cache_size: self
303 .cache
304 .as_ref()
305 .map(|c| futures::executor::block_on(c.len()))
306 .unwrap_or(0),
307 watch_active: self.watch_active.load(std::sync::atomic::Ordering::Relaxed),
308 reindex_count: self.metrics.count(MetricKind::Reindex),
309 metrics: self.metrics.summary(),
310 }
311 }
312
313 pub fn codebase_path(&self) -> &Path {
315 &self.codebase_path
316 }
317
318 pub fn config(&self) -> &RuntimeConfig {
320 &self.config
321 }
322}
323
324impl Drop for ForgeRuntime {
325 fn drop(&mut self) {
326 self.watch_active
328 .store(false, std::sync::atomic::Ordering::Relaxed);
329 }
330}
331
332#[cfg(test)]
333mod tests {
334 use super::*;
335
336 #[tokio::test]
337 async fn test_runtime_config_default() {
338 let config = RuntimeConfig::default();
339 assert_eq!(config.debounce_ms, 500);
340 assert_eq!(config.cache_size, 10_000);
341 assert_eq!(config.cache_ttl_secs, 300);
342 }
343
344 #[tokio::test]
345 async fn test_runtime_creation() {
346 let temp = tempfile::tempdir().unwrap();
347 let config = RuntimeConfig::default();
348 let runtime = ForgeRuntime::with_config(temp.path(), config)
349 .await
350 .unwrap();
351
352 let stats = runtime.stats();
353 assert_eq!(stats.cache_size, 0);
354 assert!(!stats.watch_active);
355 }
356
357 #[tokio::test]
358 async fn test_runtime_with_custom_config() {
359 let temp = tempfile::tempdir().unwrap();
360 let config = RuntimeConfig {
361 watch_enabled: false,
362 debounce_ms: 1000,
363 cache_size: 100,
364 cache_ttl_secs: 600,
365 watch_dir: "src".to_string(),
366 };
367
368 let runtime = ForgeRuntime::with_config(temp.path(), config)
369 .await
370 .unwrap();
371
372 assert_eq!(runtime.config().debounce_ms, 1000);
373 assert_eq!(runtime.config().cache_size, 100);
374 }
375
376 #[tokio::test]
377 async fn test_runtime_cache_operations() {
378 let temp = tempfile::tempdir().unwrap();
379 let runtime = ForgeRuntime::new(temp.path()).await.unwrap();
380
381 let cache = runtime.cache().expect("Cache should be initialized");
382
383 cache.insert("key1".to_string(), "value1".to_string()).await;
384 let value = cache.get(&"key1".to_string()).await;
385
386 assert_eq!(value, Some("value1".to_string()));
387 }
388
389 #[tokio::test]
390 async fn test_runtime_metrics() {
391 let temp = tempfile::tempdir().unwrap();
392 let runtime = ForgeRuntime::new(temp.path()).await.unwrap();
393
394 runtime.metrics().record(MetricKind::GraphQuery);
395 runtime.metrics().record(MetricKind::GraphQuery);
396
397 assert_eq!(runtime.metrics().count(MetricKind::GraphQuery), 2);
398 }
399
400 #[tokio::test]
401 async fn test_runtime_clear_cache() {
402 let temp = tempfile::tempdir().unwrap();
403 let runtime = ForgeRuntime::new(temp.path()).await.unwrap();
404
405 let cache = runtime.cache().expect("Cache should be initialized");
406 cache.insert("key1".to_string(), "value1".to_string()).await;
407
408 runtime.clear_cache().await.unwrap();
409
410 assert!(cache.is_empty().await);
411 }
412
413 #[tokio::test]
414 async fn test_runtime_stats() {
415 let temp = tempfile::tempdir().unwrap();
416 let runtime = ForgeRuntime::new(temp.path()).await.unwrap();
417
418 runtime.metrics().record(MetricKind::GraphQuery);
419 runtime.metrics().record_cache_access(true);
420
421 let stats = runtime.stats();
422
423 assert_eq!(stats.metrics.graph_queries, 1);
424 assert_eq!(stats.metrics.cache_hit_rate, 1.0);
425 }
426
427 #[tokio::test]
428 async fn test_runtime_watch_fails_when_disabled() {
429 let temp = tempfile::tempdir().unwrap();
430 let config = RuntimeConfig {
431 watch_enabled: false,
432 ..Default::default()
433 };
434
435 let mut runtime = ForgeRuntime::with_config(temp.path(), config)
436 .await
437 .unwrap();
438
439 assert!(runtime.watch().await.is_err());
440 }
441
442 #[tokio::test]
443 async fn test_runtime_watch_fails_for_nonexistent_dir() {
444 let temp = tempfile::tempdir().unwrap();
445 let config = RuntimeConfig {
446 watch_enabled: true,
447 watch_dir: "nonexistent".to_string(),
448 ..Default::default()
449 };
450
451 let mut runtime = ForgeRuntime::with_config(temp.path(), config)
452 .await
453 .unwrap();
454
455 assert!(runtime.watch().await.is_err());
456 }
457}