use std::future::Future;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use relayburn_sdk::{
start_watch_loop, write_pending_stamp, IngestFn, IngestReport, PendingStampHarness,
PendingStampWriteOptions, ReportSink, StartWatchLoopOptions,
};
use super::{HarnessAdapter, PlanCtx, SpawnPlan, WatcherController};
pub type IngestSessionsFn = Arc<
dyn Fn(Option<PathBuf>) -> Pin<Box<dyn Future<Output = anyhow::Result<IngestReport>> + Send>>
+ Send
+ Sync,
>;
#[derive(Clone)]
pub struct PendingStampAdapter {
pub name: &'static str,
pub session_root: Arc<dyn Fn() -> PathBuf + Send + Sync>,
pub ingest_sessions: IngestSessionsFn,
pub watch_interval: Duration,
}
impl PendingStampAdapter {
pub fn new(
name: &'static str,
session_root: Arc<dyn Fn() -> PathBuf + Send + Sync>,
ingest_sessions: IngestSessionsFn,
) -> Self {
Self {
name,
session_root,
ingest_sessions,
watch_interval: Duration::from_millis(1000),
}
}
}
pub fn adapter(config: PendingStampAdapter) -> Box<dyn HarnessAdapter> {
Box::new(PendingStampAdapterImpl::new(config))
}
pub fn adapter_static(config: PendingStampAdapter) -> &'static dyn HarnessAdapter {
Box::leak(Box::new(PendingStampAdapterImpl::new(config)))
}
struct PendingStampAdapterImpl {
name: &'static str,
harness: PendingStampHarness,
session_root: Arc<dyn Fn() -> PathBuf + Send + Sync>,
ingest_sessions: IngestSessionsFn,
watch_interval: Duration,
}
impl PendingStampAdapterImpl {
fn new(config: PendingStampAdapter) -> Self {
let harness = match config.name {
"codex" => PendingStampHarness::Codex,
"opencode" => PendingStampHarness::Opencode,
other => {
panic!(
"pending_stamp::adapter only supports codex|opencode, got {other:?}; \
extending the protocol requires an SDK change"
)
}
};
Self {
name: config.name,
harness,
session_root: config.session_root,
ingest_sessions: config.ingest_sessions,
watch_interval: config.watch_interval,
}
}
fn ingest_fn(&self, ledger_home: Option<PathBuf>) -> IngestFn {
let ingest_sessions = self.ingest_sessions.clone();
Arc::new(move || {
let f = ingest_sessions.clone();
let ledger_home = ledger_home.clone();
Box::pin(async move { f(ledger_home).await })
})
}
fn manifest_basename(path: &Path) -> String {
path.file_name()
.map(|s| s.to_string_lossy().into_owned())
.unwrap_or_else(|| path.display().to_string())
}
}
#[async_trait]
impl HarnessAdapter for PendingStampAdapterImpl {
fn name(&self) -> &'static str {
self.name
}
fn session_root(&self) -> PathBuf {
(self.session_root)()
}
async fn plan(&self, ctx: &PlanCtx) -> anyhow::Result<SpawnPlan> {
Ok(SpawnPlan::new(self.name, ctx.passthrough.clone()))
}
async fn before_spawn(&self, ctx: &PlanCtx, _plan: &SpawnPlan) -> anyhow::Result<()> {
let session_dir_hint = (self.session_root)();
let opts = PendingStampWriteOptions {
harness: self.harness,
ledger_home: ctx.ledger_home.clone(),
cwd: ctx.cwd.to_string_lossy().into_owned(),
enrichment: ctx.tags.clone(),
session_dir_hint: Some(session_dir_hint.to_string_lossy().into_owned()),
spawn_start_ts: Some(ctx.spawn_start_ts),
spawner_pid: None,
};
let written = write_pending_stamp(opts).map_err(|err| {
anyhow::anyhow!("failed to write {} pending stamp: {err}", self.name)
})?;
eprintln!(
"[burn] {} spawn: pending stamp {}",
self.name,
Self::manifest_basename(&written.file)
);
Ok(())
}
fn start_watcher(
&self,
ctx: &PlanCtx,
on_report: ReportSink,
) -> Option<WatcherController> {
let opts = StartWatchLoopOptions::new(self.ingest_fn(ctx.ledger_home.clone()))
.with_immediate(false)
.with_interval(self.watch_interval)
.with_on_report(on_report);
Some(WatcherController::new(start_watch_loop(opts)))
}
async fn after_exit(&self, ctx: &PlanCtx, _plan: &SpawnPlan) -> anyhow::Result<IngestReport> {
(self.ingest_sessions)(ctx.ledger_home.clone()).await
}
}
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};
use relayburn_sdk::Enrichment;
use super::*;
#[tokio::test]
async fn codex_factory_round_trip() {
let session_root: Arc<dyn Fn() -> PathBuf + Send + Sync> =
Arc::new(|| PathBuf::from("/tmp/codex-sessions"));
let ingest_sessions: IngestSessionsFn =
Arc::new(|_ledger_home| Box::pin(async { Ok(IngestReport::default()) }));
let config = PendingStampAdapter::new("codex", session_root, ingest_sessions);
let adapter: Box<dyn HarnessAdapter> = adapter(config);
assert_eq!(adapter.name(), "codex");
assert_eq!(adapter.session_root(), PathBuf::from("/tmp/codex-sessions"));
let ctx = PlanCtx {
cwd: PathBuf::from("/tmp"),
passthrough: vec!["--help".into()],
tags: Enrichment::new(),
ledger_home: None,
spawn_start_ts: std::time::SystemTime::now(),
};
let plan = adapter.plan(&ctx).await.unwrap();
assert_eq!(plan.binary, "codex");
assert_eq!(plan.args, vec!["--help".to_string()]);
let report = adapter.after_exit(&ctx, &plan).await.unwrap();
assert_eq!(report.scanned_sessions, 0);
}
#[tokio::test]
async fn opencode_factory_round_trip() {
let session_root: Arc<dyn Fn() -> PathBuf + Send + Sync> =
Arc::new(|| PathBuf::from("/tmp/opencode-storage"));
let ingest_sessions: IngestSessionsFn =
Arc::new(|_ledger_home| Box::pin(async { Ok(IngestReport::default()) }));
let config = PendingStampAdapter::new("opencode", session_root, ingest_sessions);
let adapter = adapter(config);
assert_eq!(adapter.name(), "opencode");
assert_eq!(
adapter.session_root(),
PathBuf::from("/tmp/opencode-storage")
);
}
#[test]
#[should_panic(expected = "pending_stamp::adapter only supports")]
fn unknown_name_panics() {
let session_root: Arc<dyn Fn() -> PathBuf + Send + Sync> =
Arc::new(|| PathBuf::from("/tmp"));
let ingest_sessions: IngestSessionsFn =
Arc::new(|_ledger_home| Box::pin(async { Ok(IngestReport::default()) }));
let _ = adapter(PendingStampAdapter::new(
"cursor",
session_root,
ingest_sessions,
));
}
#[tokio::test]
async fn after_exit_invokes_supplied_ingest_fn() {
let count = Arc::new(AtomicUsize::new(0));
let count_for_closure = count.clone();
let session_root: Arc<dyn Fn() -> PathBuf + Send + Sync> =
Arc::new(|| PathBuf::from("/tmp/codex-sessions"));
let ingest_sessions: IngestSessionsFn = Arc::new(move |_ledger_home| {
let c = count_for_closure.clone();
Box::pin(async move {
c.fetch_add(1, Ordering::SeqCst);
Ok(IngestReport::default())
})
});
let config = PendingStampAdapter::new("codex", session_root, ingest_sessions);
let adapter = adapter(config);
let ctx = PlanCtx {
cwd: PathBuf::from("/tmp"),
passthrough: vec![],
tags: Enrichment::new(),
ledger_home: None,
spawn_start_ts: std::time::SystemTime::now(),
};
let plan = adapter.plan(&ctx).await.unwrap();
adapter.after_exit(&ctx, &plan).await.unwrap();
adapter.after_exit(&ctx, &plan).await.unwrap();
assert_eq!(count.load(Ordering::SeqCst), 2);
}
#[tokio::test]
async fn before_spawn_writes_pending_stamp_under_context_ledger_home() {
let ledger_home = tempdir("ledger-home");
let cwd = tempdir("cwd");
let session_root_path = tempdir("session-root");
let session_root: Arc<dyn Fn() -> PathBuf + Send + Sync> =
Arc::new(move || session_root_path.clone());
let ingest_sessions: IngestSessionsFn =
Arc::new(|_ledger_home| Box::pin(async { Ok(IngestReport::default()) }));
let config = PendingStampAdapter::new("codex", session_root, ingest_sessions);
let adapter = adapter(config);
let ctx = PlanCtx {
cwd: cwd.clone(),
passthrough: vec![],
tags: Enrichment::new(),
ledger_home: Some(ledger_home.clone()),
spawn_start_ts: std::time::SystemTime::now(),
};
let plan = adapter.plan(&ctx).await.unwrap();
adapter.before_spawn(&ctx, &plan).await.unwrap();
let dir = ledger_home.join("pending-stamps");
let entries: Vec<_> = std::fs::read_dir(&dir).unwrap().collect();
assert_eq!(entries.len(), 1);
}
fn tempdir(label: &str) -> PathBuf {
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0);
let dir = std::env::temp_dir().join(format!(
"burn-pending-stamp-{label}-{}-{nanos}",
std::process::id()
));
std::fs::create_dir_all(&dir).unwrap();
dir
}
}