Skip to main content

bamboo_server/session_app/
resume.rs

1//! Resume execution use case.
2//!
3//! Provides the application-layer logic for resuming agent execution on an
4//! existing session (e.g. after a user responds to a pending question).
5//! The server layer implements `ResumeExecutionPort` to supply the
6//! infrastructure operations.
7
8use async_trait::async_trait;
9use bamboo_agent_core::AgentEvent;
10use bamboo_domain::Session;
11use tokio::sync::broadcast;
12use tokio_util::sync::CancellationToken;
13
14use super::execute::{consume_pending_conclusion_with_options_resume, has_pending_user_message};
15use super::types::{ResumeConfigSnapshot, ResumeOutcome};
16
17// ---------------------------------------------------------------------------
18// Port trait
19// ---------------------------------------------------------------------------
20
21/// Adapter trait for resume execution infrastructure.
22///
23/// Implementations bridge the use case to server-specific concerns
24/// (storage, runner lifecycle, agent spawning).
25#[async_trait]
26pub trait ResumeExecutionPort: Send + Sync {
27    /// Load a session by ID. Returns `None` if not found.
28    async fn load_session(&self, session_id: &str) -> Option<Session>;
29
30    /// Persist a session and update any caches.
31    async fn save_and_cache_session(&self, session: &Session);
32
33    /// Try to reserve a runner slot for the given session.
34    /// Returns `None` if a runner is already active.
35    async fn try_reserve_runner(
36        &self,
37        session_id: &str,
38        event_sender: &broadcast::Sender<AgentEvent>,
39    ) -> Option<CancellationToken>;
40
41    /// Get or create the long-lived broadcast sender for session events.
42    async fn get_or_create_event_sender(&self, session_id: &str) -> broadcast::Sender<AgentEvent>;
43
44    /// Spawn the resume execution loop in the background.
45    ///
46    /// The adapter creates the mpsc channel, spawns the event forwarder,
47    /// and calls the server's agent execution spawner.
48    async fn spawn_resume_execution(&self, request: ResumeSpawnRequest);
49}
50
51// ---------------------------------------------------------------------------
52// Value types
53// ---------------------------------------------------------------------------
54
55/// Request captured for the adapter to spawn a resumed agent execution.
56///
57/// This bundles everything the server-side spawner needs, keeping the
58/// application layer free of `AppState` and server-specific types.
59pub struct ResumeSpawnRequest {
60    pub session_id: String,
61    pub session: Session,
62    pub cancel_token: CancellationToken,
63    pub event_sender: broadcast::Sender<AgentEvent>,
64    pub config: ResumeConfigSnapshot,
65}
66
67// ---------------------------------------------------------------------------
68// Use case
69// ---------------------------------------------------------------------------
70
71/// Resume agent execution on an existing session.
72///
73/// Returns the outcome of the resume attempt:
74/// - `Started` — execution spawned successfully
75/// - `AlreadyRunning` — a runner is already active
76/// - `Completed` — no pending user message
77/// - `NotFound` — session not found
78pub async fn resume_session_execution(
79    port: &dyn ResumeExecutionPort,
80    session_id: &str,
81    config: ResumeConfigSnapshot,
82) -> ResumeOutcome {
83    // Load session.
84    let Some(mut session) = port.load_session(session_id).await else {
85        return ResumeOutcome::NotFound;
86    };
87
88    if !has_pending_user_message(&session) {
89        return ResumeOutcome::Completed;
90    }
91
92    // Reserve runner slot.
93    let event_sender = port.get_or_create_event_sender(session_id).await;
94    let Some(cancel_token) = port.try_reserve_runner(session_id, &event_sender).await else {
95        return ResumeOutcome::AlreadyRunning;
96    };
97
98    consume_pending_conclusion_with_options_resume(&mut session);
99    port.save_and_cache_session(&session).await;
100
101    port.spawn_resume_execution(ResumeSpawnRequest {
102        session_id: session_id.to_string(),
103        session,
104        cancel_token,
105        event_sender,
106        config,
107    })
108    .await;
109
110    ResumeOutcome::Started
111}