use std::io::{self, Read};
use std::sync::Arc;
use std::time::Duration;
use relayburn_sdk::{
ingest_all, start_watch_loop, IngestReport, Ledger, LedgerHandle, LedgerOpenOptions,
StartWatchLoopOptions,
};
use crate::cli::{GlobalArgs, IngestArgs};
use crate::render::error::report_error;
use crate::render::progress::TaskProgress;
const EXIT_FLAG_MISUSE: i32 = 2;
pub fn run(globals: &GlobalArgs, args: IngestArgs) -> i32 {
if args.watch && args.hook.is_some() {
eprintln!("burn: ingest --watch and --hook are mutually exclusive");
return EXIT_FLAG_MISUSE;
}
if let Some(hook) = args.hook.as_deref() {
return run_hook(globals, hook, args.quiet);
}
if args.watch {
return run_watch(globals, &args);
}
run_once(globals, args.quiet)
}
fn run_once(globals: &GlobalArgs, quiet: bool) -> i32 {
let _ = quiet; let progress = TaskProgress::new(globals, "ingest");
progress.set_task("opening ledger");
let mut handle = match open_handle(globals) {
Ok(h) => h,
Err(err) => {
progress.finish_and_clear();
return report_error(&err, globals);
}
};
progress.set_task("starting runtime");
let rt = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(rt) => rt,
Err(err) => {
progress.finish_and_clear();
return report_error(&err, globals);
}
};
progress.set_task("scanning sessions");
let opts = progress.ingest_options(globals.ledger_path.clone());
let result = rt.block_on(ingest_all(handle.raw_mut(), &opts));
progress.finish_and_clear();
match result {
Ok(report) => {
log_report_oneshot(&report);
0
}
Err(err) => report_error(&err, globals),
}
}
fn run_watch(globals: &GlobalArgs, args: &IngestArgs) -> i32 {
let interval_ms = match args.interval {
Some(0) => {
eprintln!("burn: ingest --interval must be a positive integer in milliseconds");
return EXIT_FLAG_MISUSE;
}
Some(n) => n,
None => 1000,
};
let progress = TaskProgress::new(globals, "ingest");
progress.set_task("opening ledger");
let handle = match open_handle(globals) {
Ok(h) => h,
Err(err) => {
progress.finish_and_clear();
return report_error(&err, globals);
}
};
progress.set_task("starting watcher");
let rt = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(rt) => rt,
Err(err) => {
progress.finish_and_clear();
return report_error(&err, globals);
}
};
let quiet = args.quiet;
let watch_message = format!("watching every {interval_ms}ms; Ctrl-C to stop");
if !quiet {
if progress.is_visible() {
progress.set_task(watch_message.clone());
} else {
eprintln!("[burn] ingest: foreground ingest every {interval_ms}ms; Ctrl-C to stop",);
}
}
let ledger_home = globals.ledger_path.clone();
let progress_for_loop = progress.clone();
rt.block_on(async move {
let handle_arc: Arc<tokio::sync::Mutex<LedgerHandle>> =
Arc::new(tokio::sync::Mutex::new(handle));
let handle_for_ingest = handle_arc.clone();
let progress_for_ingest = progress_for_loop.clone();
let watch_message_for_ingest = watch_message.clone();
let ingest_fn: relayburn_sdk::IngestFn = Arc::new(move || {
let h = handle_for_ingest.clone();
let progress = progress_for_ingest.clone();
let ledger_home = ledger_home.clone();
let watch_message = watch_message_for_ingest.clone();
Box::pin(async move {
progress.set_task("scanning sessions");
let mut guard = h.lock().await;
let opts = if quiet {
TaskProgress::quiet_ingest_options(ledger_home)
} else {
progress.ingest_options(ledger_home)
};
let result = ingest_all(guard.raw_mut(), &opts).await;
progress.set_task(watch_message);
result
})
});
let progress_for_report = progress_for_loop.clone();
let on_report: relayburn_sdk::ReportSink = Arc::new(move |report: &IngestReport| {
if !quiet && report.appended_turns > 0 {
progress_for_report.suspend(|| {
eprint!("{}", render_ingest_line(report));
});
}
});
let progress_for_error = progress_for_loop.clone();
let on_error: relayburn_sdk::ErrorSink = Arc::new(move |err: &anyhow::Error| {
progress_for_error.suspend(|| {
eprintln!("[burn] ingest: {err}");
});
});
let opts = StartWatchLoopOptions::new(ingest_fn)
.with_interval(Duration::from_millis(interval_ms))
.with_immediate(true)
.with_on_report(on_report)
.with_on_error(on_error);
let controller = start_watch_loop(opts);
wait_for_stop_signal().await;
controller.stop().await;
});
progress.finish_and_clear();
0
}
fn run_hook(globals: &GlobalArgs, hook: &str, quiet: bool) -> i32 {
if hook != "claude" {
eprintln!("burn: unsupported hook harness: {hook}");
return EXIT_FLAG_MISUSE;
}
let raw = match read_stdin() {
Ok(s) => s,
Err(err) => {
eprintln!("[burn] ingest: failed to read stdin: {err}");
return 0;
}
};
if raw.trim().is_empty() {
if !quiet {
eprintln!("[burn] ingest: empty stdin payload, nothing to do");
}
return 0;
}
match serde_json::from_str::<serde_json::Value>(&raw) {
Ok(v) => {
let has_session = v.get("session_id").and_then(|x| x.as_str()).is_some();
let has_transcript = v.get("transcript_path").and_then(|x| x.as_str()).is_some();
if !has_session || !has_transcript {
if !quiet {
eprintln!(
"[burn] ingest: payload missing session_id or transcript_path; ignoring",
);
}
return 0;
}
}
Err(err) => {
eprintln!("[burn] ingest: invalid JSON payload: {err}");
return 0;
}
}
let progress = (!quiet).then(|| TaskProgress::new(globals, "ingest"));
if let Some(progress) = &progress {
progress.set_task("opening ledger");
}
let mut handle = match open_handle(globals) {
Ok(h) => h,
Err(err) => {
if let Some(progress) = &progress {
progress.finish_and_clear();
}
eprintln!("[burn] ingest: {err}");
return 0;
}
};
if let Some(progress) = &progress {
progress.set_task("starting runtime");
}
let rt = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(rt) => rt,
Err(err) => {
if let Some(progress) = &progress {
progress.finish_and_clear();
}
eprintln!("[burn] ingest: {err}");
return 0;
}
};
if let Some(progress) = &progress {
progress.set_task("scanning sessions");
}
let opts = match &progress {
Some(progress) => progress.ingest_options(globals.ledger_path.clone()),
None => TaskProgress::quiet_ingest_options(globals.ledger_path.clone()),
};
let result = rt.block_on(ingest_all(handle.raw_mut(), &opts));
if let Some(progress) = &progress {
progress.finish_and_clear();
}
match result {
Ok(report) => {
if !quiet && report.appended_turns > 0 {
eprint!("{}", render_ingest_line(&report));
}
}
Err(err) => {
eprintln!("[burn] ingest: {err}");
}
}
0
}
fn open_handle(globals: &GlobalArgs) -> anyhow::Result<LedgerHandle> {
let opts = match globals.ledger_path.as_deref() {
Some(h) => LedgerOpenOptions::with_home(h),
None => LedgerOpenOptions::default(),
};
Ledger::open(opts)
}
fn render_ingest_line(report: &IngestReport) -> String {
let session_word = if report.ingested_sessions == 1 {
"session"
} else {
"sessions"
};
let turn_word = if report.appended_turns == 1 {
"turn"
} else {
"turns"
};
format!(
"[burn] ingest: ingested {} {session_word} (+{} {turn_word})\n",
report.ingested_sessions, report.appended_turns,
)
}
fn log_report_oneshot(report: &IngestReport) {
print!("{}", render_ingest_line(report));
}
fn read_stdin() -> io::Result<String> {
use std::io::IsTerminal;
let stdin = io::stdin();
if stdin.is_terminal() {
return Ok(String::new());
}
let mut buf = String::new();
stdin.lock().read_to_string(&mut buf)?;
Ok(buf)
}
async fn wait_for_stop_signal() {
#[cfg(unix)]
{
use tokio::signal::unix::{signal, SignalKind};
let mut sigterm = match signal(SignalKind::terminate()) {
Ok(s) => s,
Err(_) => {
let _ = tokio::signal::ctrl_c().await;
return;
}
};
tokio::select! {
_ = tokio::signal::ctrl_c() => {}
_ = sigterm.recv() => {}
}
}
#[cfg(not(unix))]
{
let _ = tokio::signal::ctrl_c().await;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn render_ingest_line_pluralizes_consistently() {
let one = render_ingest_line(&IngestReport {
scanned_sessions: 1,
ingested_sessions: 1,
appended_turns: 1,
applied_pending_stamps: 0,
});
assert_eq!(one, "[burn] ingest: ingested 1 session (+1 turn)\n");
let many = render_ingest_line(&IngestReport {
scanned_sessions: 3,
ingested_sessions: 2,
appended_turns: 5,
applied_pending_stamps: 0,
});
assert_eq!(many, "[burn] ingest: ingested 2 sessions (+5 turns)\n");
let zero = render_ingest_line(&IngestReport::default());
assert_eq!(zero, "[burn] ingest: ingested 0 sessions (+0 turns)\n");
}
}