use async_trait::async_trait;
use bamboo_agent_core::AgentEvent;
use bamboo_domain::Session;
use tokio::sync::broadcast;
use tokio_util::sync::CancellationToken;
use super::execute::{consume_pending_clarification_resume, has_pending_user_message};
use super::types::{ResumeConfigSnapshot, ResumeOutcome};
use bamboo_engine::execution::runner_lifecycle::RunnerReservation;
#[async_trait]
pub trait ResumeExecutionPort: Send + Sync {
async fn load_session(&self, session_id: &str) -> Option<Session>;
async fn save_and_cache_session(&self, session: &mut Session);
async fn try_reserve_runner(
&self,
session_id: &str,
event_sender: &broadcast::Sender<AgentEvent>,
) -> Option<RunnerReservation>;
async fn get_existing_runner_run_id(&self, session_id: &str) -> Option<String>;
async fn get_or_create_event_sender(&self, session_id: &str) -> broadcast::Sender<AgentEvent>;
async fn spawn_resume_execution(&self, request: ResumeSpawnRequest);
}
pub struct ResumeSpawnRequest {
pub session_id: String,
pub session: Session,
pub cancel_token: CancellationToken,
pub run_id: String,
pub event_sender: broadcast::Sender<AgentEvent>,
pub config: ResumeConfigSnapshot,
}
pub async fn resume_session_execution(
port: &dyn ResumeExecutionPort,
session_id: &str,
config: ResumeConfigSnapshot,
) -> ResumeOutcome {
let Some(mut session) = port.load_session(session_id).await else {
return ResumeOutcome::NotFound;
};
if !has_pending_user_message(&session) {
return ResumeOutcome::Completed;
}
let event_sender = port.get_or_create_event_sender(session_id).await;
let Some(reservation) = port.try_reserve_runner(session_id, &event_sender).await else {
let existing_run_id = port.get_existing_runner_run_id(session_id).await;
return ResumeOutcome::AlreadyRunning {
run_id: existing_run_id.unwrap_or_default(),
};
};
consume_pending_clarification_resume(&mut session);
port.save_and_cache_session(&mut session).await;
port.spawn_resume_execution(ResumeSpawnRequest {
session_id: session_id.to_string(),
session,
cancel_token: reservation.cancel_token,
run_id: reservation.run_id.clone(),
event_sender,
config,
})
.await;
ResumeOutcome::Started {
run_id: reservation.run_id,
}
}