lantern 0.2.3

Local-first, provenance-aware semantic search for agent activity
Documentation
#![cfg(unix)]

use std::io::Write;
use std::process::Command;
use std::thread;
use std::time::Duration;

use lantern::ingest::{self, FollowOptions, IngestOptions, ingest_fifo, ingest_path};
use lantern::search::{SearchOptions, search};
use lantern::store::Store;
use tempfile::tempdir;

fn make_fifo(path: &std::path::Path) {
    let status = Command::new("mkfifo")
        .arg(path)
        .status()
        .expect("spawn mkfifo");
    assert!(status.success(), "mkfifo failed for {}", path.display());
}

#[test]
fn ingest_path_detects_fifo_and_reads_until_writer_closes() {
    let dir = tempdir().unwrap();
    let fifo = dir.path().join("session.pipe");
    make_fifo(&fifo);
    let mut store = Store::initialize(&dir.path().join("store")).unwrap();

    let writer_path = fifo.clone();
    let writer = thread::spawn(move || {
        let mut f = std::fs::OpenOptions::new()
            .write(true)
            .open(&writer_path)
            .unwrap();
        f.write_all(b"hello via named pipe").unwrap();
    });

    let report = ingest_path(&mut store, &fifo).unwrap();
    writer.join().unwrap();

    assert_eq!(report.ingested.len(), 1);
    let src = &report.ingested[0];
    assert!(
        src.uri.starts_with("fifo://"),
        "expected fifo:// URI, got {}",
        src.uri
    );
    assert!(
        src.uri.contains('#'),
        "fifo URI must carry append suffix, got {}",
        src.uri
    );
    assert!(src.path.is_none(), "fifo sources have no filesystem path");
    assert_eq!(src.kind, "text/plain");
    assert_eq!(src.bytes, b"hello via named pipe".len() as u64);

    let hits = search(&store, "pipe", SearchOptions::default()).unwrap();
    assert_eq!(hits.len(), 1);
    assert!(hits[0].uri.starts_with("fifo://"));
}

#[test]
fn repeated_fifo_reads_accumulate_as_distinct_sources() {
    let dir = tempdir().unwrap();
    let fifo = dir.path().join("stream.pipe");
    make_fifo(&fifo);
    let mut store = Store::initialize(&dir.path().join("store")).unwrap();

    for payload in [b"first batch body".as_slice(), b"second batch body"] {
        let fifo_path = fifo.clone();
        let buf = payload.to_vec();
        let writer = thread::spawn(move || {
            let mut f = std::fs::OpenOptions::new()
                .write(true)
                .open(&fifo_path)
                .unwrap();
            f.write_all(&buf).unwrap();
        });
        let report = ingest_fifo(&mut store, &fifo).unwrap();
        writer.join().unwrap();
        assert_eq!(report.ingested.len(), 1);
    }

    let sources: Vec<String> = store
        .conn()
        .prepare("SELECT uri FROM sources ORDER BY ingested_at, uri")
        .unwrap()
        .query_map([], |row| row.get::<_, String>(0))
        .unwrap()
        .map(Result::unwrap)
        .collect();
    assert_eq!(sources.len(), 2, "each read must create its own source");
    assert_ne!(sources[0], sources[1]);
    for uri in &sources {
        assert!(uri.starts_with("fifo://"));
    }

    assert_eq!(
        search(&store, "first", SearchOptions::default())
            .unwrap()
            .len(),
        1
    );
    assert_eq!(
        search(&store, "second", SearchOptions::default())
            .unwrap()
            .len(),
        1
    );
}

#[test]
fn follow_on_fifo_captures_consecutive_writer_sessions() {
    let dir = tempdir().unwrap();
    let fifo = dir.path().join("follow.pipe");
    make_fifo(&fifo);
    let mut store = Store::initialize(&dir.path().join("store")).unwrap();

    let fifo_path = fifo.clone();
    let writer = thread::spawn(move || {
        for payload in [b"follow batch one".as_slice(), b"follow batch two"] {
            let mut f = std::fs::OpenOptions::new()
                .write(true)
                .open(&fifo_path)
                .unwrap();
            f.write_all(payload).unwrap();
            drop(f);
            std::thread::sleep(Duration::from_millis(80));
        }
    });

    let mut pass_rows = Vec::new();
    ingest::follow_path_with(
        &mut store,
        &fifo,
        &IngestOptions::default(),
        &FollowOptions {
            interval: Duration::from_millis(20),
            max_iterations: Some(2),
            idle_timeout: None,
        },
        |iteration, report| {
            pass_rows.push((
                iteration,
                report.ingested.len(),
                report.ingested.first().map(|src| src.uri.clone()),
            ));
        },
    )
    .unwrap();

    writer.join().unwrap();

    assert_eq!(pass_rows.len(), 2);
    assert_eq!(pass_rows[0].1, 1);
    assert_eq!(pass_rows[1].1, 1);
    let first_uri = pass_rows[0].2.as_ref().unwrap();
    let second_uri = pass_rows[1].2.as_ref().unwrap();
    assert!(first_uri.starts_with("fifo://"));
    assert!(second_uri.starts_with("fifo://"));
    assert_ne!(first_uri, second_uri);

    let sources: Vec<String> = store
        .conn()
        .prepare("SELECT uri FROM sources ORDER BY ingested_at, uri")
        .unwrap()
        .query_map([], |row| row.get::<_, String>(0))
        .unwrap()
        .map(Result::unwrap)
        .collect();
    assert_eq!(sources.len(), 2);
    assert!(sources.iter().all(|uri| uri.starts_with("fifo://")));
    assert_ne!(sources[0], sources[1]);

    assert_eq!(
        search(&store, "follow batch one", SearchOptions::default())
            .unwrap()
            .len(),
        1
    );
    assert_eq!(
        search(&store, "follow batch two", SearchOptions::default())
            .unwrap()
            .len(),
        1
    );
}

#[test]
fn fifo_with_jsonl_extension_routes_through_transcript_extractor() {
    let dir = tempdir().unwrap();
    let fifo = dir.path().join("session.jsonl");
    make_fifo(&fifo);
    let mut store = Store::initialize(&dir.path().join("store")).unwrap();

    let writer_path = fifo.clone();
    let writer = thread::spawn(move || {
        let mut f = std::fs::OpenOptions::new()
            .write(true)
            .open(&writer_path)
            .unwrap();
        f.write_all(
            b"{\"role\":\"user\",\"content\":\"who is on call\"}\n\
              {\"role\":\"assistant\",\"content\":\"the oncall rotation is here\"}\n",
        )
        .unwrap();
    });

    let report = ingest_path(&mut store, &fifo).unwrap();
    writer.join().unwrap();

    assert_eq!(report.ingested.len(), 1);
    assert_eq!(report.ingested[0].kind, "application/jsonl");
    assert_eq!(report.ingested[0].chunks, 2);

    let hits = search(
        &store,
        "oncall",
        SearchOptions {
            kind: Some("application/jsonl".into()),
            ..SearchOptions::default()
        },
    )
    .unwrap();
    assert_eq!(hits.len(), 1);
    assert!(hits[0].text.contains("[assistant]"));
}