use std::sync::Arc;
use thiserror::Error;
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
use crate::context::IssueStage;
use crate::hooks::HookError;
use crate::logging::stage_span;
use crate::session::{SessionError, SessionFactory, SessionState};
use crate::workflow::Workflow;
use super::event::EventProducer;
use super::monitor::SessionMonitor;
#[derive(Clone)]
pub(super) struct StageLauncher {
workflow: Arc<Workflow>,
factory: SessionFactory,
producer: EventProducer,
}
impl StageLauncher {
pub(super) fn new(workflow: Arc<Workflow>, factory: SessionFactory, producer: EventProducer) -> Self {
Self {
workflow,
factory,
producer,
}
}
pub(super) fn launch(&self, issue_stage: IssueStage, shutdown: CancellationToken) {
let span = stage_span(issue_stage.stage_name(), &issue_stage.stage().agent);
let launcher = self.clone();
tokio::spawn(async move { launcher.run(issue_stage, shutdown).await }.instrument(span));
}
async fn run(self, issue_stage: IssueStage, shutdown: CancellationToken) {
if shutdown.is_cancelled() {
return;
}
let key = issue_stage.key();
let session = match self.start_session(&issue_stage).await {
Ok(session) => session,
Err(error) => {
tracing::error!(error = %error, "Failed to start session for issue stage");
self.producer.stage_failed(key, error).await;
return;
},
};
self.producer.stage_started(issue_stage.clone(), session.clone()).await;
let monitor = SessionMonitor::new(key.clone(), session.clone(), self.producer.clone());
let terminal = tokio::select! {
snapshot = monitor.watch() => snapshot,
_ = shutdown.cancelled() => {
session.cancel();
session.wait().await
}
};
if !matches!(terminal.state, SessionState::Cancelled)
&& let Err(error) = self
.workflow
.hooks()
.after_issue_stage_run(&issue_stage, &issue_stage.stage().hooks.after_run)
.await
{
tracing::error!(
error = %error,
"issue stage after_run hook failed",
);
}
self.producer.stage_terminal(key, terminal).await;
}
async fn start_session(&self, issue_stage: &IssueStage) -> Result<crate::session::Session, StageLaunchError> {
if let Err(err) = self
.workflow
.hooks()
.before_issue_stage_run(issue_stage, &issue_stage.stage().hooks.before_run)
.await
{
tracing::error!(
error = %err,
"issue stage before_run hook failed",
);
return Err(StageLaunchError::Hook(err));
}
Ok(self.factory.spawn_stage(issue_stage.clone()).await?)
}
}
#[derive(Debug, Error)]
enum StageLaunchError {
#[error(transparent)]
Hook(#[from] HookError),
#[error(transparent)]
Session(#[from] SessionError),
}