pond-db 0.12.0

Lossless storage and hybrid search for sessions from any AI agent client
//! Per-host sync coordination: the single-flight lock that keeps a manual
//! `pond sync` and the scheduled one from running concurrently against the
//! same store, and the last-sync record `pond status` reports. Local-process
//! coordination only - cross-host writers stay pure OCC on the Lance store;
//! nothing here ever touches store bytes. Bin-only module: both artifacts are
//! CLI-surface state.

use std::fs::File;
use std::io::{Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};

use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};

/// The resolved `XDG_STATE_HOME` root. Also the value scheduler
/// registrations pin into the job environment, so scheduled and manual syncs
/// agree on one state dir even when the variable is set only in shell rc.
pub(crate) fn state_root() -> PathBuf {
    std::env::var_os("XDG_STATE_HOME")
        .map(PathBuf::from)
        .filter(|path| path.is_absolute())
        .or_else(|| std::env::var_os("HOME").map(|home| PathBuf::from(home).join(".local/state")))
        .unwrap_or_else(|| PathBuf::from(".pond-state"))
}

/// `$XDG_STATE_HOME/pond` (default `~/.local/state/pond`): the scheduler log,
/// the sync lock, and the last-sync record. State, not data or cache, per the
/// XDG base-dir spec - operational state that survives cache wipes and never
/// needs backup.
pub(crate) fn pond_state_dir() -> PathBuf {
    state_root().join("pond")
}

/// Who holds the sync lock, written into the lock file on acquire so a
/// blocked sibling can name what it is waiting for.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct SyncLockHolder {
    pub pid: u32,
    pub started_at: DateTime<Utc>,
}

pub(crate) enum SyncLockState {
    Acquired(SyncLockGuard),
    /// Another local pond process holds the lock. Holder info is best-effort:
    /// `None` when it could not be read back (e.g. a mid-write race).
    Busy(Option<SyncLockHolder>),
}

/// Held for the whole sync run. The OS drops the flock when the file closes,
/// so a killed sync can never leave a stale lock. The lock file is never
/// unlinked - unlink while a sibling holds the path open hands out two locks.
pub(crate) struct SyncLockGuard {
    _file: File,
}

pub(crate) fn try_acquire_sync_lock(store_key: &str) -> Result<SyncLockState> {
    try_acquire_sync_lock_in(&pond_state_dir(), store_key)
}

fn try_acquire_sync_lock_in(dir: &Path, store_key: &str) -> Result<SyncLockState> {
    std::fs::create_dir_all(dir).with_context(|| format!("failed to create {}", dir.display()))?;
    let path = dir.join(format!("sync-{store_key}.lock"));
    let mut file = File::options()
        .read(true)
        .write(true)
        .create(true)
        .truncate(false)
        .open(&path)
        .with_context(|| format!("failed to open sync lock {}", path.display()))?;
    match file.try_lock() {
        Ok(()) => {
            let holder = SyncLockHolder {
                pid: std::process::id(),
                started_at: Utc::now(),
            };
            // Best-effort holder info; the lock itself is the flock, not the bytes.
            let _ = file.set_len(0);
            let _ = file.seek(SeekFrom::Start(0));
            let _ = serde_json::to_vec(&holder).map(|bytes| file.write_all(&bytes));
            let _ = file.flush();
            Ok(SyncLockState::Acquired(SyncLockGuard { _file: file }))
        }
        Err(std::fs::TryLockError::WouldBlock) => {
            let holder = std::fs::read_to_string(&path)
                .ok()
                .and_then(|text| serde_json::from_str(&text).ok());
            Ok(SyncLockState::Busy(holder))
        }
        Err(std::fs::TryLockError::Error(error)) => {
            // A filesystem without flock semantics (some network mounts) can't
            // single-flight. The lock is best-effort local coordination that
            // never touches store bytes - cross-writer safety is OCC - so
            // degrade to running unlocked instead of failing the sync.
            tracing::warn!(
                %error,
                path = %path.display(),
                "sync lock unsupported on this filesystem; proceeding without single-flight"
            );
            Ok(SyncLockState::Acquired(SyncLockGuard { _file: file }))
        }
    }
}

