1use std::path::PathBuf;
18use std::sync::Arc;
19use std::time::Duration;
20
21use anyhow::{Context, Result, bail};
22use futures::{SinkExt, StreamExt};
23use serde::{Deserialize, Serialize};
24use tokio::sync::{RwLock, mpsc};
25use tokio_tungstenite::{connect_async, tungstenite::Message};
26use url::Url;
27
28use super::protocol::{BackendCommand, RemoteAgentInfo, StreamChunkType};
29
30const PHOENIX_HEARTBEAT_INTERVAL_SECS: u64 = 25;
32
33#[derive(Debug, Clone)]
35pub struct RealtimeConfig {
36 pub realtime_url: String,
38 pub realtime_token: String,
40 pub channel_name: String,
42 pub user_id: String,
44 pub session_token: String,
46 pub supabase_anon_key: String,
48 pub heartbeat_interval_secs: u64,
50 pub sessions_dir: PathBuf,
52 pub version: String,
54}
55
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub enum RealtimeState {
59 Disconnected,
61 Connecting,
63 Connected,
65 Subscribed,
67 ShuttingDown,
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
73#[serde(tag = "event", rename_all = "snake_case")]
74pub enum PhoenixMessage {
75 #[serde(rename = "phx_join")]
77 PhxJoin {
78 topic: String,
80 payload: serde_json::Value,
82 #[serde(rename = "ref")]
84 msg_ref: String,
85 },
86 #[serde(rename = "phx_reply")]
88 PhxReply {
89 topic: String,
91 payload: PhxReplyPayload,
93 #[serde(rename = "ref")]
95 msg_ref: String,
96 },
97 #[serde(rename = "heartbeat")]
99 Heartbeat {
100 topic: String,
102 payload: serde_json::Value,
104 #[serde(rename = "ref")]
106 msg_ref: String,
107 },
108 #[serde(rename = "broadcast")]
110 Broadcast {
111 topic: String,
113 payload: BroadcastPayload,
115 #[serde(rename = "ref")]
117 msg_ref: Option<String>,
118 },
119 #[serde(rename = "presence_state")]
121 PresenceState {
122 topic: String,
124 payload: serde_json::Value,
126 #[serde(rename = "ref")]
128 msg_ref: Option<String>,
129 },
130 #[serde(rename = "presence_diff")]
132 PresenceDiff {
133 topic: String,
135 payload: serde_json::Value,
137 #[serde(rename = "ref")]
139 msg_ref: Option<String>,
140 },
141}
142
143#[derive(Debug, Clone, Serialize, Deserialize)]
145pub struct PhxReplyPayload {
146 pub status: String,
148 #[serde(default)]
150 pub response: serde_json::Value,
151}
152
153#[derive(Debug, Clone, Serialize, Deserialize)]
155pub struct BroadcastPayload {
156 #[serde(rename = "type")]
158 pub broadcast_type: String,
159 pub event: String,
161 pub payload: serde_json::Value,
163}
164
165#[derive(Debug, Clone, Serialize, Deserialize)]
167#[serde(tag = "type", rename_all = "snake_case")]
168pub enum RemoteRealtimeMessage {
169 #[serde(rename = "remote.register")]
171 Register {
172 payload: RegisterPayload,
174 },
175 #[serde(rename = "remote.heartbeat")]
177 Heartbeat {
178 payload: HeartbeatPayload,
180 },
181 #[serde(rename = "remote.stream")]
183 Stream {
184 payload: StreamPayload,
186 },
187 #[serde(rename = "remote.command_result")]
189 CommandResult {
190 payload: CommandResultPayload,
192 },
193 #[serde(rename = "remote.event")]
195 Event {
196 payload: EventPayload,
198 },
199 #[serde(rename = "remote.command")]
201 Command {
202 payload: CommandPayload,
204 },
205 #[serde(rename = "remote.ping")]
207 Ping {
208 payload: PingPongPayload,
210 },
211 #[serde(rename = "remote.pong")]
213 Pong {
214 payload: PingPongPayload,
216 },
217 #[serde(rename = "remote.disconnect")]
219 Disconnect {
220 payload: DisconnectPayload,
222 },
223}
224
225#[derive(Debug, Clone, Serialize, Deserialize)]
227#[serde(rename_all = "camelCase")]
228pub struct RegisterPayload {
229 pub hostname: String,
231 pub os: String,
233 pub version: String,
235 pub session_token: String,
237 #[serde(default, skip_serializing_if = "Vec::is_empty")]
239 pub agents: Vec<RemoteAgentInfo>,
240 #[serde(default, skip_serializing_if = "Option::is_none")]
242 pub system_load: Option<f32>,
243}
244
245#[derive(Debug, Clone, Serialize, Deserialize)]
247#[serde(rename_all = "camelCase")]
248pub struct HeartbeatPayload {
249 pub agents: Vec<RemoteAgentInfo>,
251 pub system_load: f32,
253 #[serde(skip_serializing_if = "Option::is_none")]
255 pub hostname: Option<String>,
256 #[serde(skip_serializing_if = "Option::is_none")]
258 pub os: Option<String>,
259 #[serde(skip_serializing_if = "Option::is_none")]
261 pub version: Option<String>,
262}
263
264#[derive(Debug, Clone, Serialize, Deserialize)]
266#[serde(rename_all = "camelCase")]
267pub struct StreamPayload {
268 pub agent_id: String,
270 pub chunk_type: StreamChunkType,
272 pub content: String,
274}
275
276#[derive(Debug, Clone, Serialize, Deserialize)]
278#[serde(rename_all = "camelCase")]
279pub struct CommandResultPayload {
280 pub command_id: String,
282 pub success: bool,
284 #[serde(skip_serializing_if = "Option::is_none")]
286 pub result: Option<serde_json::Value>,
287 #[serde(skip_serializing_if = "Option::is_none")]
289 pub error: Option<String>,
290}
291
292#[derive(Debug, Clone, Serialize, Deserialize)]
294#[serde(rename_all = "camelCase")]
295pub struct EventPayload {
296 pub event_type: String,
298 pub agent_id: String,
300 #[serde(skip_serializing_if = "Option::is_none")]
302 pub data: Option<serde_json::Value>,
303}
304
305#[derive(Debug, Clone, Serialize, Deserialize)]
307#[serde(rename_all = "camelCase")]
308pub struct CommandPayload {
309 pub command_id: String,
311 pub command_type: String,
313 #[serde(skip_serializing_if = "Option::is_none")]
315 pub agent_id: Option<String>,
316 #[serde(skip_serializing_if = "Option::is_none")]
318 pub content: Option<String>,
319 #[serde(skip_serializing_if = "Option::is_none")]
321 pub command: Option<String>,
322 #[serde(skip_serializing_if = "Option::is_none")]
324 pub args: Option<Vec<String>>,
325 #[serde(skip_serializing_if = "Option::is_none")]
327 pub model: Option<String>,
328 #[serde(skip_serializing_if = "Option::is_none")]
330 pub working_directory: Option<String>,
331 #[serde(skip_serializing_if = "Option::is_none")]
333 pub reason: Option<String>,
334}
335
336#[derive(Debug, Clone, Serialize, Deserialize)]
338#[serde(rename_all = "camelCase")]
339pub struct PingPongPayload {
340 pub server_timestamp: i64,
342}
343
344#[derive(Debug, Clone, Serialize, Deserialize)]
346#[serde(rename_all = "camelCase")]
347pub struct DisconnectPayload {
348 pub reason: String,
350 pub hostname: String,
352}
353
354pub struct RealtimeClient {
356 config: RealtimeConfig,
357 state: Arc<RwLock<RealtimeState>>,
358 outgoing_tx: Option<mpsc::Sender<RemoteRealtimeMessage>>,
360 msg_ref: Arc<RwLock<u64>>,
362}
363
364impl RealtimeClient {
365 pub fn new(config: RealtimeConfig) -> Self {
367 Self {
368 config,
369 state: Arc::new(RwLock::new(RealtimeState::Disconnected)),
370 outgoing_tx: None,
371 msg_ref: Arc::new(RwLock::new(0)),
372 }
373 }
374
375 async fn next_ref(&self) -> String {
377 let mut ref_num = self.msg_ref.write().await;
378 *ref_num += 1;
379 ref_num.to_string()
380 }
381
382 pub async fn state(&self) -> RealtimeState {
384 *self.state.read().await
385 }
386
387 pub async fn is_ready(&self) -> bool {
389 *self.state.read().await == RealtimeState::Subscribed
390 }
391
392 pub async fn connect(
399 &mut self,
400 mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
401 mut heartbeat_rx: mpsc::Receiver<super::heartbeat::HeartbeatData>,
402 mut stream_rx: mpsc::Receiver<(String, StreamChunkType, String)>,
403 command_tx: mpsc::Sender<BackendCommand>,
404 ) -> Result<()> {
405 *self.state.write().await = RealtimeState::Connecting;
406
407 let mut url = Url::parse(&self.config.realtime_url)?;
409 url.query_pairs_mut()
410 .append_pair("apikey", &self.config.supabase_anon_key)
411 .append_pair("vsn", "1.0.0");
412
413 tracing::info!(
414 "Connecting to Supabase Realtime: {}",
415 url.host_str().unwrap_or("unknown")
416 );
417
418 let request = tokio_tungstenite::tungstenite::http::Request::builder()
420 .uri(url.as_str())
421 .header(
422 "Authorization",
423 format!("Bearer {}", self.config.realtime_token),
424 )
425 .body(())
426 .context("Failed to build WebSocket request")?;
427
428 let (ws_stream, _response): (
430 tokio_tungstenite::WebSocketStream<
431 tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
432 >,
433 _,
434 ) = match connect_async(request).await {
435 Ok(result) => result,
436 Err(e) => {
437 tracing::error!("WebSocket connection error: {:?}", e);
438 return Err(anyhow::anyhow!(
439 "Failed to connect to Supabase Realtime: {}",
440 e
441 ));
442 }
443 };
444
445 *self.state.write().await = RealtimeState::Connected;
446 tracing::info!("Connected to Supabase Realtime");
447
448 let (mut write, mut read) = ws_stream.split();
449
450 let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<RemoteRealtimeMessage>(100);
452
453 self.outgoing_tx = Some(outgoing_tx);
454
455 let join_ref = self.next_ref().await;
457 let channel_topic = format!("realtime:{}", self.config.channel_name);
458 let join_msg = serde_json::json!({
459 "topic": channel_topic,
460 "event": "phx_join",
461 "payload": {
462 "config": {
463 "broadcast": {
464 "self": false
465 },
466 "presence": {
467 "key": ""
468 }
469 }
470 },
471 "ref": join_ref
472 });
473
474 write
475 .send(Message::Text(serde_json::to_string(&join_msg)?.into()))
476 .await
477 .context("Failed to send join message")?;
478
479 tracing::info!("Sent join request for channel: {}", channel_topic);
480
481 let state = Arc::clone(&self.state);
483 let channel_topic_clone = channel_topic.clone();
484 let user_id = self.config.user_id.clone();
485 let session_token = self.config.session_token.clone();
486 let sessions_dir = self.config.sessions_dir.clone();
487 let version = self.config.version.clone();
488
489 let mut phoenix_heartbeat =
491 tokio::time::interval(Duration::from_secs(PHOENIX_HEARTBEAT_INTERVAL_SECS));
492
493 let mut register_sent = false;
495
496 loop {
497 if !register_sent && *state.read().await == RealtimeState::Subscribed {
499 register_sent = true;
500 tracing::info!("Channel subscribed, sending register message to frontend");
501
502 let agents =
505 crate::ipc::discovery::list_agent_sessions_with_metadata(&sessions_dir)
506 .unwrap_or_default()
507 .into_iter()
508 .map(RemoteAgentInfo::from)
509 .collect::<Vec<_>>();
510
511 let register_msg = RemoteRealtimeMessage::Register {
512 payload: RegisterPayload {
513 hostname: gethostname::gethostname().to_string_lossy().to_string(),
514 os: std::env::consts::OS.to_string(),
515 version: version.clone(),
516 session_token: session_token.clone(),
517 agents,
518 system_load: None,
519 },
520 };
521
522 if let Err(e) = self
523 .send_broadcast(&mut write, &channel_topic_clone, &user_id, register_msg)
524 .await
525 {
526 tracing::warn!("Failed to send register message: {}", e);
527 } else {
528 tracing::info!("Register message sent to frontend");
529 }
530 }
531
532 tokio::select! {
533 _ = shutdown_rx.recv() => {
535 tracing::info!("Received shutdown signal, sending disconnect message");
536 *state.write().await = RealtimeState::ShuttingDown;
537
538 let disconnect_msg = RemoteRealtimeMessage::Disconnect {
540 payload: DisconnectPayload {
541 reason: "Bridge shutting down".to_string(),
542 hostname: gethostname::gethostname().to_string_lossy().to_string(),
543 },
544 };
545 if let Err(e) = self.send_broadcast(&mut write, &channel_topic_clone, &user_id, disconnect_msg).await {
546 tracing::warn!("Failed to send disconnect message: {}", e);
547 } else {
548 tracing::info!("Disconnect message sent to frontend");
549 }
550
551 break;
552 }
553
554 _ = phoenix_heartbeat.tick() => {
556 let hb_ref = self.next_ref().await;
557 let heartbeat_msg = serde_json::json!({
558 "topic": "phoenix",
559 "event": "heartbeat",
560 "payload": {},
561 "ref": hb_ref
562 });
563 let heartbeat_str = match serde_json::to_string(&heartbeat_msg) {
564 Ok(s) => s,
565 Err(e) => {
566 tracing::error!("Failed to serialize Phoenix heartbeat: {}", e);
567 break;
568 }
569 };
570 if let Err(e) = write.send(Message::Text(heartbeat_str.into())).await {
571 tracing::error!("Failed to send Phoenix heartbeat: {}", e);
572 break;
573 }
574 tracing::debug!("Sent Phoenix heartbeat");
575 }
576
577 msg = read.next() => {
579 match msg {
580 Some(Ok(Message::Text(text))) => {
581 if let Err(e) = self.handle_incoming_message(
582 &text,
583 &channel_topic_clone,
584 &command_tx,
585 &state,
586 ).await {
587 tracing::error!("Error handling message: {}", e);
588 }
589 }
590 Some(Ok(Message::Ping(data))) => {
591 if let Err(e) = write.send(Message::Pong(data)).await {
592 tracing::error!("Failed to send pong: {}", e);
593 }
594 }
595 Some(Ok(Message::Close(_))) => {
596 tracing::info!("WebSocket closed by server");
597 break;
598 }
599 Some(Err(e)) => {
600 tracing::error!("WebSocket error: {}", e);
601 break;
602 }
603 None => {
604 tracing::info!("WebSocket stream ended");
605 break;
606 }
607 _ => {}
608 }
609 }
610
611 Some(msg) = outgoing_rx.recv() => {
613 if let Err(e) = self.send_broadcast(&mut write, &channel_topic_clone, &user_id, msg).await {
614 tracing::error!("Failed to send broadcast: {}", e);
615 }
616 }
617
618 Some(heartbeat_data) = heartbeat_rx.recv() => {
620 tracing::info!("Broadcasting heartbeat with {} agents to frontend (host: {})",
621 heartbeat_data.agents.len(), heartbeat_data.hostname);
622 let msg = RemoteRealtimeMessage::Heartbeat {
623 payload: HeartbeatPayload {
624 agents: heartbeat_data.agents,
625 system_load: heartbeat_data.system_load,
626 hostname: Some(heartbeat_data.hostname),
627 os: Some(heartbeat_data.os),
628 version: Some(heartbeat_data.version),
629 },
630 };
631 if let Err(e) = self.send_broadcast(&mut write, &channel_topic_clone, &user_id, msg).await {
632 tracing::error!("Failed to send heartbeat broadcast: {}", e);
633 } else {
634 tracing::info!("Heartbeat broadcast sent to channel {}", channel_topic_clone);
635 }
636 }
637
638 Some((agent_id, chunk_type, content)) = stream_rx.recv() => {
640 tracing::debug!("Broadcasting stream for agent {}: {:?}", agent_id, chunk_type);
641 let msg = RemoteRealtimeMessage::Stream {
642 payload: StreamPayload {
643 agent_id,
644 chunk_type,
645 content,
646 },
647 };
648 if let Err(e) = self.send_broadcast(&mut write, &channel_topic_clone, &user_id, msg).await {
649 tracing::error!("Failed to send stream broadcast: {}", e);
650 }
651 }
652 }
653 }
654 *self.state.write().await = RealtimeState::Disconnected;
655 Ok(())
656 }
657
658 async fn handle_incoming_message(
660 &self,
661 text: &str,
662 channel_topic: &str,
663 command_tx: &mpsc::Sender<BackendCommand>,
664 state: &Arc<RwLock<RealtimeState>>,
665 ) -> Result<()> {
666 let msg: serde_json::Value = serde_json::from_str(text)?;
667
668 let event = msg.get("event").and_then(|e| e.as_str()).unwrap_or("");
669 let topic = msg.get("topic").and_then(|t| t.as_str()).unwrap_or("");
670
671 match event {
672 "phx_reply" => {
673 let status = msg
674 .get("payload")
675 .and_then(|p| p.get("status"))
676 .and_then(|s| s.as_str())
677 .unwrap_or("");
678
679 if status == "ok" && topic == channel_topic {
680 *state.write().await = RealtimeState::Subscribed;
681 tracing::info!("Successfully joined channel: {}", channel_topic);
682 } else if status != "ok" {
683 tracing::error!("Join failed: {:?}", msg);
684 }
685 }
686
687 "broadcast" => {
688 tracing::info!("Received broadcast event, full msg: {:?}", msg);
689
690 if let Some(wrapper) = msg.get("payload") {
691 tracing::info!("Broadcast wrapper payload: {:?}", wrapper);
692
693 if let Some(inner_payload) = wrapper.get("payload") {
694 tracing::info!(
695 "Inner payload (RemoteRealtimeMessage): {:?}",
696 inner_payload
697 );
698 self.handle_remote_message(inner_payload, command_tx)
699 .await?;
700 } else {
701 tracing::warn!("Broadcast has no inner payload: {:?}", wrapper);
702 }
703 }
704 }
705
706 "presence_state" | "presence_diff" => {
707 tracing::debug!("Presence update: {}", event);
708 }
709
710 _ => {
711 tracing::debug!("Unhandled event: {}", event);
712 }
713 }
714
715 Ok(())
716 }
717
718 async fn handle_remote_message(
720 &self,
721 msg: &serde_json::Value,
722 command_tx: &mpsc::Sender<BackendCommand>,
723 ) -> Result<()> {
724 let msg_type = msg.get("type").and_then(|t| t.as_str()).unwrap_or("");
725 tracing::debug!("handle_remote_message: type={}, msg={:?}", msg_type, msg);
726
727 match msg_type {
728 "remote.command" => {
729 tracing::info!("Received remote.command from frontend");
730 if let Some(payload) = msg.get("payload") {
731 tracing::debug!("Command payload: {:?}", payload);
732 match serde_json::from_value::<CommandPayload>(payload.clone()) {
733 Ok(cmd_payload) => {
734 tracing::info!(
735 "Parsed command: type={}, agent_id={:?}",
736 cmd_payload.command_type,
737 cmd_payload.agent_id
738 );
739 let backend_cmd = self.convert_to_backend_command(&cmd_payload)?;
740 command_tx.send(backend_cmd).await?;
741 }
742 Err(e) => {
743 tracing::error!(
744 "Failed to parse CommandPayload: {}, payload was: {:?}",
745 e,
746 payload
747 );
748 }
749 }
750 } else {
751 tracing::warn!("remote.command has no payload");
752 }
753 }
754
755 "remote.ping" => {
756 if let Some(tx) = &self.outgoing_tx {
757 let timestamp = msg
758 .get("payload")
759 .and_then(|p| p.get("serverTimestamp"))
760 .and_then(|t| t.as_i64())
761 .unwrap_or_else(|| chrono::Utc::now().timestamp_millis());
762
763 tx.send(RemoteRealtimeMessage::Pong {
764 payload: PingPongPayload {
765 server_timestamp: timestamp,
766 },
767 })
768 .await?;
769 }
770 }
771
772 _ => {
773 tracing::debug!("Unhandled remote message type: {}", msg_type);
774 }
775 }
776
777 Ok(())
778 }
779
780 fn convert_to_backend_command(&self, payload: &CommandPayload) -> Result<BackendCommand> {
782 let cmd = match payload.command_type.as_str() {
783 "subscribe" => BackendCommand::Subscribe {
784 agent_id: payload.agent_id.clone().unwrap_or_default(),
785 },
786 "unsubscribe" => BackendCommand::Unsubscribe {
787 agent_id: payload.agent_id.clone().unwrap_or_default(),
788 },
789 "send_input" => BackendCommand::SendInput {
790 command_id: payload.command_id.clone(),
791 agent_id: payload.agent_id.clone().unwrap_or_default(),
792 content: payload.content.clone().unwrap_or_default(),
793 },
794 "slash_command" => BackendCommand::SlashCommand {
795 command_id: payload.command_id.clone(),
796 agent_id: payload.agent_id.clone().unwrap_or_default(),
797 command: payload.command.clone().unwrap_or_default(),
798 args: payload.args.clone().unwrap_or_default(),
799 },
800 "cancel_operation" => BackendCommand::CancelOperation {
801 command_id: payload.command_id.clone(),
802 agent_id: payload.agent_id.clone().unwrap_or_default(),
803 },
804 "spawn_agent" => BackendCommand::SpawnAgent {
805 command_id: payload.command_id.clone(),
806 model: payload.model.clone(),
807 working_directory: payload.working_directory.clone(),
808 },
809 "request_sync" => BackendCommand::RequestSync,
810 "ping" => BackendCommand::Ping {
811 timestamp: chrono::Utc::now().timestamp_millis(),
812 },
813 "disconnect" => BackendCommand::Disconnect {
814 reason: payload
815 .reason
816 .clone()
817 .unwrap_or_else(|| "Server requested".to_string()),
818 },
819 _ => bail!("Unknown command type: {}", payload.command_type),
820 };
821
822 Ok(cmd)
823 }
824
825 async fn send_broadcast<W>(
827 &self,
828 write: &mut W,
829 channel_topic: &str,
830 user_id: &str,
831 msg: RemoteRealtimeMessage,
832 ) -> Result<()>
833 where
834 W: SinkExt<Message, Error = tokio_tungstenite::tungstenite::Error> + Unpin,
835 {
836 let msg_ref = self.next_ref().await;
837
838 let broadcast = serde_json::json!({
840 "topic": channel_topic,
841 "event": "broadcast",
842 "payload": {
843 "type": "broadcast",
844 "event": "remote",
845 "payload": {
846 "type": get_message_type(&msg),
847 "id": uuid::Uuid::new_v4().to_string(),
848 "payload": get_message_payload(&msg),
849 "timestamp": chrono::Utc::now().timestamp_millis(),
850 "userId": user_id
851 }
852 },
853 "ref": msg_ref
854 });
855
856 write
857 .send(Message::Text(serde_json::to_string(&broadcast)?.into()))
858 .await
859 .context("Failed to send broadcast")?;
860
861 Ok(())
862 }
863
864 pub async fn send(&self, msg: RemoteRealtimeMessage) -> Result<()> {
866 if let Some(tx) = &self.outgoing_tx {
867 tx.send(msg).await?;
868 } else {
869 bail!("Not connected");
870 }
871 Ok(())
872 }
873
874 pub async fn send_heartbeat(
876 &self,
877 heartbeat_data: super::heartbeat::HeartbeatData,
878 ) -> Result<()> {
879 self.send(RemoteRealtimeMessage::Heartbeat {
880 payload: HeartbeatPayload {
881 agents: heartbeat_data.agents,
882 system_load: heartbeat_data.system_load,
883 hostname: Some(heartbeat_data.hostname),
884 os: Some(heartbeat_data.os),
885 version: Some(heartbeat_data.version),
886 },
887 })
888 .await
889 }
890
891 pub async fn send_stream(
893 &self,
894 agent_id: String,
895 chunk_type: StreamChunkType,
896 content: String,
897 ) -> Result<()> {
898 self.send(RemoteRealtimeMessage::Stream {
899 payload: StreamPayload {
900 agent_id,
901 chunk_type,
902 content,
903 },
904 })
905 .await
906 }
907
908 pub async fn send_command_result(
910 &self,
911 command_id: String,
912 success: bool,
913 result: Option<serde_json::Value>,
914 error: Option<String>,
915 ) -> Result<()> {
916 self.send(RemoteRealtimeMessage::CommandResult {
917 payload: CommandResultPayload {
918 command_id,
919 success,
920 result,
921 error,
922 },
923 })
924 .await
925 }
926}
927
928fn get_message_type(msg: &RemoteRealtimeMessage) -> &'static str {
930 match msg {
931 RemoteRealtimeMessage::Register { .. } => "remote.register",
932 RemoteRealtimeMessage::Heartbeat { .. } => "remote.heartbeat",
933 RemoteRealtimeMessage::Stream { .. } => "remote.stream",
934 RemoteRealtimeMessage::CommandResult { .. } => "remote.command_result",
935 RemoteRealtimeMessage::Event { .. } => "remote.event",
936 RemoteRealtimeMessage::Command { .. } => "remote.command",
937 RemoteRealtimeMessage::Ping { .. } => "remote.ping",
938 RemoteRealtimeMessage::Pong { .. } => "remote.pong",
939 RemoteRealtimeMessage::Disconnect { .. } => "remote.disconnect",
940 }
941}
942
943fn get_message_payload(msg: &RemoteRealtimeMessage) -> serde_json::Value {
945 match msg {
946 RemoteRealtimeMessage::Register { payload } => {
947 serde_json::to_value(payload).unwrap_or_default()
948 }
949 RemoteRealtimeMessage::Heartbeat { payload } => {
950 serde_json::to_value(payload).unwrap_or_default()
951 }
952 RemoteRealtimeMessage::Stream { payload } => {
953 serde_json::to_value(payload).unwrap_or_default()
954 }
955 RemoteRealtimeMessage::CommandResult { payload } => {
956 serde_json::to_value(payload).unwrap_or_default()
957 }
958 RemoteRealtimeMessage::Event { payload } => {
959 serde_json::to_value(payload).unwrap_or_default()
960 }
961 RemoteRealtimeMessage::Command { payload } => {
962 serde_json::to_value(payload).unwrap_or_default()
963 }
964 RemoteRealtimeMessage::Ping { payload } => {
965 serde_json::to_value(payload).unwrap_or_default()
966 }
967 RemoteRealtimeMessage::Pong { payload } => {
968 serde_json::to_value(payload).unwrap_or_default()
969 }
970 RemoteRealtimeMessage::Disconnect { payload } => {
971 serde_json::to_value(payload).unwrap_or_default()
972 }
973 }
974}
975
976#[cfg(test)]
977mod tests {
978 use super::*;
979
980 #[test]
981 fn test_realtime_config() {
982 let config = RealtimeConfig {
983 realtime_url: "wss://example.supabase.co/realtime/v1/websocket".to_string(),
984 realtime_token: "test_token".to_string(),
985 channel_name: "cli:user123".to_string(),
986 user_id: "user123".to_string(),
987 session_token: "session123".to_string(),
988 supabase_anon_key: "test_anon_key".to_string(),
989 heartbeat_interval_secs: 30,
990 sessions_dir: PathBuf::from("/tmp/test-sessions"),
991 version: "0.7.0".to_string(),
992 };
993
994 assert_eq!(config.channel_name, "cli:user123");
995 assert_eq!(config.version, "0.7.0");
996 }
997
998 #[test]
999 fn test_message_type() {
1000 let msg = RemoteRealtimeMessage::Heartbeat {
1001 payload: HeartbeatPayload {
1002 agents: vec![],
1003 system_load: 0.5,
1004 hostname: Some("test-host".to_string()),
1005 os: Some("linux".to_string()),
1006 version: Some("0.1.0".to_string()),
1007 },
1008 };
1009
1010 assert_eq!(get_message_type(&msg), "remote.heartbeat");
1011 }
1012}