relayburn-cli 2.7.1

The `burn` CLI — published to crates.io. Crate name is relayburn-cli because `burn` is taken on crates.io; the binary keeps the `burn` invocation.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
//! `burn ingest` — passive-ingest entrypoint. No flags scans every
//! known session store once; `--watch` keeps polling; `--hook claude
//! --quiet` is the stdin-driven Claude hook path.
//!
//! Thin presenter over the SDK ingest verb plus the SDK's watch-loop
//! controller. TS source of truth: `packages/cli/src/commands/ingest.ts`
//! plus `packages/ingest/src/watch-loop.ts`.
//!
//! The Rust port keeps the three modes as a single subcommand so
//! `burn ingest` retains its TS muscle memory:
//!
//! - No flags = `runIngestOnce` — one full sweep, then exit.
//! - `--watch` = `runIngestWatch` — foreground poll loop until SIGINT
//!   / SIGTERM.
//! - `--hook claude` = `runIngestHook` — stdin-driven hook payload.
//!   Today only `--hook claude` is wired here (Codex / OpenCode hooks
//!   were never part of the TS surface either). The hook path
//!   currently ingests via a full `ingest_all` sweep, since the SDK
//!   does not yet expose a single-transcript verb. Practically this
//!   is no slower than the TS hook because Claude hooks fire at
//!   session-end and the sweep short-circuits on unchanged cursors;
//!   the cost is bounded by the number of new sessions, not by the
//!   hook payload.
//!
//! Output shape: every successful run writes a single
//! `[burn] ingest: ingested N session(s) (+M turn(s))` line. The
//! one-shot path emits it on **stdout** so pipelines can capture the
//! summary (matching the TS `runIngestOnce` source-of-truth at
//! `packages/cli/src/commands/ingest.ts:121-126`); `--watch` and
//! `--hook` modes log on **stderr** so the foreground banner / hook
//! breadcrumbs don't pollute downstream stdout consumers. `--quiet`
//! (only valid with `--hook`) suppresses the hook breadcrumb when the
//! report is empty.

use std::io::{self, Read};
use std::sync::Arc;
use std::time::Duration;

use relayburn_sdk::{
    ingest_all, start_watch_loop, IngestReport, Ledger, LedgerHandle, LedgerOpenOptions,
    StartWatchLoopOptions,
};

use crate::cli::{GlobalArgs, IngestArgs};
use crate::render::error::report_error;
use crate::render::progress::TaskProgress;

/// Exit codes mirror the TS CLI:
/// - `0` happy path (including hook-mode empty-payload no-op).
/// - `1` typed/unknown errors during a non-watch run (parse, IO).
/// - `2` flag misuse (`--watch` + `--hook`, unsupported `--hook`,
///   `--hook` without value, `--interval` not a positive integer).
const EXIT_FLAG_MISUSE: i32 = 2;

/// Entrypoint for the `burn ingest` subcommand. Dispatches on the flag
/// triple (`watch`, `hook`, default) and lets the SDK do the heavy
/// lifting.
pub fn run(globals: &GlobalArgs, args: IngestArgs) -> i32 {
    // Mutually-exclusive guard: TS rejects `--watch --hook` with exit 2
    // before doing any IO. Mirror that here so flag misuse gets a stable
    // shell-script-friendly contract.
    if args.watch && args.hook.is_some() {
        eprintln!("burn: ingest --watch and --hook are mutually exclusive");
        return EXIT_FLAG_MISUSE;
    }

    if let Some(hook) = args.hook.as_deref() {
        return run_hook(globals, hook, args.quiet);
    }
    if args.watch {
        return run_watch(globals, &args);
    }
    run_once(globals, args.quiet)
}

