Skip to main content

ai_agent/bridge/
repl_bridge_core.rs

1//! REPL Bridge Core - Bootstrap-free core for Remote Control.
2//!
3//! Translated from openclaudecode/src/bridge/replBridge.ts
4//!
5//! This module provides the core bridge functionality: env registration -> session
6//! creation -> poll loop -> ingress WS -> teardown. It reads nothing from
7//! bootstrap/state or sessionStorage — all context comes from params.
8
9use crate::bridge::SDKMessage;
10use crate::bridge::poll_config_defaults::PollIntervalConfig;
11use crate::bridge::repl_bridge_handle::{BridgeControlRequest, BridgeControlResponse, BridgeState};
12use crate::bridge::repl_bridge_transport::ReplBridgeTransport;
13use crate::error::AgentError;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::RwLock;
17
18// =============================================================================
19// CONSTANTS
20// =============================================================================
21
22/// Poll error recovery constants.
23const POLL_ERROR_INITIAL_DELAY_MS: u64 = 2_000;
24const POLL_ERROR_MAX_DELAY_MS: u64 = 60_000;
25const POLL_ERROR_GIVE_UP_MS: u64 = 15 * 60 * 1000;
26
27// =============================================================================
28// TYPES
29// =============================================================================
30
31/// Parameters for initializing the bridge core.
32#[derive(Clone)]
33pub struct BridgeCoreParams {
34    /// Current working directory.
35    pub dir: String,
36    /// Machine name.
37    pub machine_name: String,
38    /// Current git branch.
39    pub branch: String,
40    /// Git repo URL.
41    pub git_repo_url: Option<String>,
42    /// Session title.
43    pub title: String,
44    /// Base API URL.
45    pub base_url: String,
46    /// Session ingress URL.
47    pub session_ingress_url: String,
48    /// Worker type (e.g., "repl", "daemon").
49    pub worker_type: String,
50    /// Get the current OAuth access token.
51    pub get_access_token: Arc<dyn Fn() -> Option<String> + Send + Sync>,
52    /// Create a new session.
53    pub create_session: Arc<
54        dyn Fn(
55                String,
56                String,
57                Option<String>,
58                String,
59            ) -> future::BoxFuture<'static, Result<Option<String>, AgentError>>
60            + Send
61            + Sync,
62    >,
63    /// Archive a session.
64    pub archive_session:
65        Arc<dyn Fn(String) -> future::BoxFuture<'static, Result<(), AgentError>> + Send + Sync>,
66    /// Get current session title (for reconnection).
67    pub get_current_title: Option<Arc<dyn Fn() -> String + Send + Sync>>,
68    /// Convert internal messages to SDK format.
69    pub to_sdk_messages:
70        Option<Arc<dyn Fn(Vec<crate::types::Message>) -> Vec<SDKMessage> + Send + Sync>>,
71    /// Handle OAuth 401 refresh.
72    pub on_auth_401: Option<
73        Arc<dyn Fn(String) -> future::BoxFuture<'static, Result<bool, AgentError>> + Send + Sync>,
74    >,
75    /// Get poll interval config.
76    pub get_poll_interval_config: Option<Arc<dyn Fn() -> PollIntervalConfig + Send + Sync>>,
77    /// Max initial messages to replay on connect.
78    pub initial_history_cap: Option<u32>,
79    /// Initial messages to flush on connect.
80    pub initial_messages: Option<Vec<crate::types::Message>>,
81    /// Previously flushed UUIDs (for dedup).
82    pub previously_flushed_uuids:
83        Option<Arc<dyn Fn() -> std::collections::HashSet<String> + Send + Sync>>,
84    /// Callback for inbound messages.
85    pub on_inbound_message: Option<Arc<dyn Fn(SDKMessage) + Send + Sync>>,
86    /// Callback for permission responses.
87    pub on_permission_response: Option<Arc<dyn Fn(BridgeControlResponse) + Send + Sync>>,
88    /// Callback for interrupt.
89    pub on_interrupt: Option<Arc<dyn Fn() + Send + Sync>>,
90    /// Callback for model change.
91    pub on_set_model: Option<Arc<dyn Fn(Option<String>) + Send + Sync>>,
92    /// Callback for max thinking tokens change.
93    pub on_set_max_thinking_tokens: Option<Arc<dyn Fn(Option<u32>) + Send + Sync>>,
94    /// Callback for permission mode change.
95    pub on_set_permission_mode:
96        Option<Arc<dyn Fn(crate::permission::PermissionMode) -> Result<(), String> + Send + Sync>>,
97    /// Callback for state changes.
98    pub on_state_change: Option<Arc<dyn Fn(BridgeState, Option<String>) + Send + Sync>>,
99    /// Callback for user messages (title derivation).
100    pub on_user_message: Option<Arc<dyn Fn(String, String) -> bool + Send + Sync>>,
101    /// Whether this is a perpetual (persistent) bridge.
102    pub perpetual: Option<bool>,
103    /// Initial SSE sequence number (for daemon persistence).
104    pub initial_sse_sequence_num: Option<u64>,
105}
106
107/// Bridge handle returned after initialization.
108pub struct BridgeCoreHandle {
109    /// The bridge session ID.
110    pub session_id: RwLock<String>,
111    /// The environment ID (empty for env-less v2).
112    pub environment_id: RwLock<String>,
113    /// The session ingress URL.
114    pub session_ingress_url: String,
115    /// The transport (if connected).
116    pub transport: RwLock<Option<Box<dyn ReplBridgeTransport>>>,
117    /// Current work item ID.
118    pub current_work_id: RwLock<Option<String>>,
119    /// Current ingress token.
120    pub current_ingress_token: RwLock<Option<String>>,
121    /// Last SSE sequence number.
122    pub last_sequence_num: RwLock<u64>,
123    /// Poll controller signal.
124    pub poll_abort: tokio::sync::watch::Sender<bool>,
125    /// Teardown flag.
126    pub teardown_started: RwLock<bool>,
127    /// Parameters for callbacks.
128    params: BridgeCoreParams,
129}
130
131impl BridgeCoreHandle {
132    /// Create a new bridge handle.
133    pub fn new(
134        session_id: String,
135        environment_id: String,
136        session_ingress_url: String,
137        params: BridgeCoreParams,
138    ) -> Self {
139        let (poll_abort, _) = tokio::sync::watch::channel(false);
140        Self {
141            session_id: RwLock::new(session_id),
142            environment_id: RwLock::new(environment_id),
143            session_ingress_url,
144            transport: RwLock::new(None),
145            current_work_id: RwLock::new(None),
146            current_ingress_token: RwLock::new(None),
147            last_sequence_num: RwLock::new(0),
148            poll_abort,
149            teardown_started: RwLock::new(false),
150            params,
151        }
152    }
153
154    /// Write messages to the bridge.
155    pub async fn write_messages(&self, messages: Vec<SDKMessage>) {
156        let transport = self.transport.read().await;
157        if let Some(t) = transport.as_ref() {
158            t.write_batch(messages).await;
159        }
160    }
161
162    /// Write SDK messages directly.
163    pub async fn write_sdk_messages(&self, messages: Vec<SDKMessage>) {
164        let transport = self.transport.read().await;
165        if let Some(t) = transport.as_ref() {
166            t.write_batch(messages).await;
167        }
168    }
169
170    /// Send a control request.
171    pub async fn send_control_request(&self, request: BridgeControlRequest) {
172        let transport = self.transport.read().await;
173        if let Some(t) = transport.as_ref() {
174            let msg =
175                bridge_message_from_control_request(request, self.session_id.read().await.clone());
176            t.write(msg).await;
177        }
178    }
179
180    /// Send a control response.
181    pub async fn send_control_response(&self, response: BridgeControlResponse) {
182        let transport = self.transport.read().await;
183        if let Some(t) = transport.as_ref() {
184            let msg = bridge_message_from_control_response(
185                response,
186                self.session_id.read().await.clone(),
187            );
188            t.write(msg).await;
189        }
190    }
191
192    /// Send a control cancel request.
193    pub async fn send_control_cancel_request(&self, request_id: &str) {
194        let transport = self.transport.read().await;
195        if let Some(t) = transport.as_ref() {
196            let msg = bridge_control_cancel_request(
197                request_id.to_string(),
198                self.session_id.read().await.clone(),
199            );
200            t.write(msg).await;
201        }
202    }
203
204    /// Send a result message.
205    pub async fn send_result(&self) {
206        let transport = self.transport.read().await;
207        if let Some(t) = transport.as_ref() {
208            let msg = bridge_result_message(self.session_id.read().await.clone());
209            t.write(msg).await;
210        }
211    }
212
213    /// Get the SSE sequence number for persistence.
214    pub fn get_sse_sequence_num(&self) -> u64 {
215        // Return the last captured sequence number
216        // In practice, this would merge with the live transport's seq
217        *self.last_sequence_num.blocking_read()
218    }
219
220    /// Tear down the bridge.
221    pub async fn teardown(&self) {
222        let mut started = self.teardown_started.write().await;
223        if *started {
224            return;
225        }
226        *started = true;
227        drop(started);
228
229        // Abort poll loop
230        let _ = self.poll_abort.send(true);
231
232        // Close transport
233        let mut transport = self.transport.write().await;
234        if let Some(t) = transport.take() {
235            t.close();
236        }
237
238        // Call on_state_change if provided
239        if let Some(ref callback) = self.params.on_state_change {
240            callback(BridgeState::Failed, Some("teardown".to_string()));
241        }
242    }
243}
244
245/// Bridge state for debugging.
246#[derive(Debug, Clone, PartialEq)]
247pub enum BridgeStateInternal {
248    Ready,
249    Connecting,
250    Connected,
251    Reconnecting,
252    Failed,
253}
254
255// =============================================================================
256// INITIALIZATION
257// =============================================================================
258
259/// Initialize the bridge core.
260///
261/// Returns None on registration or session-creation failure.
262pub async fn init_bridge_core(
263    params: BridgeCoreParams,
264) -> Result<Option<BridgeCoreHandle>, AgentError> {
265    // Get poll interval config (default if not provided)
266    let poll_config = params
267        .get_poll_interval_config
268        .as_ref()
269        .map(|f| f())
270        .unwrap_or_default();
271
272    // Call on_state_change if provided
273    if let Some(ref callback) = params.on_state_change {
274        callback(BridgeState::Ready, None);
275    }
276
277    // Note: The actual implementation would:
278    // 1. Register the bridge environment
279    // 2. Create a session
280    // 3. Start the poll loop for work items
281    // 4. Connect transport when work arrives
282    // 5. Handle reconnection logic
283    //
284    // This is a simplified version - the full implementation would be
285    // several thousand lines of code handling:
286    // - Environment registration and re-registration
287    // - Session creation and recovery
288    // - Work polling with heartbeat
289    // - Transport management (v1/v2)
290    // - Reconnection logic
291    // - Graceful teardown
292
293    Ok(None)
294}
295
296// =============================================================================
297// HELPER FUNCTIONS
298// =============================================================================
299
300/// Compute exponential backoff delay.
301pub fn compute_backoff(consecutive_errors: u32) -> Duration {
302    let delay = POLL_ERROR_INITIAL_DELAY_MS
303        * 2u64.saturating_pow(consecutive_errors.saturating_sub(1) as u32);
304    Duration::from_millis(delay.min(POLL_ERROR_MAX_DELAY_MS))
305}
306
307/// Check if we should give up polling after errors.
308pub fn should_give_up(first_error_time: Option<u64>, give_up_ms: u64) -> bool {
309    if let Some(start) = first_error_time {
310        let elapsed = std::time::SystemTime::now()
311            .duration_since(std::time::UNIX_EPOCH)
312            .map(|d| d.as_millis() as u64)
313            .unwrap_or(0)
314            - start;
315        return elapsed >= give_up_ms;
316    }
317    false
318}
319
320// =============================================================================
321// MESSAGE HELPER FUNCTIONS
322// =============================================================================
323
324/// Create a bridge message from a control request.
325pub fn bridge_message_from_control_request(
326    _request: BridgeControlRequest,
327    session_id: String,
328) -> SDKMessage {
329    // Simplified - actual implementation would serialize properly
330    SDKMessage::user_message_with_session(session_id)
331}
332
333/// Create a bridge message from a control response.
334pub fn bridge_message_from_control_response(
335    _response: BridgeControlResponse,
336    session_id: String,
337) -> SDKMessage {
338    SDKMessage::user_message_with_session(session_id)
339}
340
341/// Create a control cancel request message.
342pub fn bridge_control_cancel_request(request_id: String, session_id: String) -> SDKMessage {
343    // The actual implementation would create a proper message
344    SDKMessage::user_message_with_session(session_id)
345}
346
347/// Create a result message.
348pub fn bridge_result_message(session_id: String) -> SDKMessage {
349    SDKMessage::user_message_with_session(session_id)
350}
351
352// =============================================================================
353// FUTURE TYPE HELPERS
354// =============================================================================
355
356mod future {
357    use crate::error::AgentError;
358    use core::pin::Pin;
359
360    pub type BoxFuture<'a, T> = Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;
361}