1use std::future::Future;
2use std::sync::{Arc, Mutex, OnceLock};
3use std::time::Duration;
4
5use argyph_embed::config::EmbedConfig;
6use argyph_embed::{self, Embedder, Provider};
7use camino::Utf8PathBuf;
8use tokio::sync::RwLock;
9use tokio::task::JoinSet;
10use tokio_util::sync::CancellationToken;
11
12use argyph_store::SqliteStore;
13use argyph_store::Store;
14
15use crate::config::Config;
16use crate::error::Result;
17use crate::index::Index;
18use crate::tiers::{self, Tier2Progress, TierState};
19
20pub struct Supervisor {
21 #[allow(dead_code)]
22 config: Arc<Config>,
23 root: Utf8PathBuf,
24 index: Arc<Index>,
25 tier_state: Arc<RwLock<TierState>>,
26 tasks: Mutex<JoinSet<()>>,
27 shutdown: CancellationToken,
28 store: Arc<dyn Store>,
29 watcher_active: bool,
30 embedder: Arc<OnceLock<Arc<dyn Embedder>>>,
31}
32
33impl Supervisor {
34 #[tracing::instrument(skip(config), fields(root = %root.as_str()))]
35 pub async fn boot(root: Utf8PathBuf, config: Config) -> Result<Self> {
36 tracing::info!("booting supervisor");
37
38 let store: Arc<dyn Store> = {
39 let sqlite = SqliteStore::open_at(&root)?;
40 Arc::new(sqlite)
41 };
42
43 let (embedder, tier2_concurrency) = build_embedder();
44 let embedder_for_t2 = Arc::clone(&embedder);
45
46 let index = Arc::new(Index::new(Arc::clone(&store), Arc::clone(&embedder)));
47 let tier_state = Arc::new(RwLock::new(TierState::Offline));
48
49 let entries = tiers::run_tier0(&root, &*store).await?;
50 let files_indexed = entries.len();
51
52 *tier_state.write().await = TierState::Tier0 { files_indexed };
53 tracing::info!(files_indexed, "Tier 0 ready");
54
55 let tier_state_clone = Arc::clone(&tier_state);
56 let root_clone = root.clone();
57 let store_clone = Arc::clone(&store);
58
59 let sup = Self {
60 config: Arc::new(config),
61 root,
62 index,
63 tier_state,
64 tasks: Mutex::new(JoinSet::new()),
65 shutdown: CancellationToken::new(),
66 store,
67 watcher_active: false,
68 embedder,
69 };
70
71 let (tier2_start_tx, tier2_start_rx) = tokio::sync::oneshot::channel::<()>();
73 let (tier1_5_start_tx, tier1_5_start_rx) = tokio::sync::oneshot::channel::<()>();
74
75 let root_for_t1 = sup.root.clone();
77 let store_for_t1 = Arc::clone(&store_clone);
78 let tier_for_t1 = Arc::clone(&tier_state_clone);
79 sup.spawn(async move {
80 match tiers::run_tier1(&root_for_t1, &*store_for_t1).await {
81 Ok(symbol_count) => {
82 *tier_for_t1.write().await = TierState::Tier1 {
83 symbols_indexed: symbol_count as usize,
84 };
85 tracing::info!(symbol_count, "Tier 1 ready");
86 }
87 Err(e) => {
88 tracing::error!(error = %e, "Tier 1 failed");
89 }
90 }
91 let _ = tier2_start_tx.send(());
93 let _ = tier1_5_start_tx.send(());
94 });
95
96 {
98 let root_for_t1_5 = sup.root.clone();
99 let store_for_t1_5 = Arc::clone(&store_clone);
100 let tier_for_t1_5 = Arc::clone(&tier_state_clone);
101 let max_bytes = sup.config.locate.max_file_bytes;
102 sup.spawn(async move {
103 let _ = tier1_5_start_rx.await;
104 match tiers::run_tier1_5(&*store_for_t1_5, &root_for_t1_5, max_bytes).await {
105 Ok(count) => {
106 let mut s = tier_for_t1_5.write().await;
107 if matches!(*s, TierState::Tier1 { .. })
108 || matches!(*s, TierState::Tier1_5 { .. })
109 {
110 *s = TierState::Tier1_5 {
111 structural_files: count,
112 };
113 }
114 tracing::info!(structural_files = count, "Tier 1.5 ready");
115 }
116 Err(e) => {
117 tracing::warn!("Tier 1.5 failed: {e}");
118 }
119 }
120 });
121 }
122
123 {
125 let store_for_t2 = Arc::clone(&store_clone);
126 let tier_for_t2 = Arc::clone(&tier_state_clone);
127 let (progress_tx, mut progress_rx) =
128 tokio::sync::mpsc::unbounded_channel::<Tier2Progress>();
129
130 let tier_prog = Arc::clone(&tier_for_t2);
132 sup.spawn(async move {
133 while let Some(prog) = progress_rx.recv().await {
134 *tier_prog.write().await = TierState::Tier2 {
135 embedded: prog.embedded,
136 total: prog.total,
137 };
138 }
139 });
140
141 sup.spawn(async move {
143 let _ = tier2_start_rx.await;
144
145 let Some(embedder) = embedder_for_t2.get().cloned() else {
146 return;
147 };
148
149 tracing::info!("Tier 2 starting semantic embedding");
150 match tiers::run_tier2(store_for_t2, embedder, progress_tx, tier2_concurrency).await
151 {
152 Ok(()) => {
153 *tier_for_t2.write().await = TierState::Ready;
154 tracing::info!("Tier 2 ready — all chunks embedded");
155 }
156 Err(e) => {
157 tracing::error!(%e, "Tier 2 embedding failed");
158 }
159 }
160 });
161 }
162
163 let mut sup = sup;
164 let watcher = crate::watcher::create_watcher(&root_clone, Duration::from_millis(500));
165
166 let orch = crate::watcher::WatcherOrchestrator::new(
167 root_clone.clone(),
168 watcher,
169 store_clone,
170 tier_state_clone,
171 );
172 sup.spawn(async move {
173 orch.run().await;
174 });
175 sup.watcher_active = true;
176
177 Ok(sup)
178 }
179
180 pub async fn run(&self) -> Result<()> {
181 tracing::info!("supervisor running");
182 self.shutdown.cancelled().await;
183 tracing::info!("supervisor shutdown signal received");
184 Ok(())
185 }
186
187 pub fn watcher_active(&self) -> bool {
188 self.watcher_active
189 }
190
191 pub fn index(&self) -> Arc<Index> {
192 Arc::clone(&self.index)
193 }
194
195 pub fn store(&self) -> Arc<dyn Store> {
196 Arc::clone(&self.store)
197 }
198
199 pub fn embedder(&self) -> Option<Arc<dyn Embedder>> {
200 self.embedder.get().cloned()
201 }
202
203 pub fn config(&self) -> &Config {
204 &self.config
205 }
206
207 pub fn root(&self) -> &Utf8PathBuf {
208 &self.root
209 }
210
211 pub async fn get_tier_state(&self) -> TierState {
212 *self.tier_state.read().await
213 }
214
215 #[allow(clippy::expect_used)]
216 pub async fn shutdown(self) -> Result<()> {
217 tracing::info!("supervisor shutting down");
218 self.shutdown.cancel();
219
220 let mut tasks = self.tasks.into_inner().unwrap_or_else(|e| e.into_inner());
221 while let Some(result) = tasks.join_next().await {
222 if let Err(e) = result {
223 tracing::warn!(error = %e, "task panicked during shutdown");
224 }
225 }
226
227 self.store.close().await?;
228
229 tracing::info!("supervisor shut down");
230 Ok(())
231 }
232
233 #[allow(clippy::expect_used)]
234 pub fn spawn<F, T>(&self, fut: F)
235 where
236 F: Future<Output = T> + Send + 'static,
237 T: Send + 'static,
238 {
239 let token = self.shutdown.child_token();
240 let mut tasks = self.tasks.lock().expect("mutex poisoned");
241 tasks.spawn(async move {
242 tokio::select! {
243 _ = fut => {},
244 _ = token.cancelled() => {},
245 }
246 });
247 }
248}
249
250fn build_embedder() -> (Arc<OnceLock<Arc<dyn Embedder>>>, usize) {
251 let provider = if std::env::var("OPENAI_API_KEY").is_ok() {
252 Provider::OpenAi
253 } else {
254 Provider::Local
255 };
256 let tier2_concurrency = provider.default_concurrency();
257 let config = EmbedConfig::for_provider(&provider);
258 let slot = Arc::new(OnceLock::new());
259 let slot_clone = Arc::clone(&slot);
260 tokio::task::spawn(async move {
261 let result =
262 tokio::task::spawn_blocking(move || argyph_embed::build(provider, config)).await;
263 match result {
264 Ok(Ok(e)) => {
265 let _ = slot_clone.set(e);
266 }
267 Ok(Err(err)) => {
268 tracing::warn!(%err, "embedding unavailable — semantic search disabled");
269 }
270 Err(join_err) => {
271 tracing::warn!(%join_err, "embedder build panicked — semantic search disabled");
272 }
273 }
274 });
275 (slot, tier2_concurrency)
276}
277
278#[cfg(test)]
279#[allow(clippy::unwrap_used, clippy::expect_used)]
280mod tests {
281 use super::*;
282 use argyph_fs::{ChangeKind, ChangedPath};
283 use std::time::Duration;
284
285 struct TestFixture {
286 _dir: tempfile::TempDir,
287 root: Utf8PathBuf,
288 }
289
290 fn temp_fixture() -> TestFixture {
291 let dir = tempfile::tempdir().unwrap();
292 let src = std::path::Path::new(concat!(
293 env!("CARGO_MANIFEST_DIR"),
294 "/../../examples/tiny-rust-app"
295 ));
296 let dst = dir.path().join("repo");
297 copy_dir_all(src, &dst).unwrap();
298 let root = Utf8PathBuf::from_path_buf(dst).unwrap();
299 TestFixture { _dir: dir, root }
300 }
301
302 #[allow(clippy::unwrap_used)]
303 fn copy_dir_all(src: &std::path::Path, dst: &std::path::Path) -> std::io::Result<()> {
304 std::fs::create_dir_all(dst)?;
305 for entry in std::fs::read_dir(src)? {
306 let entry = entry?;
307 let ty = entry.file_type()?;
308 let src_path = entry.path();
309 let dst_path = dst.join(entry.file_name());
310 if ty.is_dir() {
311 copy_dir_all(&src_path, &dst_path)?;
312 } else if ty.is_symlink() {
313 let target = std::fs::read_link(&src_path)?;
314 #[cfg(unix)]
315 {
316 std::os::unix::fs::symlink(&target, &dst_path)?;
317 }
318 #[cfg(windows)]
319 {
320 if target.is_dir() {
326 std::os::windows::fs::symlink_dir(&target, &dst_path)
327 .or_else(|_| std::fs::copy(&src_path, &dst_path).map(|_| ()))?;
328 } else {
329 std::os::windows::fs::symlink_file(&target, &dst_path)
330 .or_else(|_| std::fs::copy(&src_path, &dst_path).map(|_| ()))?;
331 }
332 }
333 } else {
334 std::fs::copy(&src_path, &dst_path)?;
335 }
336 }
337 Ok(())
338 }
339
340 #[tokio::test]
341 async fn boot_reaches_tier0_without_blocking() {
342 let fixture = temp_fixture();
349 let root = fixture.root;
350 let config = Config::default();
351 let started = std::time::Instant::now();
352
353 let sup = Supervisor::boot(root, config).await.unwrap();
354
355 assert!(
358 started.elapsed() < Duration::from_secs(30),
359 "boot took {:?} — far beyond the expected sub-second cold \
360 start; this indicates a blocking call on the boot path",
361 started.elapsed()
362 );
363
364 let state = sup.get_tier_state().await;
365 assert!(state.is_ready(), "expected Tier 0 ready, got {state:?}");
366
367 let status = sup.index().status().await.unwrap();
368 assert!(status.file_count > 0, "expected at least one indexed file");
369
370 sup.shutdown().await.unwrap();
371 }
372
373 #[tokio::test]
374 async fn boot_sets_tier_state_fields() {
375 let fixture = temp_fixture();
376 let config = Config::default();
377 let sup = Supervisor::boot(fixture.root, config).await.unwrap();
378
379 let state = sup.get_tier_state().await;
380 match state {
381 TierState::Tier0 { files_indexed } => {
382 assert!(files_indexed > 0, "expected at least 1 file indexed");
383 }
384 other => panic!("expected Tier 0, got {other:?}"),
385 }
386
387 sup.shutdown().await.unwrap();
388 }
389
390 #[tokio::test]
391 async fn shutdown_cleans_up_without_panicking() {
392 let fixture = temp_fixture();
393 let sup = Supervisor::boot(fixture.root, Config::default())
394 .await
395 .unwrap();
396 sup.shutdown().await.unwrap();
397 }
398
399 #[tokio::test]
400 async fn spawn_registers_cancellation_aware_task() {
401 let fixture = temp_fixture();
402 let sup = Supervisor::boot(fixture.root, Config::default())
403 .await
404 .unwrap();
405
406 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
407 sup.spawn(async move {
408 tokio::time::sleep(Duration::from_millis(50)).await;
409 let _ = tx.send(42);
410 });
411
412 let val = tokio::time::timeout(Duration::from_secs(2), rx.recv())
413 .await
414 .unwrap();
415 assert_eq!(val, Some(42));
416
417 sup.shutdown().await.unwrap();
418 }
419
420 #[tokio::test]
421 async fn incremental_reindex_picks_up_new_file() {
422 let fixture = temp_fixture();
423 let root = fixture.root.clone();
424 let sup = Supervisor::boot(root.clone(), Config::default())
425 .await
426 .unwrap();
427
428 let deadline = tokio::time::Instant::now() + Duration::from_secs(30);
429 let mut tier1_ready = false;
430 while tokio::time::Instant::now() < deadline {
431 let state = sup.get_tier_state().await;
432 if matches!(state, TierState::Tier1 { .. } | TierState::Tier1_5 { .. }) {
433 tier1_ready = true;
434 break;
435 }
436 tokio::time::sleep(Duration::from_millis(100)).await;
437 }
438 assert!(
439 tier1_ready,
440 "Tier 1 / Tier 1.5 did not become ready within 30s"
441 );
442
443 let new_file_path = camino::Utf8PathBuf::from("src/new_module.rs");
444 let new_file_abs = root.join(new_file_path.as_str());
445 std::fs::write(
446 new_file_abs.as_str(),
447 "pub fn watcher_test_symbol() -> u32 { 42 }\n",
448 )
449 .unwrap();
450
451 let changes = vec![ChangedPath {
452 path: new_file_path.clone(),
453 kind: ChangeKind::Created,
454 }];
455
456 let start = std::time::Instant::now();
457 sup.index()
458 .reindex(&root, &changes)
459 .await
460 .expect("reindex should succeed");
461 let elapsed = start.elapsed();
462 assert!(
463 elapsed < Duration::from_millis(3000),
464 "reindex took {elapsed:?}, expected <3s"
465 );
466
467 let found = sup
468 .index()
469 .find_symbol("watcher_test_symbol", None)
470 .await
471 .expect("find_symbol should succeed");
472 assert!(
473 !found.is_empty(),
474 "newly created watcher_test_symbol not found after reindex"
475 );
476 assert_eq!(
477 found[0].file.as_str(),
478 "src/new_module.rs",
479 "symbol should be associated with the new file"
480 );
481
482 sup.shutdown().await.unwrap();
483 }
484}