brainwires_network/remote/bridge/
core.rs1use std::collections::{HashMap, HashSet};
4use std::sync::Arc;
5use std::time::Duration;
6
7use anyhow::{Result, bail};
8use tokio::sync::RwLock;
9
10use super::types::{
11 AgentSubscription, BridgeConfig, BridgeState, ConnectionMode, RealtimeCredentials,
12};
13use crate::remote::attachments::AttachmentReceiver;
14use crate::remote::heartbeat::HeartbeatCollector;
15use crate::remote::protocol::{
16 NegotiatedProtocol, ProtocolCapability, RemoteMessage, StreamChunkType,
17};
18use crate::traits::AgentSpawner;
19
20const REMOTE_BRIDGE_TIMEOUT_SECS: u64 = 30;
21
22#[derive(Clone)]
27pub struct RemoteBridge {
28 pub(super) config: BridgeConfig,
29 pub(super) http_client: reqwest::Client,
30 pub state: Arc<RwLock<BridgeState>>,
32 pub(super) connection_mode: Arc<RwLock<ConnectionMode>>,
33 pub(super) session_token: Arc<RwLock<Option<String>>>,
34 pub(super) user_id: Arc<RwLock<Option<String>>>,
35 pub(super) realtime_credentials: Arc<RwLock<Option<RealtimeCredentials>>>,
36 pub(super) subscriptions: Arc<RwLock<HashSet<String>>>,
37 pub(super) subscription_tasks: Arc<RwLock<HashMap<String, AgentSubscription>>>,
38 pub(super) heartbeat_collector: Arc<RwLock<HeartbeatCollector>>,
39 pub(super) command_result_queue: Arc<RwLock<Vec<RemoteMessage>>>,
40 #[allow(clippy::type_complexity)]
41 pub(super) stream_tx:
42 Arc<RwLock<Option<tokio::sync::mpsc::Sender<(String, StreamChunkType, String)>>>>,
43 pub(super) sync_trigger_tx: Arc<RwLock<Option<tokio::sync::mpsc::Sender<()>>>>,
44 pub(super) shutdown_tx: Option<tokio::sync::broadcast::Sender<()>>,
45 pub(super) negotiated_protocol: Arc<RwLock<NegotiatedProtocol>>,
46 pub(super) attachment_receiver: AttachmentReceiver,
47 pub(super) agent_spawner: Option<Arc<dyn AgentSpawner>>,
49 pub device_status: Arc<RwLock<Option<crate::remote::protocol::DeviceStatus>>>,
51 pub org_policies: Arc<RwLock<Option<crate::remote::protocol::OrgPolicies>>>,
53 pub permission_relay: crate::remote::permission_relay::PermissionRelay,
55 #[cfg(feature = "telemetry")]
57 pub(super) analytics_collector:
58 Option<std::sync::Arc<brainwires_telemetry::AnalyticsCollector>>,
59}
60
61impl RemoteBridge {
62 pub fn new(config: BridgeConfig, agent_spawner: Option<Arc<dyn AgentSpawner>>) -> Self {
68 let http_client = reqwest::Client::builder()
69 .timeout(Duration::from_secs(REMOTE_BRIDGE_TIMEOUT_SECS))
70 .build()
71 .expect("Failed to create HTTP client");
72
73 let heartbeat_collector =
74 HeartbeatCollector::new(config.sessions_dir.clone(), config.version.clone());
75
76 let attachment_receiver = AttachmentReceiver::new(config.attachment_dir.clone());
77
78 Self {
79 config,
80 http_client,
81 state: Arc::new(RwLock::new(BridgeState::Disconnected)),
82 connection_mode: Arc::new(RwLock::new(ConnectionMode::Polling)),
83 session_token: Arc::new(RwLock::new(None)),
84 user_id: Arc::new(RwLock::new(None)),
85 realtime_credentials: Arc::new(RwLock::new(None)),
86 subscriptions: Arc::new(RwLock::new(HashSet::new())),
87 subscription_tasks: Arc::new(RwLock::new(HashMap::new())),
88 heartbeat_collector: Arc::new(RwLock::new(heartbeat_collector)),
89 command_result_queue: Arc::new(RwLock::new(Vec::new())),
90 stream_tx: Arc::new(RwLock::new(None)),
91 sync_trigger_tx: Arc::new(RwLock::new(None)),
92 shutdown_tx: None,
93 negotiated_protocol: Arc::new(RwLock::new(NegotiatedProtocol::default())),
94 attachment_receiver,
95 agent_spawner,
96 device_status: Arc::new(RwLock::new(None)),
97 org_policies: Arc::new(RwLock::new(None)),
98 permission_relay: crate::remote::permission_relay::PermissionRelay::new(),
99 #[cfg(feature = "telemetry")]
100 analytics_collector: None,
101 }
102 }
103
104 #[cfg(feature = "telemetry")]
106 pub fn with_analytics(
107 mut self,
108 collector: std::sync::Arc<brainwires_telemetry::AnalyticsCollector>,
109 ) -> Self {
110 self.analytics_collector = Some(collector);
111 self
112 }
113
114 pub async fn connection_mode(&self) -> ConnectionMode {
116 *self.connection_mode.read().await
117 }
118
119 pub async fn state(&self) -> BridgeState {
121 *self.state.read().await
122 }
123
124 pub async fn is_ready(&self) -> bool {
126 *self.state.read().await == BridgeState::Authenticated
127 }
128
129 pub async fn user_id(&self) -> Option<String> {
131 self.user_id.read().await.clone()
132 }
133
134 pub async fn protocol_version(&self) -> String {
136 self.negotiated_protocol.read().await.version.clone()
137 }
138
139 pub async fn has_capability(&self, cap: ProtocolCapability) -> bool {
141 self.negotiated_protocol.read().await.has_capability(cap)
142 }
143
144 pub async fn enabled_capabilities(&self) -> Vec<ProtocolCapability> {
146 self.negotiated_protocol.read().await.capabilities.clone()
147 }
148
149 pub async fn send_permission_request(
154 &self,
155 agent_id: &str,
156 tool_name: &str,
157 action: &str,
158 details: serde_json::Value,
159 ) -> Result<crate::remote::permission_relay::PermissionDecision> {
160 use crate::remote::permission_relay::PermissionDecision;
161
162 if self.permission_relay.is_session_allowed(tool_name).await {
164 return Ok(PermissionDecision {
165 approved: true,
166 remember_for_session: true,
167 always_allow: true,
168 });
169 }
170
171 let request_id = uuid::Uuid::new_v4().to_string();
172 let timeout = self.permission_relay.default_timeout().await;
173
174 let rx = self
176 .permission_relay
177 .register_request(request_id.clone())
178 .await;
179
180 let msg = RemoteMessage::PermissionRequest {
182 request_id: request_id.clone(),
183 agent_id: agent_id.to_string(),
184 tool_name: tool_name.to_string(),
185 action: action.to_string(),
186 details,
187 timeout_secs: timeout.as_secs() as u32,
188 };
189
190 self.command_result_queue.write().await.push(msg);
192
193 match tokio::time::timeout(timeout, rx).await {
195 Ok(Ok(decision)) => Ok(decision),
196 Ok(Err(_)) => {
197 Ok(PermissionDecision {
199 approved: false,
200 remember_for_session: false,
201 always_allow: false,
202 })
203 }
204 Err(_) => {
205 self.permission_relay.cancel(&request_id).await;
207 Ok(PermissionDecision {
208 approved: false,
209 remember_for_session: false,
210 always_allow: false,
211 })
212 }
213 }
214 }
215
216 pub fn set_shutdown_tx(&mut self, tx: tokio::sync::broadcast::Sender<()>) {
218 self.shutdown_tx = Some(tx);
219 }
220
221 pub async fn run(&mut self) -> Result<()> {
223 let shutdown_tx = self.shutdown_tx.clone().unwrap_or_else(|| {
224 let (tx, _) = tokio::sync::broadcast::channel(1);
225 self.shutdown_tx = Some(tx.clone());
226 tx
227 });
228
229 let mut reconnect_attempts = 0;
230
231 loop {
232 if *self.state.read().await == BridgeState::ShuttingDown {
233 tracing::info!("Remote bridge shutting down");
234 break;
235 }
236
237 *self.state.write().await = BridgeState::Connecting;
238
239 match self.register_with_backend().await {
240 Ok(()) => {
241 reconnect_attempts = 0;
242 *self.state.write().await = BridgeState::Authenticated;
243
244 let realtime_creds = self.realtime_credentials.read().await.clone();
245
246 if let Some(creds) = realtime_creds {
247 *self.connection_mode.write().await = ConnectionMode::Realtime;
248 tracing::info!("Using Supabase Realtime for communication");
249
250 if let Err(e) = self.run_realtime_loop(shutdown_tx.subscribe(), creds).await
251 {
252 tracing::error!("Remote bridge Realtime error: {:?}", e);
253 }
254 } else {
255 *self.connection_mode.write().await = ConnectionMode::Polling;
256 tracing::info!(
257 "Using HTTP polling for communication (Realtime not available)"
258 );
259
260 if let Err(e) = self.run_polling_loop(shutdown_tx.subscribe()).await {
261 tracing::error!("Remote bridge polling error: {}", e);
262 }
263 }
264 }
265 Err(e) => {
266 tracing::error!("Failed to register with backend: {}", e);
267 reconnect_attempts += 1;
268
269 if self.config.max_reconnect_attempts > 0
270 && reconnect_attempts >= self.config.max_reconnect_attempts
271 {
272 bail!(
273 "Max reconnect attempts ({}) reached",
274 self.config.max_reconnect_attempts
275 );
276 }
277 }
278 }
279
280 *self.state.write().await = BridgeState::Disconnected;
282 *self.connection_mode.write().await = ConnectionMode::Polling;
283 *self.session_token.write().await = None;
284 *self.realtime_credentials.write().await = None;
285 self.subscriptions.write().await.clear();
286 self.command_result_queue.write().await.clear();
287
288 if *self.state.read().await != BridgeState::ShuttingDown {
290 tracing::info!(
291 "Reconnecting in {} seconds...",
292 self.config.reconnect_delay_secs
293 );
294 tokio::time::sleep(Duration::from_secs(self.config.reconnect_delay_secs as u64))
295 .await;
296 }
297 }
298
299 Ok(())
300 }
301
302 pub async fn shutdown(&mut self) {
304 *self.state.write().await = BridgeState::ShuttingDown;
305
306 if let Some(tx) = &self.shutdown_tx {
307 let _ = tx.send(());
308 }
309 }
310
311 pub(super) async fn queue_command_result_msg(&self, msg: RemoteMessage) -> Result<()> {
313 self.command_result_queue.write().await.push(msg);
314 Ok(())
315 }
316}