/// One-shot scan: open the ledger, run a single `ingest_all`, log the
/// summary, exit. Drives a current-thread tokio runtime so the otherwise
/// sync presenter can drive the async SDK verb.
///
/// Summary line is emitted on **stdout** (matching TS `runIngestOnce`
/// at `packages/cli/src/commands/ingest.ts:121-126`) so callers can
/// capture pipeline output without redirecting stderr.
fn run_once(globals: &GlobalArgs, quiet: bool) -> i32 {
    let _ = quiet; // `--quiet` is hook-only (clap `requires = "hook"`); kept in
                   // the dispatch signature for symmetry with run_watch / run_hook.
    let progress = TaskProgress::new(globals, "ingest");
    progress.set_task("opening ledger");
    let mut handle = match open_handle(globals) {
        Ok(h) => h,
        Err(err) => {
            progress.finish_and_clear();
            return report_error(&err, globals);
        }
    };
    progress.set_task("starting runtime");
    let rt = match tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
    {
        Ok(rt) => rt,
        Err(err) => {
            progress.finish_and_clear();
            return report_error(&err, globals);
        }
    };
    progress.set_task("scanning sessions");
    let opts = progress.ingest_options(globals.ledger_path.clone());
    let result = rt.block_on(ingest_all(handle.raw_mut(), &opts));
    progress.finish_and_clear();
    match result {
        Ok(report) => {
            log_report_oneshot(&report);
            0
        }
        Err(err) => report_error(&err, globals),
    }
}

/// `--watch` mode: spin up [`start_watch_loop`] over a persistent ledger
/// handle and a tokio runtime, then park on SIGINT / SIGTERM.
///
/// We share the ledger handle across ticks via an `Arc<Mutex>` so the
/// poll loop reuses one open SQLite connection per process — same shape
/// as the TS adapter, which keeps a single `withLock('ledger', …)`
/// guarded handle alive for the duration of the watch. `RawIngestOptions`
/// is `Default` per tick because none of the per-tick state (progress
/// callbacks, etc.) needs to survive across ticks.
fn run_watch(globals: &GlobalArgs, args: &IngestArgs) -> i32 {
    let interval_ms = match args.interval {
        Some(0) => {
            eprintln!("burn: ingest --interval must be a positive integer in milliseconds");
            return EXIT_FLAG_MISUSE;
        }
        Some(n) => n,
        None => 1000,
    };

    let progress = TaskProgress::new(globals, "ingest");
    progress.set_task("opening ledger");
    let handle = match open_handle(globals) {
        Ok(h) => h,
        Err(err) => {
            progress.finish_and_clear();
            return report_error(&err, globals);
        }
    };

    progress.set_task("starting watcher");
    let rt = match tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
    {
        Ok(rt) => rt,
        Err(err) => {
            progress.finish_and_clear();
            return report_error(&err, globals);
        }
    };

    let quiet = args.quiet;
    let watch_message = format!("watching every {interval_ms}ms; Ctrl-C to stop");
    if !quiet {
        if progress.is_visible() {
            progress.set_task(watch_message.clone());
        } else {
            eprintln!("[burn] ingest: foreground ingest every {interval_ms}ms; Ctrl-C to stop",);
        }
    }

    let ledger_home = globals.ledger_path.clone();
    let progress_for_loop = progress.clone();
    rt.block_on(async move {
        let handle_arc: Arc<tokio::sync::Mutex<LedgerHandle>> =
            Arc::new(tokio::sync::Mutex::new(handle));
        let handle_for_ingest = handle_arc.clone();
        let progress_for_ingest = progress_for_loop.clone();
        let watch_message_for_ingest = watch_message.clone();
        let ingest_fn: relayburn_sdk::IngestFn = Arc::new(move || {
            let h = handle_for_ingest.clone();
            let progress = progress_for_ingest.clone();
            let ledger_home = ledger_home.clone();
            let watch_message = watch_message_for_ingest.clone();
            Box::pin(async move {
                progress.set_task("scanning sessions");
                let mut guard = h.lock().await;
                let opts = if quiet {
                    TaskProgress::quiet_ingest_options(ledger_home)
                } else {
                    progress.ingest_options(ledger_home)
                };
                let result = ingest_all(guard.raw_mut(), &opts).await;
                progress.set_task(watch_message);
                result
            })
        });

        let progress_for_report = progress_for_loop.clone();
        let on_report: relayburn_sdk::ReportSink = Arc::new(move |report: &IngestReport| {
            // Match TS: only log a summary when the tick actually
            // appended turns. Empty ticks would otherwise drown the
            // user with zero-progress lines.
            if !quiet && report.appended_turns > 0 {
                progress_for_report.suspend(|| {
                    eprint!("{}", render_ingest_line(report));
                });
            }
        });

        let progress_for_error = progress_for_loop.clone();
        let on_error: relayburn_sdk::ErrorSink = Arc::new(move |err: &anyhow::Error| {
            progress_for_error.suspend(|| {
                eprintln!("[burn] ingest: {err}");
            });
        });

        let opts = StartWatchLoopOptions::new(ingest_fn)
            .with_interval(Duration::from_millis(interval_ms))
            .with_immediate(true)
            .with_on_report(on_report)
            .with_on_error(on_error);
        let controller = start_watch_loop(opts);

        wait_for_stop_signal().await;
        controller.stop().await;
    });
    progress.finish_and_clear();

    0
}

