Skip to main content

argyph_core/
tiers.rs

1use std::collections::HashSet;
2use std::sync::atomic::{AtomicUsize, Ordering};
3use std::sync::Arc;
4use std::time::SystemTime;
5
6use argyph_embed::Embedder;
7use argyph_fs::{self, ChangeKind, ChangedPath, FileEntry, Language, Walker};
8use argyph_graph::builder::DefaultGraphBuilder;
9use argyph_graph::GraphBuilder;
10use argyph_parse::DefaultParser;
11use argyph_parse::Parser;
12use argyph_store::search::VectorEntry;
13use argyph_store::Store;
14use camino::{Utf8Path, Utf8PathBuf};
15use tokio::sync::{mpsc, Semaphore};
16use tokio::task::JoinSet;
17
18use crate::error::Result;
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum TierState {
22    Offline,
23    Tier0 { files_indexed: usize },
24    Tier1 { symbols_indexed: usize },
25    Tier1_5 { structural_files: usize },
26    Tier2 { embedded: usize, total: usize },
27    Ready,
28}
29
30use std::fmt;
31impl fmt::Display for TierState {
32    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
33        match self {
34            Self::Offline => write!(f, "offline"),
35            Self::Tier0 { .. } => write!(f, "tier0"),
36            Self::Tier1 { .. } => write!(f, "tier1"),
37            Self::Tier1_5 { .. } => write!(f, "tier1_5"),
38            Self::Tier2 { .. } => write!(f, "tier2"),
39            Self::Ready => write!(f, "ready"),
40        }
41    }
42}
43
44impl TierState {
45    pub fn is_ready(&self) -> bool {
46        matches!(
47            self,
48            Self::Tier0 { .. }
49                | Self::Tier1 { .. }
50                | Self::Tier1_5 { .. }
51                | Self::Tier2 { .. }
52                | Self::Ready
53        )
54    }
55
56    pub fn tier_number(&self) -> u8 {
57        match self {
58            Self::Offline => 0,
59            Self::Tier0 { .. } => 1,
60            Self::Tier1 { .. } | Self::Tier1_5 { .. } => 2,
61            Self::Tier2 { .. } | Self::Ready => 3,
62        }
63    }
64
65    #[must_use]
66    pub fn symbol_count(&self) -> u64 {
67        match self {
68            Self::Tier1 {
69                symbols_indexed, ..
70            } => *symbols_indexed as u64,
71            _ => 0,
72        }
73    }
74}
75
76#[tracing::instrument(skip(store), fields(root = %root.as_str()))]
77pub async fn run_tier0(root: &Utf8Path, store: &dyn Store) -> Result<Vec<FileEntry>> {
78    tracing::info!("starting Tier 0 walk");
79    let started = std::time::Instant::now();
80
81    let walker = argyph_fs::IgnoreWalker::new();
82    let entries: Vec<FileEntry> = walker.walk(root).collect();
83
84    tracing::info!(
85        count = entries.len(),
86        elapsed_ms = started.elapsed().as_millis() as u64,
87        "Tier 0 walk complete"
88    );
89
90    if !entries.is_empty() {
91        store.upsert_files(&entries).await?;
92        tracing::info!("Tier 0 upsert complete");
93    }
94
95    tracing::info!(
96        total_ms = started.elapsed().as_millis() as u64,
97        "Tier 0 finished"
98    );
99
100    Ok(entries)
101}
102
103#[tracing::instrument(skip(store), fields(root = %root.as_str()))]
104pub async fn run_tier1(root: &Utf8Path, store: &dyn Store) -> Result<u64> {
105    tracing::info!("starting Tier 1 parse");
106    let started = std::time::Instant::now();
107
108    let files = store.list_files().await?;
109    let parser = DefaultParser::new();
110    let builder = DefaultGraphBuilder;
111
112    let mut parsed: Vec<(Utf8PathBuf, argyph_parse::ParsedFile)> = Vec::with_capacity(files.len());
113    let mut total_symbols: u64 = 0;
114
115    // Symbol/chunk rows are flushed in large batches rather than one
116    // transaction per file. On a big repo, per-file commits dominated
117    // Tier 1 wall-clock — 79K files meant ~158K transaction commits,
118    // each forcing a WAL write. Batching collapses that to a few
119    // hundred commits.
120    const FLUSH_BATCH: usize = 4000;
121    let mut sym_batch: Vec<argyph_parse::Symbol> = Vec::new();
122    let mut chunk_batch: Vec<argyph_parse::Chunk> = Vec::new();
123
124    for entry in &files {
125        let path = root.join(entry.path.as_str());
126        let source = match std::fs::read_to_string(path.as_str()) {
127            Ok(s) => s,
128            Err(e) => {
129                tracing::warn!(file = %entry.path, error = %e, "skipping unreadable file");
130                continue;
131            }
132        };
133
134        let pf = parser.parse(entry, &source)?;
135        total_symbols += pf.symbols.len() as u64;
136
137        sym_batch.extend(pf.symbols.iter().cloned());
138        chunk_batch.extend(pf.chunks.iter().cloned());
139        if sym_batch.len() >= FLUSH_BATCH {
140            store.upsert_symbols(&sym_batch).await?;
141            sym_batch.clear();
142        }
143        if chunk_batch.len() >= FLUSH_BATCH {
144            store.upsert_chunks(&chunk_batch).await?;
145            chunk_batch.clear();
146        }
147        parsed.push((entry.path.clone(), pf));
148    }
149
150    if !sym_batch.is_empty() {
151        store.upsert_symbols(&sym_batch).await?;
152    }
153    if !chunk_batch.is_empty() {
154        store.upsert_chunks(&chunk_batch).await?;
155    }
156
157    tracing::info!(
158        files_parsed = parsed.len(),
159        symbols = total_symbols,
160        elapsed_ms = started.elapsed().as_millis() as u64,
161        "Tier 1 parse complete, building edges"
162    );
163
164    let edges = builder.build_edges(&parsed)?;
165    store.upsert_edges(&edges).await?;
166
167    tracing::info!(
168        edges = edges.len(),
169        total_ms = started.elapsed().as_millis() as u64,
170        "Tier 1 finished"
171    );
172
173    Ok(total_symbols)
174}
175
176#[tracing::instrument(skip(store), fields(root = %root.as_str()))]
177pub async fn run_tier1_5(store: &dyn Store, root: &Utf8Path, max_file_bytes: u64) -> Result<usize> {
178    tracing::info!("starting Tier 1.5 structural indexing");
179    let files = store.list_files().await?;
180
181    let candidates: Vec<_> = files
182        .into_iter()
183        .filter(|f| f.size <= max_file_bytes && f.language.is_some())
184        .collect();
185
186    let mut count = 0usize;
187    for f in &candidates {
188        if reindex_structural_for_file(store, root, f).await.is_ok() {
189            count += 1;
190        }
191    }
192
193    tracing::info!(structural_files = count, "Tier 1.5 complete");
194    Ok(count)
195}
196
197/// Re-parse and upsert structural nodes for a single (non-code) file.
198/// Returns Ok(()) when records were written, an error otherwise.
199/// `upsert_structural_nodes` clears existing rows for the file first,
200/// so this safely handles "file changed" as well as "file added".
201async fn reindex_structural_for_file(
202    store: &dyn Store,
203    root: &Utf8Path,
204    f: &FileEntry,
205) -> Result<()> {
206    use argyph_parse::structural::{self, StructuralNode};
207    use argyph_store::StructuralNodeRecord;
208
209    let path = root.join(f.path.as_str());
210    let source = std::fs::read_to_string(path.as_str())
211        .map_err(|e| crate::error::CoreError::Other(format!("read failed: {e}")))?;
212
213    let lang = f
214        .language
215        .ok_or_else(|| crate::error::CoreError::Other("no language".into()))?;
216    let file_key = f.path.as_str().len() as u64;
217    let nodes: Vec<StructuralNode> = match lang {
218        Language::Markdown => structural::markdown::parse(file_key, &source),
219        Language::Json => structural::json::parse(file_key, &source),
220        Language::Yaml => structural::yaml::parse(file_key, &source),
221        Language::Toml => structural::toml_parser::parse(file_key, &source),
222        Language::Csv => structural::csv::parse(file_key, &source),
223        _ => {
224            return Err(crate::error::CoreError::Other(
225                "non-structural language".into(),
226            ))
227        }
228    };
229
230    let file_id = store
231        .get_file_id(&f.path)
232        .await?
233        .ok_or_else(|| crate::error::CoreError::Other("file_id missing".into()))?;
234
235    let records: Vec<StructuralNodeRecord> = nodes
236        .into_iter()
237        .map(|n| StructuralNodeRecord {
238            id: n.id.0 as i64,
239            file_id,
240            kind: format!("{:?}", n.kind),
241            label: n.label,
242            path_joined: n.path.join("/"),
243            path: n.path,
244            byte_range: (n.byte_range.0 as u32, n.byte_range.1 as u32),
245            line_range: n.line_range,
246            parent_id: n.parent.map(|p| p.0 as i64),
247            depth: n.depth as u16,
248        })
249        .collect();
250
251    // Always upsert (even when empty) so a file that no longer has any
252    // structural nodes gets its stale rows cleared.
253    store.upsert_structural_nodes(file_id, &records).await?;
254    Ok(())
255}
256
257/// Progress update emitted during Tier 2 embedding.
258#[derive(Debug, Clone)]
259pub struct Tier2Progress {
260    pub embedded: usize,
261    pub total: usize,
262}
263
264#[tracing::instrument(skip(store, embedder, progress_tx))]
265pub async fn run_tier2(
266    store: Arc<dyn Store>,
267    embedder: Arc<dyn Embedder>,
268    progress_tx: mpsc::UnboundedSender<Tier2Progress>,
269    concurrency: usize,
270) -> Result<()> {
271    let model = embedder.model_id().to_string();
272    let dim = embedder.dimension();
273    let batch_size = 32;
274    let sem = Arc::new(Semaphore::new(concurrency));
275    let pending = Arc::new(AtomicUsize::new(0));
276    const BACKPRESSURE_THRESHOLD: usize = 10_000;
277
278    tracing::info!(model = %model, dim, concurrency, "Tier 2 embedding started");
279
280    loop {
281        let missing = store.missing_vectors(&model).await?;
282        if missing.is_empty() {
283            break;
284        }
285
286        let total = missing.len();
287        let done = Arc::new(AtomicUsize::new(0));
288        let mut join_set: JoinSet<Result<()>> = JoinSet::new();
289
290        for chunk_ids in missing.chunks(batch_size) {
291            while pending.load(Ordering::Relaxed) > BACKPRESSURE_THRESHOLD {
292                if let Some(res) = join_set.join_next().await {
293                    res.map_err(|e| crate::CoreError::Embed(format!("task join error: {e}")))??;
294                }
295            }
296
297            let chunk_ids = chunk_ids.to_vec();
298            let n = chunk_ids.len();
299            pending.fetch_add(n, Ordering::Relaxed);
300
301            let store = Arc::clone(&store);
302            let embedder = Arc::clone(&embedder);
303            let progress_tx = progress_tx.clone();
304            let model = model.clone();
305            let sem = Arc::clone(&sem);
306            let pending = Arc::clone(&pending);
307            let done = Arc::clone(&done);
308
309            join_set.spawn(async move {
310                let result = async {
311                    let _permit = Arc::clone(&sem)
312                        .acquire_owned()
313                        .await
314                        .map_err(|_| crate::CoreError::Embed("semaphore closed".into()))?;
315
316                    let pairs = store.get_chunk_texts(&chunk_ids).await?;
317
318                    let chunk_order: Vec<&str> = chunk_ids.iter().map(|s| s.as_str()).collect();
319                    let text_map: std::collections::HashMap<&str, &str> = pairs
320                        .iter()
321                        .map(|(id, text)| (id.as_str(), text.as_str()))
322                        .collect();
323
324                    let texts: Vec<String> = chunk_order
325                        .iter()
326                        .filter_map(|id| text_map.get(id).map(|t| t.to_string()))
327                        .collect();
328
329                    if texts.is_empty() {
330                        return Ok(());
331                    }
332
333                    let embeddings = embedder
334                        .embed(&texts)
335                        .await
336                        .map_err(|e| crate::CoreError::Embed(format!("{e}")))?;
337
338                    let entries: Vec<VectorEntry> = chunk_ids
339                        .iter()
340                        .zip(embeddings.iter())
341                        .map(|(id, vec)| VectorEntry {
342                            chunk_id: id.clone(),
343                            vector: vec.clone(),
344                            model: model.clone(),
345                            dimension: dim,
346                        })
347                        .collect();
348
349                    store.upsert_vectors(&entries).await?;
350                    Ok(())
351                }
352                .await;
353
354                pending.fetch_sub(n, Ordering::Relaxed);
355                let prev = done.fetch_add(n, Ordering::Relaxed);
356                let _ = progress_tx.send(Tier2Progress {
357                    embedded: prev + n,
358                    total,
359                });
360
361                result
362            });
363        }
364
365        while let Some(res) = join_set.join_next().await {
366            res.map_err(|e| crate::CoreError::Embed(format!("task join error: {e}")))??;
367        }
368    }
369
370    tracing::info!("Tier 2 embedding complete");
371    Ok(())
372}
373
374#[tracing::instrument(skip(store), fields(root = %root.as_str()))]
375pub async fn incremental_reindex(
376    root: &Utf8Path,
377    store: &dyn Store,
378    changes: &[ChangedPath],
379) -> Result<()> {
380    let parser = DefaultParser::new();
381    let builder = DefaultGraphBuilder;
382
383    let mut changed_files: HashSet<Utf8PathBuf> = HashSet::new();
384    let mut parsed: Vec<(Utf8PathBuf, argyph_parse::ParsedFile)> = Vec::new();
385
386    for change in changes {
387        let path = &change.path;
388
389        if change.kind == ChangeKind::Removed {
390            // FK cascade clears structural_nodes and symbol/chunk rows.
391            store.delete_file(path).await?;
392            changed_files.insert(path.clone());
393            continue;
394        }
395
396        changed_files.insert(path.clone());
397
398        let abs = root.join(path.as_str());
399
400        let entry = match read_file_entry(root, path) {
401            Ok(e) => e,
402            Err(e) => {
403                tracing::warn!(file = %path, error = %e, "skipping changed file");
404                continue;
405            }
406        };
407
408        let source = match std::fs::read_to_string(abs.as_str()) {
409            Ok(s) => s,
410            Err(e) => {
411                tracing::warn!(file = %path, error = %e, "skipping unreadable file");
412                continue;
413            }
414        };
415
416        // Persist file metadata for either branch below.
417        store.upsert_files(&[entry.clone()]).await?;
418
419        // Structural-only languages bypass the code parser and refresh Tier 1.5 nodes.
420        let is_structural = matches!(
421            entry.language,
422            Some(Language::Markdown)
423                | Some(Language::Json)
424                | Some(Language::Yaml)
425                | Some(Language::Toml)
426                | Some(Language::Csv)
427        );
428        if is_structural {
429            if let Err(e) = reindex_structural_for_file(store, root, &entry).await {
430                tracing::warn!(file = %path, error = %e, "structural reindex failed");
431            }
432            continue;
433        }
434
435        let pf = match parser.parse(&entry, &source) {
436            Ok(pf) => pf,
437            Err(e) => {
438                tracing::warn!(file = %path, error = %e, "parse failed");
439                continue;
440            }
441        };
442
443        if !pf.symbols.is_empty() {
444            store.upsert_symbols(&pf.symbols).await?;
445        }
446        if !pf.chunks.is_empty() {
447            store.upsert_chunks(&pf.chunks).await?;
448        }
449        parsed.push((path.clone(), pf));
450    }
451
452    if parsed.is_empty() && changed_files.is_empty() {
453        return Ok(());
454    }
455
456    let neighbors = find_import_neighbors(store, &changed_files).await;
457    let neighbor_files: HashSet<&Utf8PathBuf> = neighbors.iter().collect();
458
459    let all_files = store.list_files().await?;
460    for entry in &all_files {
461        if parsed.iter().any(|(p, _)| p == &entry.path) {
462            continue;
463        }
464        if !neighbor_files.contains(&entry.path) {
465            continue;
466        }
467
468        let abs = root.join(entry.path.as_str());
469        let source = match std::fs::read_to_string(abs.as_str()) {
470            Ok(s) => s,
471            Err(_) => continue,
472        };
473        let pf = match parser.parse(entry, &source) {
474            Ok(pf) => pf,
475            Err(_) => continue,
476        };
477        parsed.push((entry.path.clone(), pf));
478    }
479
480    let edges = builder.build_edges(&parsed)?;
481
482    let mut affected: HashSet<&Utf8PathBuf> = parsed.iter().map(|(p, _)| p).collect();
483    for change in changes {
484        affected.insert(&change.path);
485    }
486
487    for file_path in affected {
488        store.replace_edges_for_file(file_path, &edges).await?;
489    }
490
491    Ok(())
492}
493
494fn read_file_entry(root: &Utf8Path, path: &Utf8Path) -> Result<FileEntry> {
495    let abs = root.join(path.as_str());
496    let meta = std::fs::metadata(abs.as_str())?;
497    let size = meta.len();
498    let modified = meta.modified().unwrap_or(SystemTime::UNIX_EPOCH);
499
500    let hash = argyph_fs::hash_file(&abs)
501        .map_err(|e| crate::CoreError::Io(std::io::Error::other(e.to_string())))?;
502
503    let ext = path.extension().unwrap_or("");
504    let language = Language::from_extension(ext);
505
506    Ok(FileEntry {
507        path: path.to_path_buf(),
508        hash,
509        language,
510        size,
511        modified,
512    })
513}
514
515async fn find_import_neighbors(
516    store: &dyn Store,
517    files: &HashSet<Utf8PathBuf>,
518) -> Vec<Utf8PathBuf> {
519    let mut result = HashSet::new();
520    for file in files {
521        if let Ok(edges) = store.get_imports(file).await {
522            for e in &edges {
523                if let Some((imported, _, _)) = parse_sid_prefix(e.to.as_str()) {
524                    if !files.contains(&imported) {
525                        result.insert(imported);
526                    }
527                }
528                if let Some((importer, _, _)) = parse_sid_prefix(e.from.as_str()) {
529                    if !files.contains(&importer) {
530                        result.insert(importer);
531                    }
532                }
533            }
534        }
535    }
536    result.into_iter().collect()
537}
538
539fn parse_sid_prefix(id: &str) -> Option<(Utf8PathBuf, String, usize)> {
540    let rest = id.rsplit_once("::")?;
541    let (prefix, start_str) = rest;
542    let start: usize = start_str.parse().ok()?;
543    let (file, name) = prefix.rsplit_once("::")?;
544    Some((Utf8PathBuf::from(file), name.to_string(), start))
545}
546
547#[cfg(test)]
548#[allow(clippy::unwrap_used)]
549mod tests {
550    use super::*;
551
552    #[test]
553    fn tier_state_display() {
554        assert_eq!(TierState::Offline.to_string(), "offline");
555        assert_eq!(TierState::Tier0 { files_indexed: 0 }.to_string(), "tier0");
556        assert_eq!(
557            TierState::Tier1 {
558                symbols_indexed: 100
559            }
560            .to_string(),
561            "tier1"
562        );
563        assert_eq!(
564            TierState::Tier1_5 {
565                structural_files: 5
566            }
567            .to_string(),
568            "tier1_5"
569        );
570        assert_eq!(
571            TierState::Tier2 {
572                embedded: 25,
573                total: 50
574            }
575            .to_string(),
576            "tier2"
577        );
578        assert_eq!(TierState::Ready.to_string(), "ready");
579    }
580
581    #[test]
582    fn tier_state_is_ready() {
583        assert!(!TierState::Offline.is_ready());
584        assert!(TierState::Tier0 { files_indexed: 0 }.is_ready());
585        assert!(TierState::Tier1 { symbols_indexed: 1 }.is_ready());
586        assert!(TierState::Tier1_5 {
587            structural_files: 1
588        }
589        .is_ready());
590        assert!(TierState::Tier2 {
591            embedded: 1,
592            total: 2
593        }
594        .is_ready());
595        assert!(TierState::Ready.is_ready());
596    }
597
598    #[test]
599    fn tier_number_progression() {
600        assert_eq!(TierState::Offline.tier_number(), 0);
601        assert_eq!(TierState::Tier0 { files_indexed: 0 }.tier_number(), 1);
602        assert_eq!(TierState::Tier1 { symbols_indexed: 0 }.tier_number(), 2);
603        assert_eq!(
604            TierState::Tier1_5 {
605                structural_files: 0
606            }
607            .tier_number(),
608            2
609        );
610        assert_eq!(
611            TierState::Tier2 {
612                embedded: 0,
613                total: 0
614            }
615            .tier_number(),
616            3
617        );
618        assert_eq!(TierState::Ready.tier_number(), 3);
619    }
620}