Skip to main content

brainwires_network/remote/
realtime.rs

1//! Supabase Realtime WebSocket Client
2//!
3//! Provides bidirectional communication with the brainwires-studio backend
4//! via Supabase Realtime WebSocket channels.
5//!
6//! # Architecture
7//!
8//! - CLI connects to Supabase Realtime WebSocket endpoint
9//! - Subscribes to channel `cli:{userId}` for bidirectional communication
10//! - Server and CLI both send messages via broadcast events
11//! - Eliminates HTTP polling latency for commands
12//!
13//! All CLI-specific dependencies have been removed:
14//! - `crate::ipc::list_agent_sessions_with_metadata()` → uses `sessions_dir` from config
15//! - `env!("CARGO_PKG_VERSION")` → uses `version` from config
16
17use std::path::PathBuf;
18use std::sync::Arc;
19use std::time::Duration;
20
21use anyhow::{Context, Result, bail};
22use futures::{SinkExt, StreamExt};
23use serde::{Deserialize, Serialize};
24use tokio::sync::{RwLock, mpsc};
25use tokio_tungstenite::{connect_async, tungstenite::Message};
26use url::Url;
27
28use super::protocol::{BackendCommand, RemoteAgentInfo, StreamChunkType};
29
30/// Phoenix protocol heartbeat interval in seconds (must be < 60s to keep connection alive).
31const PHOENIX_HEARTBEAT_INTERVAL_SECS: u64 = 25;
32
33/// Supabase Realtime connection configuration
34#[derive(Debug, Clone)]
35pub struct RealtimeConfig {
36    /// WebSocket URL (wss://...)
37    pub realtime_url: String,
38    /// Supabase JWT for authentication
39    pub realtime_token: String,
40    /// Channel name to subscribe to (cli:{userId})
41    pub channel_name: String,
42    /// User ID (for message routing)
43    pub user_id: String,
44    /// Session token (for backend tracking)
45    pub session_token: String,
46    /// Supabase anon key (used as apikey param in WebSocket URL for Kong auth)
47    pub supabase_anon_key: String,
48    /// Heartbeat interval in seconds (for sending agent status to frontend)
49    pub heartbeat_interval_secs: u64,
50    /// Sessions directory for agent discovery (injected, replaces PlatformPaths)
51    pub sessions_dir: PathBuf,
52    /// CLI version string (injected, replaces env!("CARGO_PKG_VERSION"))
53    pub version: String,
54}
55
56/// Connection state
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub enum RealtimeState {
59    /// Not connected to the Realtime server.
60    Disconnected,
61    /// Connection in progress.
62    Connecting,
63    /// WebSocket connected but channel not yet joined.
64    Connected,
65    /// Channel joined and ready for messages.
66    Subscribed,
67    /// Gracefully shutting down.
68    ShuttingDown,
69}
70
71/// Message types for Supabase Realtime protocol
72#[derive(Debug, Clone, Serialize, Deserialize)]
73#[serde(tag = "event", rename_all = "snake_case")]
74pub enum PhoenixMessage {
75    /// Join a channel
76    #[serde(rename = "phx_join")]
77    PhxJoin {
78        /// Channel topic to join.
79        topic: String,
80        /// Join parameters.
81        payload: serde_json::Value,
82        /// Message reference ID.
83        #[serde(rename = "ref")]
84        msg_ref: String,
85    },
86    /// Reply to a message
87    #[serde(rename = "phx_reply")]
88    PhxReply {
89        /// Channel topic.
90        topic: String,
91        /// Reply payload with status.
92        payload: PhxReplyPayload,
93        /// Message reference ID.
94        #[serde(rename = "ref")]
95        msg_ref: String,
96    },
97    /// Heartbeat (keep-alive)
98    #[serde(rename = "heartbeat")]
99    Heartbeat {
100        /// Channel topic (usually "phoenix").
101        topic: String,
102        /// Heartbeat payload.
103        payload: serde_json::Value,
104        /// Message reference ID.
105        #[serde(rename = "ref")]
106        msg_ref: String,
107    },
108    /// Broadcast message
109    #[serde(rename = "broadcast")]
110    Broadcast {
111        /// Channel topic.
112        topic: String,
113        /// Broadcast payload containing the event data.
114        payload: BroadcastPayload,
115        /// Optional message reference ID.
116        #[serde(rename = "ref")]
117        msg_ref: Option<String>,
118    },
119    /// Presence state
120    #[serde(rename = "presence_state")]
121    PresenceState {
122        /// Channel topic.
123        topic: String,
124        /// Presence state data.
125        payload: serde_json::Value,
126        /// Optional message reference ID.
127        #[serde(rename = "ref")]
128        msg_ref: Option<String>,
129    },
130    /// Presence diff
131    #[serde(rename = "presence_diff")]
132    PresenceDiff {
133        /// Channel topic.
134        topic: String,
135        /// Presence diff data.
136        payload: serde_json::Value,
137        /// Optional message reference ID.
138        #[serde(rename = "ref")]
139        msg_ref: Option<String>,
140    },
141}
142
143/// Payload for Phoenix reply messages.
144#[derive(Debug, Clone, Serialize, Deserialize)]
145pub struct PhxReplyPayload {
146    /// Reply status (e.g., "ok" or "error").
147    pub status: String,
148    /// Response data.
149    #[serde(default)]
150    pub response: serde_json::Value,
151}
152
153/// Payload for broadcast messages on a Realtime channel.
154#[derive(Debug, Clone, Serialize, Deserialize)]
155pub struct BroadcastPayload {
156    /// Broadcast type identifier.
157    #[serde(rename = "type")]
158    pub broadcast_type: String,
159    /// Event name.
160    pub event: String,
161    /// Event payload data.
162    pub payload: serde_json::Value,
163}
164
165/// Remote Realtime message types (matching TypeScript protocol)
166#[derive(Debug, Clone, Serialize, Deserialize)]
167#[serde(tag = "type", rename_all = "snake_case")]
168pub enum RemoteRealtimeMessage {
169    /// CLI to Backend: Initial registration
170    #[serde(rename = "remote.register")]
171    Register {
172        /// Registration payload.
173        payload: RegisterPayload,
174    },
175    /// CLI to Backend: Status update with agents
176    #[serde(rename = "remote.heartbeat")]
177    Heartbeat {
178        /// Heartbeat payload with agent info.
179        payload: HeartbeatPayload,
180    },
181    /// CLI to Backend: Agent output stream
182    #[serde(rename = "remote.stream")]
183    Stream {
184        /// Stream chunk payload.
185        payload: StreamPayload,
186    },
187    /// CLI to Backend: Result of a command
188    #[serde(rename = "remote.command_result")]
189    CommandResult {
190        /// Command result payload.
191        payload: CommandResultPayload,
192    },
193    /// CLI to Backend: Agent event
194    #[serde(rename = "remote.event")]
195    Event {
196        /// Agent event payload.
197        payload: EventPayload,
198    },
199    /// Backend to CLI: Command to execute
200    #[serde(rename = "remote.command")]
201    Command {
202        /// Command payload from backend.
203        payload: CommandPayload,
204    },
205    /// Backend to CLI: Ping
206    #[serde(rename = "remote.ping")]
207    Ping {
208        /// Ping payload with timestamp.
209        payload: PingPongPayload,
210    },
211    /// CLI to Backend: Pong
212    #[serde(rename = "remote.pong")]
213    Pong {
214        /// Pong payload with timestamp.
215        payload: PingPongPayload,
216    },
217    /// CLI to Backend: Graceful disconnect notification
218    #[serde(rename = "remote.disconnect")]
219    Disconnect {
220        /// Disconnect payload with reason.
221        payload: DisconnectPayload,
222    },
223}
224
225/// Payload for CLI registration with the backend.
226#[derive(Debug, Clone, Serialize, Deserialize)]
227#[serde(rename_all = "camelCase")]
228pub struct RegisterPayload {
229    /// Client hostname.
230    pub hostname: String,
231    /// Client operating system.
232    pub os: String,
233    /// CLI version string.
234    pub version: String,
235    /// Session token for authentication.
236    pub session_token: String,
237    /// Include agents in register so frontend gets them immediately
238    #[serde(default, skip_serializing_if = "Vec::is_empty")]
239    pub agents: Vec<RemoteAgentInfo>,
240    /// System load
241    #[serde(default, skip_serializing_if = "Option::is_none")]
242    pub system_load: Option<f32>,
243}
244
245/// Payload for periodic heartbeat messages with agent status.
246#[derive(Debug, Clone, Serialize, Deserialize)]
247#[serde(rename_all = "camelCase")]
248pub struct HeartbeatPayload {
249    /// List of active agents.
250    pub agents: Vec<RemoteAgentInfo>,
251    /// Current system load (0.0-1.0).
252    pub system_load: f32,
253    /// Client hostname.
254    #[serde(skip_serializing_if = "Option::is_none")]
255    pub hostname: Option<String>,
256    /// Client operating system.
257    #[serde(skip_serializing_if = "Option::is_none")]
258    pub os: Option<String>,
259    /// CLI version string.
260    #[serde(skip_serializing_if = "Option::is_none")]
261    pub version: Option<String>,
262}
263
264/// Payload for agent output stream chunks.
265#[derive(Debug, Clone, Serialize, Deserialize)]
266#[serde(rename_all = "camelCase")]
267pub struct StreamPayload {
268    /// ID of the agent producing the stream.
269    pub agent_id: String,
270    /// Type of stream chunk.
271    pub chunk_type: StreamChunkType,
272    /// Chunk content text.
273    pub content: String,
274}
275
276/// Payload for command execution results.
277#[derive(Debug, Clone, Serialize, Deserialize)]
278#[serde(rename_all = "camelCase")]
279pub struct CommandResultPayload {
280    /// ID of the command being responded to.
281    pub command_id: String,
282    /// Whether the command succeeded.
283    pub success: bool,
284    /// Result data if successful.
285    #[serde(skip_serializing_if = "Option::is_none")]
286    pub result: Option<serde_json::Value>,
287    /// Error message if failed.
288    #[serde(skip_serializing_if = "Option::is_none")]
289    pub error: Option<String>,
290}
291
292/// Payload for agent event notifications.
293#[derive(Debug, Clone, Serialize, Deserialize)]
294#[serde(rename_all = "camelCase")]
295pub struct EventPayload {
296    /// Type of agent event.
297    pub event_type: String,
298    /// ID of the agent this event relates to.
299    pub agent_id: String,
300    /// Event-specific data.
301    #[serde(skip_serializing_if = "Option::is_none")]
302    pub data: Option<serde_json::Value>,
303}
304
305/// Payload for commands received from the backend.
306#[derive(Debug, Clone, Serialize, Deserialize)]
307#[serde(rename_all = "camelCase")]
308pub struct CommandPayload {
309    /// Unique command identifier.
310    pub command_id: String,
311    /// Type of command (e.g., "send_input", "slash_command").
312    pub command_type: String,
313    /// Target agent ID.
314    #[serde(skip_serializing_if = "Option::is_none")]
315    pub agent_id: Option<String>,
316    /// Input content for send_input commands.
317    #[serde(skip_serializing_if = "Option::is_none")]
318    pub content: Option<String>,
319    /// Slash command name.
320    #[serde(skip_serializing_if = "Option::is_none")]
321    pub command: Option<String>,
322    /// Slash command arguments.
323    #[serde(skip_serializing_if = "Option::is_none")]
324    pub args: Option<Vec<String>>,
325    /// Model for spawn_agent commands.
326    #[serde(skip_serializing_if = "Option::is_none")]
327    pub model: Option<String>,
328    /// Working directory for spawn_agent commands.
329    #[serde(skip_serializing_if = "Option::is_none")]
330    pub working_directory: Option<String>,
331    /// Reason for certain commands (e.g., disconnect).
332    #[serde(skip_serializing_if = "Option::is_none")]
333    pub reason: Option<String>,
334}
335
336/// Payload for ping/pong messages.
337#[derive(Debug, Clone, Serialize, Deserialize)]
338#[serde(rename_all = "camelCase")]
339pub struct PingPongPayload {
340    /// Server timestamp for round-trip measurement.
341    pub server_timestamp: i64,
342}
343
344/// Payload for graceful disconnect notifications.
345#[derive(Debug, Clone, Serialize, Deserialize)]
346#[serde(rename_all = "camelCase")]
347pub struct DisconnectPayload {
348    /// Reason for disconnection.
349    pub reason: String,
350    /// Hostname of disconnecting bridge (for multi-bridge support)
351    pub hostname: String,
352}
353
354/// Supabase Realtime WebSocket client
355pub struct RealtimeClient {
356    config: RealtimeConfig,
357    state: Arc<RwLock<RealtimeState>>,
358    /// Channel for outgoing messages
359    outgoing_tx: Option<mpsc::Sender<RemoteRealtimeMessage>>,
360    /// Message reference counter
361    msg_ref: Arc<RwLock<u64>>,
362}
363
364impl RealtimeClient {
365    /// Create a new Realtime client
366    pub fn new(config: RealtimeConfig) -> Self {
367        Self {
368            config,
369            state: Arc::new(RwLock::new(RealtimeState::Disconnected)),
370            outgoing_tx: None,
371            msg_ref: Arc::new(RwLock::new(0)),
372        }
373    }
374
375    /// Get the next message reference
376    async fn next_ref(&self) -> String {
377        let mut ref_num = self.msg_ref.write().await;
378        *ref_num += 1;
379        ref_num.to_string()
380    }
381
382    /// Get current connection state
383    pub async fn state(&self) -> RealtimeState {
384        *self.state.read().await
385    }
386
387    /// Check if connected and subscribed
388    pub async fn is_ready(&self) -> bool {
389        *self.state.read().await == RealtimeState::Subscribed
390    }
391
392    /// Connect to Supabase Realtime and run the message loop
393    ///
394    /// - `shutdown_rx`: Signal to gracefully shut down
395    /// - `heartbeat_rx`: Receives heartbeat data (with hostname, os, version) to broadcast to frontend
396    /// - `stream_rx`: Receives stream messages (agent_id, chunk_type, content) to broadcast
397    /// - `command_tx`: Channel to send commands received from frontend for processing
398    pub async fn connect(
399        &mut self,
400        mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
401        mut heartbeat_rx: mpsc::Receiver<super::heartbeat::HeartbeatData>,
402        mut stream_rx: mpsc::Receiver<(String, StreamChunkType, String)>,
403        command_tx: mpsc::Sender<BackendCommand>,
404    ) -> Result<()> {
405        *self.state.write().await = RealtimeState::Connecting;
406
407        // Build WebSocket URL with anon key (required by Kong) and protocol version
408        let mut url = Url::parse(&self.config.realtime_url)?;
409        url.query_pairs_mut()
410            .append_pair("apikey", &self.config.supabase_anon_key)
411            .append_pair("vsn", "1.0.0");
412
413        tracing::info!(
414            "Connecting to Supabase Realtime: {}",
415            url.host_str().unwrap_or("unknown")
416        );
417
418        // Build a WebSocket request with auth header
419        let request = tokio_tungstenite::tungstenite::http::Request::builder()
420            .uri(url.as_str())
421            .header(
422                "Authorization",
423                format!("Bearer {}", self.config.realtime_token),
424            )
425            .body(())
426            .context("Failed to build WebSocket request")?;
427
428        // Connect WebSocket
429        let (ws_stream, _response): (
430            tokio_tungstenite::WebSocketStream<
431                tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
432            >,
433            _,
434        ) = match connect_async(request).await {
435            Ok(result) => result,
436            Err(e) => {
437                tracing::error!("WebSocket connection error: {:?}", e);
438                return Err(anyhow::anyhow!(
439                    "Failed to connect to Supabase Realtime: {}",
440                    e
441                ));
442            }
443        };
444
445        *self.state.write().await = RealtimeState::Connected;
446        tracing::info!("Connected to Supabase Realtime");
447
448        let (mut write, mut read) = ws_stream.split();
449
450        // Create channel for outgoing messages
451        let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<RemoteRealtimeMessage>(100);
452
453        self.outgoing_tx = Some(outgoing_tx);
454
455        // Join the channel
456        let join_ref = self.next_ref().await;
457        let channel_topic = format!("realtime:{}", self.config.channel_name);
458        let join_msg = serde_json::json!({
459            "topic": channel_topic,
460            "event": "phx_join",
461            "payload": {
462                "config": {
463                    "broadcast": {
464                        "self": false
465                    },
466                    "presence": {
467                        "key": ""
468                    }
469                }
470            },
471            "ref": join_ref
472        });
473
474        write
475            .send(Message::Text(serde_json::to_string(&join_msg)?.into()))
476            .await
477            .context("Failed to send join message")?;
478
479        tracing::info!("Sent join request for channel: {}", channel_topic);
480
481        // Main message loop with Phoenix heartbeat
482        let state = Arc::clone(&self.state);
483        let channel_topic_clone = channel_topic.clone();
484        let user_id = self.config.user_id.clone();
485        let session_token = self.config.session_token.clone();
486        let sessions_dir = self.config.sessions_dir.clone();
487        let version = self.config.version.clone();
488
489        // Phoenix heartbeat interval (must be < 60s to keep connection alive)
490        let mut phoenix_heartbeat =
491            tokio::time::interval(Duration::from_secs(PHOENIX_HEARTBEAT_INTERVAL_SECS));
492
493        // Track if we've sent the initial register message
494        let mut register_sent = false;
495
496        loop {
497            // Check if we just became subscribed and need to send register
498            if !register_sent && *state.read().await == RealtimeState::Subscribed {
499                register_sent = true;
500                tracing::info!("Channel subscribed, sending register message to frontend");
501
502                // Get current agents to include in register message
503                // Uses bridge-internal discovery with injected sessions_dir
504                let agents =
505                    crate::ipc::discovery::list_agent_sessions_with_metadata(&sessions_dir)
506                        .unwrap_or_default()
507                        .into_iter()
508                        .map(RemoteAgentInfo::from)
509                        .collect::<Vec<_>>();
510
511                let register_msg = RemoteRealtimeMessage::Register {
512                    payload: RegisterPayload {
513                        hostname: gethostname::gethostname().to_string_lossy().to_string(),
514                        os: std::env::consts::OS.to_string(),
515                        version: version.clone(),
516                        session_token: session_token.clone(),
517                        agents,
518                        system_load: None,
519                    },
520                };
521
522                if let Err(e) = self
523                    .send_broadcast(&mut write, &channel_topic_clone, &user_id, register_msg)
524                    .await
525                {
526                    tracing::warn!("Failed to send register message: {}", e);
527                } else {
528                    tracing::info!("Register message sent to frontend");
529                }
530            }
531
532            tokio::select! {
533                // Shutdown signal
534                _ = shutdown_rx.recv() => {
535                    tracing::info!("Received shutdown signal, sending disconnect message");
536                    *state.write().await = RealtimeState::ShuttingDown;
537
538                    // Send goodbye message to frontend before disconnecting
539                    let disconnect_msg = RemoteRealtimeMessage::Disconnect {
540                        payload: DisconnectPayload {
541                            reason: "Bridge shutting down".to_string(),
542                            hostname: gethostname::gethostname().to_string_lossy().to_string(),
543                        },
544                    };
545                    if let Err(e) = self.send_broadcast(&mut write, &channel_topic_clone, &user_id, disconnect_msg).await {
546                        tracing::warn!("Failed to send disconnect message: {}", e);
547                    } else {
548                        tracing::info!("Disconnect message sent to frontend");
549                    }
550
551                    break;
552                }
553
554                // Phoenix protocol heartbeat (keeps connection alive)
555                _ = phoenix_heartbeat.tick() => {
556                    let hb_ref = self.next_ref().await;
557                    let heartbeat_msg = serde_json::json!({
558                        "topic": "phoenix",
559                        "event": "heartbeat",
560                        "payload": {},
561                        "ref": hb_ref
562                    });
563                    let heartbeat_str = match serde_json::to_string(&heartbeat_msg) {
564                        Ok(s) => s,
565                        Err(e) => {
566                            tracing::error!("Failed to serialize Phoenix heartbeat: {}", e);
567                            break;
568                        }
569                    };
570                    if let Err(e) = write.send(Message::Text(heartbeat_str.into())).await {
571                        tracing::error!("Failed to send Phoenix heartbeat: {}", e);
572                        break;
573                    }
574                    tracing::debug!("Sent Phoenix heartbeat");
575                }
576
577                // Incoming WebSocket message
578                msg = read.next() => {
579                    match msg {
580                        Some(Ok(Message::Text(text))) => {
581                            if let Err(e) = self.handle_incoming_message(
582                                &text,
583                                &channel_topic_clone,
584                                &command_tx,
585                                &state,
586                            ).await {
587                                tracing::error!("Error handling message: {}", e);
588                            }
589                        }
590                        Some(Ok(Message::Ping(data))) => {
591                            if let Err(e) = write.send(Message::Pong(data)).await {
592                                tracing::error!("Failed to send pong: {}", e);
593                            }
594                        }
595                        Some(Ok(Message::Close(_))) => {
596                            tracing::info!("WebSocket closed by server");
597                            break;
598                        }
599                        Some(Err(e)) => {
600                            tracing::error!("WebSocket error: {}", e);
601                            break;
602                        }
603                        None => {
604                            tracing::info!("WebSocket stream ended");
605                            break;
606                        }
607                        _ => {}
608                    }
609                }
610
611                // Outgoing message from bridge
612                Some(msg) = outgoing_rx.recv() => {
613                    if let Err(e) = self.send_broadcast(&mut write, &channel_topic_clone, &user_id, msg).await {
614                        tracing::error!("Failed to send broadcast: {}", e);
615                    }
616                }
617
618                // Heartbeat data from bridge (to broadcast agent status to frontend)
619                Some(heartbeat_data) = heartbeat_rx.recv() => {
620                    tracing::info!("Broadcasting heartbeat with {} agents to frontend (host: {})",
621                        heartbeat_data.agents.len(), heartbeat_data.hostname);
622                    let msg = RemoteRealtimeMessage::Heartbeat {
623                        payload: HeartbeatPayload {
624                            agents: heartbeat_data.agents,
625                            system_load: heartbeat_data.system_load,
626                            hostname: Some(heartbeat_data.hostname),
627                            os: Some(heartbeat_data.os),
628                            version: Some(heartbeat_data.version),
629                        },
630                    };
631                    if let Err(e) = self.send_broadcast(&mut write, &channel_topic_clone, &user_id, msg).await {
632                        tracing::error!("Failed to send heartbeat broadcast: {}", e);
633                    } else {
634                        tracing::info!("Heartbeat broadcast sent to channel {}", channel_topic_clone);
635                    }
636                }
637
638                // Stream data from agent readers (to broadcast to frontend)
639                Some((agent_id, chunk_type, content)) = stream_rx.recv() => {
640                    tracing::debug!("Broadcasting stream for agent {}: {:?}", agent_id, chunk_type);
641                    let msg = RemoteRealtimeMessage::Stream {
642                        payload: StreamPayload {
643                            agent_id,
644                            chunk_type,
645                            content,
646                        },
647                    };
648                    if let Err(e) = self.send_broadcast(&mut write, &channel_topic_clone, &user_id, msg).await {
649                        tracing::error!("Failed to send stream broadcast: {}", e);
650                    }
651                }
652            }
653        }
654        *self.state.write().await = RealtimeState::Disconnected;
655        Ok(())
656    }
657
658    /// Handle incoming WebSocket message
659    async fn handle_incoming_message(
660        &self,
661        text: &str,
662        channel_topic: &str,
663        command_tx: &mpsc::Sender<BackendCommand>,
664        state: &Arc<RwLock<RealtimeState>>,
665    ) -> Result<()> {
666        let msg: serde_json::Value = serde_json::from_str(text)?;
667
668        let event = msg.get("event").and_then(|e| e.as_str()).unwrap_or("");
669        let topic = msg.get("topic").and_then(|t| t.as_str()).unwrap_or("");
670
671        match event {
672            "phx_reply" => {
673                let status = msg
674                    .get("payload")
675                    .and_then(|p| p.get("status"))
676                    .and_then(|s| s.as_str())
677                    .unwrap_or("");
678
679                if status == "ok" && topic == channel_topic {
680                    *state.write().await = RealtimeState::Subscribed;
681                    tracing::info!("Successfully joined channel: {}", channel_topic);
682                } else if status != "ok" {
683                    tracing::error!("Join failed: {:?}", msg);
684                }
685            }
686
687            "broadcast" => {
688                tracing::info!("Received broadcast event, full msg: {:?}", msg);
689
690                if let Some(wrapper) = msg.get("payload") {
691                    tracing::info!("Broadcast wrapper payload: {:?}", wrapper);
692
693                    if let Some(inner_payload) = wrapper.get("payload") {
694                        tracing::info!(
695                            "Inner payload (RemoteRealtimeMessage): {:?}",
696                            inner_payload
697                        );
698                        self.handle_remote_message(inner_payload, command_tx)
699                            .await?;
700                    } else {
701                        tracing::warn!("Broadcast has no inner payload: {:?}", wrapper);
702                    }
703                }
704            }
705
706            "presence_state" | "presence_diff" => {
707                tracing::debug!("Presence update: {}", event);
708            }
709
710            _ => {
711                tracing::debug!("Unhandled event: {}", event);
712            }
713        }
714
715        Ok(())
716    }
717
718    /// Handle remote message from server
719    async fn handle_remote_message(
720        &self,
721        msg: &serde_json::Value,
722        command_tx: &mpsc::Sender<BackendCommand>,
723    ) -> Result<()> {
724        let msg_type = msg.get("type").and_then(|t| t.as_str()).unwrap_or("");
725        tracing::debug!("handle_remote_message: type={}, msg={:?}", msg_type, msg);
726
727        match msg_type {
728            "remote.command" => {
729                tracing::info!("Received remote.command from frontend");
730                if let Some(payload) = msg.get("payload") {
731                    tracing::debug!("Command payload: {:?}", payload);
732                    match serde_json::from_value::<CommandPayload>(payload.clone()) {
733                        Ok(cmd_payload) => {
734                            tracing::info!(
735                                "Parsed command: type={}, agent_id={:?}",
736                                cmd_payload.command_type,
737                                cmd_payload.agent_id
738                            );
739                            let backend_cmd = self.convert_to_backend_command(&cmd_payload)?;
740                            command_tx.send(backend_cmd).await?;
741                        }
742                        Err(e) => {
743                            tracing::error!(
744                                "Failed to parse CommandPayload: {}, payload was: {:?}",
745                                e,
746                                payload
747                            );
748                        }
749                    }
750                } else {
751                    tracing::warn!("remote.command has no payload");
752                }
753            }
754
755            "remote.ping" => {
756                if let Some(tx) = &self.outgoing_tx {
757                    let timestamp = msg
758                        .get("payload")
759                        .and_then(|p| p.get("serverTimestamp"))
760                        .and_then(|t| t.as_i64())
761                        .unwrap_or_else(|| chrono::Utc::now().timestamp_millis());
762
763                    tx.send(RemoteRealtimeMessage::Pong {
764                        payload: PingPongPayload {
765                            server_timestamp: timestamp,
766                        },
767                    })
768                    .await?;
769                }
770            }
771
772            _ => {
773                tracing::debug!("Unhandled remote message type: {}", msg_type);
774            }
775        }
776
777        Ok(())
778    }
779
780    /// Convert CommandPayload to BackendCommand
781    fn convert_to_backend_command(&self, payload: &CommandPayload) -> Result<BackendCommand> {
782        let cmd = match payload.command_type.as_str() {
783            "subscribe" => BackendCommand::Subscribe {
784                agent_id: payload.agent_id.clone().unwrap_or_default(),
785            },
786            "unsubscribe" => BackendCommand::Unsubscribe {
787                agent_id: payload.agent_id.clone().unwrap_or_default(),
788            },
789            "send_input" => BackendCommand::SendInput {
790                command_id: payload.command_id.clone(),
791                agent_id: payload.agent_id.clone().unwrap_or_default(),
792                content: payload.content.clone().unwrap_or_default(),
793            },
794            "slash_command" => BackendCommand::SlashCommand {
795                command_id: payload.command_id.clone(),
796                agent_id: payload.agent_id.clone().unwrap_or_default(),
797                command: payload.command.clone().unwrap_or_default(),
798                args: payload.args.clone().unwrap_or_default(),
799            },
800            "cancel_operation" => BackendCommand::CancelOperation {
801                command_id: payload.command_id.clone(),
802                agent_id: payload.agent_id.clone().unwrap_or_default(),
803            },
804            "spawn_agent" => BackendCommand::SpawnAgent {
805                command_id: payload.command_id.clone(),
806                model: payload.model.clone(),
807                working_directory: payload.working_directory.clone(),
808            },
809            "request_sync" => BackendCommand::RequestSync,
810            "ping" => BackendCommand::Ping {
811                timestamp: chrono::Utc::now().timestamp_millis(),
812            },
813            "disconnect" => BackendCommand::Disconnect {
814                reason: payload
815                    .reason
816                    .clone()
817                    .unwrap_or_else(|| "Server requested".to_string()),
818            },
819            _ => bail!("Unknown command type: {}", payload.command_type),
820        };
821
822        Ok(cmd)
823    }
824
825    /// Send a broadcast message on the channel
826    async fn send_broadcast<W>(
827        &self,
828        write: &mut W,
829        channel_topic: &str,
830        user_id: &str,
831        msg: RemoteRealtimeMessage,
832    ) -> Result<()>
833    where
834        W: SinkExt<Message, Error = tokio_tungstenite::tungstenite::Error> + Unpin,
835    {
836        let msg_ref = self.next_ref().await;
837
838        // Wrap in Realtime broadcast format
839        let broadcast = serde_json::json!({
840            "topic": channel_topic,
841            "event": "broadcast",
842            "payload": {
843                "type": "broadcast",
844                "event": "remote",
845                "payload": {
846                    "type": get_message_type(&msg),
847                    "id": uuid::Uuid::new_v4().to_string(),
848                    "payload": get_message_payload(&msg),
849                    "timestamp": chrono::Utc::now().timestamp_millis(),
850                    "userId": user_id
851                }
852            },
853            "ref": msg_ref
854        });
855
856        write
857            .send(Message::Text(serde_json::to_string(&broadcast)?.into()))
858            .await
859            .context("Failed to send broadcast")?;
860
861        Ok(())
862    }
863
864    /// Send a message via the outgoing channel (for bridge use)
865    pub async fn send(&self, msg: RemoteRealtimeMessage) -> Result<()> {
866        if let Some(tx) = &self.outgoing_tx {
867            tx.send(msg).await?;
868        } else {
869            bail!("Not connected");
870        }
871        Ok(())
872    }
873
874    /// Send heartbeat with agent status
875    pub async fn send_heartbeat(
876        &self,
877        heartbeat_data: super::heartbeat::HeartbeatData,
878    ) -> Result<()> {
879        self.send(RemoteRealtimeMessage::Heartbeat {
880            payload: HeartbeatPayload {
881                agents: heartbeat_data.agents,
882                system_load: heartbeat_data.system_load,
883                hostname: Some(heartbeat_data.hostname),
884                os: Some(heartbeat_data.os),
885                version: Some(heartbeat_data.version),
886            },
887        })
888        .await
889    }
890
891    /// Send stream chunk
892    pub async fn send_stream(
893        &self,
894        agent_id: String,
895        chunk_type: StreamChunkType,
896        content: String,
897    ) -> Result<()> {
898        self.send(RemoteRealtimeMessage::Stream {
899            payload: StreamPayload {
900                agent_id,
901                chunk_type,
902                content,
903            },
904        })
905        .await
906    }
907
908    /// Send command result
909    pub async fn send_command_result(
910        &self,
911        command_id: String,
912        success: bool,
913        result: Option<serde_json::Value>,
914        error: Option<String>,
915    ) -> Result<()> {
916        self.send(RemoteRealtimeMessage::CommandResult {
917            payload: CommandResultPayload {
918                command_id,
919                success,
920                result,
921                error,
922            },
923        })
924        .await
925    }
926}
927
928/// Get the message type string for a RemoteRealtimeMessage
929fn get_message_type(msg: &RemoteRealtimeMessage) -> &'static str {
930    match msg {
931        RemoteRealtimeMessage::Register { .. } => "remote.register",
932        RemoteRealtimeMessage::Heartbeat { .. } => "remote.heartbeat",
933        RemoteRealtimeMessage::Stream { .. } => "remote.stream",
934        RemoteRealtimeMessage::CommandResult { .. } => "remote.command_result",
935        RemoteRealtimeMessage::Event { .. } => "remote.event",
936        RemoteRealtimeMessage::Command { .. } => "remote.command",
937        RemoteRealtimeMessage::Ping { .. } => "remote.ping",
938        RemoteRealtimeMessage::Pong { .. } => "remote.pong",
939        RemoteRealtimeMessage::Disconnect { .. } => "remote.disconnect",
940    }
941}
942
943/// Get the payload from a RemoteRealtimeMessage
944fn get_message_payload(msg: &RemoteRealtimeMessage) -> serde_json::Value {
945    match msg {
946        RemoteRealtimeMessage::Register { payload } => {
947            serde_json::to_value(payload).unwrap_or_default()
948        }
949        RemoteRealtimeMessage::Heartbeat { payload } => {
950            serde_json::to_value(payload).unwrap_or_default()
951        }
952        RemoteRealtimeMessage::Stream { payload } => {
953            serde_json::to_value(payload).unwrap_or_default()
954        }
955        RemoteRealtimeMessage::CommandResult { payload } => {
956            serde_json::to_value(payload).unwrap_or_default()
957        }
958        RemoteRealtimeMessage::Event { payload } => {
959            serde_json::to_value(payload).unwrap_or_default()
960        }
961        RemoteRealtimeMessage::Command { payload } => {
962            serde_json::to_value(payload).unwrap_or_default()
963        }
964        RemoteRealtimeMessage::Ping { payload } => {
965            serde_json::to_value(payload).unwrap_or_default()
966        }
967        RemoteRealtimeMessage::Pong { payload } => {
968            serde_json::to_value(payload).unwrap_or_default()
969        }
970        RemoteRealtimeMessage::Disconnect { payload } => {
971            serde_json::to_value(payload).unwrap_or_default()
972        }
973    }
974}
975
976#[cfg(test)]
977mod tests {
978    use super::*;
979
980    #[test]
981    fn test_realtime_config() {
982        let config = RealtimeConfig {
983            realtime_url: "wss://example.supabase.co/realtime/v1/websocket".to_string(),
984            realtime_token: "test_token".to_string(),
985            channel_name: "cli:user123".to_string(),
986            user_id: "user123".to_string(),
987            session_token: "session123".to_string(),
988            supabase_anon_key: "test_anon_key".to_string(),
989            heartbeat_interval_secs: 30,
990            sessions_dir: PathBuf::from("/tmp/test-sessions"),
991            version: "0.7.0".to_string(),
992        };
993
994        assert_eq!(config.channel_name, "cli:user123");
995        assert_eq!(config.version, "0.7.0");
996    }
997
998    #[test]
999    fn test_message_type() {
1000        let msg = RemoteRealtimeMessage::Heartbeat {
1001            payload: HeartbeatPayload {
1002                agents: vec![],
1003                system_load: 0.5,
1004                hostname: Some("test-host".to_string()),
1005                os: Some("linux".to_string()),
1006                version: Some("0.1.0".to_string()),
1007            },
1008        };
1009
1010        assert_eq!(get_message_type(&msg), "remote.heartbeat");
1011    }
1012}