Skip to main content

ai_agent/bridge/
remote_bridge_core.rs

1//! Remote Bridge Core - Env-less Remote Control bridge.
2//!
3//! Translated from openclaudecode/src/bridge/remoteBridgeCore.ts
4//!
5//! "Env-less" = no Environments API layer. Distinct from "CCR v2" (the
6//! /worker/* transport protocol) — the env-based path can also use CCR v2
7//! transport. This file is about removing the poll/dispatch layer.
8//!
9//! Unlike initBridgeCore (env-based), this connects directly to the
10//! session-ingress layer without the Environments API work-dispatch layer:
11//!
12//!   1. POST /v1/code/sessions (OAuth, no env_id) -> session.id
13//!   2. POST /v1/code/sessions/{id}/bridge (OAuth) -> {worker_jwt, expires_in, api_base_url, worker_epoch}
14//!   3. createV2ReplTransport(worker_jwt, worker_epoch) -> SSE + CCRClient
15//!   4. createTokenRefreshScheduler -> proactive /bridge re-call (new JWT + new epoch)
16//!   5. 401 on SSE -> rebuild transport with fresh /bridge credentials (same seq-num)
17//!
18//! No register/poll/ack/stop/heartbeat/deregister environment lifecycle.
19
20use crate::bridge::SDKMessage;
21use crate::bridge::env_less_bridge_config::get_env_less_bridge_config;
22use crate::bridge::repl_bridge_handle::{BridgeControlRequest, BridgeControlResponse, BridgeState};
23use crate::bridge::repl_bridge_transport::ReplBridgeTransport;
24use crate::error::AgentError;
25use std::sync::Arc;
26use std::time::Duration;
27use tokio::sync::RwLock;
28
29// =============================================================================
30// CONSTANTS
31// =============================================================================
32
33const ANTHROPIC_VERSION: &str = "2023-06-01";
34
35// =============================================================================
36// TYPES
37// =============================================================================
38
39/// Parameters for initializing the env-less bridge.
40#[derive(Clone)]
41pub struct EnvLessBridgeParams {
42    /// Base API URL.
43    pub base_url: String,
44    /// Organization UUID.
45    pub org_uuid: String,
46    /// Session title.
47    pub title: String,
48    /// Get the current OAuth access token.
49    pub get_access_token: Arc<dyn Fn() -> Option<String> + Send + Sync>,
50    /// Handle OAuth 401 refresh.
51    pub on_auth_401: Option<
52        Arc<dyn Fn(String) -> future::BoxFuture<'static, Result<bool, AgentError>> + Send + Sync>,
53    >,
54    /// Convert internal messages to SDK format.
55    pub to_sdk_messages: Arc<dyn Fn(Vec<crate::types::Message>) -> Vec<SDKMessage> + Send + Sync>,
56    /// Max initial messages to replay on connect.
57    pub initial_history_cap: u32,
58    /// Initial messages to flush on connect.
59    pub initial_messages: Option<Vec<crate::types::Message>>,
60    /// Callback for inbound messages.
61    pub on_inbound_message: Option<Arc<dyn Fn(SDKMessage) + Send + Sync>>,
62    /// Callback for user messages (title derivation).
63    pub on_user_message: Option<Arc<dyn Fn(String, String) -> bool + Send + Sync>>,
64    /// Callback for permission responses.
65    pub on_permission_response: Option<Arc<dyn Fn(BridgeControlResponse) + Send + Sync>>,
66    /// Callback for interrupt.
67    pub on_interrupt: Option<Arc<dyn Fn() + Send + Sync>>,
68    /// Callback for model change.
69    pub on_set_model: Option<Arc<dyn Fn(Option<String>) + Send + Sync>>,
70    /// Callback for max thinking tokens change.
71    pub on_set_max_thinking_tokens: Option<Arc<dyn Fn(Option<u32>) + Send + Sync>>,
72    /// Callback for permission mode change.
73    pub on_set_permission_mode:
74        Option<Arc<dyn Fn(crate::permission::PermissionMode) -> Result<(), String> + Send + Sync>>,
75    /// Callback for state changes.
76    pub on_state_change: Option<Arc<dyn Fn(BridgeState, Option<String>) + Send + Sync>>,
77    /// When true, skip opening the SSE read stream.
78    pub outbound_only: Option<bool>,
79    /// Free-form tags for session categorization.
80    pub tags: Option<Vec<String>>,
81}
82
83/// Remote credentials returned from /bridge endpoint.
84#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
85pub struct RemoteCredentials {
86    /// Worker JWT for CCR v2 transport.
87    #[serde(rename = "worker_jwt")]
88    pub worker_jwt: String,
89    /// Seconds until JWT expires.
90    #[serde(rename = "expires_in")]
91    pub expires_in: u64,
92    /// Base URL for CCR v2 API.
93    #[serde(rename = "api_base_url")]
94    pub api_base_url: String,
95    /// Worker epoch (incremented on each /bridge call).
96    #[serde(rename = "worker_epoch")]
97    pub worker_epoch: u64,
98}
99
100/// Handle for the env-less bridge.
101pub struct EnvLessBridgeHandle {
102    /// The bridge session ID (cse_* form).
103    pub session_id: RwLock<String>,
104    /// The environment ID (empty for env-less).
105    pub environment_id: RwLock<String>,
106    /// The session ingress URL.
107    pub session_ingress_url: RwLock<String>,
108    /// The transport.
109    pub transport: RwLock<Option<Box<dyn ReplBridgeTransport>>>,
110    /// Current credentials (for refresh).
111    pub credentials: RwLock<Option<RemoteCredentials>>,
112    /// Teardown flag.
113    pub torn_down: RwLock<bool>,
114    /// Auth recovery in flight flag.
115    pub auth_recovery_in_flight: RwLock<bool>,
116    /// Parameters for callbacks.
117    params: EnvLessBridgeParams,
118}
119
120impl EnvLessBridgeHandle {
121    /// Create a new env-less bridge handle.
122    pub fn new(
123        session_id: String,
124        environment_id: String,
125        session_ingress_url: String,
126        params: EnvLessBridgeParams,
127    ) -> Self {
128        Self {
129            session_id: RwLock::new(session_id),
130            environment_id: RwLock::new(environment_id),
131            session_ingress_url: RwLock::new(session_ingress_url),
132            transport: RwLock::new(None),
133            credentials: RwLock::new(None),
134            torn_down: RwLock::new(false),
135            auth_recovery_in_flight: RwLock::new(false),
136            params,
137        }
138    }
139
140    /// Write messages to the bridge.
141    pub async fn write_messages(&self, messages: Vec<SDKMessage>) {
142        let transport = self.transport.read().await;
143        if let Some(t) = transport.as_ref() {
144            t.write_batch(messages).await;
145        }
146    }
147
148    /// Write SDK messages directly.
149    pub async fn write_sdk_messages(&self, messages: Vec<SDKMessage>) {
150        let transport = self.transport.read().await;
151        if let Some(t) = transport.as_ref() {
152            t.write_batch(messages).await;
153        }
154    }
155
156    /// Send a control request.
157    pub async fn send_control_request(&self, request: crate::bridge::BridgeControlRequest) {
158        let transport = self.transport.read().await;
159        if let Some(t) = transport.as_ref() {
160            // Convert to SDKMessage and send
161            t.write(remote_bridge_message_from_control_request(
162                request,
163                self.session_id.read().await.clone(),
164            ))
165            .await;
166        }
167    }
168
169    /// Send a control response.
170    pub async fn send_control_response(&self, response: BridgeControlResponse) {
171        let transport = self.transport.read().await;
172        if let Some(t) = transport.as_ref() {
173            t.write(remote_bridge_message_from_control_response(
174                response,
175                self.session_id.read().await.clone(),
176            ))
177            .await;
178        }
179    }
180
181    /// Send a control cancel request.
182    pub async fn send_control_cancel_request(&self, request_id: &str) {
183        let transport = self.transport.read().await;
184        if let Some(t) = transport.as_ref() {
185            let msg = remote_bridge_control_cancel_request(
186                request_id.to_string(),
187                self.session_id.read().await.clone(),
188            );
189            t.write(msg).await;
190        }
191    }
192
193    /// Send a result message.
194    pub async fn send_result(&self) {
195        let transport = self.transport.read().await;
196        if let Some(t) = transport.as_ref() {
197            let msg = remote_bridge_result_message(self.session_id.read().await.clone());
198            t.write(msg).await;
199        }
200    }
201
202    /// Tear down the bridge.
203    pub async fn teardown(&self) {
204        let mut torn_down = self.torn_down.write().await;
205        if *torn_down {
206            return;
207        }
208        *torn_down = true;
209        drop(torn_down);
210
211        // Close transport
212        let mut transport = self.transport.write().await;
213        if let Some(t) = transport.take() {
214            t.close();
215        }
216    }
217}
218
219// =============================================================================
220// INITIALIZATION
221// =============================================================================
222
223/// Create a session, fetch a worker JWT, connect the v2 transport.
224///
225/// Returns None on any pre-flight failure.
226pub async fn init_env_less_bridge_core(
227    params: EnvLessBridgeParams,
228) -> Result<Option<EnvLessBridgeHandle>, AgentError> {
229    let cfg = get_env_less_bridge_config().await;
230
231    // 1. Create session (POST /v1/code/sessions, no env_id)
232    let access_token = (params.get_access_token)();
233    if access_token.is_none() {
234        return Ok(None);
235    }
236    let access_token = access_token.unwrap();
237
238    // Create session using the provided function (SDK would need to provide this)
239    // For now, return None as we can't create sessions without the actual API
240    let _created_session_id: Option<String> = None;
241
242    // Note: The actual implementation would:
243    // 1. POST /v1/code/sessions to create the session
244    // 2. POST /v1/code/sessions/{id}/bridge to get worker credentials
245    // 3. Build the v2 transport with those credentials
246    // 4. Wire up callbacks for connect/data/close
247    // 5. Set up JWT refresh scheduler
248    // 6. Connect and start the session
249
250    // Call on_state_change if provided
251    if let Some(ref callback) = params.on_state_change {
252        callback(BridgeState::Ready, None);
253    }
254
255    Ok(None)
256}
257
258// =============================================================================
259// HELPER FUNCTIONS
260// =============================================================================
261
262/// Build OAuth headers for API requests.
263pub fn oauth_headers(access_token: &str) -> std::collections::HashMap<String, String> {
264    let mut headers = std::collections::HashMap::new();
265    headers.insert(
266        "Authorization".to_string(),
267        format!("Bearer {}", access_token),
268    );
269    headers.insert("Content-Type".to_string(), "application/json".to_string());
270    headers.insert(
271        "anthropic-version".to_string(),
272        ANTHROPIC_VERSION.to_string(),
273    );
274    headers
275}
276
277/// Retry an async init call with exponential backoff + jitter.
278pub async fn with_retry<T, F, E>(
279    mut max_attempts: u32,
280    mut base_delay_ms: u64,
281    jitter_fraction: f64,
282    max_delay_ms: u64,
283    mut fn_: F,
284) -> Result<T, E>
285where
286    F: FnMut() -> future::BoxFuture<'static, Result<T, E>>,
287    E: std::fmt::Debug,
288{
289    let mut attempt = 0;
290    loop {
291        attempt += 1;
292        match fn_().await {
293            Ok(result) => return Ok(result),
294            Err(e) if attempt >= max_attempts => return Err(e),
295            Err(_) => {
296                if attempt < max_attempts {
297                    let base =
298                        base_delay_ms * 2u64.saturating_pow(attempt.saturating_sub(1) as u32);
299                    let jitter = base as f64 * jitter_fraction * (2.0 * rand_f64() - 1.0);
300                    let delay = (base as f64 + jitter).min(max_delay_ms as f64) as u64;
301                    tokio::time::sleep(Duration::from_millis(delay)).await;
302                }
303            }
304        }
305    }
306}
307
308/// Simple random f64 between 0 and 1.
309fn rand_f64() -> f64 {
310    use std::time::{SystemTime, UNIX_EPOCH};
311    let nanos = SystemTime::now()
312        .duration_since(UNIX_EPOCH)
313        .map(|d| d.subsec_nanos())
314        .unwrap_or(0);
315    (nanos as f64) / (u32::MAX as f64)
316}
317
318// =============================================================================
319// MESSAGE HELPER FUNCTIONS
320// =============================================================================
321
322/// Create a bridge message from a control request.
323pub fn remote_bridge_message_from_control_request(
324    _request: BridgeControlRequest,
325    session_id: String,
326) -> SDKMessage {
327    SDKMessage::user_message_with_session(session_id)
328}
329
330/// Create a bridge message from a control response.
331pub fn remote_bridge_message_from_control_response(
332    _response: BridgeControlResponse,
333    session_id: String,
334) -> SDKMessage {
335    SDKMessage::user_message_with_session(session_id)
336}
337
338/// Create a control cancel request message.
339pub fn remote_bridge_control_cancel_request(request_id: String, session_id: String) -> SDKMessage {
340    SDKMessage::user_message_with_session(session_id)
341}
342
343/// Create a result message.
344pub fn remote_bridge_result_message(session_id: String) -> SDKMessage {
345    SDKMessage::user_message_with_session(session_id)
346}
347
348// =============================================================================
349// FUTURE TYPE HELPERS
350// =============================================================================
351
352mod future {
353    use crate::error::AgentError;
354    use core::pin::Pin;
355
356    pub type BoxFuture<'a, T> = Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;
357}