pub mod lock;
mod relink;
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests;
use std::path::Path;
use std::time::Instant;
use fs_err as fs;
use rusqlite::{Connection, OptionalExtension};
use time::OffsetDateTime;
use crate::TalonError;
use crate::config::ChunkerConfig;
use crate::embed::{EmbedPassOptions, EmbedPassStats, run_embed_pass};
use crate::graph::{GraphBuildInput, rebuild_graph};
use crate::indexer::{
IndexerConfig, IndexerStats, reconcile_deletions, reconcile_ignored_notes,
run_full_scan_with_chunker,
};
use crate::indexing::change_tracking::TOMBSTONE_RETENTION_MS;
use crate::indexing::migrations::read_db_version;
use crate::inference::EmbeddingClient;
pub use lock::{SyncLock, SyncLockError, acquire_sync_lock, is_sync_lock_held_by_live_process};
pub use relink::relink_unresolved;
pub fn remove_index_files(db_path: &Path) -> std::io::Result<()> {
for path in [
db_path.to_path_buf(),
db_path.with_extension("sqlite-wal"),
db_path.with_extension("sqlite-shm"),
] {
match fs::remove_file(&path) {
Ok(()) => {}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
Err(err) => return Err(err),
}
}
Ok(())
}
pub fn refresh_index(
conn: &mut Connection,
vault_root: &Path,
lock_path: &Path,
config: &IndexerConfig,
chunker: &ChunkerConfig,
) -> Result<IndexerStats, SyncError> {
let (stats, _embed) =
run_sync_with_chunker(conn, vault_root, lock_path, config, None, None, chunker)?;
Ok(stats)
}
pub fn refresh_index_locked(
conn: &mut Connection,
vault_root: &Path,
config: &IndexerConfig,
chunker: &ChunkerConfig,
lock: SyncLock,
) -> Result<IndexerStats, SyncError> {
let (stats, _embed) =
run_sync_with_chunker_locked(conn, vault_root, config, None, None, chunker, lock)?;
Ok(stats)
}
pub fn run_sync(
conn: &mut Connection,
vault_root: &Path,
lock_path: &Path,
config: &IndexerConfig,
embed_config: Option<EmbedPassOptions>,
embedding: Option<&EmbeddingClient>,
) -> Result<(IndexerStats, Option<EmbedPassStats>), SyncError> {
run_sync_with_chunker(
conn,
vault_root,
lock_path,
config,
embed_config,
embedding,
&ChunkerConfig::default(),
)
}
pub fn run_sync_with_chunker(
conn: &mut Connection,
vault_root: &Path,
lock_path: &Path,
config: &IndexerConfig,
embed_config: Option<EmbedPassOptions>,
embedding: Option<&EmbeddingClient>,
chunker_config: &ChunkerConfig,
) -> Result<(IndexerStats, Option<EmbedPassStats>), SyncError> {
let lock = acquire_sync_lock(lock_path).map_err(SyncError::from_lock)?;
run_sync_with_chunker_locked(
conn,
vault_root,
config,
embed_config,
embedding,
chunker_config,
lock,
)
}
pub fn run_sync_with_chunker_locked(
conn: &mut Connection,
vault_root: &Path,
config: &IndexerConfig,
embed_config: Option<EmbedPassOptions>,
embedding: Option<&EmbeddingClient>,
chunker_config: &ChunkerConfig,
_lock: SyncLock,
) -> Result<(IndexerStats, Option<EmbedPassStats>), SyncError> {
let profile = RefreshProfile::start();
let mut stats = run_full_scan_with_chunker(conn, vault_root, config, chunker_config)
.map_err(SyncError::Indexer)?;
profile.mark("scan");
let deleted = reconcile_deletions(conn, vault_root).map_err(SyncError::Indexer)?;
stats.deleted = stats.deleted.saturating_add(deleted);
profile.mark("deletions");
let ignored = reconcile_ignored_notes(conn, config).map_err(SyncError::Indexer)?;
stats.deleted = stats.deleted.saturating_add(ignored);
profile.mark("ignored");
let graph_version_before_relink = graph_db_version(conn).map_err(SyncError::Indexer)?;
relink_unresolved(conn).map_err(SyncError::Indexer)?;
profile.mark("relink");
if graph_version_before_relink != Some(read_db_version(conn)) {
stats.graph = Some(rebuild_graph(conn, &GraphBuildInput).map_err(SyncError::Indexer)?);
}
profile.mark("graph");
let _ = TOMBSTONE_RETENTION_MS;
let _ = OffsetDateTime::now_utc();
let embed_stats = if let (Some(opts), Some(client)) = (embed_config, embedding) {
Some(run_embed_pass(conn, client, &opts).map_err(|e| SyncError::Embed(e.to_string()))?)
} else {
None
};
profile.mark("embed");
Ok((stats, embed_stats))
}
struct RefreshProfile {
enabled: bool,
started: Instant,
previous: std::cell::Cell<Instant>,
}
impl RefreshProfile {
fn start() -> Self {
let started = Instant::now();
Self {
enabled: std::env::var_os("TALON_PROFILE").is_some(),
started,
previous: std::cell::Cell::new(started),
}
}
fn mark(&self, stage: &str) {
if !self.enabled {
return;
}
let now = Instant::now();
let previous = self.previous.replace(now);
eprintln!(
"talon profile refresh {stage}: stage={}ms total={}ms",
previous.elapsed().as_millis(),
self.started.elapsed().as_millis()
);
}
}
fn graph_db_version(conn: &Connection) -> Result<Option<u64>, TalonError> {
let version = conn
.query_row(
"SELECT value FROM graph_meta WHERE key = 'db_version'",
[],
|row| row.get::<_, String>(0),
)
.optional()
.map_err(|source| TalonError::Sqlite {
context: "read graph db version",
source,
})?;
Ok(version.and_then(|value| value.parse().ok()))
}
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum SyncError {
#[error("sync lock is held by another process")]
LockBusy,
#[error("sync lock IO error: {0}")]
Lock(#[source] std::io::Error),
#[error(transparent)]
Indexer(#[from] TalonError),
#[error("embed pass failed: {0}")]
Embed(String),
}
impl SyncError {
fn from_lock(err: SyncLockError) -> Self {
match err {
SyncLockError::Busy => Self::LockBusy,
SyncLockError::Io(io) => Self::Lock(io),
}
}
}