1use crate::bridge::SDKMessage;
10use crate::bridge::poll_config_defaults::PollIntervalConfig;
11use crate::bridge::repl_bridge_handle::{BridgeControlRequest, BridgeControlResponse, BridgeState};
12use crate::bridge::repl_bridge_transport::ReplBridgeTransport;
13use crate::error::AgentError;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::RwLock;
17
18const POLL_ERROR_INITIAL_DELAY_MS: u64 = 2_000;
24const POLL_ERROR_MAX_DELAY_MS: u64 = 60_000;
25const POLL_ERROR_GIVE_UP_MS: u64 = 15 * 60 * 1000;
26
27#[derive(Clone)]
33pub struct BridgeCoreParams {
34 pub dir: String,
36 pub machine_name: String,
38 pub branch: String,
40 pub git_repo_url: Option<String>,
42 pub title: String,
44 pub base_url: String,
46 pub session_ingress_url: String,
48 pub worker_type: String,
50 pub get_access_token: Arc<dyn Fn() -> Option<String> + Send + Sync>,
52 pub create_session: Arc<
54 dyn Fn(
55 String,
56 String,
57 Option<String>,
58 String,
59 ) -> future::BoxFuture<'static, Result<Option<String>, AgentError>>
60 + Send
61 + Sync,
62 >,
63 pub archive_session:
65 Arc<dyn Fn(String) -> future::BoxFuture<'static, Result<(), AgentError>> + Send + Sync>,
66 pub get_current_title: Option<Arc<dyn Fn() -> String + Send + Sync>>,
68 pub to_sdk_messages:
70 Option<Arc<dyn Fn(Vec<crate::types::Message>) -> Vec<SDKMessage> + Send + Sync>>,
71 pub on_auth_401: Option<
73 Arc<dyn Fn(String) -> future::BoxFuture<'static, Result<bool, AgentError>> + Send + Sync>,
74 >,
75 pub get_poll_interval_config: Option<Arc<dyn Fn() -> PollIntervalConfig + Send + Sync>>,
77 pub initial_history_cap: Option<u32>,
79 pub initial_messages: Option<Vec<crate::types::Message>>,
81 pub previously_flushed_uuids:
83 Option<Arc<dyn Fn() -> std::collections::HashSet<String> + Send + Sync>>,
84 pub on_inbound_message: Option<Arc<dyn Fn(SDKMessage) + Send + Sync>>,
86 pub on_permission_response: Option<Arc<dyn Fn(BridgeControlResponse) + Send + Sync>>,
88 pub on_interrupt: Option<Arc<dyn Fn() + Send + Sync>>,
90 pub on_set_model: Option<Arc<dyn Fn(Option<String>) + Send + Sync>>,
92 pub on_set_max_thinking_tokens: Option<Arc<dyn Fn(Option<u32>) + Send + Sync>>,
94 pub on_set_permission_mode:
96 Option<Arc<dyn Fn(crate::permission::PermissionMode) -> Result<(), String> + Send + Sync>>,
97 pub on_state_change: Option<Arc<dyn Fn(BridgeState, Option<String>) + Send + Sync>>,
99 pub on_user_message: Option<Arc<dyn Fn(String, String) -> bool + Send + Sync>>,
101 pub perpetual: Option<bool>,
103 pub initial_sse_sequence_num: Option<u64>,
105}
106
107pub struct BridgeCoreHandle {
109 pub session_id: RwLock<String>,
111 pub environment_id: RwLock<String>,
113 pub session_ingress_url: String,
115 pub transport: RwLock<Option<Box<dyn ReplBridgeTransport>>>,
117 pub current_work_id: RwLock<Option<String>>,
119 pub current_ingress_token: RwLock<Option<String>>,
121 pub last_sequence_num: RwLock<u64>,
123 pub poll_abort: tokio::sync::watch::Sender<bool>,
125 pub teardown_started: RwLock<bool>,
127 params: BridgeCoreParams,
129}
130
131impl BridgeCoreHandle {
132 pub fn new(
134 session_id: String,
135 environment_id: String,
136 session_ingress_url: String,
137 params: BridgeCoreParams,
138 ) -> Self {
139 let (poll_abort, _) = tokio::sync::watch::channel(false);
140 Self {
141 session_id: RwLock::new(session_id),
142 environment_id: RwLock::new(environment_id),
143 session_ingress_url,
144 transport: RwLock::new(None),
145 current_work_id: RwLock::new(None),
146 current_ingress_token: RwLock::new(None),
147 last_sequence_num: RwLock::new(0),
148 poll_abort,
149 teardown_started: RwLock::new(false),
150 params,
151 }
152 }
153
154 pub async fn write_messages(&self, messages: Vec<SDKMessage>) {
156 let transport = self.transport.read().await;
157 if let Some(t) = transport.as_ref() {
158 t.write_batch(messages).await;
159 }
160 }
161
162 pub async fn write_sdk_messages(&self, messages: Vec<SDKMessage>) {
164 let transport = self.transport.read().await;
165 if let Some(t) = transport.as_ref() {
166 t.write_batch(messages).await;
167 }
168 }
169
170 pub async fn send_control_request(&self, request: BridgeControlRequest) {
172 let transport = self.transport.read().await;
173 if let Some(t) = transport.as_ref() {
174 let msg =
175 bridge_message_from_control_request(request, self.session_id.read().await.clone());
176 t.write(msg).await;
177 }
178 }
179
180 pub async fn send_control_response(&self, response: BridgeControlResponse) {
182 let transport = self.transport.read().await;
183 if let Some(t) = transport.as_ref() {
184 let msg = bridge_message_from_control_response(
185 response,
186 self.session_id.read().await.clone(),
187 );
188 t.write(msg).await;
189 }
190 }
191
192 pub async fn send_control_cancel_request(&self, request_id: &str) {
194 let transport = self.transport.read().await;
195 if let Some(t) = transport.as_ref() {
196 let msg = bridge_control_cancel_request(
197 request_id.to_string(),
198 self.session_id.read().await.clone(),
199 );
200 t.write(msg).await;
201 }
202 }
203
204 pub async fn send_result(&self) {
206 let transport = self.transport.read().await;
207 if let Some(t) = transport.as_ref() {
208 let msg = bridge_result_message(self.session_id.read().await.clone());
209 t.write(msg).await;
210 }
211 }
212
213 pub fn get_sse_sequence_num(&self) -> u64 {
215 *self.last_sequence_num.blocking_read()
218 }
219
220 pub async fn teardown(&self) {
222 let mut started = self.teardown_started.write().await;
223 if *started {
224 return;
225 }
226 *started = true;
227 drop(started);
228
229 let _ = self.poll_abort.send(true);
231
232 let mut transport = self.transport.write().await;
234 if let Some(t) = transport.take() {
235 t.close();
236 }
237
238 if let Some(ref callback) = self.params.on_state_change {
240 callback(BridgeState::Failed, Some("teardown".to_string()));
241 }
242 }
243}
244
245#[derive(Debug, Clone, PartialEq)]
247pub enum BridgeStateInternal {
248 Ready,
249 Connecting,
250 Connected,
251 Reconnecting,
252 Failed,
253}
254
255pub async fn init_bridge_core(
263 params: BridgeCoreParams,
264) -> Result<Option<BridgeCoreHandle>, AgentError> {
265 let poll_config = params
267 .get_poll_interval_config
268 .as_ref()
269 .map(|f| f())
270 .unwrap_or_default();
271
272 if let Some(ref callback) = params.on_state_change {
274 callback(BridgeState::Ready, None);
275 }
276
277 Ok(None)
294}
295
296pub fn compute_backoff(consecutive_errors: u32) -> Duration {
302 let delay = POLL_ERROR_INITIAL_DELAY_MS
303 * 2u64.saturating_pow(consecutive_errors.saturating_sub(1) as u32);
304 Duration::from_millis(delay.min(POLL_ERROR_MAX_DELAY_MS))
305}
306
307pub fn should_give_up(first_error_time: Option<u64>, give_up_ms: u64) -> bool {
309 if let Some(start) = first_error_time {
310 let elapsed = std::time::SystemTime::now()
311 .duration_since(std::time::UNIX_EPOCH)
312 .map(|d| d.as_millis() as u64)
313 .unwrap_or(0)
314 - start;
315 return elapsed >= give_up_ms;
316 }
317 false
318}
319
320pub fn bridge_message_from_control_request(
326 _request: BridgeControlRequest,
327 session_id: String,
328) -> SDKMessage {
329 SDKMessage::user_message_with_session(session_id)
331}
332
333pub fn bridge_message_from_control_response(
335 _response: BridgeControlResponse,
336 session_id: String,
337) -> SDKMessage {
338 SDKMessage::user_message_with_session(session_id)
339}
340
341pub fn bridge_control_cancel_request(request_id: String, session_id: String) -> SDKMessage {
343 SDKMessage::user_message_with_session(session_id)
345}
346
347pub fn bridge_result_message(session_id: String) -> SDKMessage {
349 SDKMessage::user_message_with_session(session_id)
350}
351
352mod future {
357 use crate::error::AgentError;
358 use core::pin::Pin;
359
360 pub type BoxFuture<'a, T> = Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;
361}