1use crate::bridge::SDKMessage;
21use crate::bridge::env_less_bridge_config::get_env_less_bridge_config;
22use crate::bridge::repl_bridge_handle::{BridgeControlRequest, BridgeControlResponse, BridgeState};
23use crate::bridge::repl_bridge_transport::ReplBridgeTransport;
24use crate::error::AgentError;
25use std::sync::Arc;
26use std::time::Duration;
27use tokio::sync::RwLock;
28
29const ANTHROPIC_VERSION: &str = "2023-06-01";
34
35#[derive(Clone)]
41pub struct EnvLessBridgeParams {
42 pub base_url: String,
44 pub org_uuid: String,
46 pub title: String,
48 pub get_access_token: Arc<dyn Fn() -> Option<String> + Send + Sync>,
50 pub on_auth_401: Option<
52 Arc<dyn Fn(String) -> future::BoxFuture<'static, Result<bool, AgentError>> + Send + Sync>,
53 >,
54 pub to_sdk_messages: Arc<dyn Fn(Vec<crate::types::Message>) -> Vec<SDKMessage> + Send + Sync>,
56 pub initial_history_cap: u32,
58 pub initial_messages: Option<Vec<crate::types::Message>>,
60 pub on_inbound_message: Option<Arc<dyn Fn(SDKMessage) + Send + Sync>>,
62 pub on_user_message: Option<Arc<dyn Fn(String, String) -> bool + Send + Sync>>,
64 pub on_permission_response: Option<Arc<dyn Fn(BridgeControlResponse) + Send + Sync>>,
66 pub on_interrupt: Option<Arc<dyn Fn() + Send + Sync>>,
68 pub on_set_model: Option<Arc<dyn Fn(Option<String>) + Send + Sync>>,
70 pub on_set_max_thinking_tokens: Option<Arc<dyn Fn(Option<u32>) + Send + Sync>>,
72 pub on_set_permission_mode:
74 Option<Arc<dyn Fn(crate::permission::PermissionMode) -> Result<(), String> + Send + Sync>>,
75 pub on_state_change: Option<Arc<dyn Fn(BridgeState, Option<String>) + Send + Sync>>,
77 pub outbound_only: Option<bool>,
79 pub tags: Option<Vec<String>>,
81}
82
83#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
85pub struct RemoteCredentials {
86 #[serde(rename = "worker_jwt")]
88 pub worker_jwt: String,
89 #[serde(rename = "expires_in")]
91 pub expires_in: u64,
92 #[serde(rename = "api_base_url")]
94 pub api_base_url: String,
95 #[serde(rename = "worker_epoch")]
97 pub worker_epoch: u64,
98}
99
100pub struct EnvLessBridgeHandle {
102 pub session_id: RwLock<String>,
104 pub environment_id: RwLock<String>,
106 pub session_ingress_url: RwLock<String>,
108 pub transport: RwLock<Option<Box<dyn ReplBridgeTransport>>>,
110 pub credentials: RwLock<Option<RemoteCredentials>>,
112 pub torn_down: RwLock<bool>,
114 pub auth_recovery_in_flight: RwLock<bool>,
116 params: EnvLessBridgeParams,
118}
119
120impl EnvLessBridgeHandle {
121 pub fn new(
123 session_id: String,
124 environment_id: String,
125 session_ingress_url: String,
126 params: EnvLessBridgeParams,
127 ) -> Self {
128 Self {
129 session_id: RwLock::new(session_id),
130 environment_id: RwLock::new(environment_id),
131 session_ingress_url: RwLock::new(session_ingress_url),
132 transport: RwLock::new(None),
133 credentials: RwLock::new(None),
134 torn_down: RwLock::new(false),
135 auth_recovery_in_flight: RwLock::new(false),
136 params,
137 }
138 }
139
140 pub async fn write_messages(&self, messages: Vec<SDKMessage>) {
142 let transport = self.transport.read().await;
143 if let Some(t) = transport.as_ref() {
144 t.write_batch(messages).await;
145 }
146 }
147
148 pub async fn write_sdk_messages(&self, messages: Vec<SDKMessage>) {
150 let transport = self.transport.read().await;
151 if let Some(t) = transport.as_ref() {
152 t.write_batch(messages).await;
153 }
154 }
155
156 pub async fn send_control_request(&self, request: crate::bridge::BridgeControlRequest) {
158 let transport = self.transport.read().await;
159 if let Some(t) = transport.as_ref() {
160 t.write(remote_bridge_message_from_control_request(
162 request,
163 self.session_id.read().await.clone(),
164 ))
165 .await;
166 }
167 }
168
169 pub async fn send_control_response(&self, response: BridgeControlResponse) {
171 let transport = self.transport.read().await;
172 if let Some(t) = transport.as_ref() {
173 t.write(remote_bridge_message_from_control_response(
174 response,
175 self.session_id.read().await.clone(),
176 ))
177 .await;
178 }
179 }
180
181 pub async fn send_control_cancel_request(&self, request_id: &str) {
183 let transport = self.transport.read().await;
184 if let Some(t) = transport.as_ref() {
185 let msg = remote_bridge_control_cancel_request(
186 request_id.to_string(),
187 self.session_id.read().await.clone(),
188 );
189 t.write(msg).await;
190 }
191 }
192
193 pub async fn send_result(&self) {
195 let transport = self.transport.read().await;
196 if let Some(t) = transport.as_ref() {
197 let msg = remote_bridge_result_message(self.session_id.read().await.clone());
198 t.write(msg).await;
199 }
200 }
201
202 pub async fn teardown(&self) {
204 let mut torn_down = self.torn_down.write().await;
205 if *torn_down {
206 return;
207 }
208 *torn_down = true;
209 drop(torn_down);
210
211 let mut transport = self.transport.write().await;
213 if let Some(t) = transport.take() {
214 t.close();
215 }
216 }
217}
218
219pub async fn init_env_less_bridge_core(
227 params: EnvLessBridgeParams,
228) -> Result<Option<EnvLessBridgeHandle>, AgentError> {
229 let cfg = get_env_less_bridge_config().await;
230
231 let access_token = (params.get_access_token)();
233 if access_token.is_none() {
234 return Ok(None);
235 }
236 let access_token = access_token.unwrap();
237
238 let _created_session_id: Option<String> = None;
241
242 if let Some(ref callback) = params.on_state_change {
252 callback(BridgeState::Ready, None);
253 }
254
255 Ok(None)
256}
257
258pub fn oauth_headers(access_token: &str) -> std::collections::HashMap<String, String> {
264 let mut headers = std::collections::HashMap::new();
265 headers.insert(
266 "Authorization".to_string(),
267 format!("Bearer {}", access_token),
268 );
269 headers.insert("Content-Type".to_string(), "application/json".to_string());
270 headers.insert(
271 "anthropic-version".to_string(),
272 ANTHROPIC_VERSION.to_string(),
273 );
274 headers
275}
276
277pub async fn with_retry<T, F, E>(
279 mut max_attempts: u32,
280 mut base_delay_ms: u64,
281 jitter_fraction: f64,
282 max_delay_ms: u64,
283 mut fn_: F,
284) -> Result<T, E>
285where
286 F: FnMut() -> future::BoxFuture<'static, Result<T, E>>,
287 E: std::fmt::Debug,
288{
289 let mut attempt = 0;
290 loop {
291 attempt += 1;
292 match fn_().await {
293 Ok(result) => return Ok(result),
294 Err(e) if attempt >= max_attempts => return Err(e),
295 Err(_) => {
296 if attempt < max_attempts {
297 let base =
298 base_delay_ms * 2u64.saturating_pow(attempt.saturating_sub(1) as u32);
299 let jitter = base as f64 * jitter_fraction * (2.0 * rand_f64() - 1.0);
300 let delay = (base as f64 + jitter).min(max_delay_ms as f64) as u64;
301 tokio::time::sleep(Duration::from_millis(delay)).await;
302 }
303 }
304 }
305 }
306}
307
308fn rand_f64() -> f64 {
310 use std::time::{SystemTime, UNIX_EPOCH};
311 let nanos = SystemTime::now()
312 .duration_since(UNIX_EPOCH)
313 .map(|d| d.subsec_nanos())
314 .unwrap_or(0);
315 (nanos as f64) / (u32::MAX as f64)
316}
317
318pub fn remote_bridge_message_from_control_request(
324 _request: BridgeControlRequest,
325 session_id: String,
326) -> SDKMessage {
327 SDKMessage::user_message_with_session(session_id)
328}
329
330pub fn remote_bridge_message_from_control_response(
332 _response: BridgeControlResponse,
333 session_id: String,
334) -> SDKMessage {
335 SDKMessage::user_message_with_session(session_id)
336}
337
338pub fn remote_bridge_control_cancel_request(request_id: String, session_id: String) -> SDKMessage {
340 SDKMessage::user_message_with_session(session_id)
341}
342
343pub fn remote_bridge_result_message(session_id: String) -> SDKMessage {
345 SDKMessage::user_message_with_session(session_id)
346}
347
348mod future {
353 use crate::error::AgentError;
354 use core::pin::Pin;
355
356 pub type BoxFuture<'a, T> = Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;
357}