#![allow(dead_code)]
mod factory;
mod jsonl_writer;
mod snapshot;
use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::sync::{Arc, Mutex};
use std::time::Duration;
pub use crate::agent::AgentEvent;
use crate::logging::session_span;
pub use factory::*;
pub use snapshot::*;
use chrono::Utc;
use thiserror::Error;
use tokio::fs::{self, File};
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::process::{ChildStdout, Command};
use tokio::sync::watch;
use tracing::Instrument;
use crate::agent::{AgentAdapter, AgentCommand, AgentStdin, get_adapter};
use crate::config::AgentProfileSchema;
use crate::context::{Issue, IssueStage};
use crate::shell::{Child, CommandExecError, CommandExt};
use crate::template::{PromptRenderer, TemplateError};
use self::jsonl_writer::JsonlWriter;
#[derive(Debug, Error)]
pub enum SessionError {
#[error("unknown agent profile `{profile}`")]
ProfileNotFound { profile: String },
#[error(transparent)]
AgentSpawn(#[from] CommandExecError),
#[error(transparent)]
TemplateRender(#[from] TemplateError),
#[error("prompt path `{0}` could not be resolved")]
PromptPath(PathBuf),
#[error(transparent)]
WriteLog(#[from] std::io::Error),
}
#[derive(Clone)]
pub struct Session {
stage: IssueStage,
profile: AgentProfileSchema,
agent: Arc<dyn AgentAdapter>,
inner: Arc<Mutex<SessionInner>>,
state_notifier: watch::Sender<SessionState>,
}
struct SessionInner {
snapshot: SessionSnapshot,
writer: Option<JsonlWriter>,
child: Option<Child>,
}
impl Session {
pub(super) async fn spawn(stage: IssueStage, profile: AgentProfileSchema) -> Result<Self, SessionError> {
let _span = session_span(profile.runtime.as_ref());
let now = Utc::now();
let snapshot = SessionSnapshot {
started_at: now,
..Default::default()
};
let agent = get_adapter(profile.runtime);
let (state_notifier, _) = watch::channel(snapshot.state);
let session = Self {
stage,
profile,
agent,
inner: Arc::new(Mutex::new(SessionInner {
snapshot,
writer: None,
child: None,
})),
state_notifier,
};
let child = session.spawn_inner().in_current_span().await?;
session.inner.lock().expect("session mutex never poisoned").child = Some(child);
Ok(session)
}
pub fn id(&self) -> &str {
&self.stage.issue().id
}
pub fn issue(&self) -> &Issue {
self.stage.issue()
}
pub fn stage(&self) -> &IssueStage {
&self.stage
}
pub fn snapshot(&self) -> SessionSnapshot {
self.inner.lock().expect("session mutex never poisoned").snapshot.clone()
}
pub fn state(&self) -> SessionState {
self.inner.lock().expect("session mutex never poisoned").snapshot.state
}
pub fn log_file(&self) -> &Path {
self.stage.log_file()
}
pub fn terminated(&self) -> bool {
self.state().is_terminated()
}
pub fn subscribe_state(&self) -> watch::Receiver<SessionState> {
self.state_notifier.subscribe()
}
pub fn cancel(&self) {
let mut inner = self.inner.lock().expect("session mutex never poisoned");
if let Some(child) = &inner.child {
child.cancel();
inner.set_state(SessionState::Cancelled, &self.state_notifier);
}
}
pub async fn wait(&self) -> SessionSnapshot {
let mut state_rx = self.state_notifier.subscribe();
loop {
if self.terminated() {
return self.snapshot();
}
if state_rx.changed().await.is_err() {
return self.snapshot();
}
}
}
async fn spawn_inner(&self) -> Result<Child, SessionError> {
let agent_command = self.prepare().await?;
self.set_state(SessionState::Running);
let mut command = Command::new(&agent_command.program);
command
.current_dir(self.stage.workdir())
.args(agent_command.args)
.stdout(Stdio::piped())
.stderr(Stdio::null());
match &agent_command.stdin {
AgentStdin::None => {
command.stdin(Stdio::null());
},
AgentStdin::Inherit => {},
AgentStdin::Pipe(_) => {
command.stdin(Stdio::piped());
},
}
let mut child = command
.timeout( Duration::from_hours(1))
.spawn()?;
if let AgentStdin::Pipe(input) = agent_command.stdin
&& let Some(mut stdin) = child.stdin.take()
{
stdin
.write_all(input.as_bytes())
.await
.map_err(|err| SessionError::AgentSpawn(CommandExecError::Spawn(err)))?;
}
let stdout = child
.stdout
.take()
.ok_or_else(|| std::io::Error::other("Stdout was not bound to spawned agent process"))?;
self.stream_agent_output(stdout)?;
Ok(child)
}
async fn prepare(&self) -> Result<AgentCommand, SessionError> {
self.set_state(SessionState::Preparing);
if let Some(parent) = self.log_file().parent() {
fs::create_dir_all(parent).await?;
}
let prompt = self.render_prompt().await?;
Ok(self.agent.build_command(&self.profile, prompt))
}
async fn render_prompt(&self) -> Result<String, SessionError> {
let renderer = PromptRenderer::new();
let prompt_file = self
.stage
.workflow()
.resolve_path(&self.stage().stage().prompt_file)
.ok_or_else(|| SessionError::PromptPath(self.stage().stage().prompt_file.clone()))?;
let mut file = File::open(&prompt_file)
.await
.map_err(|err| SessionError::TemplateRender(TemplateError::Io(err)))?;
let mut template = String::new();
file
.read_to_string(&mut template)
.await
.map_err(|err| SessionError::TemplateRender(TemplateError::Io(err)))?;
Ok(renderer.render(&template, &self.stage).await?)
}
fn set_state(&self, state: SessionState) {
self
.inner
.lock()
.expect("session mutex never poisoned")
.set_state(state, &self.state_notifier);
}
fn stream_agent_output(&self, stdout: ChildStdout) -> Result<(), SessionError> {
let writer = JsonlWriter::open(self.log_file())?;
self.inner.lock().expect("session mutex never poisoned").writer = Some(writer);
let agent = self.agent.clone();
let inner = Arc::clone(&self.inner);
let state_notifier = self.state_notifier.clone();
tokio::spawn(stream_agent_events(stdout, agent, inner, state_notifier));
Ok(())
}
}
impl SessionInner {
fn finish_output_stream(&mut self, state_notifier: &watch::Sender<SessionState>) {
if matches!(self.snapshot.state, SessionState::Running) {
self.set_state(SessionState::Failed, state_notifier);
}
}
fn apply_event(&mut self, event: AgentEvent, state_notifier: &watch::Sender<SessionState>) {
if let Some(writer) = &mut self.writer
&& let Err(err) = writer.write(&event)
{
tracing::error!("session jsonl write failed: {err}");
}
self.snapshot.last_event_at = Some(Utc::now());
match event {
AgentEvent::SessionStarted { session_id } => {
self.snapshot.agent_session_id = Some(session_id);
},
AgentEvent::Message { text } => {
self.snapshot.last_message = Some(text);
},
AgentEvent::TokenUsage {
input,
output,
cache_read,
} => {
self.snapshot.tokens.input = self.snapshot.tokens.input.saturating_add(input);
self.snapshot.tokens.output = self.snapshot.tokens.output.saturating_add(output);
self.snapshot.tokens.cache_read = self.snapshot.tokens.cache_read.saturating_add(cache_read);
},
AgentEvent::RateLimit {
scope,
remaining,
reset_at,
observed_at,
} => {
let keep = match self.snapshot.rate_limits.get(&scope) {
Some(existing) => observed_at >= existing.observed_at,
None => true,
};
if keep {
self.snapshot.rate_limits.insert(
scope,
RateLimitObservation {
remaining,
reset_at,
observed_at,
},
);
}
},
AgentEvent::Completed => {
self.set_state(SessionState::Completed, state_notifier);
},
AgentEvent::Error { detail: _ } => {
self.set_state(SessionState::Failed, state_notifier);
},
}
state_notifier.send_replace(self.snapshot.state);
}
fn set_state(&mut self, state: SessionState, state_notifier: &watch::Sender<SessionState>) {
if self.snapshot.state.is_terminated() {
return;
}
self.snapshot.state = state;
state_notifier.send_replace(state);
}
}
async fn stream_agent_events(
stdout: ChildStdout,
agent: Arc<dyn AgentAdapter>,
inner: Arc<Mutex<SessionInner>>,
state_notifier: watch::Sender<SessionState>,
) {
let mut lines = BufReader::new(stdout).lines();
loop {
match lines.next_line().await {
Ok(Some(line)) => {
let events = match serde_json::from_str(&line) {
Ok(value) => agent.map_event(value),
Err(err) => vec![AgentEvent::Error {
detail: err.to_string(),
}],
};
let mut inner = inner.lock().expect("session mutex never poisoned");
for event in events {
inner.apply_event(event, &state_notifier);
}
},
Ok(None) => break,
Err(err) => {
inner.lock().expect("session mutex never poisoned").apply_event(
AgentEvent::Error {
detail: err.to_string(),
},
&state_notifier,
);
break;
},
}
}
inner
.lock()
.expect("session mutex never poisoned")
.finish_output_stream(&state_notifier);
}