Skip to main content

bamboo_server/session_app/
resume.rs

1//! Resume execution use case.
2//!
3//! Provides the application-layer logic for resuming agent execution on an
4//! existing session (e.g. after a user responds to a pending question).
5//! The server layer implements `ResumeExecutionPort` to supply the
6//! infrastructure operations.
7
8use async_trait::async_trait;
9use bamboo_agent_core::AgentEvent;
10use bamboo_domain::Session;
11use tokio::sync::broadcast;
12use tokio_util::sync::CancellationToken;
13
14use super::execute::{consume_pending_conclusion_with_options_resume, has_pending_user_message};
15use super::types::{ResumeConfigSnapshot, ResumeOutcome};
16use bamboo_engine::execution::runner_lifecycle::RunnerReservation;
17
18// ---------------------------------------------------------------------------
19// Port trait
20// ---------------------------------------------------------------------------
21
22/// Adapter trait for resume execution infrastructure.
23///
24/// Implementations bridge the use case to server-specific concerns
25/// (storage, runner lifecycle, agent spawning).
26#[async_trait]
27pub trait ResumeExecutionPort: Send + Sync {
28    /// Load a session by ID. Returns `None` if not found.
29    async fn load_session(&self, session_id: &str) -> Option<Session>;
30
31    /// Persist a session and update any caches.
32    ///
33    /// Implementations may merge concurrent UI edits to title/pinned/title_version
34    /// from disk back into `session` (which is why this takes `&mut`).
35    async fn save_and_cache_session(&self, session: &mut Session);
36
37    /// Try to reserve a runner slot for the given session.
38    /// Returns `None` if a runner is already active.
39    async fn try_reserve_runner(
40        &self,
41        session_id: &str,
42        event_sender: &broadcast::Sender<AgentEvent>,
43    ) -> Option<RunnerReservation>;
44
45    /// Get the run_id of an existing runner if one exists.
46    async fn get_existing_runner_run_id(&self, session_id: &str) -> Option<String>;
47
48    /// Get or create the long-lived broadcast sender for session events.
49    async fn get_or_create_event_sender(&self, session_id: &str) -> broadcast::Sender<AgentEvent>;
50
51    /// Spawn the resume execution loop in the background.
52    ///
53    /// The adapter creates the mpsc channel, spawns the event forwarder,
54    /// and calls the server's agent execution spawner.
55    async fn spawn_resume_execution(&self, request: ResumeSpawnRequest);
56}
57
58// ---------------------------------------------------------------------------
59// Value types
60// ---------------------------------------------------------------------------
61
62/// Request captured for the adapter to spawn a resumed agent execution.
63///
64/// This bundles everything the server-side spawner needs, keeping the
65/// application layer free of `AppState` and server-specific types.
66pub struct ResumeSpawnRequest {
67    pub session_id: String,
68    pub session: Session,
69    pub cancel_token: CancellationToken,
70    pub run_id: String,
71    pub event_sender: broadcast::Sender<AgentEvent>,
72    pub config: ResumeConfigSnapshot,
73}
74
75// ---------------------------------------------------------------------------
76// Use case
77// ---------------------------------------------------------------------------
78
79/// Resume agent execution on an existing session.
80///
81/// Returns the outcome of the resume attempt:
82/// - `Started` — execution spawned successfully
83/// - `AlreadyRunning` — a runner is already active
84/// - `Completed` — no pending user message
85/// - `NotFound` — session not found
86pub async fn resume_session_execution(
87    port: &dyn ResumeExecutionPort,
88    session_id: &str,
89    config: ResumeConfigSnapshot,
90) -> ResumeOutcome {
91    // Load session.
92    let Some(mut session) = port.load_session(session_id).await else {
93        return ResumeOutcome::NotFound;
94    };
95
96    if !has_pending_user_message(&session) {
97        return ResumeOutcome::Completed;
98    }
99
100    // Reserve runner slot.
101    let event_sender = port.get_or_create_event_sender(session_id).await;
102    let Some(reservation) = port.try_reserve_runner(session_id, &event_sender).await else {
103        // If already running, try to get the existing runner's run_id
104        let existing_run_id = port.get_existing_runner_run_id(session_id).await;
105        return ResumeOutcome::AlreadyRunning {
106            run_id: existing_run_id.unwrap_or_default(),
107        };
108    };
109
110    consume_pending_conclusion_with_options_resume(&mut session);
111    port.save_and_cache_session(&mut session).await;
112
113    port.spawn_resume_execution(ResumeSpawnRequest {
114        session_id: session_id.to_string(),
115        session,
116        cancel_token: reservation.cancel_token,
117        run_id: reservation.run_id.clone(),
118        event_sender,
119        config,
120    })
121    .await;
122
123    ResumeOutcome::Started {
124        run_id: reservation.run_id,
125    }
126}