use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use car_eventlog::{EventKind, EventLog};
use car_multi::{AgentWorkspace, WorkspaceConfig};
use super::contract::{CheckResult, OutcomeContract};
use super::router::EngineChoice;
pub type CancelFlag = Arc<AtomicBool>;
pub type EventEmitter = Arc<dyn Fn(CoderEvent) + Send + Sync>;
#[derive(Default)]
pub struct UserInputGate {
pending: Mutex<Option<tokio::sync::oneshot::Sender<String>>>,
}
impl UserInputGate {
pub fn new() -> Self {
Self::default()
}
pub fn park(&self) -> tokio::sync::oneshot::Receiver<String> {
let (tx, rx) = tokio::sync::oneshot::channel();
*self.pending.lock().expect("user-input gate poisoned") = Some(tx);
rx
}
pub fn fulfill(&self, answer: String) -> Result<(), String> {
let tx = self
.pending
.lock()
.expect("user-input gate poisoned")
.take()
.ok_or("no pending user-input request for this session")?;
tx.send(answer)
.map_err(|_| "the session is no longer waiting for input".to_string())
}
pub fn clear(&self) {
*self.pending.lock().expect("user-input gate poisoned") = None;
}
pub fn is_pending(&self) -> bool {
self.pending
.lock()
.expect("user-input gate poisoned")
.is_some()
}
}
pub fn default_state_dir() -> Result<PathBuf, String> {
let home = std::env::var_os("HOME")
.or_else(|| std::env::var_os("USERPROFILE"))
.ok_or("cannot resolve home directory (HOME/USERPROFILE unset)")?;
Ok(PathBuf::from(home).join(".car").join("coder"))
}
fn now_secs() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum CoderState {
Created,
ContractProposed,
ContractConfirmed,
Running,
NeedsApproval,
Merged,
Failed,
Abandoned,
}
impl CoderState {
pub fn is_terminal(&self) -> bool {
matches!(self, Self::Merged | Self::Failed | Self::Abandoned)
}
pub fn as_str(&self) -> &'static str {
match self {
Self::Created => "created",
Self::ContractProposed => "contract_proposed",
Self::ContractConfirmed => "contract_confirmed",
Self::Running => "running",
Self::NeedsApproval => "needs_approval",
Self::Merged => "merged",
Self::Failed => "failed",
Self::Abandoned => "abandoned",
}
}
}
pub fn can_transition(from: CoderState, to: CoderState) -> bool {
use CoderState::*;
if from.is_terminal() {
return false;
}
matches!(to, Failed | Abandoned)
|| matches!(
(from, to),
(Created, ContractProposed)
| (ContractProposed, ContractProposed) | (ContractProposed, ContractConfirmed)
| (ContractConfirmed, Running)
| (Running, NeedsApproval)
| (NeedsApproval, Merged)
)
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CoderEvent {
pub session_id: String,
pub seq: u64,
pub ts: u64,
#[serde(flatten)]
pub kind: CoderEventKind,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum CoderEventKind {
StateChanged { from: String, to: String },
ContractProposed { contract: OutcomeContract },
EngineSelected { engine: String, reason: String },
EngineFallback { from: String, to: String, reason: String },
IterationStarted { n: u32, max: u32 },
PlanText { text: String },
ToolCall { tool: String, params_preview: String },
ToolResult { tool: String, ok: bool, preview: String },
CheckStarted { name: String },
CheckCompleted { result: CheckResult },
ExternalEvent { raw: Value },
DiffReady { stat: String, patch_truncated: String },
UserInputRequested { prompt: String },
MergeCompleted { branch: String },
Error { message: String },
}
pub struct EventSink {
session_id: String,
seq: AtomicU64,
emitter: Option<EventEmitter>,
journal: Option<Mutex<EventLog>>,
}
impl EventSink {
pub fn new(
session_id: impl Into<String>,
emitter: Option<EventEmitter>,
journal_path: Option<PathBuf>,
) -> Self {
Self {
session_id: session_id.into(),
seq: AtomicU64::new(0),
emitter,
journal: journal_path.map(|p| Mutex::new(EventLog::with_journal(p))),
}
}
pub fn test_sink() -> Self {
Self::new("coder-test", None, None)
}
pub fn collecting(session_id: &str) -> (Self, Arc<Mutex<Vec<CoderEvent>>>) {
let collected: Arc<Mutex<Vec<CoderEvent>>> = Arc::new(Mutex::new(Vec::new()));
let sink_copy = collected.clone();
let emitter: EventEmitter = Arc::new(move |e| {
sink_copy.lock().expect("collector poisoned").push(e);
});
(Self::new(session_id, Some(emitter), None), collected)
}
pub fn emit(&self, kind: CoderEventKind) -> CoderEvent {
let event = CoderEvent {
session_id: self.session_id.clone(),
seq: self.seq.fetch_add(1, Ordering::SeqCst),
ts: now_secs(),
kind,
};
self.audit(&event);
if let Some(emitter) = &self.emitter {
emitter(event.clone());
}
event
}
fn audit(&self, event: &CoderEvent) {
let Some(journal) = &self.journal else { return };
let (kind, mut data): (EventKind, HashMap<String, Value>) = match &event.kind {
CoderEventKind::StateChanged { from, to } => (
EventKind::StateChanged,
HashMap::from([
("from".to_string(), Value::String(from.clone())),
("to".to_string(), Value::String(to.clone())),
]),
),
CoderEventKind::ToolCall {
tool,
params_preview,
} => (
EventKind::ActionExecuting,
HashMap::from([
("tool".to_string(), Value::String(tool.clone())),
("params".to_string(), Value::String(params_preview.clone())),
]),
),
CoderEventKind::ToolResult { tool, ok, preview } => (
if *ok {
EventKind::ActionSucceeded
} else {
EventKind::ActionFailed
},
HashMap::from([
("tool".to_string(), Value::String(tool.clone())),
("result".to_string(), Value::String(preview.clone())),
]),
),
CoderEventKind::CheckCompleted { result } => (
if result.passed {
EventKind::ActionSucceeded
} else {
EventKind::ActionFailed
},
HashMap::from([
("check".to_string(), Value::String(result.name.clone())),
(
"exit_code".to_string(),
result.exit_code.map(Value::from).unwrap_or(Value::Null),
),
]),
),
CoderEventKind::Error { message } => (
EventKind::ActionFailed,
HashMap::from([("error".to_string(), Value::String(message.clone()))]),
),
CoderEventKind::MergeCompleted { branch } => (
EventKind::ActionSucceeded,
HashMap::from([("branch".to_string(), Value::String(branch.clone()))]),
),
_ => return,
};
data.insert(
"coder_event".to_string(),
Value::String(coder_event_name(&event.kind).to_string()),
);
data.insert("seq".to_string(), Value::from(event.seq));
if let Ok(mut log) = journal.lock() {
log.append(kind, Some(&self.session_id), None, data);
}
}
}
fn coder_event_name(kind: &CoderEventKind) -> &'static str {
match kind {
CoderEventKind::StateChanged { .. } => "coder.state_changed",
CoderEventKind::ContractProposed { .. } => "coder.contract_proposed",
CoderEventKind::EngineSelected { .. } => "coder.engine_selected",
CoderEventKind::EngineFallback { .. } => "coder.engine_fallback",
CoderEventKind::IterationStarted { .. } => "coder.iteration_started",
CoderEventKind::PlanText { .. } => "coder.plan_text",
CoderEventKind::ToolCall { .. } => "coder.tool_call",
CoderEventKind::ToolResult { .. } => "coder.tool_result",
CoderEventKind::CheckStarted { .. } => "coder.check_started",
CoderEventKind::CheckCompleted { .. } => "coder.check_completed",
CoderEventKind::ExternalEvent { .. } => "coder.external_event",
CoderEventKind::DiffReady { .. } => "coder.diff_ready",
CoderEventKind::UserInputRequested { .. } => "coder.user_input_requested",
CoderEventKind::MergeCompleted { .. } => "coder.merge_completed",
CoderEventKind::Error { .. } => "coder.error",
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct CoderSession {
pub id: String,
pub repo: PathBuf,
pub intent: String,
pub engine: EngineChoice,
pub state: CoderState,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub contract: Option<OutcomeContract>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub workspace_path: Option<PathBuf>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub project: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub project_kind: Option<super::project::ProjectKind>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub built_agent: Option<car_registry::declarative::DeclarativeAgentSpec>,
pub iterations: u32,
pub max_iterations: u32,
#[serde(default)]
pub keep_workspace_on_failure: bool,
#[serde(default)]
pub last_check_results: Vec<CheckResult>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub result_branch: Option<String>,
pub created_at: u64,
pub updated_at: u64,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
#[serde(skip)]
pub workspace: Option<AgentWorkspace>,
#[serde(skip)]
pub state_dir: Option<PathBuf>,
}
impl CoderSession {
pub fn new(
repo: impl Into<PathBuf>,
intent: impl Into<String>,
engine: EngineChoice,
max_iterations: u32,
state_dir: Option<PathBuf>,
) -> Self {
let now = now_secs();
Self {
id: format!("coder-{}", uuid::Uuid::new_v4().simple()),
repo: repo.into(),
intent: intent.into(),
engine,
state: CoderState::Created,
contract: None,
workspace_path: None,
project: None,
project_kind: None,
built_agent: None,
iterations: 0,
max_iterations: max_iterations.max(1),
keep_workspace_on_failure: false,
last_check_results: Vec::new(),
result_branch: None,
created_at: now,
updated_at: now,
error: None,
workspace: None,
state_dir,
}
}
pub fn with_project(
mut self,
slug: impl Into<String>,
kind: super::project::ProjectKind,
) -> Self {
self.project = Some(slug.into());
self.project_kind = Some(kind);
self
}
pub fn short_id(&self) -> &str {
&self.id[self.id.len().saturating_sub(8)..]
}
pub fn provision_workspace(&mut self) -> Result<PathBuf, String> {
let state_dir = self
.state_dir
.clone()
.ok_or("session has no state dir; cannot provision a worktree")?;
let config = WorkspaceConfig::git_worktree_at(&self.repo, state_dir.join("worktrees"));
let workspace = AgentWorkspace::provision(&config, &self.id)?;
let path = workspace.path().to_path_buf();
self.workspace_path = Some(path.clone());
self.workspace = Some(workspace);
Ok(path)
}
pub fn transition(&mut self, to: CoderState, sink: &EventSink) -> Result<(), String> {
if !can_transition(self.state, to) {
return Err(format!(
"illegal coder transition {} → {}",
self.state.as_str(),
to.as_str()
));
}
let from = self.state;
self.state = to;
self.updated_at = now_secs();
sink.emit(CoderEventKind::StateChanged {
from: from.as_str().to_string(),
to: to.as_str().to_string(),
});
if to.is_terminal() {
if to == CoderState::Failed && self.keep_workspace_on_failure {
if let Some(ws) = self.workspace.take() {
std::mem::forget(ws);
}
} else {
self.workspace = None;
}
}
if let Err(e) = self.persist() {
tracing::warn!(session = %self.id, "coder snapshot persist failed: {e}");
}
Ok(())
}
pub fn persist(&self) -> Result<(), String> {
let Some(dir) = &self.state_dir else {
return Ok(());
};
std::fs::create_dir_all(dir).map_err(|e| format!("create {}: {e}", dir.display()))?;
let path = dir.join(format!("{}.json", self.id));
let json = serde_json::to_string_pretty(self).map_err(|e| e.to_string())?;
std::fs::write(&path, json).map_err(|e| format!("write {}: {e}", path.display()))
}
pub fn load(path: &Path) -> Result<Self, String> {
let text = std::fs::read_to_string(path).map_err(|e| format!("read {}: {e}", path.display()))?;
serde_json::from_str(&text).map_err(|e| format!("parse {}: {e}", path.display()))
}
pub fn list(state_dir: &Path) -> Vec<CoderSession> {
let Ok(entries) = std::fs::read_dir(state_dir) else {
return Vec::new();
};
let mut sessions: Vec<CoderSession> = entries
.flatten()
.filter(|e| e.path().extension().is_some_and(|x| x == "json"))
.filter_map(|e| Self::load(&e.path()).ok())
.collect();
sessions.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
sessions
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AdoptionOutcome {
Failed,
Preserved,
}
pub fn adopt_orphaned_sessions(state_dir: &Path) -> Vec<(String, AdoptionOutcome)> {
let Ok(entries) = std::fs::read_dir(state_dir) else {
return Vec::new();
};
let mut outcomes = Vec::new();
for entry in entries.flatten() {
let path = entry.path();
if path.extension().is_none_or(|x| x != "json") {
continue;
}
let Ok(mut session) = CoderSession::load(&path) else {
continue;
};
if session.state.is_terminal() {
continue;
}
let worktree_alive = session.state == CoderState::NeedsApproval
&& session
.workspace_path
.as_ref()
.is_some_and(|p| p.is_dir());
if worktree_alive {
outcomes.push((session.id.clone(), AdoptionOutcome::Preserved));
continue;
}
session.state = CoderState::Failed;
session.error = Some("daemon restarted mid-session".to_string());
session.updated_at = now_secs();
session.state_dir = Some(state_dir.to_path_buf());
if session.persist().is_ok() {
outcomes.push((session.id.clone(), AdoptionOutcome::Failed));
}
}
outcomes
}
#[cfg(test)]
mod tests {
use super::*;
fn session() -> (CoderSession, EventSink) {
(
CoderSession::new("/tmp/repo", "do it", EngineChoice::Native, 8, None),
EventSink::test_sink(),
)
}
fn init_repo(dir: &Path) {
for args in [
vec!["init", "-q", "-b", "main"],
vec![
"-c", "user.name=t", "-c", "user.email=t@t", "commit", "-q",
"--allow-empty", "-m", "init",
],
] {
let out = std::process::Command::new("git")
.arg("-C")
.arg(dir)
.args(&args)
.output()
.unwrap();
assert!(out.status.success(), "{}", String::from_utf8_lossy(&out.stderr));
}
}
#[test]
fn failed_with_keep_flag_retains_worktree() {
let repo = tempfile::tempdir().unwrap();
init_repo(repo.path());
let state_dir = tempfile::tempdir().unwrap();
let mut s = CoderSession::new(
repo.path(),
"x",
EngineChoice::Native,
2,
Some(state_dir.path().to_path_buf()),
);
s.keep_workspace_on_failure = true;
let worktree = s.provision_workspace().unwrap();
assert!(worktree.is_dir());
let sink = EventSink::test_sink();
s.transition(CoderState::Failed, &sink).unwrap();
assert!(s.workspace.is_none());
assert!(worktree.is_dir(), "worktree should be retained for postmortem");
assert_eq!(s.workspace_path.as_deref(), Some(worktree.as_path()));
let _ = std::process::Command::new("git")
.arg("-C")
.arg(repo.path())
.args(["worktree", "remove", "--force"])
.arg(&worktree)
.output();
}
#[test]
fn failed_without_keep_flag_reaps_worktree() {
let repo = tempfile::tempdir().unwrap();
init_repo(repo.path());
let state_dir = tempfile::tempdir().unwrap();
let mut s = CoderSession::new(
repo.path(),
"x",
EngineChoice::Native,
2,
Some(state_dir.path().to_path_buf()),
);
let worktree = s.provision_workspace().unwrap();
assert!(worktree.is_dir());
let sink = EventSink::test_sink();
s.transition(CoderState::Failed, &sink).unwrap();
assert!(s.workspace.is_none());
assert!(!worktree.exists(), "worktree should be reaped on failure by default");
}
#[test]
fn happy_path_transitions_are_legal() {
let (mut s, sink) = session();
for to in [
CoderState::ContractProposed,
CoderState::ContractProposed, CoderState::ContractConfirmed,
CoderState::Running,
CoderState::NeedsApproval,
CoderState::Merged,
] {
s.transition(to, &sink).unwrap();
}
assert!(s.state.is_terminal());
}
#[test]
fn illegal_jumps_are_rejected() {
let (mut s, sink) = session();
assert!(s.transition(CoderState::Running, &sink).is_err());
assert!(s.transition(CoderState::Merged, &sink).is_err());
assert!(s.transition(CoderState::NeedsApproval, &sink).is_err());
assert_eq!(s.state, CoderState::Created);
}
#[test]
fn any_non_terminal_state_can_fail_or_abandon() {
for terminal in [CoderState::Failed, CoderState::Abandoned] {
let (mut s, sink) = session();
s.transition(CoderState::ContractProposed, &sink).unwrap();
s.transition(terminal, &sink).unwrap();
assert!(s.transition(CoderState::Running, &sink).is_err());
assert!(s.transition(CoderState::Failed, &sink).is_err());
}
}
#[test]
fn event_seq_is_monotonic_and_session_tagged() {
let (sink, collected) = EventSink::collecting("coder-seq");
for _ in 0..5 {
sink.emit(CoderEventKind::PlanText { text: "x".into() });
}
let events = collected.lock().unwrap();
assert_eq!(events.len(), 5);
for (i, e) in events.iter().enumerate() {
assert_eq!(e.seq, i as u64);
assert_eq!(e.session_id, "coder-seq");
}
}
#[test]
fn snapshot_round_trips_without_workspace_handle() {
let dir = tempfile::tempdir().unwrap();
let mut s = CoderSession::new(
"/tmp/repo",
"intent",
EngineChoice::Auto,
4,
Some(dir.path().to_path_buf()),
);
s.contract = Some(OutcomeContract {
description: "d".into(),
checks: vec![],
});
s.persist().unwrap();
let loaded = CoderSession::load(&dir.path().join(format!("{}.json", s.id))).unwrap();
assert_eq!(loaded.id, s.id);
assert_eq!(loaded.state, CoderState::Created);
assert!(loaded.workspace.is_none());
assert!(loaded.contract.is_some());
let listed = CoderSession::list(dir.path());
assert_eq!(listed.len(), 1);
assert_eq!(listed[0].id, s.id);
}
#[test]
fn event_json_shape_is_ws_friendly() {
let e = CoderEvent {
session_id: "coder-x".into(),
seq: 3,
ts: 1,
kind: CoderEventKind::CheckStarted { name: "tests".into() },
};
let v = serde_json::to_value(&e).unwrap();
assert_eq!(v["type"], "check_started");
assert_eq!(v["name"], "tests");
assert_eq!(v["seq"], 3);
}
fn write_snapshot(
dir: &Path,
state: CoderState,
workspace_path: Option<PathBuf>,
) -> String {
let mut s = CoderSession::new(
"/tmp/repo",
"intent",
EngineChoice::Native,
4,
Some(dir.to_path_buf()),
);
s.state = state;
s.workspace_path = workspace_path;
s.persist().unwrap();
s.id
}
fn reload(dir: &Path, id: &str) -> CoderSession {
CoderSession::load(&dir.join(format!("{id}.json"))).unwrap()
}
#[test]
fn adoption_fails_running_and_confirmed_orphans() {
let dir = tempfile::tempdir().unwrap();
let running = write_snapshot(dir.path(), CoderState::Running, None);
let confirmed = write_snapshot(dir.path(), CoderState::ContractConfirmed, None);
let created = write_snapshot(dir.path(), CoderState::Created, None);
let proposed = write_snapshot(dir.path(), CoderState::ContractProposed, None);
let outcomes = adopt_orphaned_sessions(dir.path());
assert_eq!(outcomes.len(), 4);
assert!(outcomes.iter().all(|(_, o)| *o == AdoptionOutcome::Failed));
for id in [&running, &confirmed, &created, &proposed] {
let s = reload(dir.path(), id);
assert_eq!(s.state, CoderState::Failed, "{id} should be failed");
assert_eq!(s.error.as_deref(), Some("daemon restarted mid-session"));
}
}
#[test]
fn adoption_preserves_needs_approval_with_live_worktree() {
let dir = tempfile::tempdir().unwrap();
let worktree = dir.path().join("worktrees").join("wt-1");
std::fs::create_dir_all(&worktree).unwrap();
let id = write_snapshot(dir.path(), CoderState::NeedsApproval, Some(worktree.clone()));
let outcomes = adopt_orphaned_sessions(dir.path());
assert_eq!(outcomes, vec![(id.clone(), AdoptionOutcome::Preserved)]);
let s = reload(dir.path(), &id);
assert_eq!(s.state, CoderState::NeedsApproval);
assert!(s.error.is_none());
assert_eq!(s.workspace_path.as_deref(), Some(worktree.as_path()));
}
#[test]
fn adoption_fails_needs_approval_when_worktree_gone() {
let dir = tempfile::tempdir().unwrap();
let gone = dir.path().join("worktrees").join("vanished");
let id = write_snapshot(dir.path(), CoderState::NeedsApproval, Some(gone));
let outcomes = adopt_orphaned_sessions(dir.path());
assert_eq!(outcomes, vec![(id.clone(), AdoptionOutcome::Failed)]);
let s = reload(dir.path(), &id);
assert_eq!(s.state, CoderState::Failed);
assert_eq!(s.error.as_deref(), Some("daemon restarted mid-session"));
}
#[test]
fn adoption_leaves_terminal_snapshots_alone() {
let dir = tempfile::tempdir().unwrap();
let merged = write_snapshot(dir.path(), CoderState::Merged, None);
let failed = write_snapshot(dir.path(), CoderState::Failed, None);
let abandoned = write_snapshot(dir.path(), CoderState::Abandoned, None);
let outcomes = adopt_orphaned_sessions(dir.path());
assert!(outcomes.is_empty(), "terminal snapshots must not be adopted");
assert_eq!(reload(dir.path(), &merged).state, CoderState::Merged);
assert_eq!(reload(dir.path(), &failed).state, CoderState::Failed);
assert_eq!(reload(dir.path(), &abandoned).state, CoderState::Abandoned);
}
#[test]
fn adoption_is_a_noop_on_missing_dir() {
let dir = tempfile::tempdir().unwrap();
let missing = dir.path().join("never-created");
assert!(adopt_orphaned_sessions(&missing).is_empty());
}
}