#![cfg(unix)]
use std::io::Write;
use std::process::Command;
use std::thread;
use std::time::Duration;
use lantern::ingest::{self, FollowOptions, IngestOptions, ingest_fifo, ingest_path};
use lantern::search::{SearchOptions, search};
use lantern::store::Store;
use tempfile::tempdir;
fn make_fifo(path: &std::path::Path) {
let status = Command::new("mkfifo")
.arg(path)
.status()
.expect("spawn mkfifo");
assert!(status.success(), "mkfifo failed for {}", path.display());
}
#[test]
fn ingest_path_detects_fifo_and_reads_until_writer_closes() {
let dir = tempdir().unwrap();
let fifo = dir.path().join("session.pipe");
make_fifo(&fifo);
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
let writer_path = fifo.clone();
let writer = thread::spawn(move || {
let mut f = std::fs::OpenOptions::new()
.write(true)
.open(&writer_path)
.unwrap();
f.write_all(b"hello via named pipe").unwrap();
});
let report = ingest_path(&mut store, &fifo).unwrap();
writer.join().unwrap();
assert_eq!(report.ingested.len(), 1);
let src = &report.ingested[0];
assert!(
src.uri.starts_with("fifo://"),
"expected fifo:// URI, got {}",
src.uri
);
assert!(
src.uri.contains('#'),
"fifo URI must carry append suffix, got {}",
src.uri
);
assert!(src.path.is_none(), "fifo sources have no filesystem path");
assert_eq!(src.kind, "text/plain");
assert_eq!(src.bytes, b"hello via named pipe".len() as u64);
let hits = search(&store, "pipe", SearchOptions::default()).unwrap();
assert_eq!(hits.len(), 1);
assert!(hits[0].uri.starts_with("fifo://"));
}
#[test]
fn repeated_fifo_reads_accumulate_as_distinct_sources() {
let dir = tempdir().unwrap();
let fifo = dir.path().join("stream.pipe");
make_fifo(&fifo);
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
for payload in [b"first batch body".as_slice(), b"second batch body"] {
let fifo_path = fifo.clone();
let buf = payload.to_vec();
let writer = thread::spawn(move || {
let mut f = std::fs::OpenOptions::new()
.write(true)
.open(&fifo_path)
.unwrap();
f.write_all(&buf).unwrap();
});
let report = ingest_fifo(&mut store, &fifo).unwrap();
writer.join().unwrap();
assert_eq!(report.ingested.len(), 1);
}
let sources: Vec<String> = store
.conn()
.prepare("SELECT uri FROM sources ORDER BY ingested_at, uri")
.unwrap()
.query_map([], |row| row.get::<_, String>(0))
.unwrap()
.map(Result::unwrap)
.collect();
assert_eq!(sources.len(), 2, "each read must create its own source");
assert_ne!(sources[0], sources[1]);
for uri in &sources {
assert!(uri.starts_with("fifo://"));
}
assert_eq!(
search(&store, "first", SearchOptions::default())
.unwrap()
.len(),
1
);
assert_eq!(
search(&store, "second", SearchOptions::default())
.unwrap()
.len(),
1
);
}
#[test]
fn follow_on_fifo_captures_consecutive_writer_sessions() {
let dir = tempdir().unwrap();
let fifo = dir.path().join("follow.pipe");
make_fifo(&fifo);
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
let fifo_path = fifo.clone();
let writer = thread::spawn(move || {
for payload in [b"follow batch one".as_slice(), b"follow batch two"] {
let mut f = std::fs::OpenOptions::new()
.write(true)
.open(&fifo_path)
.unwrap();
f.write_all(payload).unwrap();
drop(f);
std::thread::sleep(Duration::from_millis(80));
}
});
let mut pass_rows = Vec::new();
ingest::follow_path_with(
&mut store,
&fifo,
&IngestOptions::default(),
&FollowOptions {
interval: Duration::from_millis(20),
max_iterations: Some(2),
idle_timeout: None,
},
|iteration, report| {
pass_rows.push((
iteration,
report.ingested.len(),
report.ingested.first().map(|src| src.uri.clone()),
));
},
)
.unwrap();
writer.join().unwrap();
assert_eq!(pass_rows.len(), 2);
assert_eq!(pass_rows[0].1, 1);
assert_eq!(pass_rows[1].1, 1);
let first_uri = pass_rows[0].2.as_ref().unwrap();
let second_uri = pass_rows[1].2.as_ref().unwrap();
assert!(first_uri.starts_with("fifo://"));
assert!(second_uri.starts_with("fifo://"));
assert_ne!(first_uri, second_uri);
let sources: Vec<String> = store
.conn()
.prepare("SELECT uri FROM sources ORDER BY ingested_at, uri")
.unwrap()
.query_map([], |row| row.get::<_, String>(0))
.unwrap()
.map(Result::unwrap)
.collect();
assert_eq!(sources.len(), 2);
assert!(sources.iter().all(|uri| uri.starts_with("fifo://")));
assert_ne!(sources[0], sources[1]);
assert_eq!(
search(&store, "follow batch one", SearchOptions::default())
.unwrap()
.len(),
1
);
assert_eq!(
search(&store, "follow batch two", SearchOptions::default())
.unwrap()
.len(),
1
);
}
#[test]
fn fifo_with_jsonl_extension_routes_through_transcript_extractor() {
let dir = tempdir().unwrap();
let fifo = dir.path().join("session.jsonl");
make_fifo(&fifo);
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
let writer_path = fifo.clone();
let writer = thread::spawn(move || {
let mut f = std::fs::OpenOptions::new()
.write(true)
.open(&writer_path)
.unwrap();
f.write_all(
b"{\"role\":\"user\",\"content\":\"who is on call\"}\n\
{\"role\":\"assistant\",\"content\":\"the oncall rotation is here\"}\n",
)
.unwrap();
});
let report = ingest_path(&mut store, &fifo).unwrap();
writer.join().unwrap();
assert_eq!(report.ingested.len(), 1);
assert_eq!(report.ingested[0].kind, "application/jsonl");
assert_eq!(report.ingested[0].chunks, 2);
let hits = search(
&store,
"oncall",
SearchOptions {
kind: Some("application/jsonl".into()),
..SearchOptions::default()
},
)
.unwrap();
assert_eq!(hits.len(), 1);
assert!(hits[0].text.contains("[assistant]"));
}