clasp_client/
client.rs

1//! Main Clasp client implementation
2
3use bytes::Bytes;
4use clasp_core::{
5    codec, time::ClockSync, BundleMessage, ErrorMessage, GetMessage, HelloMessage, Message,
6    PublishMessage, SetMessage, SignalDefinition, SignalType, SubscribeMessage, SubscribeOptions,
7    UnsubscribeMessage, Value, PROTOCOL_VERSION,
8};
9use clasp_transport::{
10    Transport, TransportEvent, TransportReceiver, TransportSender, WebSocketTransport,
11};
12use dashmap::DashMap;
13use parking_lot::RwLock;
14use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::{mpsc, oneshot, Notify};
18use tracing::{debug, error, info, warn};
19
20use crate::builder::ClaspBuilder;
21use crate::error::{ClientError, Result};
22
23/// Subscription callback type
24pub type SubscriptionCallback = Box<dyn Fn(Value, &str) + Send + Sync>;
25
26/// A Clasp client
27pub struct Clasp {
28    url: String,
29    name: String,
30    features: Vec<String>,
31    token: Option<String>,
32    reconnect: bool,
33    reconnect_interval_ms: u64,
34
35    /// Session ID (set after connect)
36    session_id: RwLock<Option<String>>,
37
38    /// Connection state
39    connected: Arc<RwLock<bool>>,
40
41    /// Sender for outgoing messages
42    sender: RwLock<Option<mpsc::Sender<Bytes>>>,
43
44    /// Local param cache
45    params: Arc<DashMap<String, Value>>,
46
47    /// Subscriptions
48    subscriptions: Arc<DashMap<u32, (String, SubscriptionCallback)>>,
49
50    /// Subscription ID counter
51    next_sub_id: AtomicU32,
52
53    /// Clock synchronization
54    clock: RwLock<ClockSync>,
55
56    /// Pending get requests
57    pending_gets: Arc<DashMap<String, oneshot::Sender<Value>>>,
58
59    /// Announced signals (from server)
60    signals: Arc<DashMap<String, SignalDefinition>>,
61
62    /// Last error received from server
63    last_error: Arc<RwLock<Option<ErrorMessage>>>,
64
65    /// Reconnect attempt counter
66    reconnect_attempts: Arc<AtomicU32>,
67
68    /// Max reconnect attempts (0 = unlimited)
69    max_reconnect_attempts: u32,
70
71    /// Flag to indicate intentional close (don't reconnect)
72    intentionally_closed: Arc<AtomicBool>,
73
74    /// Notify for triggering reconnect
75    reconnect_notify: Arc<Notify>,
76}
77
78impl Clasp {
79    /// Create a new client (use builder for more options)
80    pub fn new(
81        url: &str,
82        name: String,
83        features: Vec<String>,
84        token: Option<String>,
85        reconnect: bool,
86        reconnect_interval_ms: u64,
87    ) -> Self {
88        Self {
89            url: url.to_string(),
90            name,
91            features,
92            token,
93            reconnect,
94            reconnect_interval_ms,
95            session_id: RwLock::new(None),
96            connected: Arc::new(RwLock::new(false)),
97            sender: RwLock::new(None),
98            params: Arc::new(DashMap::new()),
99            subscriptions: Arc::new(DashMap::new()),
100            next_sub_id: AtomicU32::new(1),
101            clock: RwLock::new(ClockSync::new()),
102            pending_gets: Arc::new(DashMap::new()),
103            signals: Arc::new(DashMap::new()),
104            last_error: Arc::new(RwLock::new(None)),
105            reconnect_attempts: Arc::new(AtomicU32::new(0)),
106            max_reconnect_attempts: 10,
107            intentionally_closed: Arc::new(AtomicBool::new(false)),
108            reconnect_notify: Arc::new(Notify::new()),
109        }
110    }
111
112    /// Create a builder
113    pub fn builder(url: &str) -> ClaspBuilder {
114        ClaspBuilder::new(url)
115    }
116
117    /// Connect to server (convenience method)
118    pub async fn connect_to(url: &str) -> Result<Self> {
119        ClaspBuilder::new(url).connect().await
120    }
121
122    /// Internal connect
123    pub(crate) async fn do_connect(&mut self) -> Result<()> {
124        if *self.connected.read() {
125            return Err(ClientError::AlreadyConnected);
126        }
127
128        info!("Connecting to {}", self.url);
129
130        // Connect WebSocket
131        let (sender, mut receiver) = <WebSocketTransport as Transport>::connect(&self.url).await?;
132
133        // Create send channel
134        let (tx, mut rx) = mpsc::channel::<Bytes>(100);
135        *self.sender.write() = Some(tx);
136
137        let connected = self.connected.clone();
138
139        // Spawn sender task
140        let sender = Arc::new(sender);
141        let sender_clone = sender.clone();
142        tokio::spawn(async move {
143            while let Some(data) = rx.recv().await {
144                if let Err(e) = sender_clone.send(data).await {
145                    error!("Send error: {}", e);
146                    break;
147                }
148            }
149        });
150
151        // Send HELLO
152        let hello = Message::Hello(HelloMessage {
153            version: PROTOCOL_VERSION,
154            name: self.name.clone(),
155            features: self.features.clone(),
156            capabilities: None,
157            token: self.token.clone(),
158        });
159
160        self.send_message(&hello).await?;
161
162        // Wait for WELCOME
163        loop {
164            match receiver.recv().await {
165                Some(TransportEvent::Data(data)) => {
166                    match codec::decode(&data) {
167                        Ok((Message::Welcome(welcome), _)) => {
168                            *self.session_id.write() = Some(welcome.session.clone());
169                            *connected.write() = true;
170
171                            // Sync clock
172                            self.clock.write().process_sync(
173                                clasp_core::time::now(),
174                                welcome.time,
175                                welcome.time,
176                                clasp_core::time::now(),
177                            );
178
179                            info!("Connected, session: {}", welcome.session);
180                            break;
181                        }
182                        Ok((msg, _)) => {
183                            debug!("Received during handshake: {:?}", msg);
184                        }
185                        Err(e) => {
186                            warn!("Decode error: {}", e);
187                        }
188                    }
189                }
190                Some(TransportEvent::Error(e)) => {
191                    return Err(ClientError::ConnectionFailed(e));
192                }
193                Some(TransportEvent::Disconnected { reason }) => {
194                    return Err(ClientError::ConnectionFailed(
195                        reason.unwrap_or_else(|| "Disconnected".to_string()),
196                    ));
197                }
198                None => {
199                    return Err(ClientError::ConnectionFailed(
200                        "Connection closed".to_string(),
201                    ));
202                }
203                _ => {}
204            }
205        }
206
207        // Reset reconnect state on successful connect
208        self.reconnect_attempts.store(0, Ordering::SeqCst);
209        self.intentionally_closed.store(false, Ordering::SeqCst);
210
211        // Spawn receiver task
212        let params = Arc::clone(&self.params);
213        let subscriptions = Arc::clone(&self.subscriptions);
214        let pending_gets = Arc::clone(&self.pending_gets);
215        let signals = Arc::clone(&self.signals);
216        let last_error = Arc::clone(&self.last_error);
217        let connected_clone = Arc::clone(&self.connected);
218        let reconnect_notify = Arc::clone(&self.reconnect_notify);
219        let intentionally_closed = Arc::clone(&self.intentionally_closed);
220        let reconnect_enabled = self.reconnect;
221
222        tokio::spawn(async move {
223            while let Some(event) = receiver.recv().await {
224                match event {
225                    TransportEvent::Data(data) => {
226                        if let Ok((msg, _)) = codec::decode(&data) {
227                            handle_message(&msg, &params, &subscriptions, &pending_gets, &signals, &last_error);
228                        }
229                    }
230                    TransportEvent::Disconnected { reason } => {
231                        info!("Disconnected: {:?}", reason);
232                        *connected_clone.write() = false;
233
234                        // Trigger reconnect if enabled and not intentionally closed
235                        if reconnect_enabled && !intentionally_closed.load(Ordering::SeqCst) {
236                            reconnect_notify.notify_one();
237                        }
238                        break;
239                    }
240                    TransportEvent::Error(e) => {
241                        error!("Error: {}", e);
242                    }
243                    _ => {}
244                }
245            }
246        });
247
248        Ok(())
249    }
250
251    /// Start the reconnect loop (call after initial connect)
252    pub fn start_reconnect_loop(self: &Arc<Self>) {
253        if !self.reconnect {
254            return;
255        }
256
257        let client = Arc::clone(self);
258        tokio::spawn(async move {
259            loop {
260                // Wait for disconnect notification
261                client.reconnect_notify.notified().await;
262
263                if client.intentionally_closed.load(Ordering::SeqCst) {
264                    break;
265                }
266
267                // Attempt reconnection with exponential backoff
268                loop {
269                    let attempts = client.reconnect_attempts.fetch_add(1, Ordering::SeqCst);
270
271                    if client.max_reconnect_attempts > 0 && attempts >= client.max_reconnect_attempts {
272                        error!("Max reconnect attempts ({}) reached", client.max_reconnect_attempts);
273                        break;
274                    }
275
276                    // Exponential backoff: base * 1.5^attempts, max 30 seconds
277                    let base_ms = client.reconnect_interval_ms;
278                    let delay_ms = (base_ms as f64 * 1.5_f64.powi(attempts as i32)).min(30000.0) as u64;
279
280                    info!("Reconnect attempt {} in {}ms", attempts + 1, delay_ms);
281                    tokio::time::sleep(Duration::from_millis(delay_ms)).await;
282
283                    if client.intentionally_closed.load(Ordering::SeqCst) {
284                        break;
285                    }
286
287                    // Clone the Arc and get a mutable reference for reconnection
288                    // We need to use unsafe or restructure - let's use a different approach
289                    match client.try_reconnect().await {
290                        Ok(()) => {
291                            info!("Reconnected successfully");
292                            client.reconnect_attempts.store(0, Ordering::SeqCst);
293
294                            // Resubscribe to all patterns
295                            if let Err(e) = client.resubscribe_all().await {
296                                warn!("Failed to resubscribe: {}", e);
297                            }
298                            break;
299                        }
300                        Err(e) => {
301                            warn!("Reconnect failed: {}", e);
302                        }
303                    }
304                }
305            }
306        });
307    }
308
309    /// Internal reconnect attempt
310    async fn try_reconnect(&self) -> Result<()> {
311        info!("Attempting to reconnect to {}", self.url);
312
313        // Connect WebSocket
314        let (sender, mut receiver) = <WebSocketTransport as Transport>::connect(&self.url).await?;
315
316        // Create send channel
317        let (tx, mut rx) = mpsc::channel::<Bytes>(100);
318        *self.sender.write() = Some(tx);
319
320        // Spawn sender task
321        let sender = Arc::new(sender);
322        let sender_clone = sender.clone();
323        tokio::spawn(async move {
324            while let Some(data) = rx.recv().await {
325                if let Err(e) = sender_clone.send(data).await {
326                    error!("Send error: {}", e);
327                    break;
328                }
329            }
330        });
331
332        // Send HELLO
333        let hello = Message::Hello(HelloMessage {
334            version: PROTOCOL_VERSION,
335            name: self.name.clone(),
336            features: self.features.clone(),
337            capabilities: None,
338            token: self.token.clone(),
339        });
340
341        self.send_message(&hello).await?;
342
343        // Wait for WELCOME with timeout
344        let welcome_timeout = Duration::from_secs(10);
345        let deadline = tokio::time::Instant::now() + welcome_timeout;
346
347        loop {
348            match tokio::time::timeout_at(deadline, receiver.recv()).await {
349                Ok(Some(TransportEvent::Data(data))) => {
350                    match codec::decode(&data) {
351                        Ok((Message::Welcome(welcome), _)) => {
352                            *self.session_id.write() = Some(welcome.session.clone());
353                            *self.connected.write() = true;
354
355                            self.clock.write().process_sync(
356                                clasp_core::time::now(),
357                                welcome.time,
358                                welcome.time,
359                                clasp_core::time::now(),
360                            );
361
362                            info!("Reconnected, session: {}", welcome.session);
363                            break;
364                        }
365                        Ok((msg, _)) => {
366                            debug!("Received during reconnect handshake: {:?}", msg);
367                        }
368                        Err(e) => {
369                            warn!("Decode error during reconnect: {}", e);
370                        }
371                    }
372                }
373                Ok(Some(TransportEvent::Error(e))) => {
374                    return Err(ClientError::ConnectionFailed(e));
375                }
376                Ok(Some(TransportEvent::Disconnected { reason })) => {
377                    return Err(ClientError::ConnectionFailed(
378                        reason.unwrap_or_else(|| "Disconnected".to_string()),
379                    ));
380                }
381                Ok(None) => {
382                    return Err(ClientError::ConnectionFailed("Connection closed".to_string()));
383                }
384                Err(_) => {
385                    return Err(ClientError::Timeout);
386                }
387                _ => {}
388            }
389        }
390
391        // Spawn new receiver task
392        let params = Arc::clone(&self.params);
393        let subscriptions = Arc::clone(&self.subscriptions);
394        let pending_gets = Arc::clone(&self.pending_gets);
395        let signals = Arc::clone(&self.signals);
396        let last_error = Arc::clone(&self.last_error);
397        let connected_clone = Arc::clone(&self.connected);
398        let reconnect_notify = Arc::clone(&self.reconnect_notify);
399        let intentionally_closed = Arc::clone(&self.intentionally_closed);
400        let reconnect_enabled = self.reconnect;
401
402        tokio::spawn(async move {
403            while let Some(event) = receiver.recv().await {
404                match event {
405                    TransportEvent::Data(data) => {
406                        if let Ok((msg, _)) = codec::decode(&data) {
407                            handle_message(&msg, &params, &subscriptions, &pending_gets, &signals, &last_error);
408                        }
409                    }
410                    TransportEvent::Disconnected { reason } => {
411                        info!("Disconnected: {:?}", reason);
412                        *connected_clone.write() = false;
413
414                        if reconnect_enabled && !intentionally_closed.load(Ordering::SeqCst) {
415                            reconnect_notify.notify_one();
416                        }
417                        break;
418                    }
419                    TransportEvent::Error(e) => {
420                        error!("Error: {}", e);
421                    }
422                    _ => {}
423                }
424            }
425        });
426
427        Ok(())
428    }
429
430    /// Resubscribe to all existing subscriptions after reconnect
431    async fn resubscribe_all(&self) -> Result<()> {
432        // Collect subscription info first to avoid lifetime issues with DashMap
433        let subs: Vec<(u32, String)> = self
434            .subscriptions
435            .iter()
436            .map(|entry| (*entry.key(), entry.value().0.clone()))
437            .collect();
438
439        for (id, pattern) in subs {
440            let msg = Message::Subscribe(SubscribeMessage {
441                id,
442                pattern: pattern.clone(),
443                types: vec![],
444                options: Some(SubscribeOptions::default()),
445            });
446
447            self.send_message(&msg).await?;
448            debug!("Resubscribed to {} (id: {})", pattern, id);
449        }
450
451        Ok(())
452    }
453
454    /// Check if connected
455    pub fn is_connected(&self) -> bool {
456        *self.connected.read()
457    }
458
459    /// Get session ID
460    pub fn session_id(&self) -> Option<String> {
461        self.session_id.read().clone()
462    }
463
464    /// Get current server time (microseconds)
465    pub fn time(&self) -> u64 {
466        self.clock.read().server_time()
467    }
468
469    /// Send a raw message
470    async fn send_message(&self, message: &Message) -> Result<()> {
471        let data = codec::encode(message)?;
472        self.send_raw(data).await
473    }
474
475    /// Send raw bytes
476    async fn send_raw(&self, data: Bytes) -> Result<()> {
477        // Clone the sender to avoid holding the lock across await
478        let tx = {
479            let sender = self.sender.read();
480            sender.as_ref().cloned()
481        };
482
483        if let Some(tx) = tx {
484            tx.send(data)
485                .await
486                .map_err(|e| ClientError::SendFailed(e.to_string()))?;
487            Ok(())
488        } else {
489            Err(ClientError::NotConnected)
490        }
491    }
492
493    /// Subscribe to an address pattern
494    pub async fn subscribe<F>(&self, pattern: &str, callback: F) -> Result<u32>
495    where
496        F: Fn(Value, &str) + Send + Sync + 'static,
497    {
498        let id = self.next_sub_id.fetch_add(1, Ordering::SeqCst);
499
500        // Store callback
501        self.subscriptions
502            .insert(id, (pattern.to_string(), Box::new(callback)));
503
504        // Send subscribe message
505        let msg = Message::Subscribe(SubscribeMessage {
506            id,
507            pattern: pattern.to_string(),
508            types: vec![],
509            options: Some(SubscribeOptions::default()),
510        });
511
512        self.send_message(&msg).await?;
513
514        debug!("Subscribed to {} (id: {})", pattern, id);
515        Ok(id)
516    }
517
518    /// Shorthand for subscribe
519    pub async fn on<F>(&self, pattern: &str, callback: F) -> Result<u32>
520    where
521        F: Fn(Value, &str) + Send + Sync + 'static,
522    {
523        self.subscribe(pattern, callback).await
524    }
525
526    /// Unsubscribe
527    pub async fn unsubscribe(&self, id: u32) -> Result<()> {
528        self.subscriptions.remove(&id);
529
530        let msg = Message::Unsubscribe(UnsubscribeMessage { id });
531        self.send_message(&msg).await?;
532
533        Ok(())
534    }
535
536    /// Set a parameter value
537    pub async fn set(&self, address: &str, value: impl Into<Value>) -> Result<()> {
538        let msg = Message::Set(SetMessage {
539            address: address.to_string(),
540            value: value.into(),
541            revision: None,
542            lock: false,
543            unlock: false,
544        });
545
546        self.send_message(&msg).await
547    }
548
549    /// Set with lock
550    pub async fn set_locked(&self, address: &str, value: impl Into<Value>) -> Result<()> {
551        let msg = Message::Set(SetMessage {
552            address: address.to_string(),
553            value: value.into(),
554            revision: None,
555            lock: true,
556            unlock: false,
557        });
558
559        self.send_message(&msg).await
560    }
561
562    /// Get current value (cached or request)
563    pub async fn get(&self, address: &str) -> Result<Value> {
564        // Check cache first
565        if let Some(value) = self.params.get(address) {
566            return Ok(value.clone());
567        }
568
569        // Request from server
570        let (tx, rx) = oneshot::channel();
571        let address_key = address.to_string();
572        self.pending_gets.insert(address_key.clone(), tx);
573
574        let msg = Message::Get(GetMessage {
575            address: address.to_string(),
576        });
577        self.send_message(&msg).await?;
578
579        // Wait for response (with timeout)
580        match tokio::time::timeout(std::time::Duration::from_secs(5), rx).await {
581            Ok(Ok(value)) => Ok(value),
582            Ok(Err(_)) => {
583                // Cancelled - remove from pending
584                self.pending_gets.remove(&address_key);
585                Err(ClientError::Other("Get cancelled".to_string()))
586            }
587            Err(_) => {
588                // Timeout - remove from pending to prevent memory leak
589                self.pending_gets.remove(&address_key);
590                Err(ClientError::Timeout)
591            }
592        }
593    }
594
595    /// Emit an event
596    pub async fn emit(&self, address: &str, payload: impl Into<Value>) -> Result<()> {
597        let msg = Message::Publish(PublishMessage {
598            address: address.to_string(),
599            signal: Some(SignalType::Event),
600            value: None,
601            payload: Some(payload.into()),
602            samples: None,
603            rate: None,
604            id: None,
605            phase: None,
606            timestamp: Some(self.time()),
607        });
608
609        self.send_message(&msg).await
610    }
611
612    /// Send stream sample
613    pub async fn stream(&self, address: &str, value: impl Into<Value>) -> Result<()> {
614        let msg = Message::Publish(PublishMessage {
615            address: address.to_string(),
616            signal: Some(SignalType::Stream),
617            value: Some(value.into()),
618            payload: None,
619            samples: None,
620            rate: None,
621            id: None,
622            phase: None,
623            timestamp: Some(self.time()),
624        });
625
626        self.send_message(&msg).await
627    }
628
629    /// Send atomic bundle
630    pub async fn bundle(&self, messages: Vec<Message>) -> Result<()> {
631        let msg = Message::Bundle(BundleMessage {
632            timestamp: None,
633            messages,
634        });
635
636        self.send_message(&msg).await
637    }
638
639    /// Send scheduled bundle
640    pub async fn bundle_at(&self, messages: Vec<Message>, time: u64) -> Result<()> {
641        let msg = Message::Bundle(BundleMessage {
642            timestamp: Some(time),
643            messages,
644        });
645
646        self.send_message(&msg).await
647    }
648
649    /// Get cached param value
650    pub fn cached(&self, address: &str) -> Option<Value> {
651        self.params.get(address).map(|v| v.clone())
652    }
653
654    /// Close connection.
655    /// Disables auto-reconnect and closes the connection.
656    pub async fn close(&self) {
657        self.intentionally_closed.store(true, Ordering::SeqCst);
658        *self.connected.write() = false;
659        *self.sender.write() = None;
660    }
661
662    /// Get all announced signals
663    pub fn signals(&self) -> Vec<SignalDefinition> {
664        self.signals.iter().map(|e| e.value().clone()).collect()
665    }
666
667    /// Query signals matching a pattern
668    pub fn query_signals(&self, pattern: &str) -> Vec<SignalDefinition> {
669        self.signals
670            .iter()
671            .filter(|e| clasp_core::address::glob_match(pattern, e.key()))
672            .map(|e| e.value().clone())
673            .collect()
674    }
675
676    /// Get the last error received from server
677    pub fn last_error(&self) -> Option<ErrorMessage> {
678        self.last_error.read().clone()
679    }
680
681    /// Clear the last error
682    pub fn clear_error(&self) {
683        *self.last_error.write() = None;
684    }
685}
686
687/// Handle incoming message
688fn handle_message(
689    msg: &Message,
690    params: &Arc<DashMap<String, Value>>,
691    subscriptions: &Arc<DashMap<u32, (String, SubscriptionCallback)>>,
692    pending_gets: &Arc<DashMap<String, oneshot::Sender<Value>>>,
693    signals: &Arc<DashMap<String, SignalDefinition>>,
694    last_error: &Arc<RwLock<Option<ErrorMessage>>>,
695) {
696    match msg {
697        Message::Set(set) => {
698            // Update cache
699            params.insert(set.address.clone(), set.value.clone());
700
701            // Notify subscribers
702            for entry in subscriptions.iter() {
703                let (pattern, callback) = entry.value();
704                if clasp_core::address::glob_match(pattern, &set.address) {
705                    callback(set.value.clone(), &set.address);
706                }
707            }
708        }
709
710        Message::Snapshot(snapshot) => {
711            for param in &snapshot.params {
712                params.insert(param.address.clone(), param.value.clone());
713
714                // Complete pending gets
715                if let Some((_, tx)) = pending_gets.remove(&param.address) {
716                    let _ = tx.send(param.value.clone());
717                }
718
719                // Notify subscribers
720                for entry in subscriptions.iter() {
721                    let (pattern, callback) = entry.value();
722                    if clasp_core::address::glob_match(pattern, &param.address) {
723                        callback(param.value.clone(), &param.address);
724                    }
725                }
726            }
727        }
728
729        Message::Publish(pub_msg) => {
730            // Notify subscribers
731            let value = pub_msg
732                .value
733                .clone()
734                .or_else(|| pub_msg.payload.clone())
735                .unwrap_or(Value::Null);
736
737            for entry in subscriptions.iter() {
738                let (pattern, callback) = entry.value();
739                if clasp_core::address::glob_match(pattern, &pub_msg.address) {
740                    callback(value.clone(), &pub_msg.address);
741                }
742            }
743        }
744
745        Message::Error(error) => {
746            // Log the error and store it for retrieval
747            warn!(
748                "Server error {}: {} (address: {:?})",
749                error.code, error.message, error.address
750            );
751            *last_error.write() = Some(error.clone());
752        }
753
754        Message::Ack(ack) => {
755            // Log acknowledgment (could be extended to track pending requests)
756            debug!(
757                "Received ACK for {:?} (revision: {:?})",
758                ack.address, ack.revision
759            );
760        }
761
762        Message::Announce(announce) => {
763            // Store announced signals
764            for signal in &announce.signals {
765                debug!("Received signal announcement: {}", signal.address);
766                signals.insert(signal.address.clone(), signal.clone());
767            }
768        }
769
770        Message::Sync(sync) => {
771            // Process clock sync response
772            // Note: This handles sync messages from server with t2/t3 filled in
773            if let (Some(t2), Some(t3)) = (sync.t2, sync.t3) {
774                debug!(
775                    "Clock sync: t1={}, t2={}, t3={}",
776                    sync.t1, t2, t3
777                );
778                // Clock sync is processed through ClockSync::process_sync
779                // but we don't have access to the clock here.
780                // For now, log it. A more complete implementation would
781                // use a channel to send sync data back to the main client.
782            }
783        }
784
785        Message::Result(result) => {
786            // Handle query results
787            debug!(
788                "Received result with {} signals",
789                result.signals.len()
790            );
791            // Store any returned signals
792            for signal in &result.signals {
793                signals.insert(signal.address.clone(), signal.clone());
794            }
795        }
796
797        // Messages that are typically client-initiated, not expected from server
798        Message::Hello(_) | Message::Welcome(_) | Message::Subscribe(_)
799        | Message::Unsubscribe(_) | Message::Get(_) | Message::Query(_) => {
800            debug!("Received unexpected client-type message: {:?}", msg);
801        }
802
803        // Bundle: process contained messages recursively
804        Message::Bundle(bundle) => {
805            for inner_msg in &bundle.messages {
806                handle_message(inner_msg, params, subscriptions, pending_gets, signals, last_error);
807            }
808        }
809
810        // Ping/Pong for keep-alive
811        Message::Ping => {
812            debug!("Received PING from server");
813            // Note: Pong response should be sent, but we don't have sender access here.
814            // A more complete implementation would use a channel to request pong be sent.
815        }
816
817        Message::Pong => {
818            debug!("Received PONG from server");
819        }
820    }
821}