use anyhow::{Context, Result};
use rusqlite::params;
use serde::Serialize;
use sha2::{Digest, Sha256};
use std::fs;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::mpsc::{Receiver, channel};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use notify::{EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use crate::chunk::{Chunk, ChunkOptions, chunk_text};
use crate::entities::{extract_entities, record_chunk_entities};
use crate::ignore::IgnoreRules;
use crate::jsonl::extract_jsonl;
use crate::store::Store;
const JSONL_KIND: &str = "application/jsonl";
const MAX_INGEST_BYTES: u64 = 50 * 1024 * 1024;
#[derive(Debug, Serialize)]
pub struct IngestReport {
pub ingested: Vec<IngestedSource>,
pub skipped: Vec<SkippedSource>,
#[serde(default, skip_serializing_if = "is_zero")]
pub ignored: usize,
}
fn is_zero(n: &usize) -> bool {
*n == 0
}
#[derive(Debug, Default, Clone)]
pub struct IngestOptions {
pub no_ignore: bool,
pub include_exts: Vec<String>,
}
#[derive(Debug, Serialize)]
pub struct IngestedSource {
pub source_id: String,
pub uri: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub path: Option<String>,
pub kind: String,
pub bytes: u64,
pub content_sha256: String,
pub chunks: usize,
}
#[derive(Debug, Serialize)]
pub struct SkippedSource {
pub uri: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub skipped_reason: Option<String>,
pub reason: String,
}
pub fn ingest_path(store: &mut Store, path: &Path) -> Result<IngestReport> {
ingest_path_with(store, path, &IngestOptions::default())
}
pub const DEFAULT_FOLLOW_INTERVAL_SECS: u64 = 5;
#[derive(Debug, Clone)]
pub struct FollowOptions {
pub interval: Duration,
pub max_iterations: Option<usize>,
pub idle_timeout: Option<Duration>,
}
impl FollowOptions {
pub fn new(interval: Duration) -> Self {
Self {
interval,
max_iterations: None,
idle_timeout: None,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum FollowExit {
MaxIterations,
IdleTimeout(Duration),
}
pub fn follow_path_with(
store: &mut Store,
path: &Path,
opts: &IngestOptions,
follow_opts: &FollowOptions,
mut on_pass: impl FnMut(usize, &IngestReport),
) -> Result<FollowExit> {
if follow_opts.interval.is_zero() {
anyhow::bail!("follow interval must be greater than zero");
}
let waiter = FollowWaiter::setup(path);
let mut iteration: usize = 0;
let mut last_activity = Instant::now();
loop {
let report = ingest_path_with(store, path, opts)?;
let active = !report.ingested.is_empty();
if active {
last_activity = Instant::now();
}
on_pass(iteration, &report);
iteration += 1;
if let Some(max) = follow_opts.max_iterations {
if iteration >= max {
return Ok(FollowExit::MaxIterations);
}
}
if let Some(idle_timeout) = follow_opts.idle_timeout {
if !active && last_activity.elapsed() >= idle_timeout {
return Ok(FollowExit::IdleTimeout(idle_timeout));
}
}
waiter.wait(follow_opts.interval);
}
}
enum FollowWaiter {
Events {
rx: Receiver<()>,
_watcher: RecommendedWatcher,
},
Polling,
}
impl FollowWaiter {
fn setup(path: &Path) -> Self {
let Ok(meta) = fs::metadata(path) else {
return FollowWaiter::Polling;
};
if !meta.is_dir() {
return FollowWaiter::Polling;
}
let (tx, rx) = channel::<()>();
let tx_cb = tx.clone();
let mut watcher = match RecommendedWatcher::new(
move |res: notify::Result<notify::Event>| {
if let Ok(ev) = res {
if matches!(
ev.kind,
EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_)
) {
let _ = tx_cb.send(());
}
}
},
notify::Config::default(),
) {
Ok(w) => w,
Err(_) => return FollowWaiter::Polling,
};
if watcher.watch(path, RecursiveMode::Recursive).is_err() {
return FollowWaiter::Polling;
}
FollowWaiter::Events {
rx,
_watcher: watcher,
}
}
fn wait(&self, timeout: Duration) {
match self {
FollowWaiter::Polling => std::thread::sleep(timeout),
FollowWaiter::Events { rx, .. } => {
if rx.recv_timeout(timeout).is_ok() {
while rx.try_recv().is_ok() {}
}
}
}
}
}
pub fn ingest_path_with(
store: &mut Store,
path: &Path,
opts: &IngestOptions,
) -> Result<IngestReport> {
if path_is_fifo(path) {
return ingest_fifo(store, path);
}
let rules = if opts.no_ignore {
IgnoreRules::disabled()
} else {
IgnoreRules::load(path)?
};
let (targets, ignored) = collect_targets(path, &rules, &opts.include_exts)?;
let mut report = IngestReport {
ingested: Vec::new(),
skipped: Vec::new(),
ignored,
};
for file in targets {
let uri = path_to_uri(&file);
if let Ok(meta) = fs::metadata(&file) {
if meta.is_file() && meta.len() > MAX_INGEST_BYTES {
let reason = format!(
"file is {} bytes; exceeds {}-byte limit",
meta.len(),
MAX_INGEST_BYTES
);
eprintln!("warning: skipping {}: {}", file.display(), reason);
report.skipped.push(SkippedSource {
uri,
skipped_reason: Some("too_large".into()),
reason,
});
continue;
}
}
match ingest_file(store, &file) {
Ok(Some(src)) => report.ingested.push(src),
Ok(None) => report.skipped.push(skipped_source(
uri,
"unchanged",
"unchanged since last ingest".into(),
)),
Err(err) => report
.skipped
.push(skipped_source(uri, "error", format!("{err:#}"))),
}
}
Ok(report)
}
fn collect_targets(
path: &Path,
rules: &IgnoreRules,
include_exts: &[String],
) -> Result<(Vec<PathBuf>, usize)> {
let meta = fs::metadata(path).with_context(|| format!("stat {}", path.display()))?;
if meta.is_file() {
if !is_supported_file_with(path, include_exts) {
anyhow::bail!("unsupported file extension: {}", path.display());
}
if rules.is_ignored(path, false) {
return Ok((Vec::new(), 1));
}
return Ok((vec![path.to_path_buf()], 0));
}
if meta.is_dir() {
let mut out = Vec::new();
let mut ignored = 0usize;
let walker = walkdir::WalkDir::new(path)
.sort_by_file_name()
.into_iter()
.filter_entry(|entry| {
let is_dir = entry.file_type().is_dir();
if rules.is_ignored(entry.path(), is_dir) {
ignored += 1;
false
} else {
true
}
});
for entry in walker {
let entry = entry.with_context(|| format!("walk {}", path.display()))?;
let p = entry.path();
if p.is_file() && is_supported_file_with(p, include_exts) {
out.push(p.to_path_buf());
}
}
return Ok((out, ignored));
}
anyhow::bail!("not a file or directory: {}", path.display())
}
pub(crate) fn is_supported_file(p: &Path) -> bool {
is_supported_file_with(p, &[])
}
pub(crate) fn is_supported_file_with(p: &Path, include_exts: &[String]) -> bool {
let ext = p
.extension()
.and_then(|s| s.to_str())
.map(|s| s.to_ascii_lowercase());
if let Some(ext) = ext.as_deref() {
if include_exts.iter().any(|candidate| candidate == ext) {
return true;
}
}
matches!(
ext.as_deref(),
Some(
"txt"
| "md"
| "markdown"
| "jsonl"
| "ndjson"
| "rs"
| "py"
| "c"
| "h"
| "cpp"
| "hpp"
| "go"
| "js"
| "ts"
| "tsx"
| "sh"
| "lua"
| "java"
| "kt"
| "swift"
| "toml"
| "yaml"
| "yml"
| "json"
| "cfg"
| "ini"
| "conf"
| "zig"
)
) || looks_like_plaintext(p)
}
fn looks_like_plaintext(p: &Path) -> bool {
let bytes = match fs::read(p) {
Ok(bytes) => bytes,
Err(_) => return false,
};
!bytes.contains(&0) && std::str::from_utf8(&bytes).is_ok()
}
fn kind_for(p: &Path) -> &'static str {
match p.extension().and_then(|s| s.to_str()) {
Some("md" | "markdown") => "text/markdown",
Some("jsonl" | "ndjson") => JSONL_KIND,
_ => "text/plain",
}
}
fn extract_chunks(kind: &str, text: &str) -> Vec<Chunk> {
if kind == JSONL_KIND {
extract_jsonl(text)
} else {
chunk_text(text, ChunkOptions::default())
}
}
fn path_to_uri(p: &Path) -> String {
let abs = fs::canonicalize(p).unwrap_or_else(|_| p.to_path_buf());
format!("file://{}", abs.display())
}
#[cfg(unix)]
fn path_is_fifo(p: &Path) -> bool {
use std::os::unix::fs::FileTypeExt;
fs::symlink_metadata(p)
.map(|m| m.file_type().is_fifo())
.unwrap_or(false)
}
#[cfg(not(unix))]
fn path_is_fifo(_p: &Path) -> bool {
false
}
fn fifo_uri(p: &Path) -> String {
let abs = fs::canonicalize(p).unwrap_or_else(|_| p.to_path_buf());
format!("fifo://{}", abs.display())
}
pub fn ingest_fifo(store: &mut Store, path: &Path) -> Result<IngestReport> {
let bytes = fs::read(path).with_context(|| format!("read fifo {}", path.display()))?;
let kind = kind_for(path).to_string();
let base_uri = fifo_uri(path);
ingest_stdin_with(
store,
&base_uri,
Some(&kind),
&bytes,
&StdinIngestOptions { append: true },
)
}
fn source_id_for(uri: &str) -> String {
let mut h = Sha256::new();
h.update(uri.as_bytes());
hex::encode(&h.finalize()[..16])
}
fn chunk_id_for(source_id: &str, ordinal: usize, text: &str) -> String {
let mut h = Sha256::new();
h.update(source_id.as_bytes());
h.update((ordinal as u64).to_be_bytes());
h.update(text.as_bytes());
hex::encode(&h.finalize()[..16])
}
pub(crate) fn sha256_hex(bytes: &[u8]) -> String {
let mut h = Sha256::new();
h.update(bytes);
hex::encode(h.finalize())
}
fn now_unix() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.unwrap_or(0)
}
fn ingest_file(store: &mut Store, file: &Path) -> Result<Option<IngestedSource>> {
let uri = path_to_uri(file);
let bytes = fs::read(file).with_context(|| format!("read {}", file.display()))?;
let mtime_unix = fs::metadata(file)
.ok()
.and_then(|m| m.modified().ok())
.and_then(|t| t.duration_since(UNIX_EPOCH).ok())
.map(|d| d.as_secs() as i64);
let kind = kind_for(file).to_string();
let path_display = file.to_string_lossy().into_owned();
persist(
store,
&uri,
Some(path_display),
&kind,
&bytes,
mtime_unix,
|err_ctx| format!("{} {err_ctx}", file.display()),
)
}
#[derive(Debug, Default, Clone, Copy)]
pub struct StdinIngestOptions {
pub append: bool,
}
pub fn ingest_stdin(
store: &mut Store,
uri: &str,
kind: Option<&str>,
bytes: &[u8],
) -> Result<IngestReport> {
ingest_stdin_with(store, uri, kind, bytes, &StdinIngestOptions::default())
}
pub fn ingest_stdin_with(
store: &mut Store,
uri: &str,
kind: Option<&str>,
bytes: &[u8],
opts: &StdinIngestOptions,
) -> Result<IngestReport> {
if uri.trim().is_empty() {
anyhow::bail!("stdin ingest requires a non-empty --uri");
}
let kind = kind.unwrap_or("text/plain").to_string();
let effective_uri = if opts.append {
format!("{uri}#{}", unique_suffix())
} else {
uri.to_string()
};
let mut report = IngestReport {
ingested: Vec::new(),
skipped: Vec::new(),
ignored: 0,
};
match persist(store, &effective_uri, None, &kind, bytes, None, |err_ctx| {
format!("stdin ({effective_uri}) {err_ctx}")
}) {
Ok(Some(src)) => report.ingested.push(src),
Ok(None) => report.skipped.push(skipped_source(
effective_uri,
"unchanged",
"unchanged since last ingest".into(),
)),
Err(err) => report
.skipped
.push(skipped_source(effective_uri, "error", format!("{err:#}"))),
}
Ok(report)
}
fn unique_suffix() -> String {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0);
let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
format!("{nanos}-{seq}")
}
fn skipped_source(uri: impl Into<String>, skipped_reason: &str, reason: String) -> SkippedSource {
SkippedSource {
uri: uri.into(),
skipped_reason: Some(skipped_reason.to_string()),
reason,
}
}
fn persist(
store: &mut Store,
uri: &str,
path: Option<String>,
kind: &str,
bytes: &[u8],
mtime_unix: Option<i64>,
err_ctx: impl Fn(&str) -> String,
) -> Result<Option<IngestedSource>> {
let content_sha = sha256_hex(bytes);
let text = std::str::from_utf8(bytes)
.with_context(|| err_ctx("is not valid UTF-8"))?
.to_string();
let source_id = source_id_for(uri);
let already: Option<String> = store
.conn()
.query_row(
"SELECT content_sha256 FROM sources WHERE id = ?1",
params![source_id],
|row| row.get(0),
)
.ok();
if already.as_deref() == Some(content_sha.as_str()) {
return Ok(None);
}
let chunks = extract_chunks(kind, &text);
let ingested_at = now_unix();
let tx = store.conn_mut().transaction()?;
tx.execute(
"DELETE FROM chunks WHERE source_id = ?1",
params![source_id],
)?;
tx.execute(
"INSERT INTO sources (id, uri, path, kind, bytes, content_sha256, mtime_unix, ingested_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
ON CONFLICT(id) DO UPDATE SET
uri = excluded.uri,
path = excluded.path,
kind = excluded.kind,
bytes = excluded.bytes,
content_sha256 = excluded.content_sha256,
mtime_unix = excluded.mtime_unix,
ingested_at = excluded.ingested_at",
params![
source_id,
uri,
path,
kind,
bytes.len() as i64,
content_sha,
mtime_unix,
ingested_at,
],
)?;
{
let mut stmt = tx.prepare(
"INSERT INTO chunks
(id, source_id, ordinal, byte_start, byte_end, char_count, text, sha256, created_at,
role, session_id, turn_id, parent_turn_id, tool_name, tool_call_id, timestamp_unix,
project, user, topic, thread)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20)",
)?;
for chunk in &chunks {
let chunk_id = chunk_id_for(&source_id, chunk.ordinal, &chunk.text);
let chunk_sha = sha256_hex(chunk.text.as_bytes());
stmt.execute(params![
chunk_id,
source_id,
chunk.ordinal as i64,
chunk.byte_start as i64,
chunk.byte_end as i64,
chunk.text.chars().count() as i64,
chunk.text,
chunk_sha,
ingested_at,
chunk.role,
chunk.session_id,
chunk.turn_id,
chunk.parent_turn_id,
chunk.tool_name,
chunk.tool_call_id,
chunk.timestamp_unix,
chunk.project,
chunk.user,
chunk.topic,
chunk.thread,
])?;
let entities = extract_entities(&chunk.text);
if !entities.is_empty() {
record_chunk_entities(&tx, &chunk_id, &entities, ingested_at)?;
}
}
}
tx.commit()?;
Ok(Some(IngestedSource {
source_id,
uri: uri.to_string(),
path,
kind: kind.to_string(),
bytes: bytes.len() as u64,
content_sha256: content_sha,
chunks: chunks.len(),
}))
}
#[derive(Debug, Serialize)]
#[serde(tag = "event", rename_all = "snake_case")]
enum FollowEvent<'a> {
FollowStart {
path: String,
interval_secs: u64,
#[serde(skip_serializing_if = "Option::is_none")]
idle_timeout_secs: Option<u64>,
},
FollowPass {
pass: usize,
report: &'a IngestReport,
},
FollowExit {
reason: &'static str,
#[serde(skip_serializing_if = "Option::is_none")]
idle_timeout_secs: Option<u64>,
},
}
fn write_event<W: Write>(out: &mut W, ev: &FollowEvent<'_>) -> Result<()> {
serde_json::to_writer(&mut *out, ev).context("serializing follow event")?;
out.write_all(b"\n")
.context("writing follow event newline")?;
Ok(())
}
pub fn write_follow_start_json<W: Write>(
out: &mut W,
path: &Path,
interval: Duration,
idle_timeout: Option<Duration>,
) -> Result<()> {
let ev = FollowEvent::FollowStart {
path: path.display().to_string(),
interval_secs: interval.as_secs(),
idle_timeout_secs: idle_timeout.map(|d| d.as_secs()),
};
write_event(out, &ev)
}
pub fn write_follow_pass_json<W: Write>(
out: &mut W,
iteration: usize,
report: &IngestReport,
) -> Result<()> {
let ev = FollowEvent::FollowPass {
pass: iteration,
report,
};
write_event(out, &ev)
}
pub fn write_follow_exit_json<W: Write>(out: &mut W, exit: &FollowExit) -> Result<()> {
let ev = match exit {
FollowExit::MaxIterations => FollowEvent::FollowExit {
reason: "max_iterations",
idle_timeout_secs: None,
},
FollowExit::IdleTimeout(d) => FollowEvent::FollowExit {
reason: "idle_timeout",
idle_timeout_secs: Some(d.as_secs()),
},
};
write_event(out, &ev)
}
pub fn print_follow_start_json(
path: &Path,
interval: Duration,
idle_timeout: Option<Duration>,
) -> Result<()> {
let stdout = std::io::stdout();
let mut lock = stdout.lock();
write_follow_start_json(&mut lock, path, interval, idle_timeout)
}
pub fn print_follow_pass_json(iteration: usize, report: &IngestReport) {
let stdout = std::io::stdout();
let mut lock = stdout.lock();
let _ = write_follow_pass_json(&mut lock, iteration, report);
}
pub fn print_follow_exit_json(exit: &FollowExit) -> Result<()> {
let stdout = std::io::stdout();
let mut lock = stdout.lock();
write_follow_exit_json(&mut lock, exit)
}
pub fn print_follow_pass(iteration: usize, report: &IngestReport) {
let active = !report.ingested.is_empty() || !report.skipped.is_empty();
println!(
"[follow pass={} ingested={} skipped={} ignored={}]",
iteration,
report.ingested.len(),
report.skipped.len(),
report.ignored,
);
if active {
print_text(report);
}
}
pub fn print_text(report: &IngestReport) {
for s in &report.ingested {
println!(
"ingested source={} chunks={} bytes={} kind={} uri={}",
s.source_id, s.chunks, s.bytes, s.kind, s.uri
);
}
for s in &report.skipped {
println!("skipped uri={} reason={}", s.uri, s.reason);
}
if report.ignored > 0 {
println!(
"summary ingested={} skipped={} ignored={}",
report.ingested.len(),
report.skipped.len(),
report.ignored
);
} else {
println!(
"summary ingested={} skipped={}",
report.ingested.len(),
report.skipped.len()
);
}
}
pub fn print_json(report: &IngestReport) -> Result<()> {
println!("{}", serde_json::to_string_pretty(report)?);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn persists_jsonl_metadata_into_chunks_table() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
let bytes = br#"{"session_id":"sess-42","turn_id":"turn-3","parentMessageId":"turn-2","tool_name":"search","tool_call_id":"call-42","timestamp":1700000002,"content":"tool output"}
"#;
let report = ingest_stdin(
&mut store,
"stdin://sess-42",
Some("application/jsonl"),
bytes,
)
.unwrap();
assert_eq!(report.ingested.len(), 1);
let row = store
.conn()
.query_row(
"SELECT text, role, session_id, turn_id, parent_turn_id, tool_name, tool_call_id, timestamp_unix FROM chunks",
[],
|row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, Option<String>>(1)?,
row.get::<_, Option<String>>(2)?,
row.get::<_, Option<String>>(3)?,
row.get::<_, Option<String>>(4)?,
row.get::<_, Option<String>>(5)?,
row.get::<_, Option<String>>(6)?,
row.get::<_, Option<i64>>(7)?,
))
},
)
.unwrap();
assert_eq!(row.0, "[tool:search] tool output");
assert_eq!(row.1.as_deref(), None);
assert_eq!(row.2.as_deref(), Some("sess-42"));
assert_eq!(row.3.as_deref(), Some("turn-3"));
assert_eq!(row.4.as_deref(), Some("turn-2"));
assert_eq!(row.5.as_deref(), Some("search"));
assert_eq!(row.6.as_deref(), Some("call-42"));
assert_eq!(row.7, Some(1_700_000_002));
}
#[test]
fn persists_jsonl_project_into_chunks_table() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
let bytes = br#"{"session_id":"sess-99","project":"lantern","content":"hello"}
{"session_id":"sess-99","repository":"https://github.com/diogenes/lantern","content":"hello again"}
{"session_id":"sess-99","content":"no project"}
"#;
let report = ingest_stdin(
&mut store,
"stdin://sess-99",
Some("application/jsonl"),
bytes,
)
.unwrap();
assert_eq!(report.ingested.len(), 1);
let mut rows: Vec<Option<String>> = store
.conn()
.prepare("SELECT project FROM chunks ORDER BY ordinal")
.unwrap()
.query_map([], |row| row.get::<_, Option<String>>(0))
.unwrap()
.map(|r| r.unwrap())
.collect();
assert_eq!(rows.len(), 3);
assert_eq!(rows.remove(0).as_deref(), Some("lantern"));
assert_eq!(
rows.remove(0).as_deref(),
Some("https://github.com/diogenes/lantern")
);
assert_eq!(rows.remove(0), None);
}
#[test]
fn persists_jsonl_user_into_chunks_table() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
let bytes = br#"{"session_id":"sess-99","user":"alice","content":"hello"}
{"session_id":"sess-99","author":"Bob Example","content":"hello again"}
{"session_id":"sess-99","content":"no user"}
"#;
let report = ingest_stdin(
&mut store,
"stdin://sess-99-user",
Some("application/jsonl"),
bytes,
)
.unwrap();
assert_eq!(report.ingested.len(), 1);
let mut rows: Vec<Option<String>> = store
.conn()
.prepare("SELECT user FROM chunks ORDER BY ordinal")
.unwrap()
.query_map([], |row| row.get::<_, Option<String>>(0))
.unwrap()
.map(|r| r.unwrap())
.collect();
assert_eq!(rows.len(), 3);
assert_eq!(rows.remove(0).as_deref(), Some("alice"));
assert_eq!(rows.remove(0).as_deref(), Some("Bob Example"));
assert_eq!(rows.remove(0), None);
}
#[test]
fn persists_jsonl_topic_into_chunks_table() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
let bytes = br#"{"session_id":"sess-99","topic":"onboarding","content":"hello"}
{"session_id":"sess-99","subject":"Re: contract review","content":"hello again"}
{"session_id":"sess-99","content":"no topic"}
"#;
let report = ingest_stdin(
&mut store,
"stdin://sess-99-topic",
Some("application/jsonl"),
bytes,
)
.unwrap();
assert_eq!(report.ingested.len(), 1);
let mut rows: Vec<Option<String>> = store
.conn()
.prepare("SELECT topic FROM chunks ORDER BY ordinal")
.unwrap()
.query_map([], |row| row.get::<_, Option<String>>(0))
.unwrap()
.map(|r| r.unwrap())
.collect();
assert_eq!(rows.len(), 3);
assert_eq!(rows.remove(0).as_deref(), Some("onboarding"));
assert_eq!(rows.remove(0).as_deref(), Some("Re: contract review"));
assert_eq!(rows.remove(0), None);
}
#[test]
fn ingest_populates_url_entities_and_links_them_to_chunks() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
let bytes =
b"see https://example.com/a and https://example.com/a, plus https://other.test/page.\n";
let report =
ingest_stdin(&mut store, "stdin://entities", Some("text/plain"), bytes).unwrap();
assert_eq!(report.ingested.len(), 1);
let mut entity_values: Vec<(String, String)> = store
.conn()
.prepare("SELECT kind, value FROM entities ORDER BY kind, value")
.unwrap()
.query_map([], |row| {
Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
})
.unwrap()
.map(|r| r.unwrap())
.collect();
entity_values.sort();
assert_eq!(
entity_values,
vec![
("domain".to_string(), "example.com".to_string()),
("domain".to_string(), "other.test".to_string()),
("url".to_string(), "https://example.com/a".to_string()),
("url".to_string(), "https://other.test/page".to_string()),
],
);
let edge_count: i64 = store
.conn()
.query_row(
"SELECT COUNT(*) FROM chunk_entities ce
JOIN chunks c ON c.id = ce.chunk_id
JOIN sources s ON s.id = c.source_id
WHERE s.uri = 'stdin://entities'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(edge_count, 4);
let report2 =
ingest_stdin(&mut store, "stdin://entities", Some("text/plain"), bytes).unwrap();
assert_eq!(
report2.ingested.len(),
0,
"unchanged content must short-circuit"
);
let entity_count: i64 = store
.conn()
.query_row("SELECT COUNT(*) FROM entities", [], |row| row.get(0))
.unwrap();
assert_eq!(entity_count, 4);
}
#[test]
fn plain_text_tsx_files_are_supported_for_ingest() {
let dir = tempdir().unwrap();
let file = dir.path().join("page.tsx");
std::fs::write(
&file,
"export default function Page() { return <div>Hello</div>; }",
)
.unwrap();
assert!(is_supported_file(&file));
}
#[test]
fn extensionless_utf8_files_are_supported_for_ingest() {
let dir = tempdir().unwrap();
let file = dir.path().join("README");
std::fs::write(&file, "just some plain text without an extension").unwrap();
assert!(is_supported_file(&file));
}
#[test]
fn skipped_sources_include_structured_reason_codes_in_json() {
let report = IngestReport {
ingested: Vec::new(),
skipped: vec![SkippedSource {
uri: "file:///tmp/huge.log".into(),
skipped_reason: Some("too_large".into()),
reason: "file is 52428801 bytes; exceeds 52428800-byte limit".into(),
}],
ignored: 0,
};
let json = serde_json::to_value(&report).unwrap();
let skipped = json["skipped"].as_array().unwrap();
assert_eq!(skipped.len(), 1);
assert_eq!(skipped[0]["skipped_reason"], "too_large");
assert_eq!(
skipped[0]["reason"],
"file is 52428801 bytes; exceeds 52428800-byte limit"
);
}
#[test]
fn persists_jsonl_thread_into_chunks_table() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
let bytes = br#"{"session_id":"sess-99","thread":"telegram:ops","content":"hello"}
{"session_id":"sess-99","threadId":"discord:eng","content":"hello again"}
{"session_id":"sess-99","content":"no thread"}
"#;
let report = ingest_stdin(
&mut store,
"stdin://sess-99-thread",
Some("application/jsonl"),
bytes,
)
.unwrap();
assert_eq!(report.ingested.len(), 1);
let mut rows: Vec<Option<String>> = store
.conn()
.prepare("SELECT thread FROM chunks ORDER BY ordinal")
.unwrap()
.query_map([], |row| row.get::<_, Option<String>>(0))
.unwrap()
.map(|r| r.unwrap())
.collect();
assert_eq!(rows.len(), 3);
assert_eq!(rows.remove(0).as_deref(), Some("telegram:ops"));
assert_eq!(rows.remove(0).as_deref(), Some("discord:eng"));
assert_eq!(rows.remove(0), None);
}
fn empty_report() -> IngestReport {
IngestReport {
ingested: Vec::new(),
skipped: Vec::new(),
ignored: 0,
}
}
fn ingested_only_report() -> IngestReport {
IngestReport {
ingested: vec![IngestedSource {
source_id: "src-1".into(),
uri: "file:///tmp/sess.jsonl".into(),
path: Some("/tmp/sess.jsonl".into()),
kind: "application/jsonl".into(),
bytes: 42,
content_sha256: "deadbeef".into(),
chunks: 1,
}],
skipped: Vec::new(),
ignored: 0,
}
}
#[test]
fn follow_start_json_includes_path_and_interval() {
let mut buf = Vec::new();
write_follow_start_json(
&mut buf,
Path::new("/tmp/watch"),
Duration::from_secs(5),
None,
)
.unwrap();
let line = std::str::from_utf8(&buf).unwrap();
assert!(line.ends_with('\n'), "follow_start must terminate the line");
let value: serde_json::Value = serde_json::from_str(line.trim_end()).unwrap();
assert_eq!(value["event"], "follow_start");
assert_eq!(value["path"], "/tmp/watch");
assert_eq!(value["interval_secs"], 5);
assert!(value.get("idle_timeout_secs").is_none());
}
#[test]
fn follow_start_json_includes_idle_timeout_when_set() {
let mut buf = Vec::new();
write_follow_start_json(
&mut buf,
Path::new("/tmp/watch"),
Duration::from_secs(2),
Some(Duration::from_secs(30)),
)
.unwrap();
let value: serde_json::Value = serde_json::from_slice(&buf).unwrap();
assert_eq!(value["idle_timeout_secs"], 30);
}
#[test]
fn follow_pass_json_carries_pass_index_and_report() {
let mut buf = Vec::new();
let report = ingested_only_report();
write_follow_pass_json(&mut buf, 7, &report).unwrap();
let value: serde_json::Value = serde_json::from_slice(&buf).unwrap();
assert_eq!(value["event"], "follow_pass");
assert_eq!(value["pass"], 7);
let report_field = &value["report"];
assert_eq!(report_field["ingested"].as_array().unwrap().len(), 1);
assert_eq!(report_field["ingested"][0]["source_id"], "src-1");
assert_eq!(report_field["ingested"][0]["chunks"], 1);
assert_eq!(report_field["skipped"].as_array().unwrap().len(), 0);
}
#[test]
fn follow_pass_json_empty_report_still_one_line() {
let mut buf = Vec::new();
let report = empty_report();
write_follow_pass_json(&mut buf, 0, &report).unwrap();
let s = std::str::from_utf8(&buf).unwrap();
assert_eq!(s.matches('\n').count(), 1);
assert!(s.ends_with('\n'));
let value: serde_json::Value = serde_json::from_str(s.trim_end()).unwrap();
assert_eq!(value["event"], "follow_pass");
assert_eq!(value["pass"], 0);
}
#[test]
fn follow_exit_json_max_iterations_has_no_timeout_field() {
let mut buf = Vec::new();
write_follow_exit_json(&mut buf, &FollowExit::MaxIterations).unwrap();
let value: serde_json::Value = serde_json::from_slice(&buf).unwrap();
assert_eq!(value["event"], "follow_exit");
assert_eq!(value["reason"], "max_iterations");
assert!(value.get("idle_timeout_secs").is_none());
}
#[test]
fn follow_exit_json_idle_timeout_carries_seconds() {
let mut buf = Vec::new();
write_follow_exit_json(&mut buf, &FollowExit::IdleTimeout(Duration::from_secs(45)))
.unwrap();
let value: serde_json::Value = serde_json::from_slice(&buf).unwrap();
assert_eq!(value["event"], "follow_exit");
assert_eq!(value["reason"], "idle_timeout");
assert_eq!(value["idle_timeout_secs"], 45);
}
#[test]
fn follow_event_stream_is_newline_delimited_json() {
let mut buf = Vec::new();
write_follow_start_json(
&mut buf,
Path::new("/tmp/watch"),
Duration::from_secs(5),
Some(Duration::from_secs(30)),
)
.unwrap();
let report = ingested_only_report();
write_follow_pass_json(&mut buf, 0, &report).unwrap();
write_follow_exit_json(&mut buf, &FollowExit::IdleTimeout(Duration::from_secs(30)))
.unwrap();
let text = std::str::from_utf8(&buf).unwrap();
let lines: Vec<&str> = text.lines().collect();
assert_eq!(lines.len(), 3, "expected three JSON-Lines events");
let events: Vec<serde_json::Value> = lines
.iter()
.map(|l| serde_json::from_str::<serde_json::Value>(l).unwrap())
.collect();
assert_eq!(events[0]["event"], "follow_start");
assert_eq!(events[1]["event"], "follow_pass");
assert_eq!(events[2]["event"], "follow_exit");
}
}