bamboo_engine/session_app/resume.rs
1//! Resume execution use case.
2//!
3//! Provides the application-layer logic for resuming agent execution on an
4//! existing session (e.g. after a user responds to a pending question).
5//! The server layer implements `ResumeExecutionPort` to supply the
6//! infrastructure operations.
7
8use async_trait::async_trait;
9use bamboo_agent_core::AgentEvent;
10use bamboo_domain::Session;
11use tokio::sync::broadcast;
12use tokio_util::sync::CancellationToken;
13
14use super::execute::{consume_pending_clarification_resume, has_pending_user_message};
15use super::types::{ResumeConfigSnapshot, ResumeOutcome};
16use crate::execution::runner_lifecycle::RunnerReservation;
17
18// ---------------------------------------------------------------------------
19// Port trait
20// ---------------------------------------------------------------------------
21
22/// Adapter trait for resume execution infrastructure.
23///
24/// Implementations bridge the use case to server-specific concerns
25/// (storage, runner lifecycle, agent spawning).
26#[async_trait]
27pub trait ResumeExecutionPort: Send + Sync {
28 /// Load a session by ID. Returns `None` if not found.
29 async fn load_session(&self, session_id: &str) -> Option<Session>;
30
31 /// Persist a session and update any caches.
32 ///
33 /// Implementations may merge concurrent UI edits to title/pinned/title_version
34 /// from disk back into `session` (which is why this takes `&mut`).
35 async fn save_and_cache_session(&self, session: &mut Session);
36
37 /// Try to reserve a runner slot for the given session.
38 /// Returns `None` if a runner is already active.
39 async fn try_reserve_runner(
40 &self,
41 session_id: &str,
42 event_sender: &broadcast::Sender<AgentEvent>,
43 ) -> Option<RunnerReservation>;
44
45 /// Get the run_id of an existing runner if one exists.
46 async fn get_existing_runner_run_id(&self, session_id: &str) -> Option<String>;
47
48 /// Get or create the long-lived broadcast sender for session events.
49 async fn get_or_create_event_sender(&self, session_id: &str) -> broadcast::Sender<AgentEvent>;
50
51 /// Spawn the resume execution loop in the background.
52 ///
53 /// The adapter creates the mpsc channel, spawns the event forwarder,
54 /// and calls the server's agent execution spawner.
55 async fn spawn_resume_execution(&self, request: ResumeSpawnRequest);
56}
57
58// ---------------------------------------------------------------------------
59// Value types
60// ---------------------------------------------------------------------------
61
62/// Request captured for the adapter to spawn a resumed agent execution.
63///
64/// This bundles everything the server-side spawner needs, keeping the
65/// application layer free of `AppState` and server-specific types.
66pub struct ResumeSpawnRequest {
67 pub session_id: String,
68 pub session: Session,
69 pub cancel_token: CancellationToken,
70 pub run_id: String,
71 pub event_sender: broadcast::Sender<AgentEvent>,
72 pub config: ResumeConfigSnapshot,
73}
74
75// ---------------------------------------------------------------------------
76// Use case
77// ---------------------------------------------------------------------------
78
79/// Resume agent execution on an existing session.
80///
81/// Returns the outcome of the resume attempt:
82/// - `Started` — execution spawned successfully
83/// - `AlreadyRunning` — a runner is already active
84/// - `Completed` — no pending user message
85/// - `NotFound` — session not found
86pub async fn resume_session_execution(
87 port: &dyn ResumeExecutionPort,
88 session_id: &str,
89 config: ResumeConfigSnapshot,
90) -> ResumeOutcome {
91 // Load session.
92 let Some(mut session) = port.load_session(session_id).await else {
93 return ResumeOutcome::NotFound;
94 };
95
96 if !has_pending_user_message(&session) {
97 return ResumeOutcome::Completed;
98 }
99
100 // Reserve runner slot.
101 let event_sender = port.get_or_create_event_sender(session_id).await;
102 let Some(reservation) = port.try_reserve_runner(session_id, &event_sender).await else {
103 // If already running, try to get the existing runner's run_id
104 let existing_run_id = port.get_existing_runner_run_id(session_id).await;
105 return ResumeOutcome::AlreadyRunning {
106 run_id: existing_run_id.unwrap_or_default(),
107 };
108 };
109
110 consume_pending_clarification_resume(&mut session);
111 port.save_and_cache_session(&mut session).await;
112
113 port.spawn_resume_execution(ResumeSpawnRequest {
114 session_id: session_id.to_string(),
115 session,
116 cancel_token: reservation.cancel_token,
117 run_id: reservation.run_id.clone(),
118 event_sender,
119 config,
120 })
121 .await;
122
123 ResumeOutcome::Started {
124 run_id: reservation.run_id,
125 }
126}