Skip to main content

bamboo_server/session_app/
resume.rs

1//! Resume execution use case.
2//!
3//! Provides the application-layer logic for resuming agent execution on an
4//! existing session (e.g. after a user responds to a pending question).
5//! The server layer implements `ResumeExecutionPort` to supply the
6//! infrastructure operations.
7
8use async_trait::async_trait;
9use bamboo_agent_core::AgentEvent;
10use bamboo_domain::Session;
11use tokio::sync::broadcast;
12use tokio_util::sync::CancellationToken;
13
14use super::execute::{consume_pending_conclusion_with_options_resume, has_pending_user_message};
15use super::types::{ResumeConfigSnapshot, ResumeOutcome};
16use bamboo_engine::execution::runner_lifecycle::RunnerReservation;
17
18// ---------------------------------------------------------------------------
19// Port trait
20// ---------------------------------------------------------------------------
21
22/// Adapter trait for resume execution infrastructure.
23///
24/// Implementations bridge the use case to server-specific concerns
25/// (storage, runner lifecycle, agent spawning).
26#[async_trait]
27pub trait ResumeExecutionPort: Send + Sync {
28    /// Load a session by ID. Returns `None` if not found.
29    async fn load_session(&self, session_id: &str) -> Option<Session>;
30
31    /// Persist a session and update any caches.
32    ///
33    /// Implementations may merge concurrent UI edits to title/pinned/title_version
34    /// from disk back into `session` (which is why this takes `&mut`).
35    async fn save_and_cache_session(&self, session: &mut Session);
36
37    /// Try to reserve a runner slot for the given session.
38    /// Returns `None` if a runner is already active.
39    async fn try_reserve_runner(
40        &self,
41        session_id: &str,
42        event_sender: &broadcast::Sender<AgentEvent>,
43    ) -> Option<RunnerReservation>;
44
45    /// Get or create the long-lived broadcast sender for session events.
46    async fn get_or_create_event_sender(&self, session_id: &str) -> broadcast::Sender<AgentEvent>;
47
48    /// Spawn the resume execution loop in the background.
49    ///
50    /// The adapter creates the mpsc channel, spawns the event forwarder,
51    /// and calls the server's agent execution spawner.
52    async fn spawn_resume_execution(&self, request: ResumeSpawnRequest);
53}
54
55// ---------------------------------------------------------------------------
56// Value types
57// ---------------------------------------------------------------------------
58
59/// Request captured for the adapter to spawn a resumed agent execution.
60///
61/// This bundles everything the server-side spawner needs, keeping the
62/// application layer free of `AppState` and server-specific types.
63pub struct ResumeSpawnRequest {
64    pub session_id: String,
65    pub session: Session,
66    pub cancel_token: CancellationToken,
67    pub run_id: String,
68    pub event_sender: broadcast::Sender<AgentEvent>,
69    pub config: ResumeConfigSnapshot,
70}
71
72// ---------------------------------------------------------------------------
73// Use case
74// ---------------------------------------------------------------------------
75
76/// Resume agent execution on an existing session.
77///
78/// Returns the outcome of the resume attempt:
79/// - `Started` — execution spawned successfully
80/// - `AlreadyRunning` — a runner is already active
81/// - `Completed` — no pending user message
82/// - `NotFound` — session not found
83pub async fn resume_session_execution(
84    port: &dyn ResumeExecutionPort,
85    session_id: &str,
86    config: ResumeConfigSnapshot,
87) -> ResumeOutcome {
88    // Load session.
89    let Some(mut session) = port.load_session(session_id).await else {
90        return ResumeOutcome::NotFound;
91    };
92
93    if !has_pending_user_message(&session) {
94        return ResumeOutcome::Completed;
95    }
96
97    // Reserve runner slot.
98    let event_sender = port.get_or_create_event_sender(session_id).await;
99    let Some(reservation) = port.try_reserve_runner(session_id, &event_sender).await else {
100        return ResumeOutcome::AlreadyRunning;
101    };
102
103    consume_pending_conclusion_with_options_resume(&mut session);
104    port.save_and_cache_session(&mut session).await;
105
106    port.spawn_resume_execution(ResumeSpawnRequest {
107        session_id: session_id.to_string(),
108        session,
109        cancel_token: reservation.cancel_token,
110        run_id: reservation.run_id,
111        event_sender,
112        config,
113    })
114    .await;
115
116    ResumeOutcome::Started
117}