#![allow(dead_code)]
mod factory;
mod jsonl_writer;
mod snapshot;
use std::path::PathBuf;
use std::process::Stdio;
use std::sync::{Arc, Mutex};
use std::time::Duration;
pub use crate::agent::AgentEvent;
pub use factory::*;
pub use snapshot::*;
use chrono::Utc;
use thiserror::Error;
use tokio::fs::{self, File};
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::process::{ChildStdout, Command};
use tokio::sync::watch;
use uuid::Uuid;
use crate::agent::{AgentAdapter, AgentCommand, AgentStdin, get_adapter};
use crate::config::{AgentProfileSchema, IssueStage};
use crate::context::Issue;
use crate::shell::{Child, CommandExecError, CommandExt};
use crate::template::{Context as TemplateContext, PromptRenderer, StageContext, TemplateError};
use crate::workflow::Workflow;
use self::jsonl_writer::JsonlWriter;
#[derive(Debug, Error)]
pub enum SessionError {
#[error("unknown agent profile `{profile}`")]
ProfileNotFound { profile: String },
#[error(transparent)]
AgentSpawn(#[from] CommandExecError),
#[error(transparent)]
TemplateRender(#[from] TemplateError),
#[error("prompt path `{0}` could not be resolved")]
PromptPath(PathBuf),
#[error(transparent)]
WriteLog(#[from] std::io::Error),
}
struct SessionOptions {
workflow: Arc<Workflow>,
log_file: PathBuf,
issue_workdir: PathBuf,
issue: Issue,
stage_name: String,
stage: IssueStage,
profile: AgentProfileSchema,
}
#[derive(Clone)]
pub struct Session {
agent: Arc<dyn AgentAdapter>,
opts: Arc<SessionOptions>,
inner: Arc<Mutex<SessionInner>>,
state_notifier: watch::Sender<SessionState>,
}
struct SessionInner {
snapshot: SessionSnapshot,
writer: Option<JsonlWriter>,
child: Option<Child>,
}
impl Session {
pub(super) async fn spawn(
workflow: Arc<Workflow>,
issue: Issue,
stage_name: String,
stage: IssueStage,
issue_workdir: PathBuf,
profile: AgentProfileSchema,
) -> Result<Self, SessionError> {
let now = Utc::now();
let log_file = workflow
.workspace()
.issue_sessions_dir(&issue.id)
.join(session_log_file_name(&issue.state, Uuid::now_v7()));
let opts = SessionOptions {
workflow,
log_file,
issue_workdir,
issue,
stage_name,
stage,
profile,
};
let snapshot = SessionSnapshot {
started_at: now,
..Default::default()
};
let agent = get_adapter(opts.profile.runtime);
let (state_notifier, _) = watch::channel(snapshot.state);
let session = Self {
agent,
opts: Arc::new(opts),
inner: Arc::new(Mutex::new(SessionInner {
snapshot,
writer: None,
child: None,
})),
state_notifier,
};
let child = session.spawn_inner().await?;
session.inner.lock().expect("session mutex never poisoned").child = Some(child);
Ok(session)
}
pub fn id(&self) -> &str {
&self.opts.issue.id
}
pub fn issue(&self) -> &Issue {
&self.opts.issue
}
pub fn stage(&self) -> &IssueStage {
&self.opts.stage
}
pub fn profile(&self) -> &AgentProfileSchema {
&self.opts.profile
}
pub fn snapshot(&self) -> SessionSnapshot {
self.inner.lock().expect("session mutex never poisoned").snapshot.clone()
}
pub fn state(&self) -> SessionState {
self.inner.lock().expect("session mutex never poisoned").snapshot.state
}
pub fn log_file(&self) -> &PathBuf {
&self.opts.log_file
}
pub fn terminated(&self) -> bool {
self.state().is_terminated()
}
pub fn subscribe_state(&self) -> watch::Receiver<SessionState> {
self.state_notifier.subscribe()
}
pub fn cancel(&self) {
let mut inner = self.inner.lock().expect("session mutex never poisoned");
if let Some(child) = &inner.child {
child.cancel();
inner.set_state(SessionState::Cancelled, &self.state_notifier);
}
}
pub async fn wait(&self) -> SessionSnapshot {
let mut state_rx = self.state_notifier.subscribe();
loop {
if self.terminated() {
return self.snapshot();
}
if state_rx.changed().await.is_err() {
return self.snapshot();
}
}
}
async fn spawn_inner(&self) -> Result<Child, SessionError> {
let agent_command = self.prepare().await?;
self.set_state(SessionState::Running);
let mut command = Command::new(&agent_command.program);
command
.current_dir(&self.opts.issue_workdir)
.args(agent_command.args)
.stdout(Stdio::piped())
.stderr(Stdio::null());
match &agent_command.stdin {
AgentStdin::None => {
command.stdin(Stdio::null());
},
AgentStdin::Inherit => {},
AgentStdin::Pipe(_) => {
command.stdin(Stdio::piped());
},
}
let mut child = command
.timeout( Duration::from_hours(1))
.spawn()?;
if let AgentStdin::Pipe(input) = agent_command.stdin
&& let Some(mut stdin) = child.stdin.take()
{
stdin
.write_all(input.as_bytes())
.await
.map_err(|err| SessionError::AgentSpawn(CommandExecError::Spawn(err)))?;
}
let stdout = child
.stdout
.take()
.ok_or_else(|| std::io::Error::other("Stdout was not bound to spawned agent process"))?;
self.stream_agent_output(stdout)?;
Ok(child)
}
async fn prepare(&self) -> Result<AgentCommand, SessionError> {
self.set_state(SessionState::Preparing);
if let Some(parent) = self.log_file().parent() {
fs::create_dir_all(parent).await?;
}
let prompt = self.render_prompt().await?;
Ok(self.agent.build_command(self.profile(), prompt))
}
async fn render_prompt(&self) -> Result<String, SessionError> {
let renderer = PromptRenderer::new();
let prompt_file = self
.opts
.workflow
.resolve_path(&self.stage().prompt_file)
.ok_or_else(|| SessionError::PromptPath(self.stage().prompt_file.clone()))?;
let mut file = File::open(&prompt_file)
.await
.map_err(|err| SessionError::TemplateRender(TemplateError::Io(err)))?;
let mut template = String::new();
file
.read_to_string(&mut template)
.await
.map_err(|err| SessionError::TemplateRender(TemplateError::Io(err)))?;
let context = render_context(self);
Ok(renderer.render(&template, &context).await?)
}
fn set_state(&self, state: SessionState) {
self
.inner
.lock()
.expect("session mutex never poisoned")
.set_state(state, &self.state_notifier);
}
fn stream_agent_output(&self, stdout: ChildStdout) -> Result<(), SessionError> {
let writer = JsonlWriter::open(self.log_file())?;
self.inner.lock().expect("session mutex never poisoned").writer = Some(writer);
let agent = self.agent.clone();
let inner = Arc::clone(&self.inner);
let state_notifier = self.state_notifier.clone();
tokio::spawn(stream_agent_events(stdout, agent, inner, state_notifier));
Ok(())
}
}
impl SessionInner {
fn finish_output_stream(&mut self, state_notifier: &watch::Sender<SessionState>) {
if matches!(self.snapshot.state, SessionState::Running) {
self.set_state(SessionState::Failed, state_notifier);
}
}
fn apply_event(&mut self, event: AgentEvent, state_notifier: &watch::Sender<SessionState>) {
if let Some(writer) = &mut self.writer
&& let Err(err) = writer.write(&event)
{
tracing::error!("session jsonl write failed: {err}");
}
self.snapshot.last_event_at = Some(Utc::now());
match event {
AgentEvent::SessionStarted { session_id } => {
self.snapshot.agent_session_id = Some(session_id);
},
AgentEvent::Message { text } => {
self.snapshot.last_message = Some(text);
},
AgentEvent::TokenUsage {
input,
output,
cache_read,
} => {
self.snapshot.tokens.input = self.snapshot.tokens.input.saturating_add(input);
self.snapshot.tokens.output = self.snapshot.tokens.output.saturating_add(output);
self.snapshot.tokens.cache_read = self.snapshot.tokens.cache_read.saturating_add(cache_read);
},
AgentEvent::RateLimit {
scope,
remaining,
reset_at,
observed_at,
} => {
let keep = match self.snapshot.rate_limits.get(&scope) {
Some(existing) => observed_at >= existing.observed_at,
None => true,
};
if keep {
self.snapshot.rate_limits.insert(
scope,
RateLimitObservation {
remaining,
reset_at,
observed_at,
},
);
}
},
AgentEvent::Completed => {
self.set_state(SessionState::Completed, state_notifier);
},
AgentEvent::Error { detail: _ } => {
self.set_state(SessionState::Failed, state_notifier);
},
}
state_notifier.send_replace(self.snapshot.state);
}
fn set_state(&mut self, state: SessionState, state_notifier: &watch::Sender<SessionState>) {
if self.snapshot.state.is_terminated() {
return;
}
self.snapshot.state = state;
state_notifier.send_replace(state);
}
}
async fn stream_agent_events(
stdout: ChildStdout,
agent: Arc<dyn AgentAdapter>,
inner: Arc<Mutex<SessionInner>>,
state_notifier: watch::Sender<SessionState>,
) {
let mut lines = BufReader::new(stdout).lines();
loop {
match lines.next_line().await {
Ok(Some(line)) => {
let events = match serde_json::from_str(&line) {
Ok(value) => agent.map_event(value),
Err(err) => vec![AgentEvent::Error {
detail: err.to_string(),
}],
};
for event in events {
apply_agent_event(&inner, &state_notifier, event);
}
},
Ok(None) => break,
Err(err) => {
apply_agent_event(
&inner,
&state_notifier,
AgentEvent::Error {
detail: err.to_string(),
},
);
break;
},
}
}
inner
.lock()
.expect("session mutex never poisoned")
.finish_output_stream(&state_notifier);
}
fn apply_agent_event(
inner: &Arc<Mutex<SessionInner>>,
state_notifier: &watch::Sender<SessionState>,
event: AgentEvent,
) {
inner
.lock()
.expect("session mutex never poisoned")
.apply_event(event, state_notifier);
}
fn render_context(session: &Session) -> TemplateContext {
let session = &session.opts;
let mut context = TemplateContext::new();
for (key, value) in &session.issue.extra_payload {
let Some(key) = key.as_str() else {
continue;
};
let value = serde_json::to_value(value).unwrap_or(serde_json::Value::Null);
context.with(key, value);
}
let workspace_root = session.workflow.workspace().root().to_path_buf();
let stage_ctx = StageContext {
issue: &session.issue,
stage_name: &session.stage_name,
agent_profile: &session.stage.agent,
stage_state: &session.stage.when.state,
issue_workdir: &session.issue_workdir,
workspace_root: &workspace_root,
};
stage_ctx.apply(&mut context);
context.with("workflow", session.workflow.schema());
context.with("loop", &session.workflow.schema().loop_);
context.with("stage", stage_context_value(&session.stage, &session.stage_name));
context.with("profile", &session.profile);
context
}
fn stage_context_value(stage: &IssueStage, stage_name: &str) -> serde_json::Value {
let mut value = serde_json::to_value(stage).unwrap_or(serde_json::Value::Null);
if let serde_json::Value::Object(stage_value) = &mut value {
stage_value.insert("name".to_string(), serde_json::Value::String(stage_name.to_string()));
stage_value.insert("state".to_string(), serde_json::Value::String(stage.when.state.clone()));
}
value
}
fn session_log_file_name(issue_state: &str, session_id: Uuid) -> String {
format!("{issue_state}-{session_id}.jsonl")
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn session_log_file_name_puts_issue_state_before_uuid() {
let session_id = Uuid::nil();
assert_eq!(
session_log_file_name("todo", session_id),
"todo-00000000-0000-0000-0000-000000000000.jsonl"
);
}
}