use std::fs;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use lantern::ingest::{self, FollowOptions, IngestOptions};
use lantern::store::Store;
use tempfile::tempdir;
#[test]
fn follow_picks_up_new_transcript_between_passes() {
let root = tempdir().unwrap();
let mut store = Store::initialize(&root.path().join("store")).unwrap();
let watch_dir = root.path().join("transcripts");
fs::create_dir_all(&watch_dir).unwrap();
let first = watch_dir.join("sess-a.jsonl");
fs::write(
&first,
br#"{"role":"user","content":"hello from session a","session_id":"a"}
"#,
)
.unwrap();
let second = watch_dir.join("sess-b.jsonl");
let watch_dir_for_thread = watch_dir.clone();
let second_for_thread = second.clone();
let reports: Arc<Mutex<Vec<(usize, usize)>>> = Arc::new(Mutex::new(Vec::new()));
let reports_cb = Arc::clone(&reports);
let spawner = std::thread::spawn(move || {
std::thread::sleep(Duration::from_millis(120));
fs::write(
&second_for_thread,
br#"{"role":"assistant","content":"reply in session b","session_id":"b"}
"#,
)
.unwrap();
drop(watch_dir_for_thread);
});
let follow_opts = FollowOptions {
interval: Duration::from_millis(150),
max_iterations: Some(3),
idle_timeout: None,
};
ingest::follow_path_with(
&mut store,
&watch_dir,
&IngestOptions::default(),
&follow_opts,
|iteration, report| {
reports_cb
.lock()
.unwrap()
.push((iteration, report.ingested.len()));
},
)
.unwrap();
spawner.join().unwrap();
let passes = reports.lock().unwrap().clone();
assert_eq!(passes.len(), 3);
assert_eq!(passes[0].1, 1);
let later_ingests: usize = passes.iter().skip(1).map(|(_, n)| n).sum();
assert_eq!(later_ingests, 1);
let session_count: i64 = store
.conn()
.query_row(
"SELECT COUNT(DISTINCT session_id) FROM chunks WHERE session_id IS NOT NULL",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(session_count, 2);
}
#[test]
fn follow_terminates_at_max_iterations_without_changes() {
let root = tempdir().unwrap();
let mut store = Store::initialize(&root.path().join("store")).unwrap();
let watch_dir = root.path().join("empty");
fs::create_dir_all(&watch_dir).unwrap();
let follow_opts = FollowOptions {
interval: Duration::from_millis(20),
max_iterations: Some(2),
idle_timeout: None,
};
let mut passes = 0usize;
ingest::follow_path_with(
&mut store,
&watch_dir,
&IngestOptions::default(),
&follow_opts,
|_, _| passes += 1,
)
.unwrap();
assert_eq!(passes, 2);
}
#[test]
fn follow_exits_after_idle_timeout_without_new_files() {
let root = tempdir().unwrap();
let mut store = Store::initialize(&root.path().join("store")).unwrap();
let watch_dir = root.path().join("idle");
fs::create_dir_all(&watch_dir).unwrap();
fs::write(
watch_dir.join("one.jsonl"),
br#"{"role":"user","content":"initial content","session_id":"idle"}
"#,
)
.unwrap();
let follow_opts = FollowOptions {
interval: Duration::from_millis(10),
max_iterations: Some(20),
idle_timeout: Some(Duration::from_millis(35)),
};
let started = Instant::now();
let exit = ingest::follow_path_with(
&mut store,
&watch_dir,
&IngestOptions::default(),
&follow_opts,
|_, _| {},
)
.unwrap();
let elapsed = started.elapsed();
assert_eq!(
exit,
ingest::FollowExit::IdleTimeout(Duration::from_millis(35))
);
assert!(elapsed >= Duration::from_millis(30));
assert!(elapsed < Duration::from_millis(500));
}
#[test]
fn follow_wakes_before_poll_interval_when_file_appears() {
let root = tempdir().unwrap();
let mut store = Store::initialize(&root.path().join("store")).unwrap();
let watch_dir = root.path().join("live");
fs::create_dir_all(&watch_dir).unwrap();
let target = watch_dir.join("sess.jsonl");
let target_for_thread = target.clone();
let spawner = std::thread::spawn(move || {
std::thread::sleep(Duration::from_millis(100));
fs::write(
&target_for_thread,
br#"{"role":"user","content":"live write","session_id":"live"}
"#,
)
.unwrap();
});
let reports: Arc<Mutex<Vec<usize>>> = Arc::new(Mutex::new(Vec::new()));
let reports_cb = Arc::clone(&reports);
let follow_opts = FollowOptions {
interval: Duration::from_secs(2),
max_iterations: Some(2),
idle_timeout: None,
};
let started = Instant::now();
ingest::follow_path_with(
&mut store,
&watch_dir,
&IngestOptions::default(),
&follow_opts,
|_, report| {
reports_cb.lock().unwrap().push(report.ingested.len());
},
)
.unwrap();
let elapsed = started.elapsed();
spawner.join().unwrap();
let passes = reports.lock().unwrap().clone();
assert_eq!(passes.len(), 2);
assert_eq!(passes[0], 0);
assert_eq!(passes[1], 1);
assert!(
elapsed < Duration::from_millis(1500),
"follow loop did not wake early on fs events: elapsed={:?}",
elapsed
);
}
#[test]
fn follow_rejects_zero_interval() {
let root = tempdir().unwrap();
let mut store = Store::initialize(&root.path().join("store")).unwrap();
let watch_dir = root.path().join("data");
fs::create_dir_all(&watch_dir).unwrap();
let err = ingest::follow_path_with(
&mut store,
&watch_dir,
&IngestOptions::default(),
&FollowOptions {
interval: Duration::from_secs(0),
max_iterations: Some(1),
idle_timeout: None,
},
|_, _| {},
)
.unwrap_err();
assert!(err.to_string().contains("interval"));
}