Skip to main content

clasp_client/
client.rs

1//! Main Clasp client implementation
2
3use bytes::Bytes;
4use clasp_core::{
5    codec, time::ClockSync, BundleMessage, ErrorMessage, GesturePhase, GetMessage, HelloMessage,
6    Message, PublishMessage, SetMessage, SignalDefinition, SignalType, SubscribeMessage,
7    SubscribeOptions, TimelineData, UnsubscribeMessage, Value, PROTOCOL_VERSION,
8};
9use clasp_transport::{
10    Transport, TransportEvent, TransportReceiver, TransportSender, WebSocketTransport,
11};
12use dashmap::DashMap;
13use parking_lot::RwLock;
14use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::{mpsc, oneshot, Notify};
18use tracing::{debug, error, info, warn};
19
20use crate::builder::ClaspBuilder;
21use crate::error::{ClientError, Result};
22#[cfg(feature = "p2p")]
23use crate::p2p;
24#[cfg(feature = "p2p")]
25use clasp_core::{P2PConfig, P2P_SIGNAL_PREFIX};
26
27/// Subscription callback type
28pub type SubscriptionCallback = Box<dyn Fn(Value, &str) + Send + Sync>;
29
30/// A Clasp client
31pub struct Clasp {
32    url: String,
33    name: String,
34    features: Vec<String>,
35    token: Option<String>,
36    reconnect: bool,
37    reconnect_interval_ms: u64,
38
39    /// Session ID (set after connect)
40    session_id: RwLock<Option<String>>,
41
42    /// Connection state
43    connected: Arc<RwLock<bool>>,
44
45    /// Sender for outgoing messages
46    sender: RwLock<Option<mpsc::Sender<Bytes>>>,
47
48    /// Local param cache
49    params: Arc<DashMap<String, Value>>,
50
51    /// Subscriptions
52    subscriptions: Arc<DashMap<u32, (String, SubscriptionCallback)>>,
53
54    /// Subscription ID counter
55    next_sub_id: AtomicU32,
56
57    /// Clock synchronization
58    clock: RwLock<ClockSync>,
59
60    /// Pending get requests
61    pending_gets: Arc<DashMap<String, oneshot::Sender<Value>>>,
62
63    /// Announced signals (from server)
64    signals: Arc<DashMap<String, SignalDefinition>>,
65
66    /// Last error received from server
67    last_error: Arc<RwLock<Option<ErrorMessage>>>,
68
69    /// Reconnect attempt counter
70    reconnect_attempts: Arc<AtomicU32>,
71
72    /// Max reconnect attempts (0 = unlimited)
73    max_reconnect_attempts: u32,
74
75    /// Flag to indicate intentional close (don't reconnect)
76    intentionally_closed: Arc<AtomicBool>,
77
78    /// Notify for triggering reconnect
79    reconnect_notify: Arc<Notify>,
80
81    /// P2P config (optional, feature-gated)
82    #[cfg(feature = "p2p")]
83    p2p_config: Option<P2PConfig>,
84
85    /// P2P manager (optional, feature-gated, created after connection)
86    #[cfg(feature = "p2p")]
87    p2p_manager: Option<Arc<p2p::P2PManager>>,
88}
89
90impl Clasp {
91    /// Create a new client (use builder for more options)
92    pub fn new(
93        url: &str,
94        name: String,
95        features: Vec<String>,
96        token: Option<String>,
97        reconnect: bool,
98        reconnect_interval_ms: u64,
99    ) -> Self {
100        Self {
101            url: url.to_string(),
102            name,
103            features,
104            token,
105            reconnect,
106            reconnect_interval_ms,
107            session_id: RwLock::new(None),
108            connected: Arc::new(RwLock::new(false)),
109            sender: RwLock::new(None),
110            params: Arc::new(DashMap::new()),
111            subscriptions: Arc::new(DashMap::new()),
112            next_sub_id: AtomicU32::new(1),
113            clock: RwLock::new(ClockSync::new()),
114            pending_gets: Arc::new(DashMap::new()),
115            signals: Arc::new(DashMap::new()),
116            last_error: Arc::new(RwLock::new(None)),
117            reconnect_attempts: Arc::new(AtomicU32::new(0)),
118            max_reconnect_attempts: 10,
119            intentionally_closed: Arc::new(AtomicBool::new(false)),
120            reconnect_notify: Arc::new(Notify::new()),
121            #[cfg(feature = "p2p")]
122            p2p_config: None,
123            #[cfg(feature = "p2p")]
124            p2p_manager: None,
125        }
126    }
127
128    /// Set P2P configuration (internal, called by builder)
129    #[cfg(feature = "p2p")]
130    pub(crate) fn set_p2p_config(&mut self, config: P2PConfig) {
131        self.p2p_config = Some(config);
132    }
133
134    /// Create a builder
135    pub fn builder(url: &str) -> ClaspBuilder {
136        ClaspBuilder::new(url)
137    }
138
139    /// Connect to server (convenience method)
140    pub async fn connect_to(url: &str) -> Result<Self> {
141        ClaspBuilder::new(url).connect().await
142    }
143
144    /// Internal connect
145    pub(crate) async fn do_connect(&mut self) -> Result<()> {
146        if *self.connected.read() {
147            return Err(ClientError::AlreadyConnected);
148        }
149
150        info!("Connecting to {}", self.url);
151
152        // Connect WebSocket
153        let (sender, mut receiver) = <WebSocketTransport as Transport>::connect(&self.url).await?;
154
155        // Create send channel
156        let (tx, mut rx) = mpsc::channel::<Bytes>(100);
157        *self.sender.write() = Some(tx);
158
159        let connected = self.connected.clone();
160
161        // Spawn sender task
162        let sender = Arc::new(sender);
163        let sender_clone = sender.clone();
164        tokio::spawn(async move {
165            while let Some(data) = rx.recv().await {
166                if let Err(e) = sender_clone.send(data).await {
167                    error!("Send error: {}", e);
168                    break;
169                }
170            }
171        });
172
173        // Send HELLO
174        let hello = Message::Hello(HelloMessage {
175            version: PROTOCOL_VERSION,
176            name: self.name.clone(),
177            features: self.features.clone(),
178            capabilities: None,
179            token: self.token.clone(),
180        });
181
182        self.send_message(&hello).await?;
183
184        // Wait for WELCOME
185        loop {
186            match receiver.recv().await {
187                Some(TransportEvent::Data(data)) => {
188                    match codec::decode(&data) {
189                        Ok((Message::Welcome(welcome), _)) => {
190                            *self.session_id.write() = Some(welcome.session.clone());
191                            *connected.write() = true;
192
193                            // Sync clock
194                            self.clock.write().process_sync(
195                                clasp_core::time::now(),
196                                welcome.time,
197                                welcome.time,
198                                clasp_core::time::now(),
199                            );
200
201                            // Initialize P2P manager if configured
202                            #[cfg(feature = "p2p")]
203                            {
204                                if let Some(p2p_config) = self.p2p_config.take() {
205                                    let session_id = welcome.session.clone();
206                                    // Create channel for P2P signaling
207                                    let (signal_tx, mut signal_rx) = mpsc::channel(100);
208                                    let p2p_manager =
209                                        Arc::new(p2p::P2PManager::new(p2p_config, signal_tx));
210                                    p2p_manager.set_session_id(session_id.clone());
211
212                                    // Spawn task to forward P2P signals through client
213                                    let sender = self.sender.read().clone();
214                                    let p2p_manager_for_task = Arc::clone(&p2p_manager);
215                                    if let Some(sender_tx) = sender {
216                                        tokio::spawn(async move {
217                                            while let Some(msg) = signal_rx.recv().await {
218                                                if let Some(encoded) = codec::encode(&msg).ok() {
219                                                    if let Err(e) =
220                                                        sender_tx.send(encoded.into()).await
221                                                    {
222                                                        tracing::warn!(
223                                                            "Failed to send P2P signal: {}",
224                                                            e
225                                                        );
226                                                        break;
227                                                    }
228                                                }
229                                            }
230                                        });
231                                    }
232
233                                    // Store P2P manager first
234                                    self.p2p_manager = Some(Arc::clone(&p2p_manager));
235
236                                    // Announce P2P capability
237                                    let _ = p2p_manager.announce().await;
238
239                                    // Set up P2P subscriptions (after manager is stored)
240                                    let _ = self.setup_p2p_subscriptions(&session_id).await;
241                                }
242                            }
243
244                            info!("Connected, session: {}", welcome.session);
245                            break;
246                        }
247                        Ok((msg, _)) => {
248                            debug!("Received during handshake: {:?}", msg);
249                        }
250                        Err(e) => {
251                            warn!("Decode error: {}", e);
252                        }
253                    }
254                }
255                Some(TransportEvent::Error(e)) => {
256                    return Err(ClientError::ConnectionFailed(e));
257                }
258                Some(TransportEvent::Disconnected { reason }) => {
259                    return Err(ClientError::ConnectionFailed(
260                        reason.unwrap_or_else(|| "Disconnected".to_string()),
261                    ));
262                }
263                None => {
264                    return Err(ClientError::ConnectionFailed(
265                        "Connection closed".to_string(),
266                    ));
267                }
268                _ => {}
269            }
270        }
271
272        // Reset reconnect state on successful connect
273        self.reconnect_attempts.store(0, Ordering::SeqCst);
274        self.intentionally_closed.store(false, Ordering::SeqCst);
275
276        // Spawn receiver task
277        let params = Arc::clone(&self.params);
278        let subscriptions = Arc::clone(&self.subscriptions);
279        let pending_gets = Arc::clone(&self.pending_gets);
280        let signals = Arc::clone(&self.signals);
281        let last_error = Arc::clone(&self.last_error);
282        let connected_clone = Arc::clone(&self.connected);
283        let reconnect_notify = Arc::clone(&self.reconnect_notify);
284        let intentionally_closed = Arc::clone(&self.intentionally_closed);
285        let reconnect_enabled = self.reconnect;
286        #[cfg(feature = "p2p")]
287        let p2p_manager = self.p2p_manager.clone();
288
289        tokio::spawn(async move {
290            while let Some(event) = receiver.recv().await {
291                match event {
292                    TransportEvent::Data(data) => {
293                        if let Ok((msg, _)) = codec::decode(&data) {
294                            #[cfg(feature = "p2p")]
295                            {
296                                // Forward P2P signals to P2P manager (handled in subscription callback)
297                                // P2P announce is also handled in subscription
298                            }
299                            handle_message(
300                                &msg,
301                                &params,
302                                &subscriptions,
303                                &pending_gets,
304                                &signals,
305                                &last_error,
306                            );
307                        }
308                    }
309                    TransportEvent::Disconnected { reason } => {
310                        info!("Disconnected: {:?}", reason);
311                        *connected_clone.write() = false;
312
313                        // Trigger reconnect if enabled and not intentionally closed
314                        if reconnect_enabled && !intentionally_closed.load(Ordering::SeqCst) {
315                            reconnect_notify.notify_one();
316                        }
317                        break;
318                    }
319                    TransportEvent::Error(e) => {
320                        error!("Error: {}", e);
321                    }
322                    _ => {}
323                }
324            }
325        });
326
327        Ok(())
328    }
329
330    /// Start the reconnect loop (call after initial connect)
331    pub fn start_reconnect_loop(self: &Arc<Self>) {
332        if !self.reconnect {
333            return;
334        }
335
336        let client = Arc::clone(self);
337        tokio::spawn(async move {
338            loop {
339                // Wait for disconnect notification
340                client.reconnect_notify.notified().await;
341
342                if client.intentionally_closed.load(Ordering::SeqCst) {
343                    break;
344                }
345
346                // Attempt reconnection with exponential backoff
347                loop {
348                    let attempts = client.reconnect_attempts.fetch_add(1, Ordering::SeqCst);
349
350                    if client.max_reconnect_attempts > 0
351                        && attempts >= client.max_reconnect_attempts
352                    {
353                        error!(
354                            "Max reconnect attempts ({}) reached",
355                            client.max_reconnect_attempts
356                        );
357                        break;
358                    }
359
360                    // Exponential backoff: base * 1.5^attempts, max 30 seconds
361                    let base_ms = client.reconnect_interval_ms;
362                    let delay_ms =
363                        (base_ms as f64 * 1.5_f64.powi(attempts as i32)).min(30000.0) as u64;
364
365                    info!("Reconnect attempt {} in {}ms", attempts + 1, delay_ms);
366                    tokio::time::sleep(Duration::from_millis(delay_ms)).await;
367
368                    if client.intentionally_closed.load(Ordering::SeqCst) {
369                        break;
370                    }
371
372                    // Clone the Arc and get a mutable reference for reconnection
373                    // We need to use unsafe or restructure - let's use a different approach
374                    match client.try_reconnect().await {
375                        Ok(()) => {
376                            info!("Reconnected successfully");
377                            client.reconnect_attempts.store(0, Ordering::SeqCst);
378
379                            // Resubscribe to all patterns
380                            if let Err(e) = client.resubscribe_all().await {
381                                warn!("Failed to resubscribe: {}", e);
382                            }
383                            break;
384                        }
385                        Err(e) => {
386                            warn!("Reconnect failed: {}", e);
387                        }
388                    }
389                }
390            }
391        });
392    }
393
394    /// Internal reconnect attempt
395    async fn try_reconnect(&self) -> Result<()> {
396        info!("Attempting to reconnect to {}", self.url);
397
398        // Connect WebSocket
399        let (sender, mut receiver) = <WebSocketTransport as Transport>::connect(&self.url).await?;
400
401        // Create send channel
402        let (tx, mut rx) = mpsc::channel::<Bytes>(100);
403        *self.sender.write() = Some(tx);
404
405        // Spawn sender task
406        let sender = Arc::new(sender);
407        let sender_clone = sender.clone();
408        tokio::spawn(async move {
409            while let Some(data) = rx.recv().await {
410                if let Err(e) = sender_clone.send(data).await {
411                    error!("Send error: {}", e);
412                    break;
413                }
414            }
415        });
416
417        // Send HELLO
418        let hello = Message::Hello(HelloMessage {
419            version: PROTOCOL_VERSION,
420            name: self.name.clone(),
421            features: self.features.clone(),
422            capabilities: None,
423            token: self.token.clone(),
424        });
425
426        self.send_message(&hello).await?;
427
428        // Wait for WELCOME with timeout
429        let welcome_timeout = Duration::from_secs(10);
430        let deadline = tokio::time::Instant::now() + welcome_timeout;
431
432        loop {
433            match tokio::time::timeout_at(deadline, receiver.recv()).await {
434                Ok(Some(TransportEvent::Data(data))) => match codec::decode(&data) {
435                    Ok((Message::Welcome(welcome), _)) => {
436                        *self.session_id.write() = Some(welcome.session.clone());
437                        *self.connected.write() = true;
438
439                        self.clock.write().process_sync(
440                            clasp_core::time::now(),
441                            welcome.time,
442                            welcome.time,
443                            clasp_core::time::now(),
444                        );
445
446                        info!("Reconnected, session: {}", welcome.session);
447                        break;
448                    }
449                    Ok((msg, _)) => {
450                        debug!("Received during reconnect handshake: {:?}", msg);
451                    }
452                    Err(e) => {
453                        warn!("Decode error during reconnect: {}", e);
454                    }
455                },
456                Ok(Some(TransportEvent::Error(e))) => {
457                    return Err(ClientError::ConnectionFailed(e));
458                }
459                Ok(Some(TransportEvent::Disconnected { reason })) => {
460                    return Err(ClientError::ConnectionFailed(
461                        reason.unwrap_or_else(|| "Disconnected".to_string()),
462                    ));
463                }
464                Ok(None) => {
465                    return Err(ClientError::ConnectionFailed(
466                        "Connection closed".to_string(),
467                    ));
468                }
469                Err(_) => {
470                    return Err(ClientError::Timeout);
471                }
472                _ => {}
473            }
474        }
475
476        // Spawn new receiver task
477        let params = Arc::clone(&self.params);
478        let subscriptions = Arc::clone(&self.subscriptions);
479        let pending_gets = Arc::clone(&self.pending_gets);
480        let signals = Arc::clone(&self.signals);
481        let last_error = Arc::clone(&self.last_error);
482        let connected_clone = Arc::clone(&self.connected);
483        let reconnect_notify = Arc::clone(&self.reconnect_notify);
484        let intentionally_closed = Arc::clone(&self.intentionally_closed);
485        let reconnect_enabled = self.reconnect;
486
487        tokio::spawn(async move {
488            while let Some(event) = receiver.recv().await {
489                match event {
490                    TransportEvent::Data(data) => {
491                        if let Ok((msg, _)) = codec::decode(&data) {
492                            handle_message(
493                                &msg,
494                                &params,
495                                &subscriptions,
496                                &pending_gets,
497                                &signals,
498                                &last_error,
499                            );
500                        }
501                    }
502                    TransportEvent::Disconnected { reason } => {
503                        info!("Disconnected: {:?}", reason);
504                        *connected_clone.write() = false;
505
506                        if reconnect_enabled && !intentionally_closed.load(Ordering::SeqCst) {
507                            reconnect_notify.notify_one();
508                        }
509                        break;
510                    }
511                    TransportEvent::Error(e) => {
512                        error!("Error: {}", e);
513                    }
514                    _ => {}
515                }
516            }
517        });
518
519        Ok(())
520    }
521
522    /// Resubscribe to all existing subscriptions after reconnect
523    async fn resubscribe_all(&self) -> Result<()> {
524        // Collect subscription info first to avoid lifetime issues with DashMap
525        let subs: Vec<(u32, String)> = self
526            .subscriptions
527            .iter()
528            .map(|entry| (*entry.key(), entry.value().0.clone()))
529            .collect();
530
531        for (id, pattern) in subs {
532            let msg = Message::Subscribe(SubscribeMessage {
533                id,
534                pattern: pattern.clone(),
535                types: vec![],
536                options: Some(SubscribeOptions::default()),
537            });
538
539            self.send_message(&msg).await?;
540            debug!("Resubscribed to {} (id: {})", pattern, id);
541        }
542
543        Ok(())
544    }
545
546    /// Check if connected
547    pub fn is_connected(&self) -> bool {
548        *self.connected.read()
549    }
550
551    /// Get session ID
552    pub fn session_id(&self) -> Option<String> {
553        self.session_id.read().clone()
554    }
555
556    /// Get current server time (microseconds)
557    pub fn time(&self) -> u64 {
558        self.clock.read().server_time()
559    }
560
561    /// Send a raw message
562    async fn send_message(&self, message: &Message) -> Result<()> {
563        let data = codec::encode(message)?;
564        self.send_raw(data).await
565    }
566
567    /// Send raw bytes
568    async fn send_raw(&self, data: Bytes) -> Result<()> {
569        // Clone the sender to avoid holding the lock across await
570        let tx = {
571            let sender = self.sender.read();
572            sender.as_ref().cloned()
573        };
574
575        if let Some(tx) = tx {
576            tx.send(data)
577                .await
578                .map_err(|e| ClientError::SendFailed(e.to_string()))?;
579            Ok(())
580        } else {
581            Err(ClientError::NotConnected)
582        }
583    }
584
585    /// Subscribe to an address pattern
586    pub async fn subscribe<F>(&self, pattern: &str, callback: F) -> Result<u32>
587    where
588        F: Fn(Value, &str) + Send + Sync + 'static,
589    {
590        let id = self.next_sub_id.fetch_add(1, Ordering::SeqCst);
591
592        // Store callback
593        self.subscriptions
594            .insert(id, (pattern.to_string(), Box::new(callback)));
595
596        // Send subscribe message
597        let msg = Message::Subscribe(SubscribeMessage {
598            id,
599            pattern: pattern.to_string(),
600            types: vec![],
601            options: Some(SubscribeOptions::default()),
602        });
603
604        self.send_message(&msg).await?;
605
606        debug!("Subscribed to {} (id: {})", pattern, id);
607        Ok(id)
608    }
609
610    /// Shorthand for subscribe
611    pub async fn on<F>(&self, pattern: &str, callback: F) -> Result<u32>
612    where
613        F: Fn(Value, &str) + Send + Sync + 'static,
614    {
615        self.subscribe(pattern, callback).await
616    }
617
618    /// Unsubscribe
619    pub async fn unsubscribe(&self, id: u32) -> Result<()> {
620        self.subscriptions.remove(&id);
621
622        let msg = Message::Unsubscribe(UnsubscribeMessage { id });
623        self.send_message(&msg).await?;
624
625        Ok(())
626    }
627
628    /// Set a parameter value
629    pub async fn set(&self, address: &str, value: impl Into<Value>) -> Result<()> {
630        let msg = Message::Set(SetMessage {
631            address: address.to_string(),
632            value: value.into(),
633            revision: None,
634            lock: false,
635            unlock: false,
636        });
637
638        self.send_message(&msg).await
639    }
640
641    /// Set with lock
642    pub async fn set_locked(&self, address: &str, value: impl Into<Value>) -> Result<()> {
643        let msg = Message::Set(SetMessage {
644            address: address.to_string(),
645            value: value.into(),
646            revision: None,
647            lock: true,
648            unlock: false,
649        });
650
651        self.send_message(&msg).await
652    }
653
654    /// Set and unlock (release a previously held lock)
655    pub async fn set_unlocked(&self, address: &str, value: impl Into<Value>) -> Result<()> {
656        let msg = Message::Set(SetMessage {
657            address: address.to_string(),
658            value: value.into(),
659            revision: None,
660            lock: false,
661            unlock: true,
662        });
663
664        self.send_message(&msg).await
665    }
666
667    /// Get current value (cached or request)
668    pub async fn get(&self, address: &str) -> Result<Value> {
669        // Check cache first
670        if let Some(value) = self.params.get(address) {
671            return Ok(value.clone());
672        }
673
674        // Request from server
675        let (tx, rx) = oneshot::channel();
676        let address_key = address.to_string();
677        self.pending_gets.insert(address_key.clone(), tx);
678
679        let msg = Message::Get(GetMessage {
680            address: address.to_string(),
681        });
682        self.send_message(&msg).await?;
683
684        // Wait for response (with timeout)
685        match tokio::time::timeout(std::time::Duration::from_secs(5), rx).await {
686            Ok(Ok(value)) => Ok(value),
687            Ok(Err(_)) => {
688                // Cancelled - remove from pending
689                self.pending_gets.remove(&address_key);
690                Err(ClientError::Other("Get cancelled".to_string()))
691            }
692            Err(_) => {
693                // Timeout - remove from pending to prevent memory leak
694                self.pending_gets.remove(&address_key);
695                Err(ClientError::Timeout)
696            }
697        }
698    }
699
700    /// Emit an event
701    pub async fn emit(&self, address: &str, payload: impl Into<Value>) -> Result<()> {
702        let msg = Message::Publish(PublishMessage {
703            address: address.to_string(),
704            signal: Some(SignalType::Event),
705            value: None,
706            payload: Some(payload.into()),
707            samples: None,
708            rate: None,
709            id: None,
710            phase: None,
711            timestamp: Some(self.time()),
712            timeline: None,
713        });
714
715        self.send_message(&msg).await
716    }
717
718    /// Send stream sample
719    pub async fn stream(&self, address: &str, value: impl Into<Value>) -> Result<()> {
720        let msg = Message::Publish(PublishMessage {
721            address: address.to_string(),
722            signal: Some(SignalType::Stream),
723            value: Some(value.into()),
724            payload: None,
725            samples: None,
726            rate: None,
727            id: None,
728            phase: None,
729            timestamp: Some(self.time()),
730            timeline: None,
731        });
732
733        self.send_message(&msg).await
734    }
735
736    /// Send gesture input
737    ///
738    /// Gestures are phased input streams for touch/pen/motion input.
739    /// Each gesture has a stable ID and goes through phases:
740    /// - `Start`: Begin a new gesture
741    /// - `Move`: Update position/state (may be coalesced by router)
742    /// - `End`: Complete the gesture normally
743    /// - `Cancel`: Abort the gesture
744    ///
745    /// # Example
746    /// ```ignore
747    /// // Start a touch gesture
748    /// client.gesture("/input/touch", 1, GesturePhase::Start, json!({"x": 0.5, "y": 0.3})).await?;
749    ///
750    /// // Move updates
751    /// client.gesture("/input/touch", 1, GesturePhase::Move, json!({"x": 0.6, "y": 0.4})).await?;
752    ///
753    /// // End the gesture
754    /// client.gesture("/input/touch", 1, GesturePhase::End, json!({"x": 0.7, "y": 0.5})).await?;
755    /// ```
756    pub async fn gesture(
757        &self,
758        address: &str,
759        id: u32,
760        phase: GesturePhase,
761        payload: impl Into<Value>,
762    ) -> Result<()> {
763        let msg = Message::Publish(PublishMessage {
764            address: address.to_string(),
765            signal: Some(SignalType::Gesture),
766            value: None,
767            payload: Some(payload.into()),
768            samples: None,
769            rate: None,
770            id: Some(id),
771            phase: Some(phase),
772            timestamp: Some(self.time()),
773            timeline: None,
774        });
775
776        self.send_message(&msg).await
777    }
778
779    /// Publish timeline automation
780    ///
781    /// Timelines are pre-computed automation curves with keyframes.
782    /// Once published, timelines are immutable - to modify, publish a new one.
783    ///
784    /// # Arguments
785    /// * `address` - The parameter address this timeline controls
786    /// * `timeline` - The timeline data with keyframes
787    ///
788    /// # Example
789    /// ```ignore
790    /// use clasp_core::{TimelineData, TimelineKeyframe, EasingType, Value};
791    ///
792    /// let timeline = TimelineData::new(vec![
793    ///     TimelineKeyframe { time: 0, value: Value::Float(0.0), easing: EasingType::Linear, bezier: None },
794    ///     TimelineKeyframe { time: 1_000_000, value: Value::Float(1.0), easing: EasingType::EaseOut, bezier: None },
795    /// ]);
796    ///
797    /// client.timeline("/lights/master/dimmer", timeline).await?;
798    /// ```
799    pub async fn timeline(&self, address: &str, timeline_data: TimelineData) -> Result<()> {
800        let msg = Message::Publish(PublishMessage {
801            address: address.to_string(),
802            signal: Some(SignalType::Timeline),
803            value: None,
804            payload: None,
805            samples: None,
806            rate: None,
807            id: None,
808            phase: None,
809            timestamp: Some(self.time()),
810            timeline: Some(timeline_data),
811        });
812
813        self.send_message(&msg).await
814    }
815
816    /// Send atomic bundle
817    pub async fn bundle(&self, messages: Vec<Message>) -> Result<()> {
818        let msg = Message::Bundle(BundleMessage {
819            timestamp: None,
820            messages,
821        });
822
823        self.send_message(&msg).await
824    }
825
826    /// Send scheduled bundle
827    pub async fn bundle_at(&self, messages: Vec<Message>, time: u64) -> Result<()> {
828        let msg = Message::Bundle(BundleMessage {
829            timestamp: Some(time),
830            messages,
831        });
832
833        self.send_message(&msg).await
834    }
835
836    /// Get cached param value
837    pub fn cached(&self, address: &str) -> Option<Value> {
838        self.params.get(address).map(|v| v.clone())
839    }
840
841    /// Close connection.
842    /// Disables auto-reconnect and closes the connection.
843    pub async fn close(&self) {
844        self.intentionally_closed.store(true, Ordering::SeqCst);
845        *self.connected.write() = false;
846        *self.sender.write() = None;
847    }
848
849    /// Get all announced signals
850    pub fn signals(&self) -> Vec<SignalDefinition> {
851        self.signals.iter().map(|e| e.value().clone()).collect()
852    }
853
854    /// Query signals matching a pattern
855    pub fn query_signals(&self, pattern: &str) -> Vec<SignalDefinition> {
856        self.signals
857            .iter()
858            .filter(|e| clasp_core::address::glob_match(pattern, e.key()))
859            .map(|e| e.value().clone())
860            .collect()
861    }
862
863    /// Get the last error received from server
864    pub fn last_error(&self) -> Option<ErrorMessage> {
865        self.last_error.read().clone()
866    }
867
868    /// Clear the last error
869    pub fn clear_error(&self) {
870        *self.last_error.write() = None;
871    }
872
873    /// Set up P2P subscriptions (internal, called after P2P manager is created)
874    #[cfg(feature = "p2p")]
875    async fn setup_p2p_subscriptions(&self, session_id: &str) -> Result<()> {
876        if let Some(ref p2p_manager) = self.p2p_manager {
877            let signal_address = format!("{}{}", P2P_SIGNAL_PREFIX, session_id);
878            let p2p_manager_signal = Arc::clone(p2p_manager);
879
880            // Subscribe to P2P signals
881            let _ = self
882                .subscribe(&signal_address, move |value, address| {
883                    let p2p = Arc::clone(&p2p_manager_signal);
884                    let address = address.to_string(); // Clone the address string
885                    tokio::spawn(async move {
886                        if let Err(e) = p2p.handle_signal(&address, &value).await {
887                            tracing::debug!("P2P signal handling error: {}", e);
888                        }
889                    });
890                })
891                .await?;
892
893            // Subscribe to P2P announce
894            let p2p_manager_announce = Arc::clone(p2p_manager);
895            let _ = self
896                .subscribe(clasp_core::P2P_ANNOUNCE, move |value, _| {
897                    p2p_manager_announce.handle_announce(&value);
898                })
899                .await?;
900        }
901        Ok(())
902    }
903
904    /// Connect to a peer via P2P (requires p2p feature)
905    #[cfg(feature = "p2p")]
906    pub async fn connect_to_peer(&self, peer_session_id: &str) -> Result<()> {
907        if let Some(ref p2p_manager) = self.p2p_manager {
908            p2p_manager.connect_to_peer(peer_session_id).await
909        } else {
910            Err(ClientError::Other(
911                "P2P not configured. Use builder.p2p_config() to enable.".to_string(),
912            ))
913        }
914    }
915
916    /// Set P2P event callback (requires p2p feature)
917    #[cfg(feature = "p2p")]
918    pub fn on_p2p_event<F>(&self, callback: F)
919    where
920        F: Fn(p2p::P2PEvent) + Send + Sync + 'static,
921    {
922        if let Some(ref p2p_manager) = self.p2p_manager {
923            p2p_manager.on_event(callback);
924        }
925    }
926
927    /// Check if peer is connected via P2P (requires p2p feature)
928    #[cfg(feature = "p2p")]
929    pub fn is_peer_connected(&self, peer_session_id: &str) -> bool {
930        self.p2p_manager
931            .as_ref()
932            .map(|p2p| p2p.is_peer_connected(peer_session_id))
933            .unwrap_or(false)
934    }
935
936    /// Send data to a peer via P2P (requires p2p feature)
937    ///
938    /// Returns `SendResult::P2P` if sent via direct P2P, `SendResult::Relay` if sent via server.
939    /// Behavior depends on the current routing mode and connection state.
940    #[cfg(feature = "p2p")]
941    pub async fn send_p2p(
942        &self,
943        peer_session_id: &str,
944        data: bytes::Bytes,
945        reliable: bool,
946    ) -> Result<p2p::SendResult> {
947        if let Some(ref p2p_manager) = self.p2p_manager {
948            p2p_manager
949                .send_to_peer(peer_session_id, data, reliable)
950                .await
951        } else {
952            Err(ClientError::Other(
953                "P2P not configured. Use builder.p2p_config() to enable.".to_string(),
954            ))
955        }
956    }
957
958    /// Set P2P routing mode (requires p2p feature)
959    ///
960    /// - `RoutingMode::PreferP2P` (default): Try P2P first, fall back to relay
961    /// - `RoutingMode::P2POnly`: Only use P2P, fail if unavailable
962    /// - `RoutingMode::ServerOnly`: Never use P2P, always relay
963    #[cfg(feature = "p2p")]
964    pub fn set_p2p_routing_mode(&self, mode: clasp_core::p2p::RoutingMode) {
965        if let Some(ref p2p_manager) = self.p2p_manager {
966            p2p_manager.set_routing_mode(mode);
967        }
968    }
969
970    /// Get current P2P routing mode (requires p2p feature)
971    #[cfg(feature = "p2p")]
972    pub fn p2p_routing_mode(&self) -> clasp_core::p2p::RoutingMode {
973        self.p2p_manager
974            .as_ref()
975            .map(|p2p| p2p.routing_mode())
976            .unwrap_or_default()
977    }
978}
979
980/// Handle incoming message
981fn handle_message(
982    msg: &Message,
983    params: &Arc<DashMap<String, Value>>,
984    subscriptions: &Arc<DashMap<u32, (String, SubscriptionCallback)>>,
985    pending_gets: &Arc<DashMap<String, oneshot::Sender<Value>>>,
986    signals: &Arc<DashMap<String, SignalDefinition>>,
987    last_error: &Arc<RwLock<Option<ErrorMessage>>>,
988) {
989    match msg {
990        Message::Set(set) => {
991            // Update cache
992            params.insert(set.address.clone(), set.value.clone());
993
994            // Notify subscribers
995            for entry in subscriptions.iter() {
996                let (pattern, callback) = entry.value();
997                if clasp_core::address::glob_match(pattern, &set.address) {
998                    callback(set.value.clone(), &set.address);
999                }
1000            }
1001        }
1002
1003        Message::Snapshot(snapshot) => {
1004            for param in &snapshot.params {
1005                params.insert(param.address.clone(), param.value.clone());
1006
1007                // Complete pending gets
1008                if let Some((_, tx)) = pending_gets.remove(&param.address) {
1009                    let _ = tx.send(param.value.clone());
1010                }
1011
1012                // Notify subscribers
1013                for entry in subscriptions.iter() {
1014                    let (pattern, callback) = entry.value();
1015                    if clasp_core::address::glob_match(pattern, &param.address) {
1016                        callback(param.value.clone(), &param.address);
1017                    }
1018                }
1019            }
1020        }
1021
1022        Message::Publish(pub_msg) => {
1023            #[cfg(feature = "p2p")]
1024            {
1025                // Check if this is a P2P signal or announce - handle before regular subscriptions
1026                if pub_msg.address.starts_with(clasp_core::P2P_SIGNAL_PREFIX) {
1027                    // P2P signals will be handled by subscription callbacks
1028                    // (they're subscribed to automatically when P2P manager is created)
1029                } else if pub_msg.address == clasp_core::P2P_ANNOUNCE {
1030                    // P2P announce will be handled by subscription callback
1031                }
1032            }
1033
1034            // Notify subscribers
1035            let value = pub_msg
1036                .value
1037                .clone()
1038                .or_else(|| pub_msg.payload.clone())
1039                .unwrap_or(Value::Null);
1040
1041            for entry in subscriptions.iter() {
1042                let (pattern, callback) = entry.value();
1043                if clasp_core::address::glob_match(pattern, &pub_msg.address) {
1044                    callback(value.clone(), &pub_msg.address);
1045                }
1046            }
1047        }
1048
1049        Message::Error(error) => {
1050            // Log the error and store it for retrieval
1051            warn!(
1052                "Server error {}: {} (address: {:?})",
1053                error.code, error.message, error.address
1054            );
1055            *last_error.write() = Some(error.clone());
1056        }
1057
1058        Message::Ack(ack) => {
1059            // Log acknowledgment (could be extended to track pending requests)
1060            debug!(
1061                "Received ACK for {:?} (revision: {:?})",
1062                ack.address, ack.revision
1063            );
1064        }
1065
1066        Message::Announce(announce) => {
1067            // Store announced signals
1068            for signal in &announce.signals {
1069                debug!("Received signal announcement: {}", signal.address);
1070                signals.insert(signal.address.clone(), signal.clone());
1071            }
1072        }
1073
1074        Message::Sync(sync) => {
1075            // Process clock sync response
1076            // Note: This handles sync messages from server with t2/t3 filled in
1077            if let (Some(t2), Some(t3)) = (sync.t2, sync.t3) {
1078                debug!("Clock sync: t1={}, t2={}, t3={}", sync.t1, t2, t3);
1079                // Clock sync is processed through ClockSync::process_sync
1080                // but we don't have access to the clock here.
1081                // For now, log it. A more complete implementation would
1082                // use a channel to send sync data back to the main client.
1083            }
1084        }
1085
1086        Message::Result(result) => {
1087            // Handle query results
1088            debug!("Received result with {} signals", result.signals.len());
1089            // Store any returned signals
1090            for signal in &result.signals {
1091                signals.insert(signal.address.clone(), signal.clone());
1092            }
1093        }
1094
1095        // Messages that are typically client-initiated, not expected from server
1096        Message::Hello(_)
1097        | Message::Welcome(_)
1098        | Message::Subscribe(_)
1099        | Message::Unsubscribe(_)
1100        | Message::Get(_)
1101        | Message::Query(_)
1102        | Message::Replay(_)
1103        | Message::FederationSync(_) => {
1104            debug!("Received unexpected client-type message: {:?}", msg);
1105        }
1106
1107        // Bundle: process contained messages recursively
1108        Message::Bundle(bundle) => {
1109            for inner_msg in &bundle.messages {
1110                handle_message(
1111                    inner_msg,
1112                    params,
1113                    subscriptions,
1114                    pending_gets,
1115                    signals,
1116                    last_error,
1117                );
1118            }
1119        }
1120
1121        // Ping/Pong for keep-alive
1122        Message::Ping => {
1123            debug!("Received PING from server");
1124            // Note: Pong response should be sent, but we don't have sender access here.
1125            // A more complete implementation would use a channel to request pong be sent.
1126        }
1127
1128        Message::Pong => {
1129            debug!("Received PONG from server");
1130        }
1131    }
1132}