Skip to main content

argyph_core/
supervisor.rs

1use std::future::Future;
2use std::sync::{Arc, Mutex, OnceLock};
3use std::time::Duration;
4
5use argyph_embed::config::EmbedConfig;
6use argyph_embed::{self, Embedder, Provider};
7use camino::Utf8PathBuf;
8use tokio::sync::RwLock;
9use tokio::task::JoinSet;
10use tokio_util::sync::CancellationToken;
11
12use argyph_store::SqliteStore;
13use argyph_store::Store;
14
15use crate::config::Config;
16use crate::error::Result;
17use crate::index::Index;
18use crate::tiers::{self, Tier2Progress, TierState};
19
20pub struct Supervisor {
21    #[allow(dead_code)]
22    config: Arc<Config>,
23    root: Utf8PathBuf,
24    index: Arc<Index>,
25    tier_state: Arc<RwLock<TierState>>,
26    tasks: Mutex<JoinSet<()>>,
27    shutdown: CancellationToken,
28    store: Arc<dyn Store>,
29    watcher_active: bool,
30    embedder: Arc<OnceLock<Arc<dyn Embedder>>>,
31}
32
33impl Supervisor {
34    #[tracing::instrument(skip(config), fields(root = %root.as_str()))]
35    pub async fn boot(root: Utf8PathBuf, config: Config) -> Result<Self> {
36        tracing::info!("booting supervisor");
37
38        let store: Arc<dyn Store> = {
39            let sqlite = SqliteStore::open_at(&root)?;
40            Arc::new(sqlite)
41        };
42
43        let (embedder, tier2_concurrency) = build_embedder();
44        let embedder_for_t2 = Arc::clone(&embedder);
45
46        let index = Arc::new(Index::new(Arc::clone(&store), Arc::clone(&embedder)));
47        let tier_state = Arc::new(RwLock::new(TierState::Offline));
48
49        let entries = tiers::run_tier0(&root, &*store).await?;
50        let files_indexed = entries.len();
51
52        *tier_state.write().await = TierState::Tier0 { files_indexed };
53        tracing::info!(files_indexed, "Tier 0 ready");
54
55        let tier_state_clone = Arc::clone(&tier_state);
56        let root_clone = root.clone();
57        let store_clone = Arc::clone(&store);
58
59        let sup = Self {
60            config: Arc::new(config),
61            root,
62            index,
63            tier_state,
64            tasks: Mutex::new(JoinSet::new()),
65            shutdown: CancellationToken::new(),
66            store,
67            watcher_active: false,
68            embedder,
69        };
70
71        // Channel to signal Tier 2 start after Tier 1 finishes
72        let (tier2_start_tx, tier2_start_rx) = tokio::sync::oneshot::channel::<()>();
73        let (tier1_5_start_tx, tier1_5_start_rx) = tokio::sync::oneshot::channel::<()>();
74
75        // Tier 1 — parse symbols, create chunks
76        let root_for_t1 = sup.root.clone();
77        let store_for_t1 = Arc::clone(&store_clone);
78        let tier_for_t1 = Arc::clone(&tier_state_clone);
79        sup.spawn(async move {
80            match tiers::run_tier1(&root_for_t1, &*store_for_t1).await {
81                Ok(symbol_count) => {
82                    *tier_for_t1.write().await = TierState::Tier1 {
83                        symbols_indexed: symbol_count as usize,
84                    };
85                    tracing::info!(symbol_count, "Tier 1 ready");
86                }
87                Err(e) => {
88                    tracing::error!(error = %e, "Tier 1 failed");
89                }
90            }
91            // Signal Tier 2 that chunks exist (or Tier 1 failed — Tier 2 exits quickly either way)
92            let _ = tier2_start_tx.send(());
93            let _ = tier1_5_start_tx.send(());
94        });
95
96        // Tier 1.5 — structural indexing (starts after Tier 1, parallel with Tier 2)
97        {
98            let root_for_t1_5 = sup.root.clone();
99            let store_for_t1_5 = Arc::clone(&store_clone);
100            let tier_for_t1_5 = Arc::clone(&tier_state_clone);
101            let max_bytes = sup.config.locate.max_file_bytes;
102            sup.spawn(async move {
103                let _ = tier1_5_start_rx.await;
104                match tiers::run_tier1_5(&*store_for_t1_5, &root_for_t1_5, max_bytes).await {
105                    Ok(count) => {
106                        let mut s = tier_for_t1_5.write().await;
107                        if matches!(*s, TierState::Tier1 { .. })
108                            || matches!(*s, TierState::Tier1_5 { .. })
109                        {
110                            *s = TierState::Tier1_5 {
111                                structural_files: count,
112                            };
113                        }
114                        tracing::info!(structural_files = count, "Tier 1.5 ready");
115                    }
116                    Err(e) => {
117                        tracing::warn!("Tier 1.5 failed: {e}");
118                    }
119                }
120            });
121        }
122
123        // Tier 2 — background semantic embedding (waits for Tier 1, then polls)
124        {
125            let store_for_t2 = Arc::clone(&store_clone);
126            let tier_for_t2 = Arc::clone(&tier_state_clone);
127            let (progress_tx, mut progress_rx) =
128                tokio::sync::mpsc::unbounded_channel::<Tier2Progress>();
129
130            // Progress-tracking task — reads progress events, updates tier_state
131            let tier_prog = Arc::clone(&tier_for_t2);
132            sup.spawn(async move {
133                while let Some(prog) = progress_rx.recv().await {
134                    *tier_prog.write().await = TierState::Tier2 {
135                        embedded: prog.embedded,
136                        total: prog.total,
137                    };
138                }
139            });
140
141            // Tier 2 embedding task — waits for Tier 1, then runs embedding loop
142            sup.spawn(async move {
143                let _ = tier2_start_rx.await;
144
145                let Some(embedder) = embedder_for_t2.get().cloned() else {
146                    return;
147                };
148
149                tracing::info!("Tier 2 starting semantic embedding");
150                match tiers::run_tier2(store_for_t2, embedder, progress_tx, tier2_concurrency).await
151                {
152                    Ok(()) => {
153                        *tier_for_t2.write().await = TierState::Ready;
154                        tracing::info!("Tier 2 ready — all chunks embedded");
155                    }
156                    Err(e) => {
157                        tracing::error!(%e, "Tier 2 embedding failed");
158                    }
159                }
160            });
161        }
162
163        let mut sup = sup;
164        let watcher = crate::watcher::create_watcher(&root_clone, Duration::from_millis(500));
165
166        let orch = crate::watcher::WatcherOrchestrator::new(
167            root_clone.clone(),
168            watcher,
169            store_clone,
170            tier_state_clone,
171        );
172        sup.spawn(async move {
173            orch.run().await;
174        });
175        sup.watcher_active = true;
176
177        Ok(sup)
178    }
179
180    pub async fn run(&self) -> Result<()> {
181        tracing::info!("supervisor running");
182        self.shutdown.cancelled().await;
183        tracing::info!("supervisor shutdown signal received");
184        Ok(())
185    }
186
187    pub fn watcher_active(&self) -> bool {
188        self.watcher_active
189    }
190
191    pub fn index(&self) -> Arc<Index> {
192        Arc::clone(&self.index)
193    }
194
195    pub fn store(&self) -> Arc<dyn Store> {
196        Arc::clone(&self.store)
197    }
198
199    pub fn embedder(&self) -> Option<Arc<dyn Embedder>> {
200        self.embedder.get().cloned()
201    }
202
203    pub fn config(&self) -> &Config {
204        &self.config
205    }
206
207    pub fn root(&self) -> &Utf8PathBuf {
208        &self.root
209    }
210
211    pub async fn get_tier_state(&self) -> TierState {
212        *self.tier_state.read().await
213    }
214
215    #[allow(clippy::expect_used)]
216    pub async fn shutdown(self) -> Result<()> {
217        tracing::info!("supervisor shutting down");
218        self.shutdown.cancel();
219
220        let mut tasks = self.tasks.into_inner().unwrap_or_else(|e| e.into_inner());
221        while let Some(result) = tasks.join_next().await {
222            if let Err(e) = result {
223                tracing::warn!(error = %e, "task panicked during shutdown");
224            }
225        }
226
227        self.store.close().await?;
228
229        tracing::info!("supervisor shut down");
230        Ok(())
231    }
232
233    #[allow(clippy::expect_used)]
234    pub fn spawn<F, T>(&self, fut: F)
235    where
236        F: Future<Output = T> + Send + 'static,
237        T: Send + 'static,
238    {
239        let token = self.shutdown.child_token();
240        let mut tasks = self.tasks.lock().expect("mutex poisoned");
241        tasks.spawn(async move {
242            tokio::select! {
243                _ = fut => {},
244                _ = token.cancelled() => {},
245            }
246        });
247    }
248}
249
250fn build_embedder() -> (Arc<OnceLock<Arc<dyn Embedder>>>, usize) {
251    let provider = if std::env::var("OPENAI_API_KEY").is_ok() {
252        Provider::OpenAi
253    } else {
254        Provider::Local
255    };
256    let tier2_concurrency = provider.default_concurrency();
257    let config = EmbedConfig::for_provider(&provider);
258    let slot = Arc::new(OnceLock::new());
259    let slot_clone = Arc::clone(&slot);
260    tokio::task::spawn(async move {
261        let result =
262            tokio::task::spawn_blocking(move || argyph_embed::build(provider, config)).await;
263        match result {
264            Ok(Ok(e)) => {
265                let _ = slot_clone.set(e);
266            }
267            Ok(Err(err)) => {
268                tracing::warn!(%err, "embedding unavailable — semantic search disabled");
269            }
270            Err(join_err) => {
271                tracing::warn!(%join_err, "embedder build panicked — semantic search disabled");
272            }
273        }
274    });
275    (slot, tier2_concurrency)
276}
277
278#[cfg(test)]
279#[allow(clippy::unwrap_used, clippy::expect_used)]
280mod tests {
281    use super::*;
282    use argyph_fs::{ChangeKind, ChangedPath};
283    use std::time::Duration;
284
285    struct TestFixture {
286        _dir: tempfile::TempDir,
287        root: Utf8PathBuf,
288    }
289
290    fn temp_fixture() -> TestFixture {
291        let dir = tempfile::tempdir().unwrap();
292        let src = std::path::Path::new(concat!(
293            env!("CARGO_MANIFEST_DIR"),
294            "/../../examples/tiny-rust-app"
295        ));
296        let dst = dir.path().join("repo");
297        copy_dir_all(src, &dst).unwrap();
298        let root = Utf8PathBuf::from_path_buf(dst).unwrap();
299        TestFixture { _dir: dir, root }
300    }
301
302    #[allow(clippy::unwrap_used)]
303    fn copy_dir_all(src: &std::path::Path, dst: &std::path::Path) -> std::io::Result<()> {
304        std::fs::create_dir_all(dst)?;
305        for entry in std::fs::read_dir(src)? {
306            let entry = entry?;
307            let ty = entry.file_type()?;
308            let src_path = entry.path();
309            let dst_path = dst.join(entry.file_name());
310            if ty.is_dir() {
311                copy_dir_all(&src_path, &dst_path)?;
312            } else if ty.is_symlink() {
313                let target = std::fs::read_link(&src_path)?;
314                #[cfg(unix)]
315                {
316                    std::os::unix::fs::symlink(&target, &dst_path)?;
317                }
318                #[cfg(windows)]
319                {
320                    // Windows symlinks need an admin token in CI; for the
321                    // test fixture, treat the symlink target as a regular
322                    // file copy. The fixture tree under examples/ does not
323                    // currently contain any symlinks, so this branch is
324                    // defensive rather than load-bearing.
325                    if target.is_dir() {
326                        std::os::windows::fs::symlink_dir(&target, &dst_path)
327                            .or_else(|_| std::fs::copy(&src_path, &dst_path).map(|_| ()))?;
328                    } else {
329                        std::os::windows::fs::symlink_file(&target, &dst_path)
330                            .or_else(|_| std::fs::copy(&src_path, &dst_path).map(|_| ()))?;
331                    }
332                }
333            } else {
334                std::fs::copy(&src_path, &dst_path)?;
335            }
336        }
337        Ok(())
338    }
339
340    #[tokio::test]
341    async fn boot_reaches_tier0_without_blocking() {
342        // This is a *hang guard*, not a performance gate. Boot must
343        // return and have Tier 0 ready; it must not block on the
344        // background tiers. The actual sub-second cold-start figure is
345        // a published benchmark (see docs/benchmarks.md / system_bench)
346        // — asserting a hard wall-clock bound here flakes on loaded CI
347        // runners and tells us nothing a benchmark doesn't.
348        let fixture = temp_fixture();
349        let root = fixture.root;
350        let config = Config::default();
351        let started = std::time::Instant::now();
352
353        let sup = Supervisor::boot(root, config).await.unwrap();
354
355        // A generous bound: only a genuine hang (deadlocked boot, a
356        // blocking call on the hot path) trips this.
357        assert!(
358            started.elapsed() < Duration::from_secs(30),
359            "boot took {:?} — far beyond the expected sub-second cold \
360             start; this indicates a blocking call on the boot path",
361            started.elapsed()
362        );
363
364        let state = sup.get_tier_state().await;
365        assert!(state.is_ready(), "expected Tier 0 ready, got {state:?}");
366
367        let status = sup.index().status().await.unwrap();
368        assert!(status.file_count > 0, "expected at least one indexed file");
369
370        sup.shutdown().await.unwrap();
371    }
372
373    #[tokio::test]
374    async fn boot_sets_tier_state_fields() {
375        let fixture = temp_fixture();
376        let config = Config::default();
377        let sup = Supervisor::boot(fixture.root, config).await.unwrap();
378
379        let state = sup.get_tier_state().await;
380        match state {
381            TierState::Tier0 { files_indexed } => {
382                assert!(files_indexed > 0, "expected at least 1 file indexed");
383            }
384            other => panic!("expected Tier 0, got {other:?}"),
385        }
386
387        sup.shutdown().await.unwrap();
388    }
389
390    #[tokio::test]
391    async fn shutdown_cleans_up_without_panicking() {
392        let fixture = temp_fixture();
393        let sup = Supervisor::boot(fixture.root, Config::default())
394            .await
395            .unwrap();
396        sup.shutdown().await.unwrap();
397    }
398
399    #[tokio::test]
400    async fn spawn_registers_cancellation_aware_task() {
401        let fixture = temp_fixture();
402        let sup = Supervisor::boot(fixture.root, Config::default())
403            .await
404            .unwrap();
405
406        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
407        sup.spawn(async move {
408            tokio::time::sleep(Duration::from_millis(50)).await;
409            let _ = tx.send(42);
410        });
411
412        let val = tokio::time::timeout(Duration::from_secs(2), rx.recv())
413            .await
414            .unwrap();
415        assert_eq!(val, Some(42));
416
417        sup.shutdown().await.unwrap();
418    }
419
420    #[tokio::test]
421    async fn incremental_reindex_picks_up_new_file() {
422        let fixture = temp_fixture();
423        let root = fixture.root.clone();
424        let sup = Supervisor::boot(root.clone(), Config::default())
425            .await
426            .unwrap();
427
428        let deadline = tokio::time::Instant::now() + Duration::from_secs(30);
429        let mut tier1_ready = false;
430        while tokio::time::Instant::now() < deadline {
431            let state = sup.get_tier_state().await;
432            if matches!(state, TierState::Tier1 { .. } | TierState::Tier1_5 { .. }) {
433                tier1_ready = true;
434                break;
435            }
436            tokio::time::sleep(Duration::from_millis(100)).await;
437        }
438        assert!(
439            tier1_ready,
440            "Tier 1 / Tier 1.5 did not become ready within 30s"
441        );
442
443        let new_file_path = camino::Utf8PathBuf::from("src/new_module.rs");
444        let new_file_abs = root.join(new_file_path.as_str());
445        std::fs::write(
446            new_file_abs.as_str(),
447            "pub fn watcher_test_symbol() -> u32 { 42 }\n",
448        )
449        .unwrap();
450
451        let changes = vec![ChangedPath {
452            path: new_file_path.clone(),
453            kind: ChangeKind::Created,
454        }];
455
456        let start = std::time::Instant::now();
457        sup.index()
458            .reindex(&root, &changes)
459            .await
460            .expect("reindex should succeed");
461        let elapsed = start.elapsed();
462        assert!(
463            elapsed < Duration::from_millis(3000),
464            "reindex took {elapsed:?}, expected <3s"
465        );
466
467        let found = sup
468            .index()
469            .find_symbol("watcher_test_symbol", None)
470            .await
471            .expect("find_symbol should succeed");
472        assert!(
473            !found.is_empty(),
474            "newly created watcher_test_symbol not found after reindex"
475        );
476        assert_eq!(
477            found[0].file.as_str(),
478            "src/new_module.rs",
479            "symbol should be associated with the new file"
480        );
481
482        sup.shutdown().await.unwrap();
483    }
484}