1use crate::chunks::{
15 SymbolRow, build_commit_chunk, build_markdown_chunk, build_symbol_chunk,
16 split_markdown_sections,
17};
18use crate::config::EmbeddingsConfig;
19use crate::embedder::{Embedder, encode_vector};
20use crate::git_staleness::compute_staleness_batch;
21use crate::store;
22use crate::vec_ext::VecConnection;
23use gix::bstr::ByteSlice as _;
24use libsql::Connection;
25use std::collections::HashMap;
26use std::path::Path;
27use std::time::Instant;
28use tracing::{info, warn};
29
30#[derive(Debug, Default)]
32pub struct PopulateStats {
33 pub symbols_embedded: usize,
34 pub symbols_skipped: usize,
35 pub docs_embedded: usize,
36 pub commits_embedded: usize,
37 pub contexts_embedded: usize,
38 pub errors: usize,
39}
40
41const EMBED_BATCH_SIZE: usize = 64;
44
45pub const DEFAULT_MAX_COMMITS: usize = 500;
47
48pub async fn populate_embeddings(
56 conn: &Connection,
57 config: &EmbeddingsConfig,
58 changed_paths: Option<&[String]>,
59 head_commit: Option<&str>,
60 repo_root: Option<&std::path::Path>,
61 db_path: Option<&Path>,
62) -> anyhow::Result<PopulateStats> {
63 let started = Instant::now();
64 let is_full_rebuild = changed_paths.is_none();
65
66 let vec_conn: Option<VecConnection> = db_path.and_then(VecConnection::open);
69
70 if is_full_rebuild {
73 store::drop_embedding_tables(conn, vec_conn.as_ref()).await?;
74 }
75
76 store::ensure_schema(conn).await?;
77
78 eprintln!("Loading embedding model {}...", config.model);
79 let mut embedder = Embedder::load(&config.model, None)?;
80 info!(model = %config.model, dims = embedder.dimensions, "Embedding model loaded");
81 eprintln!(
82 "Loaded model {} ({} dimensions)",
83 config.model, embedder.dimensions
84 );
85
86 store::ensure_vec_schema(conn, embedder.dimensions, vec_conn.as_ref()).await;
88
89 let mut stats = PopulateStats::default();
90
91 let symbols = load_symbols(conn, changed_paths).await?;
93
94 if symbols.is_empty() {
95 eprintln!("No symbols to embed.");
96 return Ok(stats);
97 }
98
99 let total = symbols.len();
100 eprintln!("Embedding {total} symbols...");
101
102 if let Some(paths) = changed_paths {
104 for path in paths {
105 store::delete_embeddings_for_path(conn, path, vec_conn.as_ref()).await?;
106 }
107 }
108
109 let co_change_map = load_co_change_map(conn).await?;
111 eprintln!("Loading callers...");
112 let all_callers = load_all_callers(conn).await;
113 eprintln!("Loading callees...");
114 let all_callees = load_all_callees(conn).await;
115 eprintln!("Loading doc comments...");
116 let all_docs = load_all_doc_comments(conn).await;
117
118 let file_paths: Vec<&str> = symbols.iter().map(|s| s.file.as_str()).collect();
120 let staleness_map: HashMap<String, f64> = if let Some(root) = repo_root {
121 compute_staleness_batch(root, &file_paths)
122 } else {
123 HashMap::new()
124 };
125
126 let mut batch_symbols: Vec<SymbolRow> = Vec::new();
128 let mut batch_texts: Vec<String> = Vec::new();
129 let mut batch_staleness: Vec<f64> = Vec::new();
130 let mut done = 0usize;
131
132 for symbol in symbols {
133 let callers = all_callers.get(&symbol.name).cloned().unwrap_or_default();
134 let callees = all_callees
135 .get(&(symbol.name.clone(), symbol.file.clone()))
136 .cloned()
137 .unwrap_or_default();
138 let co_changes = co_change_map.get(&symbol.file).cloned().unwrap_or_default();
139 let doc = all_docs
140 .get(&(symbol.name.clone(), symbol.file.clone()))
141 .cloned();
142 let chunk_text =
143 build_symbol_chunk(&symbol, doc.as_deref(), &callers, &callees, &co_changes);
144 let staleness = *staleness_map.get(&symbol.file).unwrap_or(&0.0);
145
146 batch_symbols.push(symbol);
147 batch_texts.push(chunk_text);
148 batch_staleness.push(staleness);
149
150 if batch_texts.len() >= EMBED_BATCH_SIZE {
151 flush_batch(
152 conn,
153 &mut embedder,
154 &batch_symbols,
155 &batch_texts,
156 &batch_staleness,
157 head_commit,
158 &config.model,
159 &mut stats,
160 vec_conn.as_ref(),
161 )
162 .await;
163 done += batch_symbols.len();
164 eprintln!("Embedded {done}/{total} symbols");
165 batch_symbols.clear();
166 batch_texts.clear();
167 batch_staleness.clear();
168 }
169 }
170
171 if !batch_texts.is_empty() {
172 flush_batch(
173 conn,
174 &mut embedder,
175 &batch_symbols,
176 &batch_texts,
177 &batch_staleness,
178 head_commit,
179 &config.model,
180 &mut stats,
181 vec_conn.as_ref(),
182 )
183 .await;
184 done += batch_symbols.len();
185 eprintln!("Embedded {done}/{total} symbols");
186 }
187
188 if is_full_rebuild {
189 eprintln!("Running VACUUM to reclaim space...");
190 store::vacuum(conn).await;
191 }
192
193 let elapsed = started.elapsed().as_secs_f64();
194 eprintln!("Embedding complete. {total} symbols in {elapsed:.1}s");
195 info!(
196 embedded = stats.symbols_embedded,
197 errors = stats.errors,
198 elapsed_secs = elapsed,
199 "Embedding population complete"
200 );
201
202 Ok(stats)
203}
204
205pub async fn populate_incremental_for_paths(
211 conn: &Connection,
212 config: &EmbeddingsConfig,
213 changed_paths: &[String],
214 head_commit: Option<&str>,
215 repo_root: Option<&Path>,
216 db_path: Option<&Path>,
217) -> anyhow::Result<PopulateStats> {
218 if changed_paths.is_empty() {
219 return Ok(PopulateStats::default());
220 }
221
222 let vec_conn: Option<VecConnection> = db_path.and_then(VecConnection::open);
223 store::ensure_schema(conn).await?;
224
225 let mut embedder = Embedder::load(&config.model, None)?;
226 store::ensure_vec_schema(conn, embedder.dimensions, vec_conn.as_ref()).await;
227
228 let mut stats = PopulateStats::default();
229
230 for path in changed_paths {
232 if let Err(e) = store::delete_embeddings_for_path(conn, path, vec_conn.as_ref()).await {
233 warn!(path, error = %e, "Failed to delete old embeddings for changed path");
234 }
235 }
236
237 let symbols = load_symbols(conn, Some(changed_paths)).await?;
239 if !symbols.is_empty() {
240 let co_change_map = load_co_change_map(conn).await?;
241 let all_callers = load_all_callers(conn).await;
242 let all_callees = load_all_callees(conn).await;
243 let all_docs = load_all_doc_comments(conn).await;
244
245 let file_paths: Vec<&str> = symbols.iter().map(|s| s.file.as_str()).collect();
246 let staleness_map: HashMap<String, f64> = if let Some(root) = repo_root {
247 compute_staleness_batch(root, &file_paths)
248 } else {
249 HashMap::new()
250 };
251
252 let mut batch_symbols: Vec<SymbolRow> = Vec::new();
253 let mut batch_texts: Vec<String> = Vec::new();
254 let mut batch_staleness: Vec<f64> = Vec::new();
255
256 for symbol in symbols {
257 let callers = all_callers.get(&symbol.name).cloned().unwrap_or_default();
258 let callees = all_callees
259 .get(&(symbol.name.clone(), symbol.file.clone()))
260 .cloned()
261 .unwrap_or_default();
262 let co_changes = co_change_map.get(&symbol.file).cloned().unwrap_or_default();
263 let doc = all_docs
264 .get(&(symbol.name.clone(), symbol.file.clone()))
265 .cloned();
266 let chunk_text =
267 build_symbol_chunk(&symbol, doc.as_deref(), &callers, &callees, &co_changes);
268 let staleness = *staleness_map.get(&symbol.file).unwrap_or(&0.0);
269
270 batch_symbols.push(symbol);
271 batch_texts.push(chunk_text);
272 batch_staleness.push(staleness);
273
274 if batch_texts.len() >= EMBED_BATCH_SIZE {
275 flush_batch(
276 conn,
277 &mut embedder,
278 &batch_symbols,
279 &batch_texts,
280 &batch_staleness,
281 head_commit,
282 &config.model,
283 &mut stats,
284 vec_conn.as_ref(),
285 )
286 .await;
287 batch_symbols.clear();
288 batch_texts.clear();
289 batch_staleness.clear();
290 }
291 }
292 if !batch_texts.is_empty() {
293 flush_batch(
294 conn,
295 &mut embedder,
296 &batch_symbols,
297 &batch_texts,
298 &batch_staleness,
299 head_commit,
300 &config.model,
301 &mut stats,
302 vec_conn.as_ref(),
303 )
304 .await;
305 }
306 }
307
308 if let Some(root) = repo_root {
312 for path in changed_paths {
313 if path.ends_with(".md") {
314 let abs_path = root.join(path);
315 if let Ok(content) = std::fs::read_to_string(&abs_path) {
316 let sections = split_markdown_sections(&content);
317 let mut md_texts: Vec<String> = Vec::new();
318 let mut md_ids: Vec<i64> = Vec::new();
319 for (i, (breadcrumb, body)) in sections.iter().enumerate() {
320 if body.trim().is_empty() {
321 continue;
322 }
323 md_texts.push(build_markdown_chunk(path, breadcrumb, body));
324 md_ids.push(i as i64);
325 }
326 if !md_texts.is_empty() {
327 let is_context = path.contains(".normalize/context/")
328 || path.contains(".normalize\\context\\");
329 if is_context {
330 flush_context_batch(
331 conn,
332 &mut embedder,
333 path,
334 &md_texts,
335 &md_ids,
336 head_commit,
337 &config.model,
338 &mut stats,
339 vec_conn.as_ref(),
340 )
341 .await;
342 } else {
343 flush_doc_batch(
344 conn,
345 &mut embedder,
346 path,
347 &md_texts,
348 &md_ids,
349 head_commit,
350 &config.model,
351 &mut stats,
352 vec_conn.as_ref(),
353 )
354 .await;
355 }
356 }
357 }
358 }
359 }
360 }
361
362 info!(
363 symbols = stats.symbols_embedded,
364 docs = stats.docs_embedded,
365 contexts = stats.contexts_embedded,
366 paths = changed_paths.len(),
367 "Incremental re-embedding complete"
368 );
369
370 Ok(stats)
371}
372
373pub async fn populate_markdown_docs(
382 conn: &Connection,
383 config: &EmbeddingsConfig,
384 repo_root: &Path,
385 head_commit: Option<&str>,
386 db_path: Option<&Path>,
387) -> anyhow::Result<PopulateStats> {
388 let vec_conn: Option<VecConnection> = db_path.and_then(VecConnection::open);
389 store::ensure_schema(conn).await?;
390
391 let mut embedder = Embedder::load(&config.model, None)?;
392 store::ensure_vec_schema(conn, embedder.dimensions, vec_conn.as_ref()).await;
393
394 let mut stats = PopulateStats::default();
395
396 let mut md_files: Vec<std::path::PathBuf> = Vec::new();
397
398 for name in &["SUMMARY.md", "CLAUDE.md", "README.md"] {
399 let p = repo_root.join(name);
400 if p.exists() {
401 md_files.push(p);
402 }
403 }
404
405 let docs_dir = repo_root.join("docs");
406 if docs_dir.is_dir() {
407 collect_md_files(&docs_dir, &mut md_files);
408 }
409
410 if md_files.is_empty() {
411 return Ok(stats);
412 }
413
414 eprintln!("Embedding {} markdown document(s)...", md_files.len());
415
416 for abs_path in &md_files {
417 let rel_path = abs_path
418 .strip_prefix(repo_root)
419 .unwrap_or(abs_path)
420 .to_string_lossy()
421 .into_owned();
422
423 if let Err(e) = store::delete_embeddings_for_path(conn, &rel_path, vec_conn.as_ref()).await
424 {
425 warn!(path = %rel_path, error = %e, "Failed to delete old doc embeddings");
426 }
427
428 let content = match std::fs::read_to_string(abs_path) {
429 Ok(c) => c,
430 Err(e) => {
431 warn!(path = %rel_path, error = %e, "Could not read markdown file");
432 stats.errors += 1;
433 continue;
434 }
435 };
436
437 let sections = split_markdown_sections(&content);
438 let mut texts: Vec<String> = Vec::new();
439 let mut section_ids: Vec<i64> = Vec::new();
440
441 for (i, (breadcrumb, body)) in sections.iter().enumerate() {
442 if body.trim().is_empty() {
443 continue;
444 }
445 texts.push(build_markdown_chunk(&rel_path, breadcrumb, body));
446 section_ids.push(i as i64);
447 }
448
449 if texts.is_empty() {
450 continue;
451 }
452
453 flush_doc_batch(
454 conn,
455 &mut embedder,
456 &rel_path,
457 &texts,
458 §ion_ids,
459 head_commit,
460 &config.model,
461 &mut stats,
462 vec_conn.as_ref(),
463 )
464 .await;
465 }
466
467 info!(
468 docs = stats.docs_embedded,
469 files = md_files.len(),
470 "Markdown doc embedding complete"
471 );
472
473 Ok(stats)
474}
475
476fn collect_md_files(dir: &Path, out: &mut Vec<std::path::PathBuf>) {
478 let Ok(entries) = std::fs::read_dir(dir) else {
479 return;
480 };
481 let mut entries: Vec<_> = entries.filter_map(|e| e.ok()).collect();
482 entries.sort_by_key(|e| e.file_name());
483 for entry in entries {
484 let path = entry.path();
485 if path.is_dir() {
486 collect_md_files(&path, out);
487 } else if path.extension().and_then(|e| e.to_str()) == Some("md") {
488 out.push(path);
489 }
490 }
491}
492
493pub async fn populate_context_blocks(
502 conn: &Connection,
503 config: &EmbeddingsConfig,
504 repo_root: &Path,
505 head_commit: Option<&str>,
506 db_path: Option<&Path>,
507) -> anyhow::Result<PopulateStats> {
508 let vec_conn: Option<VecConnection> = db_path.and_then(VecConnection::open);
509 store::ensure_schema(conn).await?;
510
511 let mut embedder = Embedder::load(&config.model, None)?;
512 store::ensure_vec_schema(conn, embedder.dimensions, vec_conn.as_ref()).await;
513
514 let mut stats = PopulateStats::default();
515
516 let context_dir = repo_root.join(".normalize").join("context");
517 if !context_dir.is_dir() {
518 return Ok(stats);
519 }
520
521 let mut md_files: Vec<std::path::PathBuf> = Vec::new();
522 collect_md_files(&context_dir, &mut md_files);
523
524 if md_files.is_empty() {
525 return Ok(stats);
526 }
527
528 eprintln!("Embedding {} context block(s)...", md_files.len());
529
530 for abs_path in &md_files {
531 let rel_path = abs_path
532 .strip_prefix(repo_root)
533 .unwrap_or(abs_path)
534 .to_string_lossy()
535 .into_owned();
536
537 if let Err(e) = store::delete_embeddings_for_path(conn, &rel_path, vec_conn.as_ref()).await
538 {
539 warn!(path = %rel_path, error = %e, "Failed to delete old context embeddings");
540 }
541
542 let content = match std::fs::read_to_string(abs_path) {
543 Ok(c) => c,
544 Err(e) => {
545 warn!(path = %rel_path, error = %e, "Could not read context file");
546 stats.errors += 1;
547 continue;
548 }
549 };
550
551 let sections = split_markdown_sections(&content);
552 let mut texts: Vec<String> = Vec::new();
553 let mut section_ids: Vec<i64> = Vec::new();
554
555 for (i, (breadcrumb, body)) in sections.iter().enumerate() {
556 if body.trim().is_empty() {
557 continue;
558 }
559 texts.push(build_markdown_chunk(&rel_path, breadcrumb, body));
560 section_ids.push(i as i64);
561 }
562
563 if texts.is_empty() {
564 continue;
565 }
566
567 flush_context_batch(
568 conn,
569 &mut embedder,
570 &rel_path,
571 &texts,
572 §ion_ids,
573 head_commit,
574 &config.model,
575 &mut stats,
576 vec_conn.as_ref(),
577 )
578 .await;
579 }
580
581 info!(
582 contexts = stats.contexts_embedded,
583 files = md_files.len(),
584 "Context block embedding complete"
585 );
586
587 Ok(stats)
588}
589
590pub async fn populate_commit_messages(
598 conn: &Connection,
599 config: &EmbeddingsConfig,
600 repo_root: &Path,
601 head_commit: Option<&str>,
602 db_path: Option<&Path>,
603 max_commits: usize,
604) -> anyhow::Result<PopulateStats> {
605 let vec_conn: Option<VecConnection> = db_path.and_then(VecConnection::open);
606 store::ensure_schema(conn).await?;
607
608 let mut embedder = Embedder::load(&config.model, None)?;
609 store::ensure_vec_schema(conn, embedder.dimensions, vec_conn.as_ref()).await;
610
611 let mut stats = PopulateStats::default();
612
613 let embedded_hashes = load_embedded_commit_hashes(conn, &config.model).await;
614
615 let commits = load_recent_commits(repo_root, max_commits);
616 if commits.is_empty() {
617 return Ok(stats);
618 }
619
620 let new_commits: Vec<CommitInfo> = commits
621 .into_iter()
622 .filter(|c| !embedded_hashes.contains(c.hash.as_str()))
623 .collect();
624
625 if new_commits.is_empty() {
626 return Ok(stats);
627 }
628
629 eprintln!("Embedding {} new commit message(s)...", new_commits.len());
630
631 let mut texts: Vec<String> = Vec::new();
632 let mut hashes: Vec<String> = Vec::new();
633
634 for commit in &new_commits {
635 let chunk = build_commit_chunk(&commit.hash, &commit.date, &commit.subject, &commit.body);
636 texts.push(chunk);
637 hashes.push(commit.hash.clone());
638 }
639
640 for (chunk_texts, chunk_hashes) in texts
641 .chunks(EMBED_BATCH_SIZE)
642 .zip(hashes.chunks(EMBED_BATCH_SIZE))
643 {
644 let text_refs: Vec<&str> = chunk_texts.iter().map(String::as_str).collect();
645 match embedder.embed_batch(&text_refs) {
646 Ok(vectors) => {
647 if let Err(e) = conn.execute("BEGIN", ()).await {
648 warn!(error = %e, "Failed to BEGIN transaction for commit batch");
649 }
650 for (i, (text, vec)) in chunk_texts.iter().zip(vectors.iter()).enumerate() {
651 let hash = &chunk_hashes[i];
652 let blob = encode_vector(vec);
653 match store::upsert_embedding(
654 conn,
655 "commit",
656 hash,
657 None,
658 &config.model,
659 head_commit,
660 0.0,
661 text,
662 &blob,
663 vec_conn.as_ref(),
664 )
665 .await
666 {
667 Ok(()) => stats.commits_embedded += 1,
668 Err(e) => {
669 warn!(hash, error = %e, "Failed to store commit embedding");
670 stats.errors += 1;
671 }
672 }
673 }
674 if let Err(e) = conn.execute("COMMIT", ()).await {
675 warn!(error = %e, "Failed to COMMIT transaction for commit batch");
676 }
677 }
678 Err(e) => {
679 warn!(error = %e, "Commit embedding batch failed");
680 stats.errors += chunk_texts.len();
681 }
682 }
683 }
684
685 info!(
686 commits = stats.commits_embedded,
687 "Commit message embedding complete"
688 );
689
690 Ok(stats)
691}
692
693struct CommitInfo {
695 hash: String,
696 date: String,
697 subject: String,
698 body: String,
699}
700
701fn load_recent_commits(root: &Path, max_commits: usize) -> Vec<CommitInfo> {
703 let repo = match gix::discover(root) {
704 Ok(r) => r.into_sync().to_thread_local(),
705 Err(_) => return Vec::new(),
706 };
707
708 let head_id = match repo.head_id() {
709 Ok(id) => id,
710 Err(_) => return Vec::new(),
711 };
712
713 let walk = match head_id
714 .ancestors()
715 .sorting(gix::revision::walk::Sorting::ByCommitTime(
716 gix::traverse::commit::simple::CommitTimeOrder::NewestFirst,
717 ))
718 .all()
719 {
720 Ok(w) => w,
721 Err(_) => return Vec::new(),
722 };
723
724 let mut commits = Vec::new();
725
726 for info in walk.take(max_commits) {
727 let Ok(info) = info else { continue };
728 let Ok(commit_obj) = info.object() else {
729 continue;
730 };
731 let Ok(commit) = commit_obj.decode() else {
732 continue;
733 };
734
735 let hash = info.id().to_string();
736 let short_hash = if hash.len() >= 12 {
737 hash[..12].to_string()
738 } else {
739 hash.clone()
740 };
741
742 let timestamp = commit.time().map(|t| t.seconds).unwrap_or(0);
744 let date = epoch_to_date(timestamp);
745
746 let full_message = commit.message.to_str_lossy().into_owned();
748 let msg_ref = commit.message();
749 let subject = msg_ref.summary().to_str_lossy().trim().to_string();
750 let body = full_message
751 .trim_start_matches(subject.as_str())
752 .trim()
753 .to_string();
754
755 if subject.is_empty() {
756 continue;
757 }
758
759 commits.push(CommitInfo {
760 hash: short_hash,
761 date,
762 subject,
763 body,
764 });
765 }
766
767 commits
768}
769
770fn epoch_to_date(secs: i64) -> String {
774 let days_since_epoch = secs.max(0) as u64 / 86400;
775
776 let z = days_since_epoch as i64 + 719_468;
778 let era = if z >= 0 { z } else { z - 146_096 } / 146_097;
779 let doe = z - era * 146_097;
780 let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146_096) / 365;
781 let y = yoe + era * 400;
782 let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
783 let mp = (5 * doy + 2) / 153;
784 let d = doy - (153 * mp + 2) / 5 + 1;
785 let m = if mp < 10 { mp + 3 } else { mp - 9 };
786 let y = if m <= 2 { y + 1 } else { y };
787
788 format!("{y:04}-{m:02}-{d:02}")
789}
790
791async fn load_embedded_commit_hashes(
793 conn: &Connection,
794 model: &str,
795) -> std::collections::HashSet<String> {
796 let mut set = std::collections::HashSet::new();
797 let Ok(mut rows) = conn
798 .query(
799 "SELECT source_path FROM embeddings WHERE source_type = 'commit' AND model = ?1",
800 [model],
801 )
802 .await
803 else {
804 return set;
805 };
806 while let Ok(Some(row)) = rows.next().await {
807 if let Ok(hash) = row.get::<String>(0) {
808 set.insert(hash);
809 }
810 }
811 set
812}
813
814#[allow(clippy::too_many_arguments)]
819async fn flush_doc_batch(
820 conn: &Connection,
821 embedder: &mut Embedder,
822 rel_path: &str,
823 texts: &[String],
824 section_ids: &[i64],
825 head_commit: Option<&str>,
826 model_name: &str,
827 stats: &mut PopulateStats,
828 vec_conn: Option<&VecConnection>,
829) {
830 let text_refs: Vec<&str> = texts.iter().map(String::as_str).collect();
831 match embedder.embed_batch(&text_refs) {
832 Ok(vectors) => {
833 if let Err(e) = conn.execute("BEGIN", ()).await {
834 warn!(error = %e, "Failed to BEGIN transaction for doc batch");
835 }
836 for (i, (text, vec)) in texts.iter().zip(vectors.iter()).enumerate() {
837 let blob = encode_vector(vec);
838 let sid = section_ids.get(i).copied().unwrap_or(i as i64);
839 match store::upsert_embedding(
840 conn,
841 "doc",
842 rel_path,
843 Some(sid),
844 model_name,
845 head_commit,
846 0.0,
847 text,
848 &blob,
849 vec_conn,
850 )
851 .await
852 {
853 Ok(()) => stats.docs_embedded += 1,
854 Err(e) => {
855 warn!(path = %rel_path, section = i, error = %e, "Failed to store doc embedding");
856 stats.errors += 1;
857 }
858 }
859 }
860 if let Err(e) = conn.execute("COMMIT", ()).await {
861 warn!(error = %e, "Failed to COMMIT transaction for doc batch");
862 }
863 }
864 Err(e) => {
865 warn!(error = %e, path = %rel_path, "Doc embedding batch failed");
866 stats.errors += texts.len();
867 }
868 }
869}
870
871#[allow(clippy::too_many_arguments)]
876async fn flush_context_batch(
877 conn: &Connection,
878 embedder: &mut Embedder,
879 rel_path: &str,
880 texts: &[String],
881 section_ids: &[i64],
882 head_commit: Option<&str>,
883 model_name: &str,
884 stats: &mut PopulateStats,
885 vec_conn: Option<&VecConnection>,
886) {
887 let text_refs: Vec<&str> = texts.iter().map(String::as_str).collect();
888 match embedder.embed_batch(&text_refs) {
889 Ok(vectors) => {
890 if let Err(e) = conn.execute("BEGIN", ()).await {
891 warn!(error = %e, "Failed to BEGIN transaction for context batch");
892 }
893 for (i, (text, vec)) in texts.iter().zip(vectors.iter()).enumerate() {
894 let blob = encode_vector(vec);
895 let sid = section_ids.get(i).copied().unwrap_or(i as i64);
896 match store::upsert_embedding(
897 conn,
898 "context",
899 rel_path,
900 Some(sid),
901 model_name,
902 head_commit,
903 0.0,
904 text,
905 &blob,
906 vec_conn,
907 )
908 .await
909 {
910 Ok(()) => stats.contexts_embedded += 1,
911 Err(e) => {
912 warn!(path = %rel_path, section = i, error = %e, "Failed to store context embedding");
913 stats.errors += 1;
914 }
915 }
916 }
917 if let Err(e) = conn.execute("COMMIT", ()).await {
918 warn!(error = %e, "Failed to COMMIT transaction for context batch");
919 }
920 }
921 Err(e) => {
922 warn!(error = %e, path = %rel_path, "Context embedding batch failed");
923 stats.errors += texts.len();
924 }
925 }
926}
927
928#[allow(clippy::too_many_arguments)]
930async fn flush_batch(
931 conn: &Connection,
932 embedder: &mut Embedder,
933 symbols: &[SymbolRow],
934 texts: &[String],
935 staleness: &[f64],
936 head_commit: Option<&str>,
937 model_name: &str,
938 stats: &mut PopulateStats,
939 vec_conn: Option<&VecConnection>,
940) {
941 let text_refs: Vec<&str> = texts.iter().map(String::as_str).collect();
942 match embedder.embed_batch(&text_refs) {
943 Ok(vectors) => {
944 if let Err(e) = conn.execute("BEGIN", ()).await {
945 warn!(error = %e, "Failed to BEGIN transaction for batch");
946 }
947 for (idx, (sym, (text, vec))) in symbols
948 .iter()
949 .zip(texts.iter().zip(vectors.iter()))
950 .enumerate()
951 {
952 let blob = encode_vector(vec);
953 let sym_staleness = staleness.get(idx).copied().unwrap_or(0.0) as f32;
954 match store::upsert_embedding(
955 conn,
956 "symbol",
957 &sym.file,
958 Some(sym.rowid),
959 model_name,
960 head_commit,
961 sym_staleness,
962 text,
963 &blob,
964 vec_conn,
965 )
966 .await
967 {
968 Ok(()) => stats.symbols_embedded += 1,
969 Err(e) => {
970 warn!(symbol = %sym.name, file = %sym.file, error = %e, "Failed to store embedding");
971 stats.errors += 1;
972 }
973 }
974 }
975 if let Err(e) = conn.execute("COMMIT", ()).await {
976 warn!(error = %e, "Failed to COMMIT transaction for batch");
977 }
978 }
979 Err(e) => {
980 warn!(error = %e, batch_size = symbols.len(), "Embedding batch failed");
981 stats.errors += symbols.len();
982 }
983 }
984}
985
986async fn load_symbols(
988 conn: &Connection,
989 changed_paths: Option<&[String]>,
990) -> anyhow::Result<Vec<SymbolRow>> {
991 let sql = "SELECT rowid, file, name, kind, start_line, end_line, parent FROM symbols";
992
993 let mut rows = conn.query(sql, ()).await?;
994 let mut symbols = Vec::new();
995
996 let path_set: Option<std::collections::HashSet<&str>> =
997 changed_paths.map(|paths| paths.iter().map(String::as_str).collect());
998
999 while let Some(row) = rows.next().await? {
1000 let file: String = row.get(1)?;
1001
1002 if path_set
1003 .as_ref()
1004 .is_some_and(|set| !set.contains(file.as_str()))
1005 {
1006 continue;
1007 }
1008
1009 symbols.push(SymbolRow {
1010 rowid: row.get(0)?,
1011 file,
1012 name: row.get(2)?,
1013 kind: row.get(3)?,
1014 start_line: row.get(4)?,
1015 end_line: row.get(5)?,
1016 parent: row.get(6)?,
1017 });
1018 }
1019
1020 Ok(symbols)
1021}
1022
1023async fn load_all_callers(conn: &Connection) -> HashMap<String, Vec<String>> {
1026 let mut map: HashMap<String, Vec<String>> = HashMap::new();
1027
1028 let Ok(mut rows) = conn
1029 .query(
1030 "SELECT callee_name, caller_symbol, COUNT(*) as cnt \
1031 FROM calls \
1032 GROUP BY callee_name, caller_symbol \
1033 ORDER BY callee_name, cnt DESC",
1034 (),
1035 )
1036 .await
1037 else {
1038 return map;
1039 };
1040
1041 while let Ok(Some(row)) = rows.next().await {
1042 let Ok(callee) = row.get::<String>(0) else {
1043 continue;
1044 };
1045 let Ok(caller) = row.get::<String>(1) else {
1046 continue;
1047 };
1048 let entry = map.entry(callee).or_default();
1049 if entry.len() < 10 {
1050 entry.push(caller);
1051 }
1052 }
1053
1054 map
1055}
1056
1057async fn load_all_callees(conn: &Connection) -> HashMap<(String, String), Vec<String>> {
1060 let mut map: HashMap<(String, String), Vec<String>> = HashMap::new();
1061
1062 let Ok(mut rows) = conn
1063 .query(
1064 "SELECT caller_symbol, caller_file, callee_name FROM calls ORDER BY caller_symbol, caller_file",
1065 (),
1066 )
1067 .await
1068 else {
1069 return map;
1070 };
1071
1072 while let Ok(Some(row)) = rows.next().await {
1073 let Ok(caller_sym) = row.get::<String>(0) else {
1074 continue;
1075 };
1076 let Ok(caller_file) = row.get::<String>(1) else {
1077 continue;
1078 };
1079 let Ok(callee) = row.get::<String>(2) else {
1080 continue;
1081 };
1082 let entry = map.entry((caller_sym, caller_file)).or_default();
1083 if entry.len() < 10 {
1084 entry.push(callee);
1085 }
1086 }
1087
1088 map
1089}
1090
1091async fn load_all_doc_comments(conn: &Connection) -> HashMap<(String, String), String> {
1094 let mut map: HashMap<(String, String), String> = HashMap::new();
1095
1096 let Ok(mut rows) = conn
1097 .query(
1098 "SELECT name, file, attribute FROM symbol_attributes WHERE attribute LIKE 'doc:%' ORDER BY name, file",
1099 (),
1100 )
1101 .await
1102 else {
1103 return map;
1104 };
1105
1106 while let Ok(Some(row)) = rows.next().await {
1107 let Ok(name) = row.get::<String>(0) else {
1108 continue;
1109 };
1110 let Ok(file) = row.get::<String>(1) else {
1111 continue;
1112 };
1113 let Ok(attr) = row.get::<String>(2) else {
1114 continue;
1115 };
1116 if let Some(doc) = attr.strip_prefix("doc:") {
1117 let entry = map.entry((name, file)).or_default();
1118 if !entry.is_empty() {
1119 entry.push('\n');
1120 }
1121 entry.push_str(doc);
1122 }
1123 }
1124
1125 map
1126}
1127
1128async fn load_co_change_map(
1131 conn: &Connection,
1132) -> anyhow::Result<std::collections::HashMap<String, Vec<String>>> {
1133 let mut map: std::collections::HashMap<String, Vec<String>> = std::collections::HashMap::new();
1134
1135 let mut rows = match conn
1136 .query(
1137 "SELECT file_a, file_b FROM co_change_edges ORDER BY count DESC",
1138 (),
1139 )
1140 .await
1141 {
1142 Ok(rows) => rows,
1143 Err(_) => {
1144 return Ok(map);
1145 }
1146 };
1147
1148 while let Some(row) = rows.next().await? {
1149 let a: String = row.get(0)?;
1150 let b: String = row.get(1)?;
1151 map.entry(a.clone()).or_default().push(b.clone());
1152 map.entry(b).or_default().push(a);
1153 }
1154
1155 Ok(map)
1156}