Skip to main content

clasp_client/
client.rs

1//! Main Clasp client implementation
2
3use bytes::Bytes;
4use clasp_core::{
5    codec, time::ClockSync, BundleMessage, ErrorMessage, GesturePhase, GetMessage, HelloMessage,
6    Message, PublishMessage, SetMessage, SignalDefinition, SignalType, SubscribeMessage,
7    SubscribeOptions, TimelineData, UnsubscribeMessage, Value, PROTOCOL_VERSION,
8};
9use clasp_transport::{
10    Transport, TransportEvent, TransportReceiver, TransportSender, WebSocketTransport,
11};
12use dashmap::DashMap;
13use parking_lot::RwLock;
14use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::{mpsc, oneshot, Notify};
18use tracing::{debug, error, info, warn};
19
20use crate::builder::ClaspBuilder;
21use crate::error::{ClientError, Result};
22#[cfg(feature = "p2p")]
23use crate::p2p;
24#[cfg(feature = "p2p")]
25use clasp_core::{P2PConfig, P2P_SIGNAL_PREFIX};
26
27/// Subscription callback type
28pub type SubscriptionCallback = Box<dyn Fn(Value, &str) + Send + Sync>;
29
30/// A Clasp client
31pub struct Clasp {
32    url: String,
33    name: String,
34    features: Vec<String>,
35    token: Option<String>,
36    reconnect: bool,
37    reconnect_interval_ms: u64,
38
39    /// Session ID (set after connect)
40    session_id: RwLock<Option<String>>,
41
42    /// Connection state
43    connected: Arc<RwLock<bool>>,
44
45    /// Sender for outgoing messages
46    sender: RwLock<Option<mpsc::Sender<Bytes>>>,
47
48    /// Local param cache
49    params: Arc<DashMap<String, Value>>,
50
51    /// Subscriptions
52    subscriptions: Arc<DashMap<u32, (String, SubscriptionCallback)>>,
53
54    /// Subscription ID counter
55    next_sub_id: AtomicU32,
56
57    /// Clock synchronization
58    clock: RwLock<ClockSync>,
59
60    /// Pending get requests
61    pending_gets: Arc<DashMap<String, oneshot::Sender<Value>>>,
62
63    /// Announced signals (from server)
64    signals: Arc<DashMap<String, SignalDefinition>>,
65
66    /// Last error received from server
67    last_error: Arc<RwLock<Option<ErrorMessage>>>,
68
69    /// Reconnect attempt counter
70    reconnect_attempts: Arc<AtomicU32>,
71
72    /// Max reconnect attempts (0 = unlimited)
73    max_reconnect_attempts: u32,
74
75    /// Flag to indicate intentional close (don't reconnect)
76    intentionally_closed: Arc<AtomicBool>,
77
78    /// Notify for triggering reconnect
79    reconnect_notify: Arc<Notify>,
80
81    /// P2P config (optional, feature-gated)
82    #[cfg(feature = "p2p")]
83    p2p_config: Option<P2PConfig>,
84
85    /// P2P manager (optional, feature-gated, created after connection)
86    #[cfg(feature = "p2p")]
87    p2p_manager: Option<Arc<p2p::P2PManager>>,
88}
89
90impl Clasp {
91    /// Create a new client (use builder for more options)
92    pub fn new(
93        url: &str,
94        name: String,
95        features: Vec<String>,
96        token: Option<String>,
97        reconnect: bool,
98        reconnect_interval_ms: u64,
99    ) -> Self {
100        Self {
101            url: url.to_string(),
102            name,
103            features,
104            token,
105            reconnect,
106            reconnect_interval_ms,
107            session_id: RwLock::new(None),
108            connected: Arc::new(RwLock::new(false)),
109            sender: RwLock::new(None),
110            params: Arc::new(DashMap::new()),
111            subscriptions: Arc::new(DashMap::new()),
112            next_sub_id: AtomicU32::new(1),
113            clock: RwLock::new(ClockSync::new()),
114            pending_gets: Arc::new(DashMap::new()),
115            signals: Arc::new(DashMap::new()),
116            last_error: Arc::new(RwLock::new(None)),
117            reconnect_attempts: Arc::new(AtomicU32::new(0)),
118            max_reconnect_attempts: 10,
119            intentionally_closed: Arc::new(AtomicBool::new(false)),
120            reconnect_notify: Arc::new(Notify::new()),
121            #[cfg(feature = "p2p")]
122            p2p_config: None,
123            #[cfg(feature = "p2p")]
124            p2p_manager: None,
125        }
126    }
127
128    /// Set P2P configuration (internal, called by builder)
129    #[cfg(feature = "p2p")]
130    pub(crate) fn set_p2p_config(&mut self, config: P2PConfig) {
131        self.p2p_config = Some(config);
132    }
133
134    /// Create a builder
135    pub fn builder(url: &str) -> ClaspBuilder {
136        ClaspBuilder::new(url)
137    }
138
139    /// Connect to server (convenience method)
140    pub async fn connect_to(url: &str) -> Result<Self> {
141        ClaspBuilder::new(url).connect().await
142    }
143
144    /// Internal connect
145    pub(crate) async fn do_connect(&mut self) -> Result<()> {
146        if *self.connected.read() {
147            return Err(ClientError::AlreadyConnected);
148        }
149
150        info!("Connecting to {}", self.url);
151
152        // Connect WebSocket
153        let (sender, mut receiver) = <WebSocketTransport as Transport>::connect(&self.url).await?;
154
155        // Create send channel
156        let (tx, mut rx) = mpsc::channel::<Bytes>(100);
157        *self.sender.write() = Some(tx);
158
159        let connected = self.connected.clone();
160
161        // Spawn sender task
162        let sender = Arc::new(sender);
163        let sender_clone = sender.clone();
164        tokio::spawn(async move {
165            while let Some(data) = rx.recv().await {
166                if let Err(e) = sender_clone.send(data).await {
167                    error!("Send error: {}", e);
168                    break;
169                }
170            }
171        });
172
173        // Send HELLO
174        let hello = Message::Hello(HelloMessage {
175            version: PROTOCOL_VERSION,
176            name: self.name.clone(),
177            features: self.features.clone(),
178            capabilities: None,
179            token: self.token.clone(),
180        });
181
182        self.send_message(&hello).await?;
183
184        // Wait for WELCOME
185        loop {
186            match receiver.recv().await {
187                Some(TransportEvent::Data(data)) => {
188                    match codec::decode(&data) {
189                        Ok((Message::Welcome(welcome), _)) => {
190                            *self.session_id.write() = Some(welcome.session.clone());
191                            *connected.write() = true;
192
193                            // Sync clock
194                            self.clock.write().process_sync(
195                                clasp_core::time::now(),
196                                welcome.time,
197                                welcome.time,
198                                clasp_core::time::now(),
199                            );
200
201                            // Initialize P2P manager if configured
202                            #[cfg(feature = "p2p")]
203                            {
204                                if let Some(p2p_config) = self.p2p_config.take() {
205                                    let session_id = welcome.session.clone();
206                                    // Create channel for P2P signaling
207                                    let (signal_tx, mut signal_rx) = mpsc::channel(100);
208                                    let p2p_manager =
209                                        Arc::new(p2p::P2PManager::new(p2p_config, signal_tx));
210                                    p2p_manager.set_session_id(session_id.clone());
211
212                                    // Spawn task to forward P2P signals through client
213                                    let sender = self.sender.read().clone();
214                                    let p2p_manager_for_task = Arc::clone(&p2p_manager);
215                                    if let Some(sender_tx) = sender {
216                                        tokio::spawn(async move {
217                                            while let Some(msg) = signal_rx.recv().await {
218                                                if let Some(encoded) = codec::encode(&msg).ok() {
219                                                    if let Err(e) =
220                                                        sender_tx.send(encoded.into()).await
221                                                    {
222                                                        tracing::warn!(
223                                                            "Failed to send P2P signal: {}",
224                                                            e
225                                                        );
226                                                        break;
227                                                    }
228                                                }
229                                            }
230                                        });
231                                    }
232
233                                    // Store P2P manager first
234                                    self.p2p_manager = Some(Arc::clone(&p2p_manager));
235
236                                    // Announce P2P capability
237                                    let _ = p2p_manager.announce().await;
238
239                                    // Set up P2P subscriptions (after manager is stored)
240                                    let _ = self.setup_p2p_subscriptions(&session_id).await;
241                                }
242                            }
243
244                            info!("Connected, session: {}", welcome.session);
245                            break;
246                        }
247                        Ok((msg, _)) => {
248                            debug!("Received during handshake: {:?}", msg);
249                        }
250                        Err(e) => {
251                            warn!("Decode error: {}", e);
252                        }
253                    }
254                }
255                Some(TransportEvent::Error(e)) => {
256                    return Err(ClientError::ConnectionFailed(e));
257                }
258                Some(TransportEvent::Disconnected { reason }) => {
259                    return Err(ClientError::ConnectionFailed(
260                        reason.unwrap_or_else(|| "Disconnected".to_string()),
261                    ));
262                }
263                None => {
264                    return Err(ClientError::ConnectionFailed(
265                        "Connection closed".to_string(),
266                    ));
267                }
268                _ => {}
269            }
270        }
271
272        // Reset reconnect state on successful connect
273        self.reconnect_attempts.store(0, Ordering::SeqCst);
274        self.intentionally_closed.store(false, Ordering::SeqCst);
275
276        // Spawn receiver task
277        let params = Arc::clone(&self.params);
278        let subscriptions = Arc::clone(&self.subscriptions);
279        let pending_gets = Arc::clone(&self.pending_gets);
280        let signals = Arc::clone(&self.signals);
281        let last_error = Arc::clone(&self.last_error);
282        let connected_clone = Arc::clone(&self.connected);
283        let reconnect_notify = Arc::clone(&self.reconnect_notify);
284        let intentionally_closed = Arc::clone(&self.intentionally_closed);
285        let reconnect_enabled = self.reconnect;
286        #[cfg(feature = "p2p")]
287        let p2p_manager = self.p2p_manager.clone();
288
289        tokio::spawn(async move {
290            while let Some(event) = receiver.recv().await {
291                match event {
292                    TransportEvent::Data(data) => {
293                        if let Ok((msg, _)) = codec::decode(&data) {
294                            #[cfg(feature = "p2p")]
295                            {
296                                // Forward P2P signals to P2P manager (handled in subscription callback)
297                                // P2P announce is also handled in subscription
298                            }
299                            handle_message(
300                                &msg,
301                                &params,
302                                &subscriptions,
303                                &pending_gets,
304                                &signals,
305                                &last_error,
306                            );
307                        }
308                    }
309                    TransportEvent::Disconnected { reason } => {
310                        info!("Disconnected: {:?}", reason);
311                        *connected_clone.write() = false;
312
313                        // Trigger reconnect if enabled and not intentionally closed
314                        if reconnect_enabled && !intentionally_closed.load(Ordering::SeqCst) {
315                            reconnect_notify.notify_one();
316                        }
317                        break;
318                    }
319                    TransportEvent::Error(e) => {
320                        error!("Error: {}", e);
321                    }
322                    _ => {}
323                }
324            }
325        });
326
327        Ok(())
328    }
329
330    /// Start the reconnect loop (call after initial connect)
331    pub fn start_reconnect_loop(self: &Arc<Self>) {
332        if !self.reconnect {
333            return;
334        }
335
336        let client = Arc::clone(self);
337        tokio::spawn(async move {
338            loop {
339                // Wait for disconnect notification
340                client.reconnect_notify.notified().await;
341
342                if client.intentionally_closed.load(Ordering::SeqCst) {
343                    break;
344                }
345
346                // Attempt reconnection with exponential backoff
347                loop {
348                    let attempts = client.reconnect_attempts.fetch_add(1, Ordering::SeqCst);
349
350                    if client.max_reconnect_attempts > 0
351                        && attempts >= client.max_reconnect_attempts
352                    {
353                        error!(
354                            "Max reconnect attempts ({}) reached",
355                            client.max_reconnect_attempts
356                        );
357                        break;
358                    }
359
360                    // Exponential backoff: base * 1.5^attempts, max 30 seconds
361                    let base_ms = client.reconnect_interval_ms;
362                    let delay_ms =
363                        (base_ms as f64 * 1.5_f64.powi(attempts as i32)).min(30000.0) as u64;
364
365                    info!("Reconnect attempt {} in {}ms", attempts + 1, delay_ms);
366                    tokio::time::sleep(Duration::from_millis(delay_ms)).await;
367
368                    if client.intentionally_closed.load(Ordering::SeqCst) {
369                        break;
370                    }
371
372                    // Clone the Arc and get a mutable reference for reconnection
373                    // We need to use unsafe or restructure - let's use a different approach
374                    match client.try_reconnect().await {
375                        Ok(()) => {
376                            info!("Reconnected successfully");
377                            client.reconnect_attempts.store(0, Ordering::SeqCst);
378
379                            // Resubscribe to all patterns
380                            if let Err(e) = client.resubscribe_all().await {
381                                warn!("Failed to resubscribe: {}", e);
382                            }
383                            break;
384                        }
385                        Err(e) => {
386                            warn!("Reconnect failed: {}", e);
387                        }
388                    }
389                }
390            }
391        });
392    }
393
394    /// Internal reconnect attempt
395    async fn try_reconnect(&self) -> Result<()> {
396        info!("Attempting to reconnect to {}", self.url);
397
398        // Connect WebSocket
399        let (sender, mut receiver) = <WebSocketTransport as Transport>::connect(&self.url).await?;
400
401        // Create send channel
402        let (tx, mut rx) = mpsc::channel::<Bytes>(100);
403        *self.sender.write() = Some(tx);
404
405        // Spawn sender task
406        let sender = Arc::new(sender);
407        let sender_clone = sender.clone();
408        tokio::spawn(async move {
409            while let Some(data) = rx.recv().await {
410                if let Err(e) = sender_clone.send(data).await {
411                    error!("Send error: {}", e);
412                    break;
413                }
414            }
415        });
416
417        // Send HELLO
418        let hello = Message::Hello(HelloMessage {
419            version: PROTOCOL_VERSION,
420            name: self.name.clone(),
421            features: self.features.clone(),
422            capabilities: None,
423            token: self.token.clone(),
424        });
425
426        self.send_message(&hello).await?;
427
428        // Wait for WELCOME with timeout
429        let welcome_timeout = Duration::from_secs(10);
430        let deadline = tokio::time::Instant::now() + welcome_timeout;
431
432        loop {
433            match tokio::time::timeout_at(deadline, receiver.recv()).await {
434                Ok(Some(TransportEvent::Data(data))) => match codec::decode(&data) {
435                    Ok((Message::Welcome(welcome), _)) => {
436                        *self.session_id.write() = Some(welcome.session.clone());
437                        *self.connected.write() = true;
438
439                        self.clock.write().process_sync(
440                            clasp_core::time::now(),
441                            welcome.time,
442                            welcome.time,
443                            clasp_core::time::now(),
444                        );
445
446                        info!("Reconnected, session: {}", welcome.session);
447                        break;
448                    }
449                    Ok((msg, _)) => {
450                        debug!("Received during reconnect handshake: {:?}", msg);
451                    }
452                    Err(e) => {
453                        warn!("Decode error during reconnect: {}", e);
454                    }
455                },
456                Ok(Some(TransportEvent::Error(e))) => {
457                    return Err(ClientError::ConnectionFailed(e));
458                }
459                Ok(Some(TransportEvent::Disconnected { reason })) => {
460                    return Err(ClientError::ConnectionFailed(
461                        reason.unwrap_or_else(|| "Disconnected".to_string()),
462                    ));
463                }
464                Ok(None) => {
465                    return Err(ClientError::ConnectionFailed(
466                        "Connection closed".to_string(),
467                    ));
468                }
469                Err(_) => {
470                    return Err(ClientError::Timeout);
471                }
472                _ => {}
473            }
474        }
475
476        // Spawn new receiver task
477        let params = Arc::clone(&self.params);
478        let subscriptions = Arc::clone(&self.subscriptions);
479        let pending_gets = Arc::clone(&self.pending_gets);
480        let signals = Arc::clone(&self.signals);
481        let last_error = Arc::clone(&self.last_error);
482        let connected_clone = Arc::clone(&self.connected);
483        let reconnect_notify = Arc::clone(&self.reconnect_notify);
484        let intentionally_closed = Arc::clone(&self.intentionally_closed);
485        let reconnect_enabled = self.reconnect;
486
487        tokio::spawn(async move {
488            while let Some(event) = receiver.recv().await {
489                match event {
490                    TransportEvent::Data(data) => {
491                        if let Ok((msg, _)) = codec::decode(&data) {
492                            handle_message(
493                                &msg,
494                                &params,
495                                &subscriptions,
496                                &pending_gets,
497                                &signals,
498                                &last_error,
499                            );
500                        }
501                    }
502                    TransportEvent::Disconnected { reason } => {
503                        info!("Disconnected: {:?}", reason);
504                        *connected_clone.write() = false;
505
506                        if reconnect_enabled && !intentionally_closed.load(Ordering::SeqCst) {
507                            reconnect_notify.notify_one();
508                        }
509                        break;
510                    }
511                    TransportEvent::Error(e) => {
512                        error!("Error: {}", e);
513                    }
514                    _ => {}
515                }
516            }
517        });
518
519        Ok(())
520    }
521
522    /// Resubscribe to all existing subscriptions after reconnect
523    async fn resubscribe_all(&self) -> Result<()> {
524        // Collect subscription info first to avoid lifetime issues with DashMap
525        let subs: Vec<(u32, String)> = self
526            .subscriptions
527            .iter()
528            .map(|entry| (*entry.key(), entry.value().0.clone()))
529            .collect();
530
531        for (id, pattern) in subs {
532            let msg = Message::Subscribe(SubscribeMessage {
533                id,
534                pattern: pattern.clone(),
535                types: vec![],
536                options: Some(SubscribeOptions::default()),
537            });
538
539            self.send_message(&msg).await?;
540            debug!("Resubscribed to {} (id: {})", pattern, id);
541        }
542
543        Ok(())
544    }
545
546    /// Check if connected
547    pub fn is_connected(&self) -> bool {
548        *self.connected.read()
549    }
550
551    /// Get session ID
552    pub fn session_id(&self) -> Option<String> {
553        self.session_id.read().clone()
554    }
555
556    /// Get current server time (microseconds)
557    pub fn time(&self) -> u64 {
558        self.clock.read().server_time()
559    }
560
561    /// Send a raw message
562    async fn send_message(&self, message: &Message) -> Result<()> {
563        let data = codec::encode(message)?;
564        self.send_raw(data).await
565    }
566
567    /// Send raw bytes
568    async fn send_raw(&self, data: Bytes) -> Result<()> {
569        // Clone the sender to avoid holding the lock across await
570        let tx = {
571            let sender = self.sender.read();
572            sender.as_ref().cloned()
573        };
574
575        if let Some(tx) = tx {
576            tx.send(data)
577                .await
578                .map_err(|e| ClientError::SendFailed(e.to_string()))?;
579            Ok(())
580        } else {
581            Err(ClientError::NotConnected)
582        }
583    }
584
585    /// Subscribe to an address pattern
586    pub async fn subscribe<F>(&self, pattern: &str, callback: F) -> Result<u32>
587    where
588        F: Fn(Value, &str) + Send + Sync + 'static,
589    {
590        let id = self.next_sub_id.fetch_add(1, Ordering::SeqCst);
591
592        // Store callback
593        self.subscriptions
594            .insert(id, (pattern.to_string(), Box::new(callback)));
595
596        // Send subscribe message
597        let msg = Message::Subscribe(SubscribeMessage {
598            id,
599            pattern: pattern.to_string(),
600            types: vec![],
601            options: Some(SubscribeOptions::default()),
602        });
603
604        self.send_message(&msg).await?;
605
606        debug!("Subscribed to {} (id: {})", pattern, id);
607        Ok(id)
608    }
609
610    /// Shorthand for subscribe
611    pub async fn on<F>(&self, pattern: &str, callback: F) -> Result<u32>
612    where
613        F: Fn(Value, &str) + Send + Sync + 'static,
614    {
615        self.subscribe(pattern, callback).await
616    }
617
618    /// Unsubscribe
619    pub async fn unsubscribe(&self, id: u32) -> Result<()> {
620        self.subscriptions.remove(&id);
621
622        let msg = Message::Unsubscribe(UnsubscribeMessage { id });
623        self.send_message(&msg).await?;
624
625        Ok(())
626    }
627
628    /// Set a parameter value
629    pub async fn set(&self, address: &str, value: impl Into<Value>) -> Result<()> {
630        let msg = Message::Set(SetMessage {
631            address: address.to_string(),
632            value: value.into(),
633            revision: None,
634            lock: false,
635            unlock: false,
636            ttl: None,
637        });
638
639        self.send_message(&msg).await
640    }
641
642    /// Set with lock
643    pub async fn set_locked(&self, address: &str, value: impl Into<Value>) -> Result<()> {
644        let msg = Message::Set(SetMessage {
645            address: address.to_string(),
646            value: value.into(),
647            revision: None,
648            lock: true,
649            unlock: false,
650            ttl: None,
651        });
652
653        self.send_message(&msg).await
654    }
655
656    /// Set and unlock (release a previously held lock)
657    pub async fn set_unlocked(&self, address: &str, value: impl Into<Value>) -> Result<()> {
658        let msg = Message::Set(SetMessage {
659            address: address.to_string(),
660            value: value.into(),
661            revision: None,
662            lock: false,
663            unlock: true,
664            ttl: None,
665        });
666
667        self.send_message(&msg).await
668    }
669
670    /// Set a parameter value with a per-message TTL
671    pub async fn set_with_ttl(&self, address: &str, value: impl Into<Value>, ttl: clasp_core::Ttl) -> Result<()> {
672        let msg = Message::Set(SetMessage {
673            address: address.to_string(),
674            value: value.into(),
675            revision: None,
676            lock: false,
677            unlock: false,
678            ttl: Some(ttl),
679        });
680
681        self.send_message(&msg).await
682    }
683
684    /// Get current value (cached or request)
685    pub async fn get(&self, address: &str) -> Result<Value> {
686        // Check cache first
687        if let Some(value) = self.params.get(address) {
688            return Ok(value.clone());
689        }
690
691        // Request from server
692        let (tx, rx) = oneshot::channel();
693        let address_key = address.to_string();
694        self.pending_gets.insert(address_key.clone(), tx);
695
696        let msg = Message::Get(GetMessage {
697            address: address.to_string(),
698        });
699        self.send_message(&msg).await?;
700
701        // Wait for response (with timeout)
702        match tokio::time::timeout(std::time::Duration::from_secs(5), rx).await {
703            Ok(Ok(value)) => Ok(value),
704            Ok(Err(_)) => {
705                // Cancelled - remove from pending
706                self.pending_gets.remove(&address_key);
707                Err(ClientError::Other("Get cancelled".to_string()))
708            }
709            Err(_) => {
710                // Timeout - remove from pending to prevent memory leak
711                self.pending_gets.remove(&address_key);
712                Err(ClientError::Timeout)
713            }
714        }
715    }
716
717    /// Emit an event
718    pub async fn emit(&self, address: &str, payload: impl Into<Value>) -> Result<()> {
719        let msg = Message::Publish(PublishMessage {
720            address: address.to_string(),
721            signal: Some(SignalType::Event),
722            value: None,
723            payload: Some(payload.into()),
724            samples: None,
725            rate: None,
726            id: None,
727            phase: None,
728            timestamp: Some(self.time()),
729            timeline: None,
730        });
731
732        self.send_message(&msg).await
733    }
734
735    /// Send stream sample
736    pub async fn stream(&self, address: &str, value: impl Into<Value>) -> Result<()> {
737        let msg = Message::Publish(PublishMessage {
738            address: address.to_string(),
739            signal: Some(SignalType::Stream),
740            value: Some(value.into()),
741            payload: None,
742            samples: None,
743            rate: None,
744            id: None,
745            phase: None,
746            timestamp: Some(self.time()),
747            timeline: None,
748        });
749
750        self.send_message(&msg).await
751    }
752
753    /// Send gesture input
754    ///
755    /// Gestures are phased input streams for touch/pen/motion input.
756    /// Each gesture has a stable ID and goes through phases:
757    /// - `Start`: Begin a new gesture
758    /// - `Move`: Update position/state (may be coalesced by router)
759    /// - `End`: Complete the gesture normally
760    /// - `Cancel`: Abort the gesture
761    ///
762    /// # Example
763    /// ```ignore
764    /// // Start a touch gesture
765    /// client.gesture("/input/touch", 1, GesturePhase::Start, json!({"x": 0.5, "y": 0.3})).await?;
766    ///
767    /// // Move updates
768    /// client.gesture("/input/touch", 1, GesturePhase::Move, json!({"x": 0.6, "y": 0.4})).await?;
769    ///
770    /// // End the gesture
771    /// client.gesture("/input/touch", 1, GesturePhase::End, json!({"x": 0.7, "y": 0.5})).await?;
772    /// ```
773    pub async fn gesture(
774        &self,
775        address: &str,
776        id: u32,
777        phase: GesturePhase,
778        payload: impl Into<Value>,
779    ) -> Result<()> {
780        let msg = Message::Publish(PublishMessage {
781            address: address.to_string(),
782            signal: Some(SignalType::Gesture),
783            value: None,
784            payload: Some(payload.into()),
785            samples: None,
786            rate: None,
787            id: Some(id),
788            phase: Some(phase),
789            timestamp: Some(self.time()),
790            timeline: None,
791        });
792
793        self.send_message(&msg).await
794    }
795
796    /// Publish timeline automation
797    ///
798    /// Timelines are pre-computed automation curves with keyframes.
799    /// Once published, timelines are immutable - to modify, publish a new one.
800    ///
801    /// # Arguments
802    /// * `address` - The parameter address this timeline controls
803    /// * `timeline` - The timeline data with keyframes
804    ///
805    /// # Example
806    /// ```ignore
807    /// use clasp_core::{TimelineData, TimelineKeyframe, EasingType, Value};
808    ///
809    /// let timeline = TimelineData::new(vec![
810    ///     TimelineKeyframe { time: 0, value: Value::Float(0.0), easing: EasingType::Linear, bezier: None },
811    ///     TimelineKeyframe { time: 1_000_000, value: Value::Float(1.0), easing: EasingType::EaseOut, bezier: None },
812    /// ]);
813    ///
814    /// client.timeline("/lights/master/dimmer", timeline).await?;
815    /// ```
816    pub async fn timeline(&self, address: &str, timeline_data: TimelineData) -> Result<()> {
817        let msg = Message::Publish(PublishMessage {
818            address: address.to_string(),
819            signal: Some(SignalType::Timeline),
820            value: None,
821            payload: None,
822            samples: None,
823            rate: None,
824            id: None,
825            phase: None,
826            timestamp: Some(self.time()),
827            timeline: Some(timeline_data),
828        });
829
830        self.send_message(&msg).await
831    }
832
833    /// Send atomic bundle
834    pub async fn bundle(&self, messages: Vec<Message>) -> Result<()> {
835        let msg = Message::Bundle(BundleMessage {
836            timestamp: None,
837            messages,
838        });
839
840        self.send_message(&msg).await
841    }
842
843    /// Send scheduled bundle
844    pub async fn bundle_at(&self, messages: Vec<Message>, time: u64) -> Result<()> {
845        let msg = Message::Bundle(BundleMessage {
846            timestamp: Some(time),
847            messages,
848        });
849
850        self.send_message(&msg).await
851    }
852
853    /// Get cached param value
854    pub fn cached(&self, address: &str) -> Option<Value> {
855        self.params.get(address).map(|v| v.clone())
856    }
857
858    /// Close connection.
859    /// Disables auto-reconnect and closes the connection.
860    pub async fn close(&self) {
861        self.intentionally_closed.store(true, Ordering::SeqCst);
862        *self.connected.write() = false;
863        *self.sender.write() = None;
864    }
865
866    /// Get all announced signals
867    pub fn signals(&self) -> Vec<SignalDefinition> {
868        self.signals.iter().map(|e| e.value().clone()).collect()
869    }
870
871    /// Query signals matching a pattern
872    pub fn query_signals(&self, pattern: &str) -> Vec<SignalDefinition> {
873        self.signals
874            .iter()
875            .filter(|e| clasp_core::address::glob_match(pattern, e.key()))
876            .map(|e| e.value().clone())
877            .collect()
878    }
879
880    /// Get the last error received from server
881    pub fn last_error(&self) -> Option<ErrorMessage> {
882        self.last_error.read().clone()
883    }
884
885    /// Clear the last error
886    pub fn clear_error(&self) {
887        *self.last_error.write() = None;
888    }
889
890    /// Set up P2P subscriptions (internal, called after P2P manager is created)
891    #[cfg(feature = "p2p")]
892    async fn setup_p2p_subscriptions(&self, session_id: &str) -> Result<()> {
893        if let Some(ref p2p_manager) = self.p2p_manager {
894            let signal_address = format!("{}{}", P2P_SIGNAL_PREFIX, session_id);
895            let p2p_manager_signal = Arc::clone(p2p_manager);
896
897            // Subscribe to P2P signals
898            let _ = self
899                .subscribe(&signal_address, move |value, address| {
900                    let p2p = Arc::clone(&p2p_manager_signal);
901                    let address = address.to_string(); // Clone the address string
902                    tokio::spawn(async move {
903                        if let Err(e) = p2p.handle_signal(&address, &value).await {
904                            tracing::debug!("P2P signal handling error: {}", e);
905                        }
906                    });
907                })
908                .await?;
909
910            // Subscribe to P2P announce
911            let p2p_manager_announce = Arc::clone(p2p_manager);
912            let _ = self
913                .subscribe(clasp_core::P2P_ANNOUNCE, move |value, _| {
914                    p2p_manager_announce.handle_announce(&value);
915                })
916                .await?;
917        }
918        Ok(())
919    }
920
921    /// Connect to a peer via P2P (requires p2p feature)
922    #[cfg(feature = "p2p")]
923    pub async fn connect_to_peer(&self, peer_session_id: &str) -> Result<()> {
924        if let Some(ref p2p_manager) = self.p2p_manager {
925            p2p_manager.connect_to_peer(peer_session_id).await
926        } else {
927            Err(ClientError::Other(
928                "P2P not configured. Use builder.p2p_config() to enable.".to_string(),
929            ))
930        }
931    }
932
933    /// Set P2P event callback (requires p2p feature)
934    #[cfg(feature = "p2p")]
935    pub fn on_p2p_event<F>(&self, callback: F)
936    where
937        F: Fn(p2p::P2PEvent) + Send + Sync + 'static,
938    {
939        if let Some(ref p2p_manager) = self.p2p_manager {
940            p2p_manager.on_event(callback);
941        }
942    }
943
944    /// Check if peer is connected via P2P (requires p2p feature)
945    #[cfg(feature = "p2p")]
946    pub fn is_peer_connected(&self, peer_session_id: &str) -> bool {
947        self.p2p_manager
948            .as_ref()
949            .map(|p2p| p2p.is_peer_connected(peer_session_id))
950            .unwrap_or(false)
951    }
952
953    /// Send data to a peer via P2P (requires p2p feature)
954    ///
955    /// Returns `SendResult::P2P` if sent via direct P2P, `SendResult::Relay` if sent via server.
956    /// Behavior depends on the current routing mode and connection state.
957    #[cfg(feature = "p2p")]
958    pub async fn send_p2p(
959        &self,
960        peer_session_id: &str,
961        data: bytes::Bytes,
962        reliable: bool,
963    ) -> Result<p2p::SendResult> {
964        if let Some(ref p2p_manager) = self.p2p_manager {
965            p2p_manager
966                .send_to_peer(peer_session_id, data, reliable)
967                .await
968        } else {
969            Err(ClientError::Other(
970                "P2P not configured. Use builder.p2p_config() to enable.".to_string(),
971            ))
972        }
973    }
974
975    /// Set P2P routing mode (requires p2p feature)
976    ///
977    /// - `RoutingMode::PreferP2P` (default): Try P2P first, fall back to relay
978    /// - `RoutingMode::P2POnly`: Only use P2P, fail if unavailable
979    /// - `RoutingMode::ServerOnly`: Never use P2P, always relay
980    #[cfg(feature = "p2p")]
981    pub fn set_p2p_routing_mode(&self, mode: clasp_core::p2p::RoutingMode) {
982        if let Some(ref p2p_manager) = self.p2p_manager {
983            p2p_manager.set_routing_mode(mode);
984        }
985    }
986
987    /// Get current P2P routing mode (requires p2p feature)
988    #[cfg(feature = "p2p")]
989    pub fn p2p_routing_mode(&self) -> clasp_core::p2p::RoutingMode {
990        self.p2p_manager
991            .as_ref()
992            .map(|p2p| p2p.routing_mode())
993            .unwrap_or_default()
994    }
995}
996
997/// Handle incoming message
998fn handle_message(
999    msg: &Message,
1000    params: &Arc<DashMap<String, Value>>,
1001    subscriptions: &Arc<DashMap<u32, (String, SubscriptionCallback)>>,
1002    pending_gets: &Arc<DashMap<String, oneshot::Sender<Value>>>,
1003    signals: &Arc<DashMap<String, SignalDefinition>>,
1004    last_error: &Arc<RwLock<Option<ErrorMessage>>>,
1005) {
1006    match msg {
1007        Message::Set(set) => {
1008            // Update cache
1009            params.insert(set.address.clone(), set.value.clone());
1010
1011            // Notify subscribers
1012            for entry in subscriptions.iter() {
1013                let (pattern, callback) = entry.value();
1014                if clasp_core::address::glob_match(pattern, &set.address) {
1015                    callback(set.value.clone(), &set.address);
1016                }
1017            }
1018        }
1019
1020        Message::Snapshot(snapshot) => {
1021            for param in &snapshot.params {
1022                params.insert(param.address.clone(), param.value.clone());
1023
1024                // Complete pending gets
1025                if let Some((_, tx)) = pending_gets.remove(&param.address) {
1026                    let _ = tx.send(param.value.clone());
1027                }
1028
1029                // Notify subscribers
1030                for entry in subscriptions.iter() {
1031                    let (pattern, callback) = entry.value();
1032                    if clasp_core::address::glob_match(pattern, &param.address) {
1033                        callback(param.value.clone(), &param.address);
1034                    }
1035                }
1036            }
1037        }
1038
1039        Message::Publish(pub_msg) => {
1040            #[cfg(feature = "p2p")]
1041            {
1042                // Check if this is a P2P signal or announce - handle before regular subscriptions
1043                if pub_msg.address.starts_with(clasp_core::P2P_SIGNAL_PREFIX) {
1044                    // P2P signals will be handled by subscription callbacks
1045                    // (they're subscribed to automatically when P2P manager is created)
1046                } else if pub_msg.address == clasp_core::P2P_ANNOUNCE {
1047                    // P2P announce will be handled by subscription callback
1048                }
1049            }
1050
1051            // Notify subscribers
1052            let value = pub_msg
1053                .value
1054                .clone()
1055                .or_else(|| pub_msg.payload.clone())
1056                .unwrap_or(Value::Null);
1057
1058            for entry in subscriptions.iter() {
1059                let (pattern, callback) = entry.value();
1060                if clasp_core::address::glob_match(pattern, &pub_msg.address) {
1061                    callback(value.clone(), &pub_msg.address);
1062                }
1063            }
1064        }
1065
1066        Message::Error(error) => {
1067            // Log the error and store it for retrieval
1068            warn!(
1069                "Server error {}: {} (address: {:?})",
1070                error.code, error.message, error.address
1071            );
1072            *last_error.write() = Some(error.clone());
1073        }
1074
1075        Message::Ack(ack) => {
1076            // Log acknowledgment (could be extended to track pending requests)
1077            debug!(
1078                "Received ACK for {:?} (revision: {:?})",
1079                ack.address, ack.revision
1080            );
1081        }
1082
1083        Message::Announce(announce) => {
1084            // Store announced signals
1085            for signal in &announce.signals {
1086                debug!("Received signal announcement: {}", signal.address);
1087                signals.insert(signal.address.clone(), signal.clone());
1088            }
1089        }
1090
1091        Message::Sync(sync) => {
1092            // Process clock sync response
1093            // Note: This handles sync messages from server with t2/t3 filled in
1094            if let (Some(t2), Some(t3)) = (sync.t2, sync.t3) {
1095                debug!("Clock sync: t1={}, t2={}, t3={}", sync.t1, t2, t3);
1096                // Clock sync is processed through ClockSync::process_sync
1097                // but we don't have access to the clock here.
1098                // For now, log it. A more complete implementation would
1099                // use a channel to send sync data back to the main client.
1100            }
1101        }
1102
1103        Message::Result(result) => {
1104            // Handle query results
1105            debug!("Received result with {} signals", result.signals.len());
1106            // Store any returned signals
1107            for signal in &result.signals {
1108                signals.insert(signal.address.clone(), signal.clone());
1109            }
1110        }
1111
1112        // Messages that are typically client-initiated, not expected from server
1113        Message::Hello(_)
1114        | Message::Welcome(_)
1115        | Message::Subscribe(_)
1116        | Message::Unsubscribe(_)
1117        | Message::Get(_)
1118        | Message::Query(_)
1119        | Message::Replay(_)
1120        | Message::FederationSync(_) => {
1121            debug!("Received unexpected client-type message: {:?}", msg);
1122        }
1123
1124        // Bundle: process contained messages recursively
1125        Message::Bundle(bundle) => {
1126            for inner_msg in &bundle.messages {
1127                handle_message(
1128                    inner_msg,
1129                    params,
1130                    subscriptions,
1131                    pending_gets,
1132                    signals,
1133                    last_error,
1134                );
1135            }
1136        }
1137
1138        // Ping/Pong for keep-alive
1139        Message::Ping => {
1140            debug!("Received PING from server");
1141            // Note: Pong response should be sent, but we don't have sender access here.
1142            // A more complete implementation would use a channel to request pong be sent.
1143        }
1144
1145        Message::Pong => {
1146            debug!("Received PONG from server");
1147        }
1148    }
1149}