use crate::bridge::SDKMessage;
use crate::bridge::env_less_bridge_config::get_env_less_bridge_config;
use crate::bridge::repl_bridge_handle::{BridgeControlRequest, BridgeControlResponse, BridgeState};
use crate::bridge::repl_bridge_transport::ReplBridgeTransport;
use crate::error::AgentError;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
const ANTHROPIC_VERSION: &str = "2023-06-01";
#[derive(Clone)]
pub struct EnvLessBridgeParams {
pub base_url: String,
pub org_uuid: String,
pub title: String,
pub get_access_token: Arc<dyn Fn() -> Option<String> + Send + Sync>,
pub on_auth_401: Option<
Arc<dyn Fn(String) -> future::BoxFuture<'static, Result<bool, AgentError>> + Send + Sync>,
>,
pub to_sdk_messages: Arc<dyn Fn(Vec<crate::types::Message>) -> Vec<SDKMessage> + Send + Sync>,
pub initial_history_cap: u32,
pub initial_messages: Option<Vec<crate::types::Message>>,
pub on_inbound_message: Option<Arc<dyn Fn(SDKMessage) + Send + Sync>>,
pub on_user_message: Option<Arc<dyn Fn(String, String) -> bool + Send + Sync>>,
pub on_permission_response: Option<Arc<dyn Fn(BridgeControlResponse) + Send + Sync>>,
pub on_interrupt: Option<Arc<dyn Fn() + Send + Sync>>,
pub on_set_model: Option<Arc<dyn Fn(Option<String>) + Send + Sync>>,
pub on_set_max_thinking_tokens: Option<Arc<dyn Fn(Option<u32>) + Send + Sync>>,
pub on_set_permission_mode:
Option<Arc<dyn Fn(crate::permission::PermissionMode) -> Result<(), String> + Send + Sync>>,
pub on_state_change: Option<Arc<dyn Fn(BridgeState, Option<String>) + Send + Sync>>,
pub outbound_only: Option<bool>,
pub tags: Option<Vec<String>>,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct RemoteCredentials {
#[serde(rename = "worker_jwt")]
pub worker_jwt: String,
#[serde(rename = "expires_in")]
pub expires_in: u64,
#[serde(rename = "api_base_url")]
pub api_base_url: String,
#[serde(rename = "worker_epoch")]
pub worker_epoch: u64,
}
pub struct EnvLessBridgeHandle {
pub session_id: RwLock<String>,
pub environment_id: RwLock<String>,
pub session_ingress_url: RwLock<String>,
pub transport: RwLock<Option<Box<dyn ReplBridgeTransport>>>,
pub credentials: RwLock<Option<RemoteCredentials>>,
pub torn_down: RwLock<bool>,
pub auth_recovery_in_flight: RwLock<bool>,
params: EnvLessBridgeParams,
}
impl EnvLessBridgeHandle {
pub fn new(
session_id: String,
environment_id: String,
session_ingress_url: String,
params: EnvLessBridgeParams,
) -> Self {
Self {
session_id: RwLock::new(session_id),
environment_id: RwLock::new(environment_id),
session_ingress_url: RwLock::new(session_ingress_url),
transport: RwLock::new(None),
credentials: RwLock::new(None),
torn_down: RwLock::new(false),
auth_recovery_in_flight: RwLock::new(false),
params,
}
}
pub async fn write_messages(&self, messages: Vec<SDKMessage>) {
let transport = self.transport.read().await;
if let Some(t) = transport.as_ref() {
t.write_batch(messages).await;
}
}
pub async fn write_sdk_messages(&self, messages: Vec<SDKMessage>) {
let transport = self.transport.read().await;
if let Some(t) = transport.as_ref() {
t.write_batch(messages).await;
}
}
pub async fn send_control_request(&self, request: crate::bridge::BridgeControlRequest) {
let transport = self.transport.read().await;
if let Some(t) = transport.as_ref() {
t.write(remote_bridge_message_from_control_request(
request,
self.session_id.read().await.clone(),
))
.await;
}
}
pub async fn send_control_response(&self, response: BridgeControlResponse) {
let transport = self.transport.read().await;
if let Some(t) = transport.as_ref() {
t.write(remote_bridge_message_from_control_response(
response,
self.session_id.read().await.clone(),
))
.await;
}
}
pub async fn send_control_cancel_request(&self, request_id: &str) {
let transport = self.transport.read().await;
if let Some(t) = transport.as_ref() {
let msg = remote_bridge_control_cancel_request(
request_id.to_string(),
self.session_id.read().await.clone(),
);
t.write(msg).await;
}
}
pub async fn send_result(&self) {
let transport = self.transport.read().await;
if let Some(t) = transport.as_ref() {
let msg = remote_bridge_result_message(self.session_id.read().await.clone());
t.write(msg).await;
}
}
pub async fn teardown(&self) {
let mut torn_down = self.torn_down.write().await;
if *torn_down {
return;
}
*torn_down = true;
drop(torn_down);
let mut transport = self.transport.write().await;
if let Some(t) = transport.take() {
t.close();
}
}
}
pub async fn init_env_less_bridge_core(
params: EnvLessBridgeParams,
) -> Result<Option<EnvLessBridgeHandle>, AgentError> {
let cfg = get_env_less_bridge_config().await;
let access_token = (params.get_access_token)();
if access_token.is_none() {
return Ok(None);
}
let access_token = access_token.unwrap();
let _created_session_id: Option<String> = None;
if let Some(ref callback) = params.on_state_change {
callback(BridgeState::Ready, None);
}
Ok(None)
}
pub fn oauth_headers(access_token: &str) -> std::collections::HashMap<String, String> {
let mut headers = std::collections::HashMap::new();
headers.insert(
"Authorization".to_string(),
format!("Bearer {}", access_token),
);
headers.insert("Content-Type".to_string(), "application/json".to_string());
headers.insert(
"anthropic-version".to_string(),
ANTHROPIC_VERSION.to_string(),
);
headers
}
pub async fn with_retry<T, F, E>(
mut max_attempts: u32,
mut base_delay_ms: u64,
jitter_fraction: f64,
max_delay_ms: u64,
mut fn_: F,
) -> Result<T, E>
where
F: FnMut() -> future::BoxFuture<'static, Result<T, E>>,
E: std::fmt::Debug,
{
let mut attempt = 0;
loop {
attempt += 1;
match fn_().await {
Ok(result) => return Ok(result),
Err(e) if attempt >= max_attempts => return Err(e),
Err(_) => {
if attempt < max_attempts {
let base =
base_delay_ms * 2u64.saturating_pow(attempt.saturating_sub(1) as u32);
let jitter = base as f64 * jitter_fraction * (2.0 * rand_f64() - 1.0);
let delay = (base as f64 + jitter).min(max_delay_ms as f64) as u64;
tokio::time::sleep(Duration::from_millis(delay)).await;
}
}
}
}
}
fn rand_f64() -> f64 {
use std::time::{SystemTime, UNIX_EPOCH};
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.subsec_nanos())
.unwrap_or(0);
(nanos as f64) / (u32::MAX as f64)
}
pub fn remote_bridge_message_from_control_request(
_request: BridgeControlRequest,
session_id: String,
) -> SDKMessage {
SDKMessage::user_message_with_session(session_id)
}
pub fn remote_bridge_message_from_control_response(
_response: BridgeControlResponse,
session_id: String,
) -> SDKMessage {
SDKMessage::user_message_with_session(session_id)
}
pub fn remote_bridge_control_cancel_request(request_id: String, session_id: String) -> SDKMessage {
SDKMessage::user_message_with_session(session_id)
}
pub fn remote_bridge_result_message(session_id: String) -> SDKMessage {
SDKMessage::user_message_with_session(session_id)
}
mod future {
use crate::error::AgentError;
use core::pin::Pin;
pub type BoxFuture<'a, T> = Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;
}