Skip to main content

clasp_client/
client.rs

1//! Main Clasp client implementation
2
3use bytes::Bytes;
4use clasp_core::{
5    codec, time::ClockSync, BundleMessage, ErrorMessage, GesturePhase, GetMessage, HelloMessage,
6    Message, PublishMessage, SetMessage, SignalDefinition, SignalType, SubscribeMessage,
7    SubscribeOptions, TimelineData, UnsubscribeMessage, Value, PROTOCOL_VERSION,
8};
9use clasp_transport::{
10    Transport, TransportEvent, TransportReceiver, TransportSender, WebSocketTransport,
11};
12use dashmap::DashMap;
13use parking_lot::RwLock;
14use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::{mpsc, oneshot, Notify};
18use tracing::{debug, error, info, warn};
19
20use crate::builder::ClaspBuilder;
21use crate::error::{ClientError, Result};
22#[cfg(feature = "p2p")]
23use crate::p2p;
24#[cfg(feature = "p2p")]
25use clasp_core::{P2PConfig, P2P_SIGNAL_PREFIX};
26
27/// Subscription callback type
28pub type SubscriptionCallback = Box<dyn Fn(Value, &str) + Send + Sync>;
29
30/// A Clasp client
31pub struct Clasp {
32    url: String,
33    name: String,
34    features: Vec<String>,
35    token: Option<String>,
36    reconnect: bool,
37    reconnect_interval_ms: u64,
38
39    /// Session ID (set after connect)
40    session_id: RwLock<Option<String>>,
41
42    /// Connection state
43    connected: Arc<RwLock<bool>>,
44
45    /// Sender for outgoing messages
46    sender: RwLock<Option<mpsc::Sender<Bytes>>>,
47
48    /// Local param cache
49    params: Arc<DashMap<String, Value>>,
50
51    /// Subscriptions
52    subscriptions: Arc<DashMap<u32, (String, SubscriptionCallback)>>,
53
54    /// Subscription ID counter
55    next_sub_id: AtomicU32,
56
57    /// Clock synchronization
58    clock: RwLock<ClockSync>,
59
60    /// Pending get requests
61    pending_gets: Arc<DashMap<String, oneshot::Sender<Value>>>,
62
63    /// Announced signals (from server)
64    signals: Arc<DashMap<String, SignalDefinition>>,
65
66    /// Last error received from server
67    last_error: Arc<RwLock<Option<ErrorMessage>>>,
68
69    /// Reconnect attempt counter
70    reconnect_attempts: Arc<AtomicU32>,
71
72    /// Max reconnect attempts (0 = unlimited)
73    max_reconnect_attempts: u32,
74
75    /// Flag to indicate intentional close (don't reconnect)
76    intentionally_closed: Arc<AtomicBool>,
77
78    /// Notify for triggering reconnect
79    reconnect_notify: Arc<Notify>,
80
81    /// P2P config (optional, feature-gated)
82    #[cfg(feature = "p2p")]
83    p2p_config: Option<P2PConfig>,
84
85    /// P2P manager (optional, feature-gated, created after connection)
86    #[cfg(feature = "p2p")]
87    p2p_manager: Option<Arc<p2p::P2PManager>>,
88}
89
90impl Clasp {
91    /// Create a new client (use builder for more options)
92    pub fn new(
93        url: &str,
94        name: String,
95        features: Vec<String>,
96        token: Option<String>,
97        reconnect: bool,
98        reconnect_interval_ms: u64,
99    ) -> Self {
100        Self {
101            url: url.to_string(),
102            name,
103            features,
104            token,
105            reconnect,
106            reconnect_interval_ms,
107            session_id: RwLock::new(None),
108            connected: Arc::new(RwLock::new(false)),
109            sender: RwLock::new(None),
110            params: Arc::new(DashMap::new()),
111            subscriptions: Arc::new(DashMap::new()),
112            next_sub_id: AtomicU32::new(1),
113            clock: RwLock::new(ClockSync::new()),
114            pending_gets: Arc::new(DashMap::new()),
115            signals: Arc::new(DashMap::new()),
116            last_error: Arc::new(RwLock::new(None)),
117            reconnect_attempts: Arc::new(AtomicU32::new(0)),
118            max_reconnect_attempts: 10,
119            intentionally_closed: Arc::new(AtomicBool::new(false)),
120            reconnect_notify: Arc::new(Notify::new()),
121            #[cfg(feature = "p2p")]
122            p2p_config: None,
123            #[cfg(feature = "p2p")]
124            p2p_manager: None,
125        }
126    }
127
128    /// Set P2P configuration (internal, called by builder)
129    #[cfg(feature = "p2p")]
130    pub(crate) fn set_p2p_config(&mut self, config: P2PConfig) {
131        self.p2p_config = Some(config);
132    }
133
134    /// Create a builder
135    pub fn builder(url: &str) -> ClaspBuilder {
136        ClaspBuilder::new(url)
137    }
138
139    /// Connect to server (convenience method)
140    pub async fn connect_to(url: &str) -> Result<Self> {
141        ClaspBuilder::new(url).connect().await
142    }
143
144    /// Internal connect
145    pub(crate) async fn do_connect(&mut self) -> Result<()> {
146        if *self.connected.read() {
147            return Err(ClientError::AlreadyConnected);
148        }
149
150        info!("Connecting to {}", self.url);
151
152        // Connect WebSocket
153        let (sender, mut receiver) = <WebSocketTransport as Transport>::connect(&self.url).await?;
154
155        // Create send channel
156        let (tx, mut rx) = mpsc::channel::<Bytes>(100);
157        *self.sender.write() = Some(tx);
158
159        let connected = self.connected.clone();
160
161        // Spawn sender task
162        let sender = Arc::new(sender);
163        let sender_clone = sender.clone();
164        tokio::spawn(async move {
165            while let Some(data) = rx.recv().await {
166                if let Err(e) = sender_clone.send(data).await {
167                    error!("Send error: {}", e);
168                    break;
169                }
170            }
171        });
172
173        // Send HELLO
174        let hello = Message::Hello(HelloMessage {
175            version: PROTOCOL_VERSION,
176            name: self.name.clone(),
177            features: self.features.clone(),
178            capabilities: None,
179            token: self.token.clone(),
180        });
181
182        self.send_message(&hello).await?;
183
184        // Wait for WELCOME
185        loop {
186            match receiver.recv().await {
187                Some(TransportEvent::Data(data)) => {
188                    match codec::decode(&data) {
189                        Ok((Message::Welcome(welcome), _)) => {
190                            *self.session_id.write() = Some(welcome.session.clone());
191                            *connected.write() = true;
192
193                            // Sync clock
194                            self.clock.write().process_sync(
195                                clasp_core::time::now(),
196                                welcome.time,
197                                welcome.time,
198                                clasp_core::time::now(),
199                            );
200
201                            // Initialize P2P manager if configured
202                            #[cfg(feature = "p2p")]
203                            {
204                                if let Some(p2p_config) = self.p2p_config.take() {
205                                    let session_id = welcome.session.clone();
206                                    // Create channel for P2P signaling
207                                    let (signal_tx, mut signal_rx) = mpsc::channel(100);
208                                    let p2p_manager =
209                                        Arc::new(p2p::P2PManager::new(p2p_config, signal_tx));
210                                    p2p_manager.set_session_id(session_id.clone());
211
212                                    // Spawn task to forward P2P signals through client
213                                    let sender = self.sender.read().clone();
214                                    let p2p_manager_for_task = Arc::clone(&p2p_manager);
215                                    if let Some(sender_tx) = sender {
216                                        tokio::spawn(async move {
217                                            while let Some(msg) = signal_rx.recv().await {
218                                                if let Some(encoded) = codec::encode(&msg).ok() {
219                                                    if let Err(e) =
220                                                        sender_tx.send(encoded.into()).await
221                                                    {
222                                                        tracing::warn!(
223                                                            "Failed to send P2P signal: {}",
224                                                            e
225                                                        );
226                                                        break;
227                                                    }
228                                                }
229                                            }
230                                        });
231                                    }
232
233                                    // Store P2P manager first
234                                    self.p2p_manager = Some(Arc::clone(&p2p_manager));
235
236                                    // Announce P2P capability
237                                    let _ = p2p_manager.announce().await;
238
239                                    // Set up P2P subscriptions (after manager is stored)
240                                    let _ = self.setup_p2p_subscriptions(&session_id).await;
241                                }
242                            }
243
244                            info!("Connected, session: {}", welcome.session);
245                            break;
246                        }
247                        Ok((msg, _)) => {
248                            debug!("Received during handshake: {:?}", msg);
249                        }
250                        Err(e) => {
251                            warn!("Decode error: {}", e);
252                        }
253                    }
254                }
255                Some(TransportEvent::Error(e)) => {
256                    return Err(ClientError::ConnectionFailed(e));
257                }
258                Some(TransportEvent::Disconnected { reason }) => {
259                    return Err(ClientError::ConnectionFailed(
260                        reason.unwrap_or_else(|| "Disconnected".to_string()),
261                    ));
262                }
263                None => {
264                    return Err(ClientError::ConnectionFailed(
265                        "Connection closed".to_string(),
266                    ));
267                }
268                _ => {}
269            }
270        }
271
272        // Reset reconnect state on successful connect
273        self.reconnect_attempts.store(0, Ordering::SeqCst);
274        self.intentionally_closed.store(false, Ordering::SeqCst);
275
276        // Spawn receiver task
277        let params = Arc::clone(&self.params);
278        let subscriptions = Arc::clone(&self.subscriptions);
279        let pending_gets = Arc::clone(&self.pending_gets);
280        let signals = Arc::clone(&self.signals);
281        let last_error = Arc::clone(&self.last_error);
282        let connected_clone = Arc::clone(&self.connected);
283        let reconnect_notify = Arc::clone(&self.reconnect_notify);
284        let intentionally_closed = Arc::clone(&self.intentionally_closed);
285        let reconnect_enabled = self.reconnect;
286        #[cfg(feature = "p2p")]
287        let p2p_manager = self.p2p_manager.clone();
288
289        tokio::spawn(async move {
290            while let Some(event) = receiver.recv().await {
291                match event {
292                    TransportEvent::Data(data) => {
293                        if let Ok((msg, _)) = codec::decode(&data) {
294                            #[cfg(feature = "p2p")]
295                            {
296                                // Forward P2P signals to P2P manager (handled in subscription callback)
297                                // P2P announce is also handled in subscription
298                            }
299                            handle_message(
300                                &msg,
301                                &params,
302                                &subscriptions,
303                                &pending_gets,
304                                &signals,
305                                &last_error,
306                            );
307                        }
308                    }
309                    TransportEvent::Disconnected { reason } => {
310                        info!("Disconnected: {:?}", reason);
311                        *connected_clone.write() = false;
312
313                        // Trigger reconnect if enabled and not intentionally closed
314                        if reconnect_enabled && !intentionally_closed.load(Ordering::SeqCst) {
315                            reconnect_notify.notify_one();
316                        }
317                        break;
318                    }
319                    TransportEvent::Error(e) => {
320                        error!("Error: {}", e);
321                    }
322                    _ => {}
323                }
324            }
325        });
326
327        Ok(())
328    }
329
330    /// Start the reconnect loop (call after initial connect)
331    pub fn start_reconnect_loop(self: &Arc<Self>) {
332        if !self.reconnect {
333            return;
334        }
335
336        let client = Arc::clone(self);
337        tokio::spawn(async move {
338            loop {
339                // Wait for disconnect notification
340                client.reconnect_notify.notified().await;
341
342                if client.intentionally_closed.load(Ordering::SeqCst) {
343                    break;
344                }
345
346                // Attempt reconnection with exponential backoff
347                loop {
348                    let attempts = client.reconnect_attempts.fetch_add(1, Ordering::SeqCst);
349
350                    if client.max_reconnect_attempts > 0
351                        && attempts >= client.max_reconnect_attempts
352                    {
353                        error!(
354                            "Max reconnect attempts ({}) reached",
355                            client.max_reconnect_attempts
356                        );
357                        break;
358                    }
359
360                    // Exponential backoff: base * 1.5^attempts, max 30 seconds
361                    let base_ms = client.reconnect_interval_ms;
362                    let delay_ms =
363                        (base_ms as f64 * 1.5_f64.powi(attempts as i32)).min(30000.0) as u64;
364
365                    info!("Reconnect attempt {} in {}ms", attempts + 1, delay_ms);
366                    tokio::time::sleep(Duration::from_millis(delay_ms)).await;
367
368                    if client.intentionally_closed.load(Ordering::SeqCst) {
369                        break;
370                    }
371
372                    // Clone the Arc and get a mutable reference for reconnection
373                    // We need to use unsafe or restructure - let's use a different approach
374                    match client.try_reconnect().await {
375                        Ok(()) => {
376                            info!("Reconnected successfully");
377                            client.reconnect_attempts.store(0, Ordering::SeqCst);
378
379                            // Resubscribe to all patterns
380                            if let Err(e) = client.resubscribe_all().await {
381                                warn!("Failed to resubscribe: {}", e);
382                            }
383                            break;
384                        }
385                        Err(e) => {
386                            warn!("Reconnect failed: {}", e);
387                        }
388                    }
389                }
390            }
391        });
392    }
393
394    /// Internal reconnect attempt
395    async fn try_reconnect(&self) -> Result<()> {
396        info!("Attempting to reconnect to {}", self.url);
397
398        // Connect WebSocket
399        let (sender, mut receiver) = <WebSocketTransport as Transport>::connect(&self.url).await?;
400
401        // Create send channel
402        let (tx, mut rx) = mpsc::channel::<Bytes>(100);
403        *self.sender.write() = Some(tx);
404
405        // Spawn sender task
406        let sender = Arc::new(sender);
407        let sender_clone = sender.clone();
408        tokio::spawn(async move {
409            while let Some(data) = rx.recv().await {
410                if let Err(e) = sender_clone.send(data).await {
411                    error!("Send error: {}", e);
412                    break;
413                }
414            }
415        });
416
417        // Send HELLO
418        let hello = Message::Hello(HelloMessage {
419            version: PROTOCOL_VERSION,
420            name: self.name.clone(),
421            features: self.features.clone(),
422            capabilities: None,
423            token: self.token.clone(),
424        });
425
426        self.send_message(&hello).await?;
427
428        // Wait for WELCOME with timeout
429        let welcome_timeout = Duration::from_secs(10);
430        let deadline = tokio::time::Instant::now() + welcome_timeout;
431
432        loop {
433            match tokio::time::timeout_at(deadline, receiver.recv()).await {
434                Ok(Some(TransportEvent::Data(data))) => match codec::decode(&data) {
435                    Ok((Message::Welcome(welcome), _)) => {
436                        *self.session_id.write() = Some(welcome.session.clone());
437                        *self.connected.write() = true;
438
439                        self.clock.write().process_sync(
440                            clasp_core::time::now(),
441                            welcome.time,
442                            welcome.time,
443                            clasp_core::time::now(),
444                        );
445
446                        info!("Reconnected, session: {}", welcome.session);
447                        break;
448                    }
449                    Ok((msg, _)) => {
450                        debug!("Received during reconnect handshake: {:?}", msg);
451                    }
452                    Err(e) => {
453                        warn!("Decode error during reconnect: {}", e);
454                    }
455                },
456                Ok(Some(TransportEvent::Error(e))) => {
457                    return Err(ClientError::ConnectionFailed(e));
458                }
459                Ok(Some(TransportEvent::Disconnected { reason })) => {
460                    return Err(ClientError::ConnectionFailed(
461                        reason.unwrap_or_else(|| "Disconnected".to_string()),
462                    ));
463                }
464                Ok(None) => {
465                    return Err(ClientError::ConnectionFailed(
466                        "Connection closed".to_string(),
467                    ));
468                }
469                Err(_) => {
470                    return Err(ClientError::Timeout);
471                }
472                _ => {}
473            }
474        }
475
476        // Spawn new receiver task
477        let params = Arc::clone(&self.params);
478        let subscriptions = Arc::clone(&self.subscriptions);
479        let pending_gets = Arc::clone(&self.pending_gets);
480        let signals = Arc::clone(&self.signals);
481        let last_error = Arc::clone(&self.last_error);
482        let connected_clone = Arc::clone(&self.connected);
483        let reconnect_notify = Arc::clone(&self.reconnect_notify);
484        let intentionally_closed = Arc::clone(&self.intentionally_closed);
485        let reconnect_enabled = self.reconnect;
486
487        tokio::spawn(async move {
488            while let Some(event) = receiver.recv().await {
489                match event {
490                    TransportEvent::Data(data) => {
491                        if let Ok((msg, _)) = codec::decode(&data) {
492                            handle_message(
493                                &msg,
494                                &params,
495                                &subscriptions,
496                                &pending_gets,
497                                &signals,
498                                &last_error,
499                            );
500                        }
501                    }
502                    TransportEvent::Disconnected { reason } => {
503                        info!("Disconnected: {:?}", reason);
504                        *connected_clone.write() = false;
505
506                        if reconnect_enabled && !intentionally_closed.load(Ordering::SeqCst) {
507                            reconnect_notify.notify_one();
508                        }
509                        break;
510                    }
511                    TransportEvent::Error(e) => {
512                        error!("Error: {}", e);
513                    }
514                    _ => {}
515                }
516            }
517        });
518
519        Ok(())
520    }
521
522    /// Resubscribe to all existing subscriptions after reconnect
523    async fn resubscribe_all(&self) -> Result<()> {
524        // Collect subscription info first to avoid lifetime issues with DashMap
525        let subs: Vec<(u32, String)> = self
526            .subscriptions
527            .iter()
528            .map(|entry| (*entry.key(), entry.value().0.clone()))
529            .collect();
530
531        for (id, pattern) in subs {
532            let msg = Message::Subscribe(SubscribeMessage {
533                id,
534                pattern: pattern.clone(),
535                types: vec![],
536                options: Some(SubscribeOptions::default()),
537            });
538
539            self.send_message(&msg).await?;
540            debug!("Resubscribed to {} (id: {})", pattern, id);
541        }
542
543        Ok(())
544    }
545
546    /// Check if connected
547    pub fn is_connected(&self) -> bool {
548        *self.connected.read()
549    }
550
551    /// Get session ID
552    pub fn session_id(&self) -> Option<String> {
553        self.session_id.read().clone()
554    }
555
556    /// Get current server time (microseconds)
557    pub fn time(&self) -> u64 {
558        self.clock.read().server_time()
559    }
560
561    /// Send a raw message
562    async fn send_message(&self, message: &Message) -> Result<()> {
563        let data = codec::encode(message)?;
564        self.send_raw(data).await
565    }
566
567    /// Send raw bytes
568    async fn send_raw(&self, data: Bytes) -> Result<()> {
569        // Clone the sender to avoid holding the lock across await
570        let tx = {
571            let sender = self.sender.read();
572            sender.as_ref().cloned()
573        };
574
575        if let Some(tx) = tx {
576            tx.send(data)
577                .await
578                .map_err(|e| ClientError::SendFailed(e.to_string()))?;
579            Ok(())
580        } else {
581            Err(ClientError::NotConnected)
582        }
583    }
584
585    /// Subscribe to an address pattern
586    pub async fn subscribe<F>(&self, pattern: &str, callback: F) -> Result<u32>
587    where
588        F: Fn(Value, &str) + Send + Sync + 'static,
589    {
590        let id = self.next_sub_id.fetch_add(1, Ordering::SeqCst);
591
592        // Store callback
593        self.subscriptions
594            .insert(id, (pattern.to_string(), Box::new(callback)));
595
596        // Send subscribe message
597        let msg = Message::Subscribe(SubscribeMessage {
598            id,
599            pattern: pattern.to_string(),
600            types: vec![],
601            options: Some(SubscribeOptions::default()),
602        });
603
604        self.send_message(&msg).await?;
605
606        debug!("Subscribed to {} (id: {})", pattern, id);
607        Ok(id)
608    }
609
610    /// Shorthand for subscribe
611    pub async fn on<F>(&self, pattern: &str, callback: F) -> Result<u32>
612    where
613        F: Fn(Value, &str) + Send + Sync + 'static,
614    {
615        self.subscribe(pattern, callback).await
616    }
617
618    /// Unsubscribe
619    pub async fn unsubscribe(&self, id: u32) -> Result<()> {
620        self.subscriptions.remove(&id);
621
622        let msg = Message::Unsubscribe(UnsubscribeMessage { id });
623        self.send_message(&msg).await?;
624
625        Ok(())
626    }
627
628    /// Set a parameter value
629    pub async fn set(&self, address: &str, value: impl Into<Value>) -> Result<()> {
630        let msg = Message::Set(SetMessage {
631            address: address.to_string(),
632            value: value.into(),
633            revision: None,
634            lock: false,
635            unlock: false,
636            ttl: None,
637        });
638
639        self.send_message(&msg).await
640    }
641
642    /// Set with lock
643    pub async fn set_locked(&self, address: &str, value: impl Into<Value>) -> Result<()> {
644        let msg = Message::Set(SetMessage {
645            address: address.to_string(),
646            value: value.into(),
647            revision: None,
648            lock: true,
649            unlock: false,
650            ttl: None,
651        });
652
653        self.send_message(&msg).await
654    }
655
656    /// Set and unlock (release a previously held lock)
657    pub async fn set_unlocked(&self, address: &str, value: impl Into<Value>) -> Result<()> {
658        let msg = Message::Set(SetMessage {
659            address: address.to_string(),
660            value: value.into(),
661            revision: None,
662            lock: false,
663            unlock: true,
664            ttl: None,
665        });
666
667        self.send_message(&msg).await
668    }
669
670    /// Set a parameter value with a per-message TTL
671    pub async fn set_with_ttl(
672        &self,
673        address: &str,
674        value: impl Into<Value>,
675        ttl: clasp_core::Ttl,
676    ) -> Result<()> {
677        let msg = Message::Set(SetMessage {
678            address: address.to_string(),
679            value: value.into(),
680            revision: None,
681            lock: false,
682            unlock: false,
683            ttl: Some(ttl),
684        });
685
686        self.send_message(&msg).await
687    }
688
689    /// Get current value (cached or request)
690    pub async fn get(&self, address: &str) -> Result<Value> {
691        // Check cache first
692        if let Some(value) = self.params.get(address) {
693            return Ok(value.clone());
694        }
695
696        // Request from server
697        let (tx, rx) = oneshot::channel();
698        let address_key = address.to_string();
699        self.pending_gets.insert(address_key.clone(), tx);
700
701        let msg = Message::Get(GetMessage {
702            address: address.to_string(),
703        });
704        self.send_message(&msg).await?;
705
706        // Wait for response (with timeout)
707        match tokio::time::timeout(std::time::Duration::from_secs(5), rx).await {
708            Ok(Ok(value)) => Ok(value),
709            Ok(Err(_)) => {
710                // Cancelled - remove from pending
711                self.pending_gets.remove(&address_key);
712                Err(ClientError::Other("Get cancelled".to_string()))
713            }
714            Err(_) => {
715                // Timeout - remove from pending to prevent memory leak
716                self.pending_gets.remove(&address_key);
717                Err(ClientError::Timeout)
718            }
719        }
720    }
721
722    /// Emit an event
723    pub async fn emit(&self, address: &str, payload: impl Into<Value>) -> Result<()> {
724        let msg = Message::Publish(PublishMessage {
725            address: address.to_string(),
726            signal: Some(SignalType::Event),
727            value: None,
728            payload: Some(payload.into()),
729            samples: None,
730            rate: None,
731            id: None,
732            phase: None,
733            timestamp: Some(self.time()),
734            timeline: None,
735        });
736
737        self.send_message(&msg).await
738    }
739
740    /// Send stream sample
741    pub async fn stream(&self, address: &str, value: impl Into<Value>) -> Result<()> {
742        let msg = Message::Publish(PublishMessage {
743            address: address.to_string(),
744            signal: Some(SignalType::Stream),
745            value: Some(value.into()),
746            payload: None,
747            samples: None,
748            rate: None,
749            id: None,
750            phase: None,
751            timestamp: Some(self.time()),
752            timeline: None,
753        });
754
755        self.send_message(&msg).await
756    }
757
758    /// Send gesture input
759    ///
760    /// Gestures are phased input streams for touch/pen/motion input.
761    /// Each gesture has a stable ID and goes through phases:
762    /// - `Start`: Begin a new gesture
763    /// - `Move`: Update position/state (may be coalesced by router)
764    /// - `End`: Complete the gesture normally
765    /// - `Cancel`: Abort the gesture
766    ///
767    /// # Example
768    /// ```ignore
769    /// // Start a touch gesture
770    /// client.gesture("/input/touch", 1, GesturePhase::Start, json!({"x": 0.5, "y": 0.3})).await?;
771    ///
772    /// // Move updates
773    /// client.gesture("/input/touch", 1, GesturePhase::Move, json!({"x": 0.6, "y": 0.4})).await?;
774    ///
775    /// // End the gesture
776    /// client.gesture("/input/touch", 1, GesturePhase::End, json!({"x": 0.7, "y": 0.5})).await?;
777    /// ```
778    pub async fn gesture(
779        &self,
780        address: &str,
781        id: u32,
782        phase: GesturePhase,
783        payload: impl Into<Value>,
784    ) -> Result<()> {
785        let msg = Message::Publish(PublishMessage {
786            address: address.to_string(),
787            signal: Some(SignalType::Gesture),
788            value: None,
789            payload: Some(payload.into()),
790            samples: None,
791            rate: None,
792            id: Some(id),
793            phase: Some(phase),
794            timestamp: Some(self.time()),
795            timeline: None,
796        });
797
798        self.send_message(&msg).await
799    }
800
801    /// Publish timeline automation
802    ///
803    /// Timelines are pre-computed automation curves with keyframes.
804    /// Once published, timelines are immutable - to modify, publish a new one.
805    ///
806    /// # Arguments
807    /// * `address` - The parameter address this timeline controls
808    /// * `timeline` - The timeline data with keyframes
809    ///
810    /// # Example
811    /// ```ignore
812    /// use clasp_core::{TimelineData, TimelineKeyframe, EasingType, Value};
813    ///
814    /// let timeline = TimelineData::new(vec![
815    ///     TimelineKeyframe { time: 0, value: Value::Float(0.0), easing: EasingType::Linear, bezier: None },
816    ///     TimelineKeyframe { time: 1_000_000, value: Value::Float(1.0), easing: EasingType::EaseOut, bezier: None },
817    /// ]);
818    ///
819    /// client.timeline("/lights/master/dimmer", timeline).await?;
820    /// ```
821    pub async fn timeline(&self, address: &str, timeline_data: TimelineData) -> Result<()> {
822        let msg = Message::Publish(PublishMessage {
823            address: address.to_string(),
824            signal: Some(SignalType::Timeline),
825            value: None,
826            payload: None,
827            samples: None,
828            rate: None,
829            id: None,
830            phase: None,
831            timestamp: Some(self.time()),
832            timeline: Some(timeline_data),
833        });
834
835        self.send_message(&msg).await
836    }
837
838    /// Send atomic bundle
839    pub async fn bundle(&self, messages: Vec<Message>) -> Result<()> {
840        let msg = Message::Bundle(BundleMessage {
841            timestamp: None,
842            messages,
843        });
844
845        self.send_message(&msg).await
846    }
847
848    /// Send scheduled bundle
849    pub async fn bundle_at(&self, messages: Vec<Message>, time: u64) -> Result<()> {
850        let msg = Message::Bundle(BundleMessage {
851            timestamp: Some(time),
852            messages,
853        });
854
855        self.send_message(&msg).await
856    }
857
858    /// Get cached param value
859    pub fn cached(&self, address: &str) -> Option<Value> {
860        self.params.get(address).map(|v| v.clone())
861    }
862
863    /// Close connection.
864    /// Disables auto-reconnect and closes the connection.
865    pub async fn close(&self) {
866        self.intentionally_closed.store(true, Ordering::SeqCst);
867        *self.connected.write() = false;
868        *self.sender.write() = None;
869    }
870
871    /// Get all announced signals
872    pub fn signals(&self) -> Vec<SignalDefinition> {
873        self.signals.iter().map(|e| e.value().clone()).collect()
874    }
875
876    /// Query signals matching a pattern
877    pub fn query_signals(&self, pattern: &str) -> Vec<SignalDefinition> {
878        self.signals
879            .iter()
880            .filter(|e| clasp_core::address::glob_match(pattern, e.key()))
881            .map(|e| e.value().clone())
882            .collect()
883    }
884
885    /// Get the last error received from server
886    pub fn last_error(&self) -> Option<ErrorMessage> {
887        self.last_error.read().clone()
888    }
889
890    /// Clear the last error
891    pub fn clear_error(&self) {
892        *self.last_error.write() = None;
893    }
894
895    /// Set up P2P subscriptions (internal, called after P2P manager is created)
896    #[cfg(feature = "p2p")]
897    async fn setup_p2p_subscriptions(&self, session_id: &str) -> Result<()> {
898        if let Some(ref p2p_manager) = self.p2p_manager {
899            let signal_address = format!("{}{}", P2P_SIGNAL_PREFIX, session_id);
900            let p2p_manager_signal = Arc::clone(p2p_manager);
901
902            // Subscribe to P2P signals
903            let _ = self
904                .subscribe(&signal_address, move |value, address| {
905                    let p2p = Arc::clone(&p2p_manager_signal);
906                    let address = address.to_string(); // Clone the address string
907                    tokio::spawn(async move {
908                        if let Err(e) = p2p.handle_signal(&address, &value).await {
909                            tracing::debug!("P2P signal handling error: {}", e);
910                        }
911                    });
912                })
913                .await?;
914
915            // Subscribe to P2P announce
916            let p2p_manager_announce = Arc::clone(p2p_manager);
917            let _ = self
918                .subscribe(clasp_core::P2P_ANNOUNCE, move |value, _| {
919                    p2p_manager_announce.handle_announce(&value);
920                })
921                .await?;
922        }
923        Ok(())
924    }
925
926    /// Connect to a peer via P2P (requires p2p feature)
927    #[cfg(feature = "p2p")]
928    pub async fn connect_to_peer(&self, peer_session_id: &str) -> Result<()> {
929        if let Some(ref p2p_manager) = self.p2p_manager {
930            p2p_manager.connect_to_peer(peer_session_id).await
931        } else {
932            Err(ClientError::Other(
933                "P2P not configured. Use builder.p2p_config() to enable.".to_string(),
934            ))
935        }
936    }
937
938    /// Set P2P event callback (requires p2p feature)
939    #[cfg(feature = "p2p")]
940    pub fn on_p2p_event<F>(&self, callback: F)
941    where
942        F: Fn(p2p::P2PEvent) + Send + Sync + 'static,
943    {
944        if let Some(ref p2p_manager) = self.p2p_manager {
945            p2p_manager.on_event(callback);
946        }
947    }
948
949    /// Check if peer is connected via P2P (requires p2p feature)
950    #[cfg(feature = "p2p")]
951    pub fn is_peer_connected(&self, peer_session_id: &str) -> bool {
952        self.p2p_manager
953            .as_ref()
954            .map(|p2p| p2p.is_peer_connected(peer_session_id))
955            .unwrap_or(false)
956    }
957
958    /// Send data to a peer via P2P (requires p2p feature)
959    ///
960    /// Returns `SendResult::P2P` if sent via direct P2P, `SendResult::Relay` if sent via server.
961    /// Behavior depends on the current routing mode and connection state.
962    #[cfg(feature = "p2p")]
963    pub async fn send_p2p(
964        &self,
965        peer_session_id: &str,
966        data: bytes::Bytes,
967        reliable: bool,
968    ) -> Result<p2p::SendResult> {
969        if let Some(ref p2p_manager) = self.p2p_manager {
970            p2p_manager
971                .send_to_peer(peer_session_id, data, reliable)
972                .await
973        } else {
974            Err(ClientError::Other(
975                "P2P not configured. Use builder.p2p_config() to enable.".to_string(),
976            ))
977        }
978    }
979
980    /// Set P2P routing mode (requires p2p feature)
981    ///
982    /// - `RoutingMode::PreferP2P` (default): Try P2P first, fall back to relay
983    /// - `RoutingMode::P2POnly`: Only use P2P, fail if unavailable
984    /// - `RoutingMode::ServerOnly`: Never use P2P, always relay
985    #[cfg(feature = "p2p")]
986    pub fn set_p2p_routing_mode(&self, mode: clasp_core::p2p::RoutingMode) {
987        if let Some(ref p2p_manager) = self.p2p_manager {
988            p2p_manager.set_routing_mode(mode);
989        }
990    }
991
992    /// Get current P2P routing mode (requires p2p feature)
993    #[cfg(feature = "p2p")]
994    pub fn p2p_routing_mode(&self) -> clasp_core::p2p::RoutingMode {
995        self.p2p_manager
996            .as_ref()
997            .map(|p2p| p2p.routing_mode())
998            .unwrap_or_default()
999    }
1000}
1001
1002/// Handle incoming message
1003fn handle_message(
1004    msg: &Message,
1005    params: &Arc<DashMap<String, Value>>,
1006    subscriptions: &Arc<DashMap<u32, (String, SubscriptionCallback)>>,
1007    pending_gets: &Arc<DashMap<String, oneshot::Sender<Value>>>,
1008    signals: &Arc<DashMap<String, SignalDefinition>>,
1009    last_error: &Arc<RwLock<Option<ErrorMessage>>>,
1010) {
1011    match msg {
1012        Message::Set(set) => {
1013            // Update cache
1014            params.insert(set.address.clone(), set.value.clone());
1015
1016            // Notify subscribers
1017            for entry in subscriptions.iter() {
1018                let (pattern, callback) = entry.value();
1019                if clasp_core::address::glob_match(pattern, &set.address) {
1020                    callback(set.value.clone(), &set.address);
1021                }
1022            }
1023        }
1024
1025        Message::Snapshot(snapshot) => {
1026            for param in &snapshot.params {
1027                params.insert(param.address.clone(), param.value.clone());
1028
1029                // Complete pending gets
1030                if let Some((_, tx)) = pending_gets.remove(&param.address) {
1031                    let _ = tx.send(param.value.clone());
1032                }
1033
1034                // Notify subscribers
1035                for entry in subscriptions.iter() {
1036                    let (pattern, callback) = entry.value();
1037                    if clasp_core::address::glob_match(pattern, &param.address) {
1038                        callback(param.value.clone(), &param.address);
1039                    }
1040                }
1041            }
1042        }
1043
1044        Message::Publish(pub_msg) => {
1045            #[cfg(feature = "p2p")]
1046            {
1047                // Check if this is a P2P signal or announce - handle before regular subscriptions
1048                if pub_msg.address.starts_with(clasp_core::P2P_SIGNAL_PREFIX) {
1049                    // P2P signals will be handled by subscription callbacks
1050                    // (they're subscribed to automatically when P2P manager is created)
1051                } else if pub_msg.address == clasp_core::P2P_ANNOUNCE {
1052                    // P2P announce will be handled by subscription callback
1053                }
1054            }
1055
1056            // Notify subscribers
1057            let value = pub_msg
1058                .value
1059                .clone()
1060                .or_else(|| pub_msg.payload.clone())
1061                .unwrap_or(Value::Null);
1062
1063            for entry in subscriptions.iter() {
1064                let (pattern, callback) = entry.value();
1065                if clasp_core::address::glob_match(pattern, &pub_msg.address) {
1066                    callback(value.clone(), &pub_msg.address);
1067                }
1068            }
1069        }
1070
1071        Message::Error(error) => {
1072            // Log the error and store it for retrieval
1073            warn!(
1074                "Server error {}: {} (address: {:?})",
1075                error.code, error.message, error.address
1076            );
1077            *last_error.write() = Some(error.clone());
1078        }
1079
1080        Message::Ack(ack) => {
1081            // Log acknowledgment (could be extended to track pending requests)
1082            debug!(
1083                "Received ACK for {:?} (revision: {:?})",
1084                ack.address, ack.revision
1085            );
1086        }
1087
1088        Message::Announce(announce) => {
1089            // Store announced signals
1090            for signal in &announce.signals {
1091                debug!("Received signal announcement: {}", signal.address);
1092                signals.insert(signal.address.clone(), signal.clone());
1093            }
1094        }
1095
1096        Message::Sync(sync) => {
1097            // Process clock sync response
1098            // Note: This handles sync messages from server with t2/t3 filled in
1099            if let (Some(t2), Some(t3)) = (sync.t2, sync.t3) {
1100                debug!("Clock sync: t1={}, t2={}, t3={}", sync.t1, t2, t3);
1101                // Clock sync is processed through ClockSync::process_sync
1102                // but we don't have access to the clock here.
1103                // For now, log it. A more complete implementation would
1104                // use a channel to send sync data back to the main client.
1105            }
1106        }
1107
1108        Message::Result(result) => {
1109            // Handle query results
1110            debug!("Received result with {} signals", result.signals.len());
1111            // Store any returned signals
1112            for signal in &result.signals {
1113                signals.insert(signal.address.clone(), signal.clone());
1114            }
1115        }
1116
1117        // Messages that are typically client-initiated, not expected from server
1118        Message::Hello(_)
1119        | Message::Welcome(_)
1120        | Message::Subscribe(_)
1121        | Message::Unsubscribe(_)
1122        | Message::Get(_)
1123        | Message::Query(_)
1124        | Message::Replay(_)
1125        | Message::FederationSync(_) => {
1126            debug!("Received unexpected client-type message: {:?}", msg);
1127        }
1128
1129        // Bundle: process contained messages recursively
1130        Message::Bundle(bundle) => {
1131            for inner_msg in &bundle.messages {
1132                handle_message(
1133                    inner_msg,
1134                    params,
1135                    subscriptions,
1136                    pending_gets,
1137                    signals,
1138                    last_error,
1139                );
1140            }
1141        }
1142
1143        // Ping/Pong for keep-alive
1144        Message::Ping => {
1145            debug!("Received PING from server");
1146            // Note: Pong response should be sent, but we don't have sender access here.
1147            // A more complete implementation would use a channel to request pong be sent.
1148        }
1149
1150        Message::Pong => {
1151            debug!("Received PONG from server");
1152        }
1153    }
1154}