Skip to main content

talon_core/sync/
mod.rs

1//! Sync orchestration: advisory lock, full scan, reconcile.
2//!
3//! Ports `services/talon/sync/*.ts`. Owners of multiple [`crate::indexer`]
4//! invocations that should run as a single logical "sync" go through here:
5//! the [`lock`] module ensures only one Talon process touches the index at
6//! a time across PIDs, and [`run_sync`] drives the full
7//! scan + reconcile + tombstone-prune pipeline.
8
9pub mod lock;
10mod relink;
11#[cfg(test)]
12#[allow(clippy::unwrap_used, clippy::expect_used)]
13mod tests;
14
15use std::path::Path;
16use std::time::Instant;
17
18use fs_err as fs;
19use rusqlite::{Connection, OptionalExtension};
20use time::OffsetDateTime;
21
22use crate::TalonError;
23use crate::config::ChunkerConfig;
24use crate::embed::{EmbedPassOptions, EmbedPassStats, run_embed_pass};
25use crate::graph::{GraphBuildInput, rebuild_graph};
26use crate::indexer::{
27    IndexerConfig, IndexerStats, reconcile_deletions, reconcile_ignored_notes,
28    run_full_scan_with_chunker,
29};
30use crate::indexing::change_tracking::TOMBSTONE_RETENTION_MS;
31use crate::indexing::migrations::read_db_version;
32use crate::inference::EmbeddingClient;
33
34pub use lock::{SyncLock, SyncLockError, acquire_sync_lock, is_sync_lock_held_by_live_process};
35pub use relink::relink_unresolved;
36
37/// Deletes the `SQLite` index database and companion WAL/SHM files.
38///
39/// Callers should hold the sync lock before invoking this so no other Talon
40/// process can read or write the database while it is being replaced.
41///
42/// # Errors
43///
44/// Returns the first filesystem error other than `NotFound`.
45pub fn remove_index_files(db_path: &Path) -> std::io::Result<()> {
46    for path in [
47        db_path.to_path_buf(),
48        db_path.with_extension("sqlite-wal"),
49        db_path.with_extension("sqlite-shm"),
50    ] {
51        match fs::remove_file(&path) {
52            Ok(()) => {}
53            Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
54            Err(err) => return Err(err),
55        }
56    }
57    Ok(())
58}
59
60/// No-embed sync used by query surfaces to keep the index in step with
61/// on-disk state without paying the embedding round-trip.
62///
63/// Equivalent to [`run_sync_with_chunker`] called with `embed_config = None`
64/// and `inference = None` — runs the full scan, reconciles deletions, and
65/// re-resolves links, then returns. Recently-edited files are searchable
66/// via BM25 immediately; their semantic embeddings catch up on the next
67/// explicit `talon sync`.
68///
69/// # Errors
70///
71/// Same as [`run_sync_with_chunker`]. Errors loudly on lock contention
72/// (`SyncError::LockBusy`) — query commands should propagate this rather
73/// than silently fall back to stale state.
74pub fn refresh_index(
75    conn: &mut Connection,
76    vault_root: &Path,
77    lock_path: &Path,
78    config: &IndexerConfig,
79    chunker: &ChunkerConfig,
80) -> Result<IndexerStats, SyncError> {
81    let (stats, _embed) =
82        run_sync_with_chunker(conn, vault_root, lock_path, config, None, None, chunker)?;
83    Ok(stats)
84}
85
86/// Like [`refresh_index`] when the caller already owns the sync lock.
87///
88/// This is useful for process entry points that need to serialize database
89/// migrations before opening a write-capable connection.
90///
91/// # Errors
92///
93/// Same as [`refresh_index`], except lock acquisition has already happened.
94pub fn refresh_index_locked(
95    conn: &mut Connection,
96    vault_root: &Path,
97    config: &IndexerConfig,
98    chunker: &ChunkerConfig,
99    lock: SyncLock,
100) -> Result<IndexerStats, SyncError> {
101    let (stats, _embed) =
102        run_sync_with_chunker_locked(conn, vault_root, config, None, None, chunker, lock)?;
103    Ok(stats)
104}
105
106/// One-shot sync over a vault.
107///
108/// Holds [`SyncLock`] for the duration of the call so concurrent Talon
109/// processes serialize. Runs the full scan, then reconciles deletions, then
110/// (best-effort) prunes tombstones older than [`TOMBSTONE_RETENTION_MS`],
111/// then optionally runs the embed pass.
112///
113/// When `embed_config` and `embedding` are both `Some`, the embed pass runs
114/// after reconciliation. When either is `None` (e.g. `--fast` mode), the
115/// embed pass is skipped entirely.
116///
117/// # Errors
118///
119/// Returns [`SyncError::LockBusy`] if another process holds the lock,
120/// [`SyncError::Indexer`] if the underlying scan or reconcile fails,
121/// [`SyncError::Embed`] if the embed pass fails, and
122/// [`SyncError::Lock`] if the lock file itself cannot be created/removed.
123pub fn run_sync(
124    conn: &mut Connection,
125    vault_root: &Path,
126    lock_path: &Path,
127    config: &IndexerConfig,
128    embed_config: Option<EmbedPassOptions>,
129    embedding: Option<&EmbeddingClient>,
130) -> Result<(IndexerStats, Option<EmbedPassStats>), SyncError> {
131    run_sync_with_chunker(
132        conn,
133        vault_root,
134        lock_path,
135        config,
136        embed_config,
137        embedding,
138        &ChunkerConfig::default(),
139    )
140}
141
142/// Like [`run_sync`] but with an explicit [`ChunkerConfig`].
143///
144/// Per-note scope names are resolved from `config.talon_config` when present.
145///
146/// # Errors
147///
148/// See [`run_sync`] for error variants.
149pub fn run_sync_with_chunker(
150    conn: &mut Connection,
151    vault_root: &Path,
152    lock_path: &Path,
153    config: &IndexerConfig,
154    embed_config: Option<EmbedPassOptions>,
155    embedding: Option<&EmbeddingClient>,
156    chunker_config: &ChunkerConfig,
157) -> Result<(IndexerStats, Option<EmbedPassStats>), SyncError> {
158    let lock = acquire_sync_lock(lock_path).map_err(SyncError::from_lock)?;
159    run_sync_with_chunker_locked(
160        conn,
161        vault_root,
162        config,
163        embed_config,
164        embedding,
165        chunker_config,
166        lock,
167    )
168}
169
170/// Like [`run_sync_with_chunker`] when the caller already owns the sync lock.
171///
172/// # Errors
173///
174/// See [`run_sync`] for non-lock error variants.
175pub fn run_sync_with_chunker_locked(
176    conn: &mut Connection,
177    vault_root: &Path,
178    config: &IndexerConfig,
179    embed_config: Option<EmbedPassOptions>,
180    embedding: Option<&EmbeddingClient>,
181    chunker_config: &ChunkerConfig,
182    _lock: SyncLock,
183) -> Result<(IndexerStats, Option<EmbedPassStats>), SyncError> {
184    let profile = RefreshProfile::start();
185    let mut stats = run_full_scan_with_chunker(conn, vault_root, config, chunker_config)
186        .map_err(SyncError::Indexer)?;
187    profile.mark("scan");
188    let deleted = reconcile_deletions(conn, vault_root).map_err(SyncError::Indexer)?;
189    stats.deleted = stats.deleted.saturating_add(deleted);
190    profile.mark("deletions");
191    let ignored = reconcile_ignored_notes(conn, config).map_err(SyncError::Indexer)?;
192    stats.deleted = stats.deleted.saturating_add(ignored);
193    profile.mark("ignored");
194
195    // Closes the link-staleness window: incremental indexing only refreshes
196    // a source file's resolved links when that source file is touched, so an
197    // alias added to a target leaves prior links to it unresolved until the
198    // sources change. This pass re-resolves any link still pointing at a
199    // missing `to_path` and lets the new aliases / new target files satisfy
200    // existing references.
201    let graph_version_before_relink = graph_db_version(conn).map_err(SyncError::Indexer)?;
202    relink_unresolved(conn).map_err(SyncError::Indexer)?;
203    profile.mark("relink");
204    if graph_version_before_relink != Some(read_db_version(conn)) {
205        stats.graph = Some(rebuild_graph(conn, &GraphBuildInput).map_err(SyncError::Indexer)?);
206    }
207    profile.mark("graph");
208
209    // Tombstone state currently lives in the in-memory `ChangeIndex` (see
210    // `crate::indexing::change_tracking`); the persistent change-feed table will land in
211    // Phase 5 alongside `query::changes`. The constants below are referenced
212    // here so the eventual prune wiring has an obvious home.
213    let _ = TOMBSTONE_RETENTION_MS;
214    let _ = OffsetDateTime::now_utc();
215
216    // Run embed pass after reconciliation if configured.
217    let embed_stats = if let (Some(opts), Some(client)) = (embed_config, embedding) {
218        Some(run_embed_pass(conn, client, &opts).map_err(|e| SyncError::Embed(e.to_string()))?)
219    } else {
220        None
221    };
222    profile.mark("embed");
223
224    Ok((stats, embed_stats))
225}
226
227struct RefreshProfile {
228    enabled: bool,
229    started: Instant,
230    previous: std::cell::Cell<Instant>,
231}
232
233impl RefreshProfile {
234    fn start() -> Self {
235        let started = Instant::now();
236        Self {
237            enabled: std::env::var_os("TALON_PROFILE").is_some(),
238            started,
239            previous: std::cell::Cell::new(started),
240        }
241    }
242
243    fn mark(&self, stage: &str) {
244        if !self.enabled {
245            return;
246        }
247        let now = Instant::now();
248        let previous = self.previous.replace(now);
249        eprintln!(
250            "talon profile refresh {stage}: stage={}ms total={}ms",
251            previous.elapsed().as_millis(),
252            self.started.elapsed().as_millis()
253        );
254    }
255}
256
257fn graph_db_version(conn: &Connection) -> Result<Option<u64>, TalonError> {
258    let version = conn
259        .query_row(
260            "SELECT value FROM graph_meta WHERE key = 'db_version'",
261            [],
262            |row| row.get::<_, String>(0),
263        )
264        .optional()
265        .map_err(|source| TalonError::Sqlite {
266            context: "read graph db version",
267            source,
268        })?;
269    Ok(version.and_then(|value| value.parse().ok()))
270}
271
272/// Errors returned by [`run_sync`].
273#[derive(Debug, thiserror::Error)]
274#[non_exhaustive]
275pub enum SyncError {
276    /// Another Talon process holds the sync lock.
277    #[error("sync lock is held by another process")]
278    LockBusy,
279    /// Lock-file IO failed for a reason other than contention.
280    #[error("sync lock IO error: {0}")]
281    Lock(#[source] std::io::Error),
282    /// Indexer-side failure (DB or filesystem).
283    #[error(transparent)]
284    Indexer(#[from] TalonError),
285    /// Embed-pass failure (HTTP error, dim mismatch, etc.).
286    #[error("embed pass failed: {0}")]
287    Embed(String),
288}
289
290impl SyncError {
291    fn from_lock(err: SyncLockError) -> Self {
292        match err {
293            SyncLockError::Busy => Self::LockBusy,
294            SyncLockError::Io(io) => Self::Lock(io),
295        }
296    }
297}