/// `--hook <harness>`: read a JSON payload from stdin and ingest the
/// transcript it references. Today only `--hook claude` is supported.
///
/// The TS implementation tries hard not to fail Claude Code hooks (a
/// non-zero exit can block the surrounding tool call); the Rust port
/// keeps that policy — every error is logged to stderr but the exit
/// code is `0` so the calling Claude Code session continues.
fn run_hook(globals: &GlobalArgs, hook: &str, quiet: bool) -> i32 {
    if hook != "claude" {
        eprintln!("burn: unsupported hook harness: {hook}");
        return EXIT_FLAG_MISUSE;
    }
    let raw = match read_stdin() {
        Ok(s) => s,
        Err(err) => {
            // Hook callers expect us not to break the parent. Log + 0.
            eprintln!("[burn] ingest: failed to read stdin: {err}");
            return 0;
        }
    };
    if raw.trim().is_empty() {
        if !quiet {
            eprintln!("[burn] ingest: empty stdin payload, nothing to do");
        }
        return 0;
    }

    // Validate the payload shape so we don't trigger a full sweep on
    // unrelated stdin content. The TS hook ignores payloads missing
    // `session_id` / `transcript_path`; mirror that.
    match serde_json::from_str::<serde_json::Value>(&raw) {
        Ok(v) => {
            let has_session = v.get("session_id").and_then(|x| x.as_str()).is_some();
            let has_transcript = v.get("transcript_path").and_then(|x| x.as_str()).is_some();
            if !has_session || !has_transcript {
                if !quiet {
                    eprintln!(
                        "[burn] ingest: payload missing session_id or transcript_path; ignoring",
                    );
                }
                return 0;
            }
        }
        Err(err) => {
            eprintln!("[burn] ingest: invalid JSON payload: {err}");
            return 0;
        }
    }

    // Drive a full sweep. The SDK does not (yet) expose a
    // single-transcript verb; `ingest_all` short-circuits unchanged
    // cursors so the practical cost is bounded by the new turns this
    // hook fires for. Matches the TS hook's "ingest the matching
    // session" intent — the Claude transcript that just changed will
    // be picked up by `ingest_claude_into` on the same sweep.
    let progress = (!quiet).then(|| TaskProgress::new(globals, "ingest"));
    if let Some(progress) = &progress {
        progress.set_task("opening ledger");
    }
    let mut handle = match open_handle(globals) {
        Ok(h) => h,
        Err(err) => {
            // Hook policy: never fail the parent.
            if let Some(progress) = &progress {
                progress.finish_and_clear();
            }
            eprintln!("[burn] ingest: {err}");
            return 0;
        }
    };
    if let Some(progress) = &progress {
        progress.set_task("starting runtime");
    }
    let rt = match tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
    {
        Ok(rt) => rt,
        Err(err) => {
            if let Some(progress) = &progress {
                progress.finish_and_clear();
            }
            eprintln!("[burn] ingest: {err}");
            return 0;
        }
    };
    if let Some(progress) = &progress {
        progress.set_task("scanning sessions");
    }
    let opts = match &progress {
        Some(progress) => progress.ingest_options(globals.ledger_path.clone()),
        None => TaskProgress::quiet_ingest_options(globals.ledger_path.clone()),
    };
    let result = rt.block_on(ingest_all(handle.raw_mut(), &opts));
    if let Some(progress) = &progress {
        progress.finish_and_clear();
    }
    match result {
        Ok(report) => {
            // In hook mode we keep stderr quiet by default; only log
            // when work was actually done so a per-tool-call hook
            // doesn't spam the user.
            if !quiet && report.appended_turns > 0 {
                eprint!("{}", render_ingest_line(&report));
            }
        }
        Err(err) => {
            eprintln!("[burn] ingest: {err}");
        }
    }
    0
}

