1pub mod lock;
10mod relink;
11#[cfg(test)]
12#[allow(clippy::unwrap_used, clippy::expect_used)]
13mod tests;
14
15use std::path::Path;
16use std::time::Instant;
17
18use fs_err as fs;
19use rusqlite::{Connection, OptionalExtension};
20use time::OffsetDateTime;
21
22use crate::TalonError;
23use crate::config::ChunkerConfig;
24use crate::embed::{EmbedPassOptions, EmbedPassStats, run_embed_pass};
25use crate::graph::{GraphBuildInput, rebuild_graph};
26use crate::indexer::{
27 IndexerConfig, IndexerStats, reconcile_deletions, reconcile_ignored_notes,
28 run_full_scan_with_chunker,
29};
30use crate::indexing::change_tracking::TOMBSTONE_RETENTION_MS;
31use crate::indexing::migrations::read_db_version;
32use crate::inference::EmbeddingClient;
33
34pub use lock::{SyncLock, SyncLockError, acquire_sync_lock, is_sync_lock_held_by_live_process};
35pub use relink::relink_unresolved;
36
37pub fn remove_index_files(db_path: &Path) -> std::io::Result<()> {
46 for path in [
47 db_path.to_path_buf(),
48 db_path.with_extension("sqlite-wal"),
49 db_path.with_extension("sqlite-shm"),
50 ] {
51 match fs::remove_file(&path) {
52 Ok(()) => {}
53 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
54 Err(err) => return Err(err),
55 }
56 }
57 Ok(())
58}
59
60pub fn refresh_index(
75 conn: &mut Connection,
76 vault_root: &Path,
77 lock_path: &Path,
78 config: &IndexerConfig,
79 chunker: &ChunkerConfig,
80) -> Result<IndexerStats, SyncError> {
81 let (stats, _embed) =
82 run_sync_with_chunker(conn, vault_root, lock_path, config, None, None, chunker)?;
83 Ok(stats)
84}
85
86pub fn refresh_index_locked(
95 conn: &mut Connection,
96 vault_root: &Path,
97 config: &IndexerConfig,
98 chunker: &ChunkerConfig,
99 lock: SyncLock,
100) -> Result<IndexerStats, SyncError> {
101 let (stats, _embed) =
102 run_sync_with_chunker_locked(conn, vault_root, config, None, None, chunker, lock)?;
103 Ok(stats)
104}
105
106pub fn run_sync(
124 conn: &mut Connection,
125 vault_root: &Path,
126 lock_path: &Path,
127 config: &IndexerConfig,
128 embed_config: Option<EmbedPassOptions>,
129 embedding: Option<&EmbeddingClient>,
130) -> Result<(IndexerStats, Option<EmbedPassStats>), SyncError> {
131 run_sync_with_chunker(
132 conn,
133 vault_root,
134 lock_path,
135 config,
136 embed_config,
137 embedding,
138 &ChunkerConfig::default(),
139 )
140}
141
142pub fn run_sync_with_chunker(
150 conn: &mut Connection,
151 vault_root: &Path,
152 lock_path: &Path,
153 config: &IndexerConfig,
154 embed_config: Option<EmbedPassOptions>,
155 embedding: Option<&EmbeddingClient>,
156 chunker_config: &ChunkerConfig,
157) -> Result<(IndexerStats, Option<EmbedPassStats>), SyncError> {
158 let lock = acquire_sync_lock(lock_path).map_err(SyncError::from_lock)?;
159 run_sync_with_chunker_locked(
160 conn,
161 vault_root,
162 config,
163 embed_config,
164 embedding,
165 chunker_config,
166 lock,
167 )
168}
169
170pub fn run_sync_with_chunker_locked(
176 conn: &mut Connection,
177 vault_root: &Path,
178 config: &IndexerConfig,
179 embed_config: Option<EmbedPassOptions>,
180 embedding: Option<&EmbeddingClient>,
181 chunker_config: &ChunkerConfig,
182 _lock: SyncLock,
183) -> Result<(IndexerStats, Option<EmbedPassStats>), SyncError> {
184 let profile = RefreshProfile::start();
185 let mut stats = run_full_scan_with_chunker(conn, vault_root, config, chunker_config)
186 .map_err(SyncError::Indexer)?;
187 profile.mark("scan");
188 let deleted = reconcile_deletions(conn, vault_root).map_err(SyncError::Indexer)?;
189 stats.deleted = stats.deleted.saturating_add(deleted);
190 profile.mark("deletions");
191 let ignored = reconcile_ignored_notes(conn, config).map_err(SyncError::Indexer)?;
192 stats.deleted = stats.deleted.saturating_add(ignored);
193 profile.mark("ignored");
194
195 let graph_version_before_relink = graph_db_version(conn).map_err(SyncError::Indexer)?;
202 relink_unresolved(conn).map_err(SyncError::Indexer)?;
203 profile.mark("relink");
204 if graph_version_before_relink != Some(read_db_version(conn)) {
205 stats.graph = Some(rebuild_graph(conn, &GraphBuildInput).map_err(SyncError::Indexer)?);
206 }
207 profile.mark("graph");
208
209 let _ = TOMBSTONE_RETENTION_MS;
214 let _ = OffsetDateTime::now_utc();
215
216 let embed_stats = if let (Some(opts), Some(client)) = (embed_config, embedding) {
218 Some(run_embed_pass(conn, client, &opts).map_err(|e| SyncError::Embed(e.to_string()))?)
219 } else {
220 None
221 };
222 profile.mark("embed");
223
224 Ok((stats, embed_stats))
225}
226
227struct RefreshProfile {
228 enabled: bool,
229 started: Instant,
230 previous: std::cell::Cell<Instant>,
231}
232
233impl RefreshProfile {
234 fn start() -> Self {
235 let started = Instant::now();
236 Self {
237 enabled: std::env::var_os("TALON_PROFILE").is_some(),
238 started,
239 previous: std::cell::Cell::new(started),
240 }
241 }
242
243 fn mark(&self, stage: &str) {
244 if !self.enabled {
245 return;
246 }
247 let now = Instant::now();
248 let previous = self.previous.replace(now);
249 eprintln!(
250 "talon profile refresh {stage}: stage={}ms total={}ms",
251 previous.elapsed().as_millis(),
252 self.started.elapsed().as_millis()
253 );
254 }
255}
256
257fn graph_db_version(conn: &Connection) -> Result<Option<u64>, TalonError> {
258 let version = conn
259 .query_row(
260 "SELECT value FROM graph_meta WHERE key = 'db_version'",
261 [],
262 |row| row.get::<_, String>(0),
263 )
264 .optional()
265 .map_err(|source| TalonError::Sqlite {
266 context: "read graph db version",
267 source,
268 })?;
269 Ok(version.and_then(|value| value.parse().ok()))
270}
271
272#[derive(Debug, thiserror::Error)]
274#[non_exhaustive]
275pub enum SyncError {
276 #[error("sync lock is held by another process")]
278 LockBusy,
279 #[error("sync lock IO error: {0}")]
281 Lock(#[source] std::io::Error),
282 #[error(transparent)]
284 Indexer(#[from] TalonError),
285 #[error("embed pass failed: {0}")]
287 Embed(String),
288}
289
290impl SyncError {
291 fn from_lock(err: SyncLockError) -> Self {
292 match err {
293 SyncLockError::Busy => Self::LockBusy,
294 SyncLockError::Io(io) => Self::Lock(io),
295 }
296 }
297}