/// Outcome of the most recent `pond sync` against one store on this host.
/// Written by every sync run (success or failure) and rendered by
/// `pond status`, so a silently failing scheduled sync becomes visible.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct LastSyncRecord {
    pub finished_at: DateTime<Utc>,
    pub duration_secs: f64,
    pub sessions_inserted: u64,
    pub messages_inserted: u64,
    pub outcome: SyncOutcome,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub error: Option<String>,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub(crate) enum SyncOutcome {
    Ok,
    Error,
}

fn last_sync_path(dir: &Path, store_key: &str) -> PathBuf {
    dir.join(format!("last-sync-{store_key}.json"))
}

/// Best-effort: a sync must never fail because its status breadcrumb could
/// not be written, so errors degrade to a tracing warning.
pub(crate) fn write_last_sync(store_key: &str, record: &LastSyncRecord) {
    if let Err(error) = write_last_sync_in(&pond_state_dir(), store_key, record) {
        tracing::warn!(%error, "failed to write last-sync record");
    }
}

fn write_last_sync_in(dir: &Path, store_key: &str, record: &LastSyncRecord) -> Result<()> {
    std::fs::create_dir_all(dir).with_context(|| format!("failed to create {}", dir.display()))?;
    let path = last_sync_path(dir, store_key);
    let bytes = serde_json::to_vec_pretty(record).context("serialize last-sync record")?;
    // Temp + rename so a concurrent `pond status` never reads a half-write.
    let tmp = path.with_extension("json.tmp");
    std::fs::write(&tmp, &bytes).with_context(|| format!("failed to write {}", tmp.display()))?;
    std::fs::rename(&tmp, &path)
        .with_context(|| format!("failed to install {}", path.display()))?;
    Ok(())
}

pub(crate) fn read_last_sync(store_key: &str) -> Option<LastSyncRecord> {
    read_last_sync_in(&pond_state_dir(), store_key)
}

fn read_last_sync_in(dir: &Path, store_key: &str) -> Option<LastSyncRecord> {
    let text = std::fs::read_to_string(last_sync_path(dir, store_key)).ok()?;
    serde_json::from_str(&text).ok()
}

#[cfg(test)]
mod tests {
    #![allow(clippy::expect_used, clippy::unwrap_used)]

    use super::*;

    #[test]
    fn sync_lock_excludes_a_sibling_and_frees_on_drop() {
        let dir = tempfile::TempDir::new().unwrap();
        let guard = match try_acquire_sync_lock_in(dir.path(), "k1").unwrap() {
            SyncLockState::Acquired(guard) => guard,
            SyncLockState::Busy(_) => panic!("fresh lock must acquire"),
        };
        match try_acquire_sync_lock_in(dir.path(), "k1").unwrap() {
            SyncLockState::Busy(holder) => {
                let holder = holder.expect("holder info written on acquire");
                assert_eq!(holder.pid, std::process::id());
            }
            SyncLockState::Acquired(_) => panic!("held lock must report busy"),
        }
        // A different store key is a different lock.
        assert!(matches!(
            try_acquire_sync_lock_in(dir.path(), "k2").unwrap(),
            SyncLockState::Acquired(_)
        ));
        drop(guard);
        assert!(matches!(
            try_acquire_sync_lock_in(dir.path(), "k1").unwrap(),
            SyncLockState::Acquired(_)
        ));
    }

    #[test]
    fn last_sync_record_round_trips_and_is_absent_before_first_write() {
        let dir = tempfile::TempDir::new().unwrap();
        assert!(read_last_sync_in(dir.path(), "k").is_none());
        let record = LastSyncRecord {
            finished_at: Utc::now(),
            duration_secs: 12.5,
            sessions_inserted: 3,
            messages_inserted: 41,
            outcome: SyncOutcome::Error,
            error: Some("boom".to_owned()),
        };
        write_last_sync_in(dir.path(), "k", &record).unwrap();
        let read = read_last_sync_in(dir.path(), "k").expect("record present");
        assert_eq!(read.sessions_inserted, 3);
        assert_eq!(read.outcome, SyncOutcome::Error);
        assert_eq!(read.error.as_deref(), Some("boom"));
    }
}