lantern 0.2.3

Local-first, provenance-aware semantic search for agent activity
Documentation
use std::fs;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};

use lantern::ingest::{self, FollowOptions, IngestOptions};
use lantern::store::Store;
use tempfile::tempdir;

#[test]
fn follow_picks_up_new_transcript_between_passes() {
    let root = tempdir().unwrap();
    let mut store = Store::initialize(&root.path().join("store")).unwrap();
    let watch_dir = root.path().join("transcripts");
    fs::create_dir_all(&watch_dir).unwrap();

    // First file exists before the loop starts.
    let first = watch_dir.join("sess-a.jsonl");
    fs::write(
        &first,
        br#"{"role":"user","content":"hello from session a","session_id":"a"}
"#,
    )
    .unwrap();

    let second = watch_dir.join("sess-b.jsonl");
    let watch_dir_for_thread = watch_dir.clone();
    let second_for_thread = second.clone();

    let reports: Arc<Mutex<Vec<(usize, usize)>>> = Arc::new(Mutex::new(Vec::new()));
    let reports_cb = Arc::clone(&reports);

    // Drop the second file in after the first pass has run. We use a short
    // thread instead of a filesystem watcher because follow mode should pick
    // it up on its own polling cadence.
    let spawner = std::thread::spawn(move || {
        std::thread::sleep(Duration::from_millis(120));
        fs::write(
            &second_for_thread,
            br#"{"role":"assistant","content":"reply in session b","session_id":"b"}
"#,
        )
        .unwrap();
        // Touch the watch dir so no other filesystem event is required.
        drop(watch_dir_for_thread);
    });

    let follow_opts = FollowOptions {
        interval: Duration::from_millis(150),
        max_iterations: Some(3),
        idle_timeout: None,
    };

    ingest::follow_path_with(
        &mut store,
        &watch_dir,
        &IngestOptions::default(),
        &follow_opts,
        |iteration, report| {
            reports_cb
                .lock()
                .unwrap()
                .push((iteration, report.ingested.len()));
        },
    )
    .unwrap();

    spawner.join().unwrap();

    let passes = reports.lock().unwrap().clone();
    assert_eq!(passes.len(), 3);
    // First pass sees the file that existed up-front.
    assert_eq!(passes[0].1, 1);

    // Across the remaining passes we should have picked up the second file
    // exactly once (re-ingesting an unchanged file is a no-op at persist()).
    let later_ingests: usize = passes.iter().skip(1).map(|(_, n)| n).sum();
    assert_eq!(later_ingests, 1);

    // Both sessions made it into the store with JSONL metadata preserved.
    let session_count: i64 = store
        .conn()
        .query_row(
            "SELECT COUNT(DISTINCT session_id) FROM chunks WHERE session_id IS NOT NULL",
            [],
            |row| row.get(0),
        )
        .unwrap();
    assert_eq!(session_count, 2);
}

#[test]
fn follow_terminates_at_max_iterations_without_changes() {
    let root = tempdir().unwrap();
    let mut store = Store::initialize(&root.path().join("store")).unwrap();
    let watch_dir = root.path().join("empty");
    fs::create_dir_all(&watch_dir).unwrap();

    let follow_opts = FollowOptions {
        interval: Duration::from_millis(20),
        max_iterations: Some(2),
        idle_timeout: None,
    };

    let mut passes = 0usize;
    ingest::follow_path_with(
        &mut store,
        &watch_dir,
        &IngestOptions::default(),
        &follow_opts,
        |_, _| passes += 1,
    )
    .unwrap();

    assert_eq!(passes, 2);
}

#[test]
fn follow_exits_after_idle_timeout_without_new_files() {
    let root = tempdir().unwrap();
    let mut store = Store::initialize(&root.path().join("store")).unwrap();
    let watch_dir = root.path().join("idle");
    fs::create_dir_all(&watch_dir).unwrap();

    fs::write(
        watch_dir.join("one.jsonl"),
        br#"{"role":"user","content":"initial content","session_id":"idle"}
"#,
    )
    .unwrap();

    let follow_opts = FollowOptions {
        interval: Duration::from_millis(10),
        max_iterations: Some(20),
        idle_timeout: Some(Duration::from_millis(35)),
    };

    let started = Instant::now();
    let exit = ingest::follow_path_with(
        &mut store,
        &watch_dir,
        &IngestOptions::default(),
        &follow_opts,
        |_, _| {},
    )
    .unwrap();
    let elapsed = started.elapsed();

    assert_eq!(
        exit,
        ingest::FollowExit::IdleTimeout(Duration::from_millis(35))
    );
    assert!(elapsed >= Duration::from_millis(30));
    assert!(elapsed < Duration::from_millis(500));
}

#[test]
fn follow_wakes_before_poll_interval_when_file_appears() {
    // With a long polling interval, a polling-only implementation would sit
    // idle for the full interval between passes. The filesystem watcher
    // should wake the loop early once a new transcript lands, so two passes
    // complete well before 2x the interval.
    let root = tempdir().unwrap();
    let mut store = Store::initialize(&root.path().join("store")).unwrap();
    let watch_dir = root.path().join("live");
    fs::create_dir_all(&watch_dir).unwrap();

    let target = watch_dir.join("sess.jsonl");
    let target_for_thread = target.clone();
    let spawner = std::thread::spawn(move || {
        std::thread::sleep(Duration::from_millis(100));
        fs::write(
            &target_for_thread,
            br#"{"role":"user","content":"live write","session_id":"live"}
"#,
        )
        .unwrap();
    });

    let reports: Arc<Mutex<Vec<usize>>> = Arc::new(Mutex::new(Vec::new()));
    let reports_cb = Arc::clone(&reports);

    let follow_opts = FollowOptions {
        interval: Duration::from_secs(2),
        max_iterations: Some(2),
        idle_timeout: None,
    };

    let started = Instant::now();
    ingest::follow_path_with(
        &mut store,
        &watch_dir,
        &IngestOptions::default(),
        &follow_opts,
        |_, report| {
            reports_cb.lock().unwrap().push(report.ingested.len());
        },
    )
    .unwrap();
    let elapsed = started.elapsed();
    spawner.join().unwrap();

    let passes = reports.lock().unwrap().clone();
    assert_eq!(passes.len(), 2);
    // First pass: empty dir. Second pass: the newly-written transcript.
    assert_eq!(passes[0], 0);
    assert_eq!(passes[1], 1);

    // The watcher must have woken us early — if we had polled for the full
    // 2-second interval we'd be well past this bound. Give generous slack
    // for CI jitter without letting a silent regression back to pure polling
    // sneak through.
    assert!(
        elapsed < Duration::from_millis(1500),
        "follow loop did not wake early on fs events: elapsed={:?}",
        elapsed
    );
}

#[test]
fn follow_rejects_zero_interval() {
    let root = tempdir().unwrap();
    let mut store = Store::initialize(&root.path().join("store")).unwrap();
    let watch_dir = root.path().join("data");
    fs::create_dir_all(&watch_dir).unwrap();

    let err = ingest::follow_path_with(
        &mut store,
        &watch_dir,
        &IngestOptions::default(),
        &FollowOptions {
            interval: Duration::from_secs(0),
            max_iterations: Some(1),
            idle_timeout: None,
        },
        |_, _| {},
    )
    .unwrap_err();
    assert!(err.to_string().contains("interval"));
}