bamboo_server/session_app/resume.rs
1//! Resume execution use case.
2//!
3//! Provides the application-layer logic for resuming agent execution on an
4//! existing session (e.g. after a user responds to a pending question).
5//! The server layer implements `ResumeExecutionPort` to supply the
6//! infrastructure operations.
7
8use async_trait::async_trait;
9use bamboo_agent_core::AgentEvent;
10use bamboo_domain::Session;
11use tokio::sync::broadcast;
12use tokio_util::sync::CancellationToken;
13
14use super::execute::{consume_pending_conclusion_with_options_resume, has_pending_user_message};
15use super::types::{ResumeConfigSnapshot, ResumeOutcome};
16use bamboo_engine::execution::runner_lifecycle::RunnerReservation;
17
18// ---------------------------------------------------------------------------
19// Port trait
20// ---------------------------------------------------------------------------
21
22/// Adapter trait for resume execution infrastructure.
23///
24/// Implementations bridge the use case to server-specific concerns
25/// (storage, runner lifecycle, agent spawning).
26#[async_trait]
27pub trait ResumeExecutionPort: Send + Sync {
28 /// Load a session by ID. Returns `None` if not found.
29 async fn load_session(&self, session_id: &str) -> Option<Session>;
30
31 /// Persist a session and update any caches.
32 ///
33 /// Implementations may merge concurrent UI edits to title/pinned/title_version
34 /// from disk back into `session` (which is why this takes `&mut`).
35 async fn save_and_cache_session(&self, session: &mut Session);
36
37 /// Try to reserve a runner slot for the given session.
38 /// Returns `None` if a runner is already active.
39 async fn try_reserve_runner(
40 &self,
41 session_id: &str,
42 event_sender: &broadcast::Sender<AgentEvent>,
43 ) -> Option<RunnerReservation>;
44
45 /// Get or create the long-lived broadcast sender for session events.
46 async fn get_or_create_event_sender(&self, session_id: &str) -> broadcast::Sender<AgentEvent>;
47
48 /// Spawn the resume execution loop in the background.
49 ///
50 /// The adapter creates the mpsc channel, spawns the event forwarder,
51 /// and calls the server's agent execution spawner.
52 async fn spawn_resume_execution(&self, request: ResumeSpawnRequest);
53}
54
55// ---------------------------------------------------------------------------
56// Value types
57// ---------------------------------------------------------------------------
58
59/// Request captured for the adapter to spawn a resumed agent execution.
60///
61/// This bundles everything the server-side spawner needs, keeping the
62/// application layer free of `AppState` and server-specific types.
63pub struct ResumeSpawnRequest {
64 pub session_id: String,
65 pub session: Session,
66 pub cancel_token: CancellationToken,
67 pub run_id: String,
68 pub event_sender: broadcast::Sender<AgentEvent>,
69 pub config: ResumeConfigSnapshot,
70}
71
72// ---------------------------------------------------------------------------
73// Use case
74// ---------------------------------------------------------------------------
75
76/// Resume agent execution on an existing session.
77///
78/// Returns the outcome of the resume attempt:
79/// - `Started` — execution spawned successfully
80/// - `AlreadyRunning` — a runner is already active
81/// - `Completed` — no pending user message
82/// - `NotFound` — session not found
83pub async fn resume_session_execution(
84 port: &dyn ResumeExecutionPort,
85 session_id: &str,
86 config: ResumeConfigSnapshot,
87) -> ResumeOutcome {
88 // Load session.
89 let Some(mut session) = port.load_session(session_id).await else {
90 return ResumeOutcome::NotFound;
91 };
92
93 if !has_pending_user_message(&session) {
94 return ResumeOutcome::Completed;
95 }
96
97 // Reserve runner slot.
98 let event_sender = port.get_or_create_event_sender(session_id).await;
99 let Some(reservation) = port.try_reserve_runner(session_id, &event_sender).await else {
100 return ResumeOutcome::AlreadyRunning;
101 };
102
103 consume_pending_conclusion_with_options_resume(&mut session);
104 port.save_and_cache_session(&mut session).await;
105
106 port.spawn_resume_execution(ResumeSpawnRequest {
107 session_id: session_id.to_string(),
108 session,
109 cancel_token: reservation.cancel_token,
110 run_id: reservation.run_id,
111 event_sender,
112 config,
113 })
114 .await;
115
116 ResumeOutcome::Started
117}