Skip to main content

brainwires_network/remote/bridge/
core.rs

1//! Core RemoteBridge struct definition, constructor, accessors, and top-level run loop.
2
3use std::collections::{HashMap, HashSet};
4use std::sync::Arc;
5use std::time::Duration;
6
7use anyhow::{Result, bail};
8use tokio::sync::RwLock;
9
10use super::types::{
11    AgentSubscription, BridgeConfig, BridgeState, ConnectionMode, RealtimeCredentials,
12};
13use crate::remote::attachments::AttachmentReceiver;
14use crate::remote::heartbeat::HeartbeatCollector;
15use crate::remote::protocol::{
16    NegotiatedProtocol, ProtocolCapability, RemoteMessage, StreamChunkType,
17};
18use crate::traits::AgentSpawner;
19
20const REMOTE_BRIDGE_TIMEOUT_SECS: u64 = 30;
21
22/// Remote control bridge
23///
24/// Maintains communication with the backend using either Supabase Realtime
25/// (preferred) or HTTP polling (fallback).
26#[derive(Clone)]
27pub struct RemoteBridge {
28    pub(super) config: BridgeConfig,
29    pub(super) http_client: reqwest::Client,
30    /// Current bridge connection state.
31    pub state: Arc<RwLock<BridgeState>>,
32    pub(super) connection_mode: Arc<RwLock<ConnectionMode>>,
33    pub(super) session_token: Arc<RwLock<Option<String>>>,
34    pub(super) user_id: Arc<RwLock<Option<String>>>,
35    pub(super) realtime_credentials: Arc<RwLock<Option<RealtimeCredentials>>>,
36    pub(super) subscriptions: Arc<RwLock<HashSet<String>>>,
37    pub(super) subscription_tasks: Arc<RwLock<HashMap<String, AgentSubscription>>>,
38    pub(super) heartbeat_collector: Arc<RwLock<HeartbeatCollector>>,
39    pub(super) command_result_queue: Arc<RwLock<Vec<RemoteMessage>>>,
40    #[allow(clippy::type_complexity)]
41    pub(super) stream_tx:
42        Arc<RwLock<Option<tokio::sync::mpsc::Sender<(String, StreamChunkType, String)>>>>,
43    pub(super) sync_trigger_tx: Arc<RwLock<Option<tokio::sync::mpsc::Sender<()>>>>,
44    pub(super) shutdown_tx: Option<tokio::sync::broadcast::Sender<()>>,
45    pub(super) negotiated_protocol: Arc<RwLock<NegotiatedProtocol>>,
46    pub(super) attachment_receiver: AttachmentReceiver,
47    /// Agent spawner for creating new agent processes (injected trait)
48    pub(super) agent_spawner: Option<Arc<dyn AgentSpawner>>,
49    /// Device allowlist status from last authentication.
50    pub device_status: Arc<RwLock<Option<crate::remote::protocol::DeviceStatus>>>,
51    /// Organization policies from last authentication.
52    pub org_policies: Arc<RwLock<Option<crate::remote::protocol::OrgPolicies>>>,
53    /// Permission relay for remote tool-approval prompts.
54    pub permission_relay: crate::remote::permission_relay::PermissionRelay,
55    /// Analytics collector for NetworkMessage events.
56    #[cfg(feature = "telemetry")]
57    pub(super) analytics_collector:
58        Option<std::sync::Arc<brainwires_telemetry::AnalyticsCollector>>,
59}
60
61impl RemoteBridge {
62    /// Create a new remote bridge
63    ///
64    /// # Arguments
65    /// * `config` - Bridge configuration with all injected platform values
66    /// * `agent_spawner` - Optional agent spawner for remote agent creation
67    pub fn new(config: BridgeConfig, agent_spawner: Option<Arc<dyn AgentSpawner>>) -> Self {
68        let http_client = reqwest::Client::builder()
69            .timeout(Duration::from_secs(REMOTE_BRIDGE_TIMEOUT_SECS))
70            .build()
71            .expect("Failed to create HTTP client");
72
73        let heartbeat_collector =
74            HeartbeatCollector::new(config.sessions_dir.clone(), config.version.clone());
75
76        let attachment_receiver = AttachmentReceiver::new(config.attachment_dir.clone());
77
78        Self {
79            config,
80            http_client,
81            state: Arc::new(RwLock::new(BridgeState::Disconnected)),
82            connection_mode: Arc::new(RwLock::new(ConnectionMode::Polling)),
83            session_token: Arc::new(RwLock::new(None)),
84            user_id: Arc::new(RwLock::new(None)),
85            realtime_credentials: Arc::new(RwLock::new(None)),
86            subscriptions: Arc::new(RwLock::new(HashSet::new())),
87            subscription_tasks: Arc::new(RwLock::new(HashMap::new())),
88            heartbeat_collector: Arc::new(RwLock::new(heartbeat_collector)),
89            command_result_queue: Arc::new(RwLock::new(Vec::new())),
90            stream_tx: Arc::new(RwLock::new(None)),
91            sync_trigger_tx: Arc::new(RwLock::new(None)),
92            shutdown_tx: None,
93            negotiated_protocol: Arc::new(RwLock::new(NegotiatedProtocol::default())),
94            attachment_receiver,
95            agent_spawner,
96            device_status: Arc::new(RwLock::new(None)),
97            org_policies: Arc::new(RwLock::new(None)),
98            permission_relay: crate::remote::permission_relay::PermissionRelay::new(),
99            #[cfg(feature = "telemetry")]
100            analytics_collector: None,
101        }
102    }
103
104    /// Attach an analytics collector to record NetworkMessage events.
105    #[cfg(feature = "telemetry")]
106    pub fn with_analytics(
107        mut self,
108        collector: std::sync::Arc<brainwires_telemetry::AnalyticsCollector>,
109    ) -> Self {
110        self.analytics_collector = Some(collector);
111        self
112    }
113
114    /// Get current connection mode
115    pub async fn connection_mode(&self) -> ConnectionMode {
116        *self.connection_mode.read().await
117    }
118
119    /// Get current bridge state
120    pub async fn state(&self) -> BridgeState {
121        *self.state.read().await
122    }
123
124    /// Check if bridge is connected and authenticated
125    pub async fn is_ready(&self) -> bool {
126        *self.state.read().await == BridgeState::Authenticated
127    }
128
129    /// Get the user ID (if authenticated)
130    pub async fn user_id(&self) -> Option<String> {
131        self.user_id.read().await.clone()
132    }
133
134    /// Get the negotiated protocol version
135    pub async fn protocol_version(&self) -> String {
136        self.negotiated_protocol.read().await.version.clone()
137    }
138
139    /// Check if a capability is enabled in the negotiated protocol
140    pub async fn has_capability(&self, cap: ProtocolCapability) -> bool {
141        self.negotiated_protocol.read().await.has_capability(cap)
142    }
143
144    /// Get all enabled capabilities
145    pub async fn enabled_capabilities(&self) -> Vec<ProtocolCapability> {
146        self.negotiated_protocol.read().await.capabilities.clone()
147    }
148
149    /// Send a permission request to the remote user and wait for their decision.
150    ///
151    /// Returns `Ok(decision)` if the user responds within the timeout,
152    /// or `Ok(PermissionDecision { approved: false, .. })` on timeout.
153    pub async fn send_permission_request(
154        &self,
155        agent_id: &str,
156        tool_name: &str,
157        action: &str,
158        details: serde_json::Value,
159    ) -> Result<crate::remote::permission_relay::PermissionDecision> {
160        use crate::remote::permission_relay::PermissionDecision;
161
162        // Check session-allowed list first
163        if self.permission_relay.is_session_allowed(tool_name).await {
164            return Ok(PermissionDecision {
165                approved: true,
166                remember_for_session: true,
167                always_allow: true,
168            });
169        }
170
171        let request_id = uuid::Uuid::new_v4().to_string();
172        let timeout = self.permission_relay.default_timeout().await;
173
174        // Register the pending request
175        let rx = self
176            .permission_relay
177            .register_request(request_id.clone())
178            .await;
179
180        // Send the request message to the backend
181        let msg = RemoteMessage::PermissionRequest {
182            request_id: request_id.clone(),
183            agent_id: agent_id.to_string(),
184            tool_name: tool_name.to_string(),
185            action: action.to_string(),
186            details,
187            timeout_secs: timeout.as_secs() as u32,
188        };
189
190        // Queue the message for sending
191        self.command_result_queue.write().await.push(msg);
192
193        // Wait for response with timeout
194        match tokio::time::timeout(timeout, rx).await {
195            Ok(Ok(decision)) => Ok(decision),
196            Ok(Err(_)) => {
197                // Sender was dropped (request cancelled)
198                Ok(PermissionDecision {
199                    approved: false,
200                    remember_for_session: false,
201                    always_allow: false,
202                })
203            }
204            Err(_) => {
205                // Timeout — auto-deny
206                self.permission_relay.cancel(&request_id).await;
207                Ok(PermissionDecision {
208                    approved: false,
209                    remember_for_session: false,
210                    always_allow: false,
211                })
212            }
213        }
214    }
215
216    /// Set the shutdown signal sender (for external shutdown control)
217    pub fn set_shutdown_tx(&mut self, tx: tokio::sync::broadcast::Sender<()>) {
218        self.shutdown_tx = Some(tx);
219    }
220
221    /// Connect to the backend and run the main communication loop
222    pub async fn run(&mut self) -> Result<()> {
223        let shutdown_tx = self.shutdown_tx.clone().unwrap_or_else(|| {
224            let (tx, _) = tokio::sync::broadcast::channel(1);
225            self.shutdown_tx = Some(tx.clone());
226            tx
227        });
228
229        let mut reconnect_attempts = 0;
230
231        loop {
232            if *self.state.read().await == BridgeState::ShuttingDown {
233                tracing::info!("Remote bridge shutting down");
234                break;
235            }
236
237            *self.state.write().await = BridgeState::Connecting;
238
239            match self.register_with_backend().await {
240                Ok(()) => {
241                    reconnect_attempts = 0;
242                    *self.state.write().await = BridgeState::Authenticated;
243
244                    let realtime_creds = self.realtime_credentials.read().await.clone();
245
246                    if let Some(creds) = realtime_creds {
247                        *self.connection_mode.write().await = ConnectionMode::Realtime;
248                        tracing::info!("Using Supabase Realtime for communication");
249
250                        if let Err(e) = self.run_realtime_loop(shutdown_tx.subscribe(), creds).await
251                        {
252                            tracing::error!("Remote bridge Realtime error: {:?}", e);
253                        }
254                    } else {
255                        *self.connection_mode.write().await = ConnectionMode::Polling;
256                        tracing::info!(
257                            "Using HTTP polling for communication (Realtime not available)"
258                        );
259
260                        if let Err(e) = self.run_polling_loop(shutdown_tx.subscribe()).await {
261                            tracing::error!("Remote bridge polling error: {}", e);
262                        }
263                    }
264                }
265                Err(e) => {
266                    tracing::error!("Failed to register with backend: {}", e);
267                    reconnect_attempts += 1;
268
269                    if self.config.max_reconnect_attempts > 0
270                        && reconnect_attempts >= self.config.max_reconnect_attempts
271                    {
272                        bail!(
273                            "Max reconnect attempts ({}) reached",
274                            self.config.max_reconnect_attempts
275                        );
276                    }
277                }
278            }
279
280            // Clean up state
281            *self.state.write().await = BridgeState::Disconnected;
282            *self.connection_mode.write().await = ConnectionMode::Polling;
283            *self.session_token.write().await = None;
284            *self.realtime_credentials.write().await = None;
285            self.subscriptions.write().await.clear();
286            self.command_result_queue.write().await.clear();
287
288            // Wait before reconnecting
289            if *self.state.read().await != BridgeState::ShuttingDown {
290                tracing::info!(
291                    "Reconnecting in {} seconds...",
292                    self.config.reconnect_delay_secs
293                );
294                tokio::time::sleep(Duration::from_secs(self.config.reconnect_delay_secs as u64))
295                    .await;
296            }
297        }
298
299        Ok(())
300    }
301
302    /// Shutdown the bridge
303    pub async fn shutdown(&mut self) {
304        *self.state.write().await = BridgeState::ShuttingDown;
305
306        if let Some(tx) = &self.shutdown_tx {
307            let _ = tx.send(());
308        }
309    }
310
311    /// Queue a command result to send with the next heartbeat
312    pub(super) async fn queue_command_result_msg(&self, msg: RemoteMessage) -> Result<()> {
313        self.command_result_queue.write().await.push(msg);
314        Ok(())
315    }
316}