bamboo_server/session_app/resume.rs
1//! Resume execution use case.
2//!
3//! Provides the application-layer logic for resuming agent execution on an
4//! existing session (e.g. after a user responds to a pending question).
5//! The server layer implements `ResumeExecutionPort` to supply the
6//! infrastructure operations.
7
8use async_trait::async_trait;
9use bamboo_agent_core::AgentEvent;
10use bamboo_domain::Session;
11use tokio::sync::broadcast;
12use tokio_util::sync::CancellationToken;
13
14use super::execute::{consume_pending_conclusion_with_options_resume, has_pending_user_message};
15use super::types::{ResumeConfigSnapshot, ResumeOutcome};
16
17// ---------------------------------------------------------------------------
18// Port trait
19// ---------------------------------------------------------------------------
20
21/// Adapter trait for resume execution infrastructure.
22///
23/// Implementations bridge the use case to server-specific concerns
24/// (storage, runner lifecycle, agent spawning).
25#[async_trait]
26pub trait ResumeExecutionPort: Send + Sync {
27 /// Load a session by ID. Returns `None` if not found.
28 async fn load_session(&self, session_id: &str) -> Option<Session>;
29
30 /// Persist a session and update any caches.
31 async fn save_and_cache_session(&self, session: &Session);
32
33 /// Try to reserve a runner slot for the given session.
34 /// Returns `None` if a runner is already active.
35 async fn try_reserve_runner(
36 &self,
37 session_id: &str,
38 event_sender: &broadcast::Sender<AgentEvent>,
39 ) -> Option<CancellationToken>;
40
41 /// Get or create the long-lived broadcast sender for session events.
42 async fn get_or_create_event_sender(&self, session_id: &str) -> broadcast::Sender<AgentEvent>;
43
44 /// Spawn the resume execution loop in the background.
45 ///
46 /// The adapter creates the mpsc channel, spawns the event forwarder,
47 /// and calls the server's agent execution spawner.
48 async fn spawn_resume_execution(&self, request: ResumeSpawnRequest);
49}
50
51// ---------------------------------------------------------------------------
52// Value types
53// ---------------------------------------------------------------------------
54
55/// Request captured for the adapter to spawn a resumed agent execution.
56///
57/// This bundles everything the server-side spawner needs, keeping the
58/// application layer free of `AppState` and server-specific types.
59pub struct ResumeSpawnRequest {
60 pub session_id: String,
61 pub session: Session,
62 pub cancel_token: CancellationToken,
63 pub event_sender: broadcast::Sender<AgentEvent>,
64 pub config: ResumeConfigSnapshot,
65}
66
67// ---------------------------------------------------------------------------
68// Use case
69// ---------------------------------------------------------------------------
70
71/// Resume agent execution on an existing session.
72///
73/// Returns the outcome of the resume attempt:
74/// - `Started` — execution spawned successfully
75/// - `AlreadyRunning` — a runner is already active
76/// - `Completed` — no pending user message
77/// - `NotFound` — session not found
78pub async fn resume_session_execution(
79 port: &dyn ResumeExecutionPort,
80 session_id: &str,
81 config: ResumeConfigSnapshot,
82) -> ResumeOutcome {
83 // Load session.
84 let Some(mut session) = port.load_session(session_id).await else {
85 return ResumeOutcome::NotFound;
86 };
87
88 if !has_pending_user_message(&session) {
89 return ResumeOutcome::Completed;
90 }
91
92 // Reserve runner slot.
93 let event_sender = port.get_or_create_event_sender(session_id).await;
94 let Some(cancel_token) = port.try_reserve_runner(session_id, &event_sender).await else {
95 return ResumeOutcome::AlreadyRunning;
96 };
97
98 consume_pending_conclusion_with_options_resume(&mut session);
99 port.save_and_cache_session(&session).await;
100
101 port.spawn_resume_execution(ResumeSpawnRequest {
102 session_id: session_id.to_string(),
103 session,
104 cancel_token,
105 event_sender,
106 config,
107 })
108 .await;
109
110 ResumeOutcome::Started
111}