/// Open a ledger honoring the global `--ledger-path` override.
fn open_handle(globals: &GlobalArgs) -> anyhow::Result<LedgerHandle> {
    let opts = match globals.ledger_path.as_deref() {
        Some(h) => LedgerOpenOptions::with_home(h),
        None => LedgerOpenOptions::default(),
    };
    Ledger::open(opts)
}

/// Format an `IngestReport` as the canonical TS log line. Kept as a
/// pure helper so the watch loop and one-shot mode share output shape.
fn render_ingest_line(report: &IngestReport) -> String {
    let session_word = if report.ingested_sessions == 1 {
        "session"
    } else {
        "sessions"
    };
    let turn_word = if report.appended_turns == 1 {
        "turn"
    } else {
        "turns"
    };
    format!(
        "[burn] ingest: ingested {} {session_word} (+{} {turn_word})\n",
        report.ingested_sessions, report.appended_turns,
    )
}

/// Log the canonical `[burn] ingest: ...` line on **stdout** for the
/// one-shot path. TS source of truth: `runIngestOnce` at
/// `packages/cli/src/commands/ingest.ts:121-126` writes the rendered
/// report via `process.stdout.write`, so pipelines that capture stdout
/// see the summary. `--watch` and `--hook` keep their own stderr
/// emitters (`render_ingest_line` is the shared formatter).
fn log_report_oneshot(report: &IngestReport) {
    print!("{}", render_ingest_line(report));
}

/// Read all of stdin into a String. Returns empty string when stdin is
/// a TTY (no payload) — TS uses the same `process.stdin.isTTY` guard.
fn read_stdin() -> io::Result<String> {
    use std::io::IsTerminal;
    let stdin = io::stdin();
    if stdin.is_terminal() {
        return Ok(String::new());
    }
    let mut buf = String::new();
    stdin.lock().read_to_string(&mut buf)?;
    Ok(buf)
}

/// Park until SIGINT or SIGTERM. Cross-platform via tokio's `ctrl_c` for
/// SIGINT; SIGTERM is wired only on Unix because Windows lacks the
/// signal. The watch loop's controller will drain in-flight ticks before
/// returning so callers see all observable side effects.
async fn wait_for_stop_signal() {
    #[cfg(unix)]
    {
        use tokio::signal::unix::{signal, SignalKind};
        let mut sigterm = match signal(SignalKind::terminate()) {
            Ok(s) => s,
            Err(_) => {
                // If we can't install SIGTERM, fall back to ctrl_c only.
                let _ = tokio::signal::ctrl_c().await;
                return;
            }
        };
        tokio::select! {
            _ = tokio::signal::ctrl_c() => {}
            _ = sigterm.recv() => {}
        }
    }
    #[cfg(not(unix))]
    {
        let _ = tokio::signal::ctrl_c().await;
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn render_ingest_line_pluralizes_consistently() {
        let one = render_ingest_line(&IngestReport {
            scanned_sessions: 1,
            ingested_sessions: 1,
            appended_turns: 1,
            applied_pending_stamps: 0,
        });
        assert_eq!(one, "[burn] ingest: ingested 1 session (+1 turn)\n");

        let many = render_ingest_line(&IngestReport {
            scanned_sessions: 3,
            ingested_sessions: 2,
            appended_turns: 5,
            applied_pending_stamps: 0,
        });
        assert_eq!(many, "[burn] ingest: ingested 2 sessions (+5 turns)\n");

        let zero = render_ingest_line(&IngestReport::default());
        assert_eq!(zero, "[burn] ingest: ingested 0 sessions (+0 turns)\n");
    }
}