1use std::collections::HashSet;
2use std::sync::atomic::{AtomicUsize, Ordering};
3use std::sync::Arc;
4use std::time::SystemTime;
5
6use argyph_embed::Embedder;
7use argyph_fs::{self, ChangeKind, ChangedPath, FileEntry, Language, Walker};
8use argyph_graph::builder::DefaultGraphBuilder;
9use argyph_graph::GraphBuilder;
10use argyph_parse::DefaultParser;
11use argyph_parse::Parser;
12use argyph_store::search::VectorEntry;
13use argyph_store::Store;
14use camino::{Utf8Path, Utf8PathBuf};
15use tokio::sync::{mpsc, Semaphore};
16use tokio::task::JoinSet;
17
18use crate::error::Result;
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum TierState {
22 Offline,
23 Tier0 { files_indexed: usize },
24 Tier1 { symbols_indexed: usize },
25 Tier1_5 { structural_files: usize },
26 Tier2 { embedded: usize, total: usize },
27 Ready,
28}
29
30use std::fmt;
31impl fmt::Display for TierState {
32 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
33 match self {
34 Self::Offline => write!(f, "offline"),
35 Self::Tier0 { .. } => write!(f, "tier0"),
36 Self::Tier1 { .. } => write!(f, "tier1"),
37 Self::Tier1_5 { .. } => write!(f, "tier1_5"),
38 Self::Tier2 { .. } => write!(f, "tier2"),
39 Self::Ready => write!(f, "ready"),
40 }
41 }
42}
43
44impl TierState {
45 pub fn is_ready(&self) -> bool {
46 matches!(
47 self,
48 Self::Tier0 { .. }
49 | Self::Tier1 { .. }
50 | Self::Tier1_5 { .. }
51 | Self::Tier2 { .. }
52 | Self::Ready
53 )
54 }
55
56 pub fn tier_number(&self) -> u8 {
57 match self {
58 Self::Offline => 0,
59 Self::Tier0 { .. } => 1,
60 Self::Tier1 { .. } | Self::Tier1_5 { .. } => 2,
61 Self::Tier2 { .. } | Self::Ready => 3,
62 }
63 }
64
65 #[must_use]
66 pub fn symbol_count(&self) -> u64 {
67 match self {
68 Self::Tier1 {
69 symbols_indexed, ..
70 } => *symbols_indexed as u64,
71 _ => 0,
72 }
73 }
74}
75
76#[tracing::instrument(skip(store), fields(root = %root.as_str()))]
77pub async fn run_tier0(root: &Utf8Path, store: &dyn Store) -> Result<Vec<FileEntry>> {
78 tracing::info!("starting Tier 0 walk");
79 let started = std::time::Instant::now();
80
81 let walker = argyph_fs::IgnoreWalker::new();
82 let entries: Vec<FileEntry> = walker.walk(root).collect();
83
84 tracing::info!(
85 count = entries.len(),
86 elapsed_ms = started.elapsed().as_millis() as u64,
87 "Tier 0 walk complete"
88 );
89
90 if !entries.is_empty() {
91 store.upsert_files(&entries).await?;
92 tracing::info!("Tier 0 upsert complete");
93 }
94
95 tracing::info!(
96 total_ms = started.elapsed().as_millis() as u64,
97 "Tier 0 finished"
98 );
99
100 Ok(entries)
101}
102
103#[tracing::instrument(skip(store), fields(root = %root.as_str()))]
104pub async fn run_tier1(root: &Utf8Path, store: &dyn Store) -> Result<u64> {
105 tracing::info!("starting Tier 1 parse");
106 let started = std::time::Instant::now();
107
108 let files = store.list_files().await?;
109 let parser = DefaultParser::new();
110 let builder = DefaultGraphBuilder;
111
112 let mut parsed: Vec<(Utf8PathBuf, argyph_parse::ParsedFile)> = Vec::with_capacity(files.len());
113 let mut total_symbols: u64 = 0;
114
115 const FLUSH_BATCH: usize = 4000;
121 let mut sym_batch: Vec<argyph_parse::Symbol> = Vec::new();
122 let mut chunk_batch: Vec<argyph_parse::Chunk> = Vec::new();
123
124 for entry in &files {
125 let path = root.join(entry.path.as_str());
126 let source = match std::fs::read_to_string(path.as_str()) {
127 Ok(s) => s,
128 Err(e) => {
129 tracing::warn!(file = %entry.path, error = %e, "skipping unreadable file");
130 continue;
131 }
132 };
133
134 let pf = parser.parse(entry, &source)?;
135 total_symbols += pf.symbols.len() as u64;
136
137 sym_batch.extend(pf.symbols.iter().cloned());
138 chunk_batch.extend(pf.chunks.iter().cloned());
139 if sym_batch.len() >= FLUSH_BATCH {
140 store.upsert_symbols(&sym_batch).await?;
141 sym_batch.clear();
142 }
143 if chunk_batch.len() >= FLUSH_BATCH {
144 store.upsert_chunks(&chunk_batch).await?;
145 chunk_batch.clear();
146 }
147 parsed.push((entry.path.clone(), pf));
148 }
149
150 if !sym_batch.is_empty() {
151 store.upsert_symbols(&sym_batch).await?;
152 }
153 if !chunk_batch.is_empty() {
154 store.upsert_chunks(&chunk_batch).await?;
155 }
156
157 tracing::info!(
158 files_parsed = parsed.len(),
159 symbols = total_symbols,
160 elapsed_ms = started.elapsed().as_millis() as u64,
161 "Tier 1 parse complete, building edges"
162 );
163
164 let edges = builder.build_edges(&parsed)?;
165 store.upsert_edges(&edges).await?;
166
167 tracing::info!(
168 edges = edges.len(),
169 total_ms = started.elapsed().as_millis() as u64,
170 "Tier 1 finished"
171 );
172
173 Ok(total_symbols)
174}
175
176#[tracing::instrument(skip(store), fields(root = %root.as_str()))]
177pub async fn run_tier1_5(store: &dyn Store, root: &Utf8Path, max_file_bytes: u64) -> Result<usize> {
178 tracing::info!("starting Tier 1.5 structural indexing");
179 let files = store.list_files().await?;
180
181 let candidates: Vec<_> = files
182 .into_iter()
183 .filter(|f| f.size <= max_file_bytes && f.language.is_some())
184 .collect();
185
186 let mut count = 0usize;
187 for f in &candidates {
188 if reindex_structural_for_file(store, root, f).await.is_ok() {
189 count += 1;
190 }
191 }
192
193 tracing::info!(structural_files = count, "Tier 1.5 complete");
194 Ok(count)
195}
196
197async fn reindex_structural_for_file(
202 store: &dyn Store,
203 root: &Utf8Path,
204 f: &FileEntry,
205) -> Result<()> {
206 use argyph_parse::structural::{self, StructuralNode};
207 use argyph_store::StructuralNodeRecord;
208
209 let path = root.join(f.path.as_str());
210 let source = std::fs::read_to_string(path.as_str())
211 .map_err(|e| crate::error::CoreError::Other(format!("read failed: {e}")))?;
212
213 let lang = f
214 .language
215 .ok_or_else(|| crate::error::CoreError::Other("no language".into()))?;
216 let file_key = f.path.as_str().len() as u64;
217 let nodes: Vec<StructuralNode> = match lang {
218 Language::Markdown => structural::markdown::parse(file_key, &source),
219 Language::Json => structural::json::parse(file_key, &source),
220 Language::Yaml => structural::yaml::parse(file_key, &source),
221 Language::Toml => structural::toml_parser::parse(file_key, &source),
222 Language::Csv => structural::csv::parse(file_key, &source),
223 _ => {
224 return Err(crate::error::CoreError::Other(
225 "non-structural language".into(),
226 ))
227 }
228 };
229
230 let file_id = store
231 .get_file_id(&f.path)
232 .await?
233 .ok_or_else(|| crate::error::CoreError::Other("file_id missing".into()))?;
234
235 let records: Vec<StructuralNodeRecord> = nodes
236 .into_iter()
237 .map(|n| StructuralNodeRecord {
238 id: n.id.0 as i64,
239 file_id,
240 kind: format!("{:?}", n.kind),
241 label: n.label,
242 path_joined: n.path.join("/"),
243 path: n.path,
244 byte_range: (n.byte_range.0 as u32, n.byte_range.1 as u32),
245 line_range: n.line_range,
246 parent_id: n.parent.map(|p| p.0 as i64),
247 depth: n.depth as u16,
248 })
249 .collect();
250
251 store.upsert_structural_nodes(file_id, &records).await?;
254 Ok(())
255}
256
257#[derive(Debug, Clone)]
259pub struct Tier2Progress {
260 pub embedded: usize,
261 pub total: usize,
262}
263
264#[tracing::instrument(skip(store, embedder, progress_tx))]
265pub async fn run_tier2(
266 store: Arc<dyn Store>,
267 embedder: Arc<dyn Embedder>,
268 progress_tx: mpsc::UnboundedSender<Tier2Progress>,
269 concurrency: usize,
270) -> Result<()> {
271 let model = embedder.model_id().to_string();
272 let dim = embedder.dimension();
273 let batch_size = 32;
274 let sem = Arc::new(Semaphore::new(concurrency));
275 let pending = Arc::new(AtomicUsize::new(0));
276 const BACKPRESSURE_THRESHOLD: usize = 10_000;
277
278 tracing::info!(model = %model, dim, concurrency, "Tier 2 embedding started");
279
280 loop {
281 let missing = store.missing_vectors(&model).await?;
282 if missing.is_empty() {
283 break;
284 }
285
286 let total = missing.len();
287 let done = Arc::new(AtomicUsize::new(0));
288 let mut join_set: JoinSet<Result<()>> = JoinSet::new();
289
290 for chunk_ids in missing.chunks(batch_size) {
291 while pending.load(Ordering::Relaxed) > BACKPRESSURE_THRESHOLD {
292 if let Some(res) = join_set.join_next().await {
293 res.map_err(|e| crate::CoreError::Embed(format!("task join error: {e}")))??;
294 }
295 }
296
297 let chunk_ids = chunk_ids.to_vec();
298 let n = chunk_ids.len();
299 pending.fetch_add(n, Ordering::Relaxed);
300
301 let store = Arc::clone(&store);
302 let embedder = Arc::clone(&embedder);
303 let progress_tx = progress_tx.clone();
304 let model = model.clone();
305 let sem = Arc::clone(&sem);
306 let pending = Arc::clone(&pending);
307 let done = Arc::clone(&done);
308
309 join_set.spawn(async move {
310 let result = async {
311 let _permit = Arc::clone(&sem)
312 .acquire_owned()
313 .await
314 .map_err(|_| crate::CoreError::Embed("semaphore closed".into()))?;
315
316 let pairs = store.get_chunk_texts(&chunk_ids).await?;
317
318 let chunk_order: Vec<&str> = chunk_ids.iter().map(|s| s.as_str()).collect();
319 let text_map: std::collections::HashMap<&str, &str> = pairs
320 .iter()
321 .map(|(id, text)| (id.as_str(), text.as_str()))
322 .collect();
323
324 let texts: Vec<String> = chunk_order
325 .iter()
326 .filter_map(|id| text_map.get(id).map(|t| t.to_string()))
327 .collect();
328
329 if texts.is_empty() {
330 return Ok(());
331 }
332
333 let embeddings = embedder
334 .embed(&texts)
335 .await
336 .map_err(|e| crate::CoreError::Embed(format!("{e}")))?;
337
338 let entries: Vec<VectorEntry> = chunk_ids
339 .iter()
340 .zip(embeddings.iter())
341 .map(|(id, vec)| VectorEntry {
342 chunk_id: id.clone(),
343 vector: vec.clone(),
344 model: model.clone(),
345 dimension: dim,
346 })
347 .collect();
348
349 store.upsert_vectors(&entries).await?;
350 Ok(())
351 }
352 .await;
353
354 pending.fetch_sub(n, Ordering::Relaxed);
355 let prev = done.fetch_add(n, Ordering::Relaxed);
356 let _ = progress_tx.send(Tier2Progress {
357 embedded: prev + n,
358 total,
359 });
360
361 result
362 });
363 }
364
365 while let Some(res) = join_set.join_next().await {
366 res.map_err(|e| crate::CoreError::Embed(format!("task join error: {e}")))??;
367 }
368 }
369
370 tracing::info!("Tier 2 embedding complete");
371 Ok(())
372}
373
374#[tracing::instrument(skip(store), fields(root = %root.as_str()))]
375pub async fn incremental_reindex(
376 root: &Utf8Path,
377 store: &dyn Store,
378 changes: &[ChangedPath],
379) -> Result<()> {
380 let parser = DefaultParser::new();
381 let builder = DefaultGraphBuilder;
382
383 let mut changed_files: HashSet<Utf8PathBuf> = HashSet::new();
384 let mut parsed: Vec<(Utf8PathBuf, argyph_parse::ParsedFile)> = Vec::new();
385
386 for change in changes {
387 let path = &change.path;
388
389 if change.kind == ChangeKind::Removed {
390 store.delete_file(path).await?;
392 changed_files.insert(path.clone());
393 continue;
394 }
395
396 changed_files.insert(path.clone());
397
398 let abs = root.join(path.as_str());
399
400 let entry = match read_file_entry(root, path) {
401 Ok(e) => e,
402 Err(e) => {
403 tracing::warn!(file = %path, error = %e, "skipping changed file");
404 continue;
405 }
406 };
407
408 let source = match std::fs::read_to_string(abs.as_str()) {
409 Ok(s) => s,
410 Err(e) => {
411 tracing::warn!(file = %path, error = %e, "skipping unreadable file");
412 continue;
413 }
414 };
415
416 store.upsert_files(&[entry.clone()]).await?;
418
419 let is_structural = matches!(
421 entry.language,
422 Some(Language::Markdown)
423 | Some(Language::Json)
424 | Some(Language::Yaml)
425 | Some(Language::Toml)
426 | Some(Language::Csv)
427 );
428 if is_structural {
429 if let Err(e) = reindex_structural_for_file(store, root, &entry).await {
430 tracing::warn!(file = %path, error = %e, "structural reindex failed");
431 }
432 continue;
433 }
434
435 let pf = match parser.parse(&entry, &source) {
436 Ok(pf) => pf,
437 Err(e) => {
438 tracing::warn!(file = %path, error = %e, "parse failed");
439 continue;
440 }
441 };
442
443 if !pf.symbols.is_empty() {
444 store.upsert_symbols(&pf.symbols).await?;
445 }
446 if !pf.chunks.is_empty() {
447 store.upsert_chunks(&pf.chunks).await?;
448 }
449 parsed.push((path.clone(), pf));
450 }
451
452 if parsed.is_empty() && changed_files.is_empty() {
453 return Ok(());
454 }
455
456 let neighbors = find_import_neighbors(store, &changed_files).await;
457 let neighbor_files: HashSet<&Utf8PathBuf> = neighbors.iter().collect();
458
459 let all_files = store.list_files().await?;
460 for entry in &all_files {
461 if parsed.iter().any(|(p, _)| p == &entry.path) {
462 continue;
463 }
464 if !neighbor_files.contains(&entry.path) {
465 continue;
466 }
467
468 let abs = root.join(entry.path.as_str());
469 let source = match std::fs::read_to_string(abs.as_str()) {
470 Ok(s) => s,
471 Err(_) => continue,
472 };
473 let pf = match parser.parse(entry, &source) {
474 Ok(pf) => pf,
475 Err(_) => continue,
476 };
477 parsed.push((entry.path.clone(), pf));
478 }
479
480 let edges = builder.build_edges(&parsed)?;
481
482 let mut affected: HashSet<&Utf8PathBuf> = parsed.iter().map(|(p, _)| p).collect();
483 for change in changes {
484 affected.insert(&change.path);
485 }
486
487 for file_path in affected {
488 store.replace_edges_for_file(file_path, &edges).await?;
489 }
490
491 Ok(())
492}
493
494fn read_file_entry(root: &Utf8Path, path: &Utf8Path) -> Result<FileEntry> {
495 let abs = root.join(path.as_str());
496 let meta = std::fs::metadata(abs.as_str())?;
497 let size = meta.len();
498 let modified = meta.modified().unwrap_or(SystemTime::UNIX_EPOCH);
499
500 let hash = argyph_fs::hash_file(&abs)
501 .map_err(|e| crate::CoreError::Io(std::io::Error::other(e.to_string())))?;
502
503 let ext = path.extension().unwrap_or("");
504 let language = Language::from_extension(ext);
505
506 Ok(FileEntry {
507 path: path.to_path_buf(),
508 hash,
509 language,
510 size,
511 modified,
512 })
513}
514
515async fn find_import_neighbors(
516 store: &dyn Store,
517 files: &HashSet<Utf8PathBuf>,
518) -> Vec<Utf8PathBuf> {
519 let mut result = HashSet::new();
520 for file in files {
521 if let Ok(edges) = store.get_imports(file).await {
522 for e in &edges {
523 if let Some((imported, _, _)) = parse_sid_prefix(e.to.as_str()) {
524 if !files.contains(&imported) {
525 result.insert(imported);
526 }
527 }
528 if let Some((importer, _, _)) = parse_sid_prefix(e.from.as_str()) {
529 if !files.contains(&importer) {
530 result.insert(importer);
531 }
532 }
533 }
534 }
535 }
536 result.into_iter().collect()
537}
538
539fn parse_sid_prefix(id: &str) -> Option<(Utf8PathBuf, String, usize)> {
540 let rest = id.rsplit_once("::")?;
541 let (prefix, start_str) = rest;
542 let start: usize = start_str.parse().ok()?;
543 let (file, name) = prefix.rsplit_once("::")?;
544 Some((Utf8PathBuf::from(file), name.to_string(), start))
545}
546
547#[cfg(test)]
548#[allow(clippy::unwrap_used)]
549mod tests {
550 use super::*;
551
552 #[test]
553 fn tier_state_display() {
554 assert_eq!(TierState::Offline.to_string(), "offline");
555 assert_eq!(TierState::Tier0 { files_indexed: 0 }.to_string(), "tier0");
556 assert_eq!(
557 TierState::Tier1 {
558 symbols_indexed: 100
559 }
560 .to_string(),
561 "tier1"
562 );
563 assert_eq!(
564 TierState::Tier1_5 {
565 structural_files: 5
566 }
567 .to_string(),
568 "tier1_5"
569 );
570 assert_eq!(
571 TierState::Tier2 {
572 embedded: 25,
573 total: 50
574 }
575 .to_string(),
576 "tier2"
577 );
578 assert_eq!(TierState::Ready.to_string(), "ready");
579 }
580
581 #[test]
582 fn tier_state_is_ready() {
583 assert!(!TierState::Offline.is_ready());
584 assert!(TierState::Tier0 { files_indexed: 0 }.is_ready());
585 assert!(TierState::Tier1 { symbols_indexed: 1 }.is_ready());
586 assert!(TierState::Tier1_5 {
587 structural_files: 1
588 }
589 .is_ready());
590 assert!(TierState::Tier2 {
591 embedded: 1,
592 total: 2
593 }
594 .is_ready());
595 assert!(TierState::Ready.is_ready());
596 }
597
598 #[test]
599 fn tier_number_progression() {
600 assert_eq!(TierState::Offline.tier_number(), 0);
601 assert_eq!(TierState::Tier0 { files_indexed: 0 }.tier_number(), 1);
602 assert_eq!(TierState::Tier1 { symbols_indexed: 0 }.tier_number(), 2);
603 assert_eq!(
604 TierState::Tier1_5 {
605 structural_files: 0
606 }
607 .tier_number(),
608 2
609 );
610 assert_eq!(
611 TierState::Tier2 {
612 embedded: 0,
613 total: 0
614 }
615 .tier_number(),
616 3
617 );
618 assert_eq!(TierState::Ready.tier_number(), 3);
619 }
620}