Skip to main content

normalize_semantic/
populate.rs

1//! Embedding population: walk symbols from the structural index and embed them.
2//!
3//! Called after `structure rebuild` when `embeddings.enabled = true`.
4//! For incremental rebuilds, only symbols in changed files are re-embedded.
5//!
6//! Additional source types:
7//! - [`populate_markdown_docs`] -- embed `*.md` files under `docs/`, `SUMMARY.md`,
8//!   `CLAUDE.md`, and `README.md` chunked by heading section.
9//! - [`populate_commit_messages`] -- embed recent git commit messages (last N commits)
10//!   keyed by commit hash.
11//! - [`populate_incremental_for_paths`] -- incremental re-embed for a set of changed
12//!   paths; called by the daemon on file change without a full rebuild.
13
14use crate::chunks::{
15    SymbolRow, build_commit_chunk, build_markdown_chunk, build_symbol_chunk,
16    split_markdown_sections,
17};
18use crate::config::EmbeddingsConfig;
19use crate::embedder::{Embedder, encode_vector};
20use crate::git_staleness::compute_staleness_batch;
21use crate::store;
22use crate::vec_ext::VecConnection;
23use gix::bstr::ByteSlice as _;
24use libsql::Connection;
25use std::collections::HashMap;
26use std::path::Path;
27use std::time::Instant;
28use tracing::{info, warn};
29
30/// Result of a population run.
31#[derive(Debug, Default)]
32pub struct PopulateStats {
33    pub symbols_embedded: usize,
34    pub symbols_skipped: usize,
35    pub docs_embedded: usize,
36    pub commits_embedded: usize,
37    pub contexts_embedded: usize,
38    pub errors: usize,
39}
40
41/// Batch size for embedding calls (fastembed handles batching internally but
42/// we chunk to keep peak memory bounded).
43const EMBED_BATCH_SIZE: usize = 64;
44
45/// Default number of recent commits to embed.
46pub const DEFAULT_MAX_COMMITS: usize = 500;
47
48/// Populate embeddings for all symbols in the index.
49///
50/// If `incremental` is true and `changed_paths` is provided, only symbols from
51/// those files are re-embedded. Otherwise all symbols are re-embedded.
52///
53/// `db_path` is the path to the SQLite database file, used to open a parallel
54/// raw connection with sqlite-vec registered for ANN virtual table operations.
55pub async fn populate_embeddings(
56    conn: &Connection,
57    config: &EmbeddingsConfig,
58    changed_paths: Option<&[String]>,
59    head_commit: Option<&str>,
60    repo_root: Option<&std::path::Path>,
61    db_path: Option<&Path>,
62) -> anyhow::Result<PopulateStats> {
63    let started = Instant::now();
64    let is_full_rebuild = changed_paths.is_none();
65
66    // Open a parallel raw connection with sqlite-vec registered for ANN
67    // virtual table operations (CREATE VIRTUAL TABLE, INSERT, DELETE).
68    let vec_conn: Option<VecConnection> = db_path.and_then(VecConnection::open);
69
70    // For a full rebuild, drop and recreate tables instead of per-row deletes.
71    // This avoids leaving massive dead pages in SQLite.
72    if is_full_rebuild {
73        store::drop_embedding_tables(conn, vec_conn.as_ref()).await?;
74    }
75
76    store::ensure_schema(conn).await?;
77
78    eprintln!("Loading embedding model {}...", config.model);
79    let mut embedder = Embedder::load(&config.model, None)?;
80    info!(model = %config.model, dims = embedder.dimensions, "Embedding model loaded");
81    eprintln!(
82        "Loaded model {} ({} dimensions)",
83        config.model, embedder.dimensions
84    );
85
86    // Create the ANN virtual table now that we know the model's dimension count.
87    store::ensure_vec_schema(conn, embedder.dimensions, vec_conn.as_ref()).await;
88
89    let mut stats = PopulateStats::default();
90
91    // Load all symbols
92    let symbols = load_symbols(conn, changed_paths).await?;
93
94    if symbols.is_empty() {
95        eprintln!("No symbols to embed.");
96        return Ok(stats);
97    }
98
99    let total = symbols.len();
100    eprintln!("Embedding {total} symbols...");
101
102    // If incremental, delete old embeddings for changed paths first
103    if let Some(paths) = changed_paths {
104        for path in paths {
105            store::delete_embeddings_for_path(conn, path, vec_conn.as_ref()).await?;
106        }
107    }
108
109    // Build context maps with single bulk queries instead of per-symbol queries.
110    let co_change_map = load_co_change_map(conn).await?;
111    eprintln!("Loading callers...");
112    let all_callers = load_all_callers(conn).await;
113    eprintln!("Loading callees...");
114    let all_callees = load_all_callees(conn).await;
115    eprintln!("Loading doc comments...");
116    let all_docs = load_all_doc_comments(conn).await;
117
118    // Compute staleness per unique file path
119    let file_paths: Vec<&str> = symbols.iter().map(|s| s.file.as_str()).collect();
120    let staleness_map: HashMap<String, f64> = if let Some(root) = repo_root {
121        compute_staleness_batch(root, &file_paths)
122    } else {
123        HashMap::new()
124    };
125
126    // Process in batches
127    let mut batch_symbols: Vec<SymbolRow> = Vec::new();
128    let mut batch_texts: Vec<String> = Vec::new();
129    let mut batch_staleness: Vec<f64> = Vec::new();
130    let mut done = 0usize;
131
132    for symbol in symbols {
133        let callers = all_callers.get(&symbol.name).cloned().unwrap_or_default();
134        let callees = all_callees
135            .get(&(symbol.name.clone(), symbol.file.clone()))
136            .cloned()
137            .unwrap_or_default();
138        let co_changes = co_change_map.get(&symbol.file).cloned().unwrap_or_default();
139        let doc = all_docs
140            .get(&(symbol.name.clone(), symbol.file.clone()))
141            .cloned();
142        let chunk_text =
143            build_symbol_chunk(&symbol, doc.as_deref(), &callers, &callees, &co_changes);
144        let staleness = *staleness_map.get(&symbol.file).unwrap_or(&0.0);
145
146        batch_symbols.push(symbol);
147        batch_texts.push(chunk_text);
148        batch_staleness.push(staleness);
149
150        if batch_texts.len() >= EMBED_BATCH_SIZE {
151            flush_batch(
152                conn,
153                &mut embedder,
154                &batch_symbols,
155                &batch_texts,
156                &batch_staleness,
157                head_commit,
158                &config.model,
159                &mut stats,
160                vec_conn.as_ref(),
161            )
162            .await;
163            done += batch_symbols.len();
164            eprintln!("Embedded {done}/{total} symbols");
165            batch_symbols.clear();
166            batch_texts.clear();
167            batch_staleness.clear();
168        }
169    }
170
171    if !batch_texts.is_empty() {
172        flush_batch(
173            conn,
174            &mut embedder,
175            &batch_symbols,
176            &batch_texts,
177            &batch_staleness,
178            head_commit,
179            &config.model,
180            &mut stats,
181            vec_conn.as_ref(),
182        )
183        .await;
184        done += batch_symbols.len();
185        eprintln!("Embedded {done}/{total} symbols");
186    }
187
188    if is_full_rebuild {
189        eprintln!("Running VACUUM to reclaim space...");
190        store::vacuum(conn).await;
191    }
192
193    let elapsed = started.elapsed().as_secs_f64();
194    eprintln!("Embedding complete. {total} symbols in {elapsed:.1}s");
195    info!(
196        embedded = stats.symbols_embedded,
197        errors = stats.errors,
198        elapsed_secs = elapsed,
199        "Embedding population complete"
200    );
201
202    Ok(stats)
203}
204
205/// Incremental re-embedding for a set of changed file paths.
206///
207/// Called by the daemon after `incremental_refresh` when `embeddings.enabled` is true.
208/// Deletes old embeddings for each changed path, then re-embeds symbols and markdown
209/// docs in those files. Lightweight -- no VACUUM, no full table drop.
210pub async fn populate_incremental_for_paths(
211    conn: &Connection,
212    config: &EmbeddingsConfig,
213    changed_paths: &[String],
214    head_commit: Option<&str>,
215    repo_root: Option<&Path>,
216    db_path: Option<&Path>,
217) -> anyhow::Result<PopulateStats> {
218    if changed_paths.is_empty() {
219        return Ok(PopulateStats::default());
220    }
221
222    let vec_conn: Option<VecConnection> = db_path.and_then(VecConnection::open);
223    store::ensure_schema(conn).await?;
224
225    let mut embedder = Embedder::load(&config.model, None)?;
226    store::ensure_vec_schema(conn, embedder.dimensions, vec_conn.as_ref()).await;
227
228    let mut stats = PopulateStats::default();
229
230    // Delete old embeddings for changed paths (all source types)
231    for path in changed_paths {
232        if let Err(e) = store::delete_embeddings_for_path(conn, path, vec_conn.as_ref()).await {
233            warn!(path, error = %e, "Failed to delete old embeddings for changed path");
234        }
235    }
236
237    // Re-embed symbols in changed source files
238    let symbols = load_symbols(conn, Some(changed_paths)).await?;
239    if !symbols.is_empty() {
240        let co_change_map = load_co_change_map(conn).await?;
241        let all_callers = load_all_callers(conn).await;
242        let all_callees = load_all_callees(conn).await;
243        let all_docs = load_all_doc_comments(conn).await;
244
245        let file_paths: Vec<&str> = symbols.iter().map(|s| s.file.as_str()).collect();
246        let staleness_map: HashMap<String, f64> = if let Some(root) = repo_root {
247            compute_staleness_batch(root, &file_paths)
248        } else {
249            HashMap::new()
250        };
251
252        let mut batch_symbols: Vec<SymbolRow> = Vec::new();
253        let mut batch_texts: Vec<String> = Vec::new();
254        let mut batch_staleness: Vec<f64> = Vec::new();
255
256        for symbol in symbols {
257            let callers = all_callers.get(&symbol.name).cloned().unwrap_or_default();
258            let callees = all_callees
259                .get(&(symbol.name.clone(), symbol.file.clone()))
260                .cloned()
261                .unwrap_or_default();
262            let co_changes = co_change_map.get(&symbol.file).cloned().unwrap_or_default();
263            let doc = all_docs
264                .get(&(symbol.name.clone(), symbol.file.clone()))
265                .cloned();
266            let chunk_text =
267                build_symbol_chunk(&symbol, doc.as_deref(), &callers, &callees, &co_changes);
268            let staleness = *staleness_map.get(&symbol.file).unwrap_or(&0.0);
269
270            batch_symbols.push(symbol);
271            batch_texts.push(chunk_text);
272            batch_staleness.push(staleness);
273
274            if batch_texts.len() >= EMBED_BATCH_SIZE {
275                flush_batch(
276                    conn,
277                    &mut embedder,
278                    &batch_symbols,
279                    &batch_texts,
280                    &batch_staleness,
281                    head_commit,
282                    &config.model,
283                    &mut stats,
284                    vec_conn.as_ref(),
285                )
286                .await;
287                batch_symbols.clear();
288                batch_texts.clear();
289                batch_staleness.clear();
290            }
291        }
292        if !batch_texts.is_empty() {
293            flush_batch(
294                conn,
295                &mut embedder,
296                &batch_symbols,
297                &batch_texts,
298                &batch_staleness,
299                head_commit,
300                &config.model,
301                &mut stats,
302                vec_conn.as_ref(),
303            )
304            .await;
305        }
306    }
307
308    // Re-embed any changed markdown files.
309    // Files under .normalize/context/ are stored as source_type='context';
310    // all other *.md files are stored as source_type='doc'.
311    if let Some(root) = repo_root {
312        for path in changed_paths {
313            if path.ends_with(".md") {
314                let abs_path = root.join(path);
315                if let Ok(content) = std::fs::read_to_string(&abs_path) {
316                    let sections = split_markdown_sections(&content);
317                    let mut md_texts: Vec<String> = Vec::new();
318                    let mut md_ids: Vec<i64> = Vec::new();
319                    for (i, (breadcrumb, body)) in sections.iter().enumerate() {
320                        if body.trim().is_empty() {
321                            continue;
322                        }
323                        md_texts.push(build_markdown_chunk(path, breadcrumb, body));
324                        md_ids.push(i as i64);
325                    }
326                    if !md_texts.is_empty() {
327                        let is_context = path.contains(".normalize/context/")
328                            || path.contains(".normalize\\context\\");
329                        if is_context {
330                            flush_context_batch(
331                                conn,
332                                &mut embedder,
333                                path,
334                                &md_texts,
335                                &md_ids,
336                                head_commit,
337                                &config.model,
338                                &mut stats,
339                                vec_conn.as_ref(),
340                            )
341                            .await;
342                        } else {
343                            flush_doc_batch(
344                                conn,
345                                &mut embedder,
346                                path,
347                                &md_texts,
348                                &md_ids,
349                                head_commit,
350                                &config.model,
351                                &mut stats,
352                                vec_conn.as_ref(),
353                            )
354                            .await;
355                        }
356                    }
357                }
358            }
359        }
360    }
361
362    info!(
363        symbols = stats.symbols_embedded,
364        docs = stats.docs_embedded,
365        contexts = stats.contexts_embedded,
366        paths = changed_paths.len(),
367        "Incremental re-embedding complete"
368    );
369
370    Ok(stats)
371}
372
373/// Embed all eligible markdown files in the workspace.
374///
375/// Eligible files:
376/// - `SUMMARY.md`, `CLAUDE.md`, `README.md` in the repo root
377/// - Any `*.md` under `docs/`
378///
379/// Each file is chunked by heading sections. Existing `doc` embeddings for the
380/// same paths are replaced via the UNIQUE constraint on `(source_type, source_path, source_id)`.
381pub async fn populate_markdown_docs(
382    conn: &Connection,
383    config: &EmbeddingsConfig,
384    repo_root: &Path,
385    head_commit: Option<&str>,
386    db_path: Option<&Path>,
387) -> anyhow::Result<PopulateStats> {
388    let vec_conn: Option<VecConnection> = db_path.and_then(VecConnection::open);
389    store::ensure_schema(conn).await?;
390
391    let mut embedder = Embedder::load(&config.model, None)?;
392    store::ensure_vec_schema(conn, embedder.dimensions, vec_conn.as_ref()).await;
393
394    let mut stats = PopulateStats::default();
395
396    let mut md_files: Vec<std::path::PathBuf> = Vec::new();
397
398    for name in &["SUMMARY.md", "CLAUDE.md", "README.md"] {
399        let p = repo_root.join(name);
400        if p.exists() {
401            md_files.push(p);
402        }
403    }
404
405    let docs_dir = repo_root.join("docs");
406    if docs_dir.is_dir() {
407        collect_md_files(&docs_dir, &mut md_files);
408    }
409
410    if md_files.is_empty() {
411        return Ok(stats);
412    }
413
414    eprintln!("Embedding {} markdown document(s)...", md_files.len());
415
416    for abs_path in &md_files {
417        let rel_path = abs_path
418            .strip_prefix(repo_root)
419            .unwrap_or(abs_path)
420            .to_string_lossy()
421            .into_owned();
422
423        if let Err(e) = store::delete_embeddings_for_path(conn, &rel_path, vec_conn.as_ref()).await
424        {
425            warn!(path = %rel_path, error = %e, "Failed to delete old doc embeddings");
426        }
427
428        let content = match std::fs::read_to_string(abs_path) {
429            Ok(c) => c,
430            Err(e) => {
431                warn!(path = %rel_path, error = %e, "Could not read markdown file");
432                stats.errors += 1;
433                continue;
434            }
435        };
436
437        let sections = split_markdown_sections(&content);
438        let mut texts: Vec<String> = Vec::new();
439        let mut section_ids: Vec<i64> = Vec::new();
440
441        for (i, (breadcrumb, body)) in sections.iter().enumerate() {
442            if body.trim().is_empty() {
443                continue;
444            }
445            texts.push(build_markdown_chunk(&rel_path, breadcrumb, body));
446            section_ids.push(i as i64);
447        }
448
449        if texts.is_empty() {
450            continue;
451        }
452
453        flush_doc_batch(
454            conn,
455            &mut embedder,
456            &rel_path,
457            &texts,
458            &section_ids,
459            head_commit,
460            &config.model,
461            &mut stats,
462            vec_conn.as_ref(),
463        )
464        .await;
465    }
466
467    info!(
468        docs = stats.docs_embedded,
469        files = md_files.len(),
470        "Markdown doc embedding complete"
471    );
472
473    Ok(stats)
474}
475
476/// Recursively collect `*.md` files under a directory (sorted for determinism).
477fn collect_md_files(dir: &Path, out: &mut Vec<std::path::PathBuf>) {
478    let Ok(entries) = std::fs::read_dir(dir) else {
479        return;
480    };
481    let mut entries: Vec<_> = entries.filter_map(|e| e.ok()).collect();
482    entries.sort_by_key(|e| e.file_name());
483    for entry in entries {
484        let path = entry.path();
485        if path.is_dir() {
486            collect_md_files(&path, out);
487        } else if path.extension().and_then(|e| e.to_str()) == Some("md") {
488            out.push(path);
489        }
490    }
491}
492
493/// Embed all `.normalize/context/` markdown files in the workspace.
494///
495/// Walks `<repo_root>/.normalize/context/` recursively. Each `*.md` file is
496/// chunked by heading section and stored with `source_type = "context"` so
497/// `normalize context --semantic` can filter to context blocks exclusively.
498///
499/// Existing `context` embeddings for the same paths are replaced via the UNIQUE
500/// constraint on `(source_type, source_path, source_id)`.
501pub async fn populate_context_blocks(
502    conn: &Connection,
503    config: &EmbeddingsConfig,
504    repo_root: &Path,
505    head_commit: Option<&str>,
506    db_path: Option<&Path>,
507) -> anyhow::Result<PopulateStats> {
508    let vec_conn: Option<VecConnection> = db_path.and_then(VecConnection::open);
509    store::ensure_schema(conn).await?;
510
511    let mut embedder = Embedder::load(&config.model, None)?;
512    store::ensure_vec_schema(conn, embedder.dimensions, vec_conn.as_ref()).await;
513
514    let mut stats = PopulateStats::default();
515
516    let context_dir = repo_root.join(".normalize").join("context");
517    if !context_dir.is_dir() {
518        return Ok(stats);
519    }
520
521    let mut md_files: Vec<std::path::PathBuf> = Vec::new();
522    collect_md_files(&context_dir, &mut md_files);
523
524    if md_files.is_empty() {
525        return Ok(stats);
526    }
527
528    eprintln!("Embedding {} context block(s)...", md_files.len());
529
530    for abs_path in &md_files {
531        let rel_path = abs_path
532            .strip_prefix(repo_root)
533            .unwrap_or(abs_path)
534            .to_string_lossy()
535            .into_owned();
536
537        if let Err(e) = store::delete_embeddings_for_path(conn, &rel_path, vec_conn.as_ref()).await
538        {
539            warn!(path = %rel_path, error = %e, "Failed to delete old context embeddings");
540        }
541
542        let content = match std::fs::read_to_string(abs_path) {
543            Ok(c) => c,
544            Err(e) => {
545                warn!(path = %rel_path, error = %e, "Could not read context file");
546                stats.errors += 1;
547                continue;
548            }
549        };
550
551        let sections = split_markdown_sections(&content);
552        let mut texts: Vec<String> = Vec::new();
553        let mut section_ids: Vec<i64> = Vec::new();
554
555        for (i, (breadcrumb, body)) in sections.iter().enumerate() {
556            if body.trim().is_empty() {
557                continue;
558            }
559            texts.push(build_markdown_chunk(&rel_path, breadcrumb, body));
560            section_ids.push(i as i64);
561        }
562
563        if texts.is_empty() {
564            continue;
565        }
566
567        flush_context_batch(
568            conn,
569            &mut embedder,
570            &rel_path,
571            &texts,
572            &section_ids,
573            head_commit,
574            &config.model,
575            &mut stats,
576            vec_conn.as_ref(),
577        )
578        .await;
579    }
580
581    info!(
582        contexts = stats.contexts_embedded,
583        files = md_files.len(),
584        "Context block embedding complete"
585    );
586
587    Ok(stats)
588}
589
590/// Embed recent git commit messages.
591///
592/// Walks up to `max_commits` commits from HEAD (using `gix`) and embeds each
593/// commit's subject + body as a `commit` source chunk, keyed by the short hash.
594///
595/// Commits already present in the embeddings table (same hash) are skipped so
596/// repeated runs only embed new commits.
597pub async fn populate_commit_messages(
598    conn: &Connection,
599    config: &EmbeddingsConfig,
600    repo_root: &Path,
601    head_commit: Option<&str>,
602    db_path: Option<&Path>,
603    max_commits: usize,
604) -> anyhow::Result<PopulateStats> {
605    let vec_conn: Option<VecConnection> = db_path.and_then(VecConnection::open);
606    store::ensure_schema(conn).await?;
607
608    let mut embedder = Embedder::load(&config.model, None)?;
609    store::ensure_vec_schema(conn, embedder.dimensions, vec_conn.as_ref()).await;
610
611    let mut stats = PopulateStats::default();
612
613    let embedded_hashes = load_embedded_commit_hashes(conn, &config.model).await;
614
615    let commits = load_recent_commits(repo_root, max_commits);
616    if commits.is_empty() {
617        return Ok(stats);
618    }
619
620    let new_commits: Vec<CommitInfo> = commits
621        .into_iter()
622        .filter(|c| !embedded_hashes.contains(c.hash.as_str()))
623        .collect();
624
625    if new_commits.is_empty() {
626        return Ok(stats);
627    }
628
629    eprintln!("Embedding {} new commit message(s)...", new_commits.len());
630
631    let mut texts: Vec<String> = Vec::new();
632    let mut hashes: Vec<String> = Vec::new();
633
634    for commit in &new_commits {
635        let chunk = build_commit_chunk(&commit.hash, &commit.date, &commit.subject, &commit.body);
636        texts.push(chunk);
637        hashes.push(commit.hash.clone());
638    }
639
640    for (chunk_texts, chunk_hashes) in texts
641        .chunks(EMBED_BATCH_SIZE)
642        .zip(hashes.chunks(EMBED_BATCH_SIZE))
643    {
644        let text_refs: Vec<&str> = chunk_texts.iter().map(String::as_str).collect();
645        match embedder.embed_batch(&text_refs) {
646            Ok(vectors) => {
647                if let Err(e) = conn.execute("BEGIN", ()).await {
648                    warn!(error = %e, "Failed to BEGIN transaction for commit batch");
649                }
650                for (i, (text, vec)) in chunk_texts.iter().zip(vectors.iter()).enumerate() {
651                    let hash = &chunk_hashes[i];
652                    let blob = encode_vector(vec);
653                    match store::upsert_embedding(
654                        conn,
655                        "commit",
656                        hash,
657                        None,
658                        &config.model,
659                        head_commit,
660                        0.0,
661                        text,
662                        &blob,
663                        vec_conn.as_ref(),
664                    )
665                    .await
666                    {
667                        Ok(()) => stats.commits_embedded += 1,
668                        Err(e) => {
669                            warn!(hash, error = %e, "Failed to store commit embedding");
670                            stats.errors += 1;
671                        }
672                    }
673                }
674                if let Err(e) = conn.execute("COMMIT", ()).await {
675                    warn!(error = %e, "Failed to COMMIT transaction for commit batch");
676                }
677            }
678            Err(e) => {
679                warn!(error = %e, "Commit embedding batch failed");
680                stats.errors += chunk_texts.len();
681            }
682        }
683    }
684
685    info!(
686        commits = stats.commits_embedded,
687        "Commit message embedding complete"
688    );
689
690    Ok(stats)
691}
692
693/// A parsed commit entry from the git history.
694struct CommitInfo {
695    hash: String,
696    date: String,
697    subject: String,
698    body: String,
699}
700
701/// Walk up to `max_commits` commits from HEAD using `gix` and return their info.
702fn load_recent_commits(root: &Path, max_commits: usize) -> Vec<CommitInfo> {
703    let repo = match gix::discover(root) {
704        Ok(r) => r.into_sync().to_thread_local(),
705        Err(_) => return Vec::new(),
706    };
707
708    let head_id = match repo.head_id() {
709        Ok(id) => id,
710        Err(_) => return Vec::new(),
711    };
712
713    let walk = match head_id
714        .ancestors()
715        .sorting(gix::revision::walk::Sorting::ByCommitTime(
716            gix::traverse::commit::simple::CommitTimeOrder::NewestFirst,
717        ))
718        .all()
719    {
720        Ok(w) => w,
721        Err(_) => return Vec::new(),
722    };
723
724    let mut commits = Vec::new();
725
726    for info in walk.take(max_commits) {
727        let Ok(info) = info else { continue };
728        let Ok(commit_obj) = info.object() else {
729            continue;
730        };
731        let Ok(commit) = commit_obj.decode() else {
732            continue;
733        };
734
735        let hash = info.id().to_string();
736        let short_hash = if hash.len() >= 12 {
737            hash[..12].to_string()
738        } else {
739            hash.clone()
740        };
741
742        // commit.time() returns Result<Time, Error>; fall back to 0 on decode error.
743        let timestamp = commit.time().map(|t| t.seconds).unwrap_or(0);
744        let date = epoch_to_date(timestamp);
745
746        // commit.message is &BStr; ByteSlice::to_str_lossy for UTF-8 conversion.
747        let full_message = commit.message.to_str_lossy().into_owned();
748        let msg_ref = commit.message();
749        let subject = msg_ref.summary().to_str_lossy().trim().to_string();
750        let body = full_message
751            .trim_start_matches(subject.as_str())
752            .trim()
753            .to_string();
754
755        if subject.is_empty() {
756            continue;
757        }
758
759        commits.push(CommitInfo {
760            hash: short_hash,
761            date,
762            subject,
763            body,
764        });
765    }
766
767    commits
768}
769
770/// Convert Unix epoch seconds to a `YYYY-MM-DD` string (UTC, approximate).
771///
772/// Uses pure arithmetic -- no chrono dependency.
773fn epoch_to_date(secs: i64) -> String {
774    let days_since_epoch = secs.max(0) as u64 / 86400;
775
776    // Civil calendar from Howard Hinnant's date algorithms
777    let z = days_since_epoch as i64 + 719_468;
778    let era = if z >= 0 { z } else { z - 146_096 } / 146_097;
779    let doe = z - era * 146_097;
780    let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146_096) / 365;
781    let y = yoe + era * 400;
782    let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
783    let mp = (5 * doy + 2) / 153;
784    let d = doy - (153 * mp + 2) / 5 + 1;
785    let m = if mp < 10 { mp + 3 } else { mp - 9 };
786    let y = if m <= 2 { y + 1 } else { y };
787
788    format!("{y:04}-{m:02}-{d:02}")
789}
790
791/// Load the set of commit hashes already present in the embeddings table.
792async fn load_embedded_commit_hashes(
793    conn: &Connection,
794    model: &str,
795) -> std::collections::HashSet<String> {
796    let mut set = std::collections::HashSet::new();
797    let Ok(mut rows) = conn
798        .query(
799            "SELECT source_path FROM embeddings WHERE source_type = 'commit' AND model = ?1",
800            [model],
801        )
802        .await
803    else {
804        return set;
805    };
806    while let Ok(Some(row)) = rows.next().await {
807        if let Ok(hash) = row.get::<String>(0) {
808            set.insert(hash);
809        }
810    }
811    set
812}
813
814/// Flush a batch of doc (markdown) chunks through the embedder and store.
815///
816/// `section_ids` are used as `source_id` so each heading section gets its own
817/// unique slot under the UNIQUE constraint `(source_type='doc', source_path, source_id=i)`.
818#[allow(clippy::too_many_arguments)]
819async fn flush_doc_batch(
820    conn: &Connection,
821    embedder: &mut Embedder,
822    rel_path: &str,
823    texts: &[String],
824    section_ids: &[i64],
825    head_commit: Option<&str>,
826    model_name: &str,
827    stats: &mut PopulateStats,
828    vec_conn: Option<&VecConnection>,
829) {
830    let text_refs: Vec<&str> = texts.iter().map(String::as_str).collect();
831    match embedder.embed_batch(&text_refs) {
832        Ok(vectors) => {
833            if let Err(e) = conn.execute("BEGIN", ()).await {
834                warn!(error = %e, "Failed to BEGIN transaction for doc batch");
835            }
836            for (i, (text, vec)) in texts.iter().zip(vectors.iter()).enumerate() {
837                let blob = encode_vector(vec);
838                let sid = section_ids.get(i).copied().unwrap_or(i as i64);
839                match store::upsert_embedding(
840                    conn,
841                    "doc",
842                    rel_path,
843                    Some(sid),
844                    model_name,
845                    head_commit,
846                    0.0,
847                    text,
848                    &blob,
849                    vec_conn,
850                )
851                .await
852                {
853                    Ok(()) => stats.docs_embedded += 1,
854                    Err(e) => {
855                        warn!(path = %rel_path, section = i, error = %e, "Failed to store doc embedding");
856                        stats.errors += 1;
857                    }
858                }
859            }
860            if let Err(e) = conn.execute("COMMIT", ()).await {
861                warn!(error = %e, "Failed to COMMIT transaction for doc batch");
862            }
863        }
864        Err(e) => {
865            warn!(error = %e, path = %rel_path, "Doc embedding batch failed");
866            stats.errors += texts.len();
867        }
868    }
869}
870
871/// Flush a batch of context block chunks through the embedder and store.
872///
873/// Identical to [`flush_doc_batch`] but uses `source_type = "context"` so context
874/// blocks are stored separately and can be filtered in semantic search.
875#[allow(clippy::too_many_arguments)]
876async fn flush_context_batch(
877    conn: &Connection,
878    embedder: &mut Embedder,
879    rel_path: &str,
880    texts: &[String],
881    section_ids: &[i64],
882    head_commit: Option<&str>,
883    model_name: &str,
884    stats: &mut PopulateStats,
885    vec_conn: Option<&VecConnection>,
886) {
887    let text_refs: Vec<&str> = texts.iter().map(String::as_str).collect();
888    match embedder.embed_batch(&text_refs) {
889        Ok(vectors) => {
890            if let Err(e) = conn.execute("BEGIN", ()).await {
891                warn!(error = %e, "Failed to BEGIN transaction for context batch");
892            }
893            for (i, (text, vec)) in texts.iter().zip(vectors.iter()).enumerate() {
894                let blob = encode_vector(vec);
895                let sid = section_ids.get(i).copied().unwrap_or(i as i64);
896                match store::upsert_embedding(
897                    conn,
898                    "context",
899                    rel_path,
900                    Some(sid),
901                    model_name,
902                    head_commit,
903                    0.0,
904                    text,
905                    &blob,
906                    vec_conn,
907                )
908                .await
909                {
910                    Ok(()) => stats.contexts_embedded += 1,
911                    Err(e) => {
912                        warn!(path = %rel_path, section = i, error = %e, "Failed to store context embedding");
913                        stats.errors += 1;
914                    }
915                }
916            }
917            if let Err(e) = conn.execute("COMMIT", ()).await {
918                warn!(error = %e, "Failed to COMMIT transaction for context batch");
919            }
920        }
921        Err(e) => {
922            warn!(error = %e, path = %rel_path, "Context embedding batch failed");
923            stats.errors += texts.len();
924        }
925    }
926}
927
928/// Flush one batch of symbols through the embedder and store.
929#[allow(clippy::too_many_arguments)]
930async fn flush_batch(
931    conn: &Connection,
932    embedder: &mut Embedder,
933    symbols: &[SymbolRow],
934    texts: &[String],
935    staleness: &[f64],
936    head_commit: Option<&str>,
937    model_name: &str,
938    stats: &mut PopulateStats,
939    vec_conn: Option<&VecConnection>,
940) {
941    let text_refs: Vec<&str> = texts.iter().map(String::as_str).collect();
942    match embedder.embed_batch(&text_refs) {
943        Ok(vectors) => {
944            if let Err(e) = conn.execute("BEGIN", ()).await {
945                warn!(error = %e, "Failed to BEGIN transaction for batch");
946            }
947            for (idx, (sym, (text, vec))) in symbols
948                .iter()
949                .zip(texts.iter().zip(vectors.iter()))
950                .enumerate()
951            {
952                let blob = encode_vector(vec);
953                let sym_staleness = staleness.get(idx).copied().unwrap_or(0.0) as f32;
954                match store::upsert_embedding(
955                    conn,
956                    "symbol",
957                    &sym.file,
958                    Some(sym.rowid),
959                    model_name,
960                    head_commit,
961                    sym_staleness,
962                    text,
963                    &blob,
964                    vec_conn,
965                )
966                .await
967                {
968                    Ok(()) => stats.symbols_embedded += 1,
969                    Err(e) => {
970                        warn!(symbol = %sym.name, file = %sym.file, error = %e, "Failed to store embedding");
971                        stats.errors += 1;
972                    }
973                }
974            }
975            if let Err(e) = conn.execute("COMMIT", ()).await {
976                warn!(error = %e, "Failed to COMMIT transaction for batch");
977            }
978        }
979        Err(e) => {
980            warn!(error = %e, batch_size = symbols.len(), "Embedding batch failed");
981            stats.errors += symbols.len();
982        }
983    }
984}
985
986/// Load all symbols from the index (or only those in changed_paths).
987async fn load_symbols(
988    conn: &Connection,
989    changed_paths: Option<&[String]>,
990) -> anyhow::Result<Vec<SymbolRow>> {
991    let sql = "SELECT rowid, file, name, kind, start_line, end_line, parent FROM symbols";
992
993    let mut rows = conn.query(sql, ()).await?;
994    let mut symbols = Vec::new();
995
996    let path_set: Option<std::collections::HashSet<&str>> =
997        changed_paths.map(|paths| paths.iter().map(String::as_str).collect());
998
999    while let Some(row) = rows.next().await? {
1000        let file: String = row.get(1)?;
1001
1002        if path_set
1003            .as_ref()
1004            .is_some_and(|set| !set.contains(file.as_str()))
1005        {
1006            continue;
1007        }
1008
1009        symbols.push(SymbolRow {
1010            rowid: row.get(0)?,
1011            file,
1012            name: row.get(2)?,
1013            kind: row.get(3)?,
1014            start_line: row.get(4)?,
1015            end_line: row.get(5)?,
1016            parent: row.get(6)?,
1017        });
1018    }
1019
1020    Ok(symbols)
1021}
1022
1023/// Load all callers in one query, grouped by callee name.
1024/// Returns a map: callee_name -> top-10 caller names (by call count).
1025async fn load_all_callers(conn: &Connection) -> HashMap<String, Vec<String>> {
1026    let mut map: HashMap<String, Vec<String>> = HashMap::new();
1027
1028    let Ok(mut rows) = conn
1029        .query(
1030            "SELECT callee_name, caller_symbol, COUNT(*) as cnt \
1031             FROM calls \
1032             GROUP BY callee_name, caller_symbol \
1033             ORDER BY callee_name, cnt DESC",
1034            (),
1035        )
1036        .await
1037    else {
1038        return map;
1039    };
1040
1041    while let Ok(Some(row)) = rows.next().await {
1042        let Ok(callee) = row.get::<String>(0) else {
1043            continue;
1044        };
1045        let Ok(caller) = row.get::<String>(1) else {
1046            continue;
1047        };
1048        let entry = map.entry(callee).or_default();
1049        if entry.len() < 10 {
1050            entry.push(caller);
1051        }
1052    }
1053
1054    map
1055}
1056
1057/// Load all callees in one query, grouped by (caller_symbol, caller_file).
1058/// Returns a map: (caller_symbol, caller_file) -> callee names (up to 10).
1059async fn load_all_callees(conn: &Connection) -> HashMap<(String, String), Vec<String>> {
1060    let mut map: HashMap<(String, String), Vec<String>> = HashMap::new();
1061
1062    let Ok(mut rows) = conn
1063        .query(
1064            "SELECT caller_symbol, caller_file, callee_name FROM calls ORDER BY caller_symbol, caller_file",
1065            (),
1066        )
1067        .await
1068    else {
1069        return map;
1070    };
1071
1072    while let Ok(Some(row)) = rows.next().await {
1073        let Ok(caller_sym) = row.get::<String>(0) else {
1074            continue;
1075        };
1076        let Ok(caller_file) = row.get::<String>(1) else {
1077            continue;
1078        };
1079        let Ok(callee) = row.get::<String>(2) else {
1080            continue;
1081        };
1082        let entry = map.entry((caller_sym, caller_file)).or_default();
1083        if entry.len() < 10 {
1084            entry.push(callee);
1085        }
1086    }
1087
1088    map
1089}
1090
1091/// Load all doc comments in one query from the symbol_attributes table.
1092/// Returns a map: (name, file) -> doc text.
1093async fn load_all_doc_comments(conn: &Connection) -> HashMap<(String, String), String> {
1094    let mut map: HashMap<(String, String), String> = HashMap::new();
1095
1096    let Ok(mut rows) = conn
1097        .query(
1098            "SELECT name, file, attribute FROM symbol_attributes WHERE attribute LIKE 'doc:%' ORDER BY name, file",
1099            (),
1100        )
1101        .await
1102    else {
1103        return map;
1104    };
1105
1106    while let Ok(Some(row)) = rows.next().await {
1107        let Ok(name) = row.get::<String>(0) else {
1108            continue;
1109        };
1110        let Ok(file) = row.get::<String>(1) else {
1111            continue;
1112        };
1113        let Ok(attr) = row.get::<String>(2) else {
1114            continue;
1115        };
1116        if let Some(doc) = attr.strip_prefix("doc:") {
1117            let entry = map.entry((name, file)).or_default();
1118            if !entry.is_empty() {
1119                entry.push('\n');
1120            }
1121            entry.push_str(doc);
1122        }
1123    }
1124
1125    map
1126}
1127
1128/// Load co-change neighbors for all files as a map: file -> [neighbor_files].
1129/// Returns an empty map if the co_change_edges table doesn't exist yet.
1130async fn load_co_change_map(
1131    conn: &Connection,
1132) -> anyhow::Result<std::collections::HashMap<String, Vec<String>>> {
1133    let mut map: std::collections::HashMap<String, Vec<String>> = std::collections::HashMap::new();
1134
1135    let mut rows = match conn
1136        .query(
1137            "SELECT file_a, file_b FROM co_change_edges ORDER BY count DESC",
1138            (),
1139        )
1140        .await
1141    {
1142        Ok(rows) => rows,
1143        Err(_) => {
1144            return Ok(map);
1145        }
1146    };
1147
1148    while let Some(row) = rows.next().await? {
1149        let a: String = row.get(0)?;
1150        let b: String = row.get(1)?;
1151        map.entry(a.clone()).or_default().push(b.clone());
1152        map.entry(b).or_default().push(a);
1153    }
1154
1155    Ok(map)
1156}