1use bytes::Bytes;
4use clasp_core::{
5 codec, time::ClockSync, BundleMessage, ErrorMessage, GesturePhase, GetMessage, HelloMessage,
6 Message, PublishMessage, SetMessage, SignalDefinition, SignalType, SubscribeMessage,
7 SubscribeOptions, TimelineData, UnsubscribeMessage, Value, PROTOCOL_VERSION,
8};
9use clasp_transport::{
10 Transport, TransportEvent, TransportReceiver, TransportSender, WebSocketTransport,
11};
12use dashmap::DashMap;
13use parking_lot::RwLock;
14use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::{mpsc, oneshot, Notify};
18use tracing::{debug, error, info, warn};
19
20use crate::builder::ClaspBuilder;
21use crate::error::{ClientError, Result};
22#[cfg(feature = "p2p")]
23use crate::p2p;
24#[cfg(feature = "p2p")]
25use clasp_core::{P2PConfig, P2P_SIGNAL_PREFIX};
26
27pub type SubscriptionCallback = Box<dyn Fn(Value, &str) + Send + Sync>;
29
30pub struct Clasp {
32 url: String,
33 name: String,
34 features: Vec<String>,
35 token: Option<String>,
36 reconnect: bool,
37 reconnect_interval_ms: u64,
38
39 session_id: RwLock<Option<String>>,
41
42 connected: Arc<RwLock<bool>>,
44
45 sender: RwLock<Option<mpsc::Sender<Bytes>>>,
47
48 params: Arc<DashMap<String, Value>>,
50
51 subscriptions: Arc<DashMap<u32, (String, SubscriptionCallback)>>,
53
54 next_sub_id: AtomicU32,
56
57 clock: RwLock<ClockSync>,
59
60 pending_gets: Arc<DashMap<String, oneshot::Sender<Value>>>,
62
63 signals: Arc<DashMap<String, SignalDefinition>>,
65
66 last_error: Arc<RwLock<Option<ErrorMessage>>>,
68
69 reconnect_attempts: Arc<AtomicU32>,
71
72 max_reconnect_attempts: u32,
74
75 intentionally_closed: Arc<AtomicBool>,
77
78 reconnect_notify: Arc<Notify>,
80
81 #[cfg(feature = "p2p")]
83 p2p_config: Option<P2PConfig>,
84
85 #[cfg(feature = "p2p")]
87 p2p_manager: Option<Arc<p2p::P2PManager>>,
88}
89
90impl Clasp {
91 pub fn new(
93 url: &str,
94 name: String,
95 features: Vec<String>,
96 token: Option<String>,
97 reconnect: bool,
98 reconnect_interval_ms: u64,
99 ) -> Self {
100 Self {
101 url: url.to_string(),
102 name,
103 features,
104 token,
105 reconnect,
106 reconnect_interval_ms,
107 session_id: RwLock::new(None),
108 connected: Arc::new(RwLock::new(false)),
109 sender: RwLock::new(None),
110 params: Arc::new(DashMap::new()),
111 subscriptions: Arc::new(DashMap::new()),
112 next_sub_id: AtomicU32::new(1),
113 clock: RwLock::new(ClockSync::new()),
114 pending_gets: Arc::new(DashMap::new()),
115 signals: Arc::new(DashMap::new()),
116 last_error: Arc::new(RwLock::new(None)),
117 reconnect_attempts: Arc::new(AtomicU32::new(0)),
118 max_reconnect_attempts: 10,
119 intentionally_closed: Arc::new(AtomicBool::new(false)),
120 reconnect_notify: Arc::new(Notify::new()),
121 #[cfg(feature = "p2p")]
122 p2p_config: None,
123 #[cfg(feature = "p2p")]
124 p2p_manager: None,
125 }
126 }
127
128 #[cfg(feature = "p2p")]
130 pub(crate) fn set_p2p_config(&mut self, config: P2PConfig) {
131 self.p2p_config = Some(config);
132 }
133
134 pub fn builder(url: &str) -> ClaspBuilder {
136 ClaspBuilder::new(url)
137 }
138
139 pub async fn connect_to(url: &str) -> Result<Self> {
141 ClaspBuilder::new(url).connect().await
142 }
143
144 pub(crate) async fn do_connect(&mut self) -> Result<()> {
146 if *self.connected.read() {
147 return Err(ClientError::AlreadyConnected);
148 }
149
150 info!("Connecting to {}", self.url);
151
152 let (sender, mut receiver) = <WebSocketTransport as Transport>::connect(&self.url).await?;
154
155 let (tx, mut rx) = mpsc::channel::<Bytes>(100);
157 *self.sender.write() = Some(tx);
158
159 let connected = self.connected.clone();
160
161 let sender = Arc::new(sender);
163 let sender_clone = sender.clone();
164 tokio::spawn(async move {
165 while let Some(data) = rx.recv().await {
166 if let Err(e) = sender_clone.send(data).await {
167 error!("Send error: {}", e);
168 break;
169 }
170 }
171 });
172
173 let hello = Message::Hello(HelloMessage {
175 version: PROTOCOL_VERSION,
176 name: self.name.clone(),
177 features: self.features.clone(),
178 capabilities: None,
179 token: self.token.clone(),
180 });
181
182 self.send_message(&hello).await?;
183
184 loop {
186 match receiver.recv().await {
187 Some(TransportEvent::Data(data)) => {
188 match codec::decode(&data) {
189 Ok((Message::Welcome(welcome), _)) => {
190 *self.session_id.write() = Some(welcome.session.clone());
191 *connected.write() = true;
192
193 self.clock.write().process_sync(
195 clasp_core::time::now(),
196 welcome.time,
197 welcome.time,
198 clasp_core::time::now(),
199 );
200
201 #[cfg(feature = "p2p")]
203 {
204 if let Some(p2p_config) = self.p2p_config.take() {
205 let session_id = welcome.session.clone();
206 let (signal_tx, mut signal_rx) = mpsc::channel(100);
208 let p2p_manager =
209 Arc::new(p2p::P2PManager::new(p2p_config, signal_tx));
210 p2p_manager.set_session_id(session_id.clone());
211
212 let sender = self.sender.read().clone();
214 let p2p_manager_for_task = Arc::clone(&p2p_manager);
215 if let Some(sender_tx) = sender {
216 tokio::spawn(async move {
217 while let Some(msg) = signal_rx.recv().await {
218 if let Some(encoded) = codec::encode(&msg).ok() {
219 if let Err(e) =
220 sender_tx.send(encoded.into()).await
221 {
222 tracing::warn!(
223 "Failed to send P2P signal: {}",
224 e
225 );
226 break;
227 }
228 }
229 }
230 });
231 }
232
233 self.p2p_manager = Some(Arc::clone(&p2p_manager));
235
236 let _ = p2p_manager.announce().await;
238
239 let _ = self.setup_p2p_subscriptions(&session_id).await;
241 }
242 }
243
244 info!("Connected, session: {}", welcome.session);
245 break;
246 }
247 Ok((msg, _)) => {
248 debug!("Received during handshake: {:?}", msg);
249 }
250 Err(e) => {
251 warn!("Decode error: {}", e);
252 }
253 }
254 }
255 Some(TransportEvent::Error(e)) => {
256 return Err(ClientError::ConnectionFailed(e));
257 }
258 Some(TransportEvent::Disconnected { reason }) => {
259 return Err(ClientError::ConnectionFailed(
260 reason.unwrap_or_else(|| "Disconnected".to_string()),
261 ));
262 }
263 None => {
264 return Err(ClientError::ConnectionFailed(
265 "Connection closed".to_string(),
266 ));
267 }
268 _ => {}
269 }
270 }
271
272 self.reconnect_attempts.store(0, Ordering::SeqCst);
274 self.intentionally_closed.store(false, Ordering::SeqCst);
275
276 let params = Arc::clone(&self.params);
278 let subscriptions = Arc::clone(&self.subscriptions);
279 let pending_gets = Arc::clone(&self.pending_gets);
280 let signals = Arc::clone(&self.signals);
281 let last_error = Arc::clone(&self.last_error);
282 let connected_clone = Arc::clone(&self.connected);
283 let reconnect_notify = Arc::clone(&self.reconnect_notify);
284 let intentionally_closed = Arc::clone(&self.intentionally_closed);
285 let reconnect_enabled = self.reconnect;
286 #[cfg(feature = "p2p")]
287 let p2p_manager = self.p2p_manager.clone();
288
289 tokio::spawn(async move {
290 while let Some(event) = receiver.recv().await {
291 match event {
292 TransportEvent::Data(data) => {
293 if let Ok((msg, _)) = codec::decode(&data) {
294 #[cfg(feature = "p2p")]
295 {
296 }
299 handle_message(
300 &msg,
301 ¶ms,
302 &subscriptions,
303 &pending_gets,
304 &signals,
305 &last_error,
306 );
307 }
308 }
309 TransportEvent::Disconnected { reason } => {
310 info!("Disconnected: {:?}", reason);
311 *connected_clone.write() = false;
312
313 if reconnect_enabled && !intentionally_closed.load(Ordering::SeqCst) {
315 reconnect_notify.notify_one();
316 }
317 break;
318 }
319 TransportEvent::Error(e) => {
320 error!("Error: {}", e);
321 }
322 _ => {}
323 }
324 }
325 });
326
327 Ok(())
328 }
329
330 pub fn start_reconnect_loop(self: &Arc<Self>) {
332 if !self.reconnect {
333 return;
334 }
335
336 let client = Arc::clone(self);
337 tokio::spawn(async move {
338 loop {
339 client.reconnect_notify.notified().await;
341
342 if client.intentionally_closed.load(Ordering::SeqCst) {
343 break;
344 }
345
346 loop {
348 let attempts = client.reconnect_attempts.fetch_add(1, Ordering::SeqCst);
349
350 if client.max_reconnect_attempts > 0
351 && attempts >= client.max_reconnect_attempts
352 {
353 error!(
354 "Max reconnect attempts ({}) reached",
355 client.max_reconnect_attempts
356 );
357 break;
358 }
359
360 let base_ms = client.reconnect_interval_ms;
362 let delay_ms =
363 (base_ms as f64 * 1.5_f64.powi(attempts as i32)).min(30000.0) as u64;
364
365 info!("Reconnect attempt {} in {}ms", attempts + 1, delay_ms);
366 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
367
368 if client.intentionally_closed.load(Ordering::SeqCst) {
369 break;
370 }
371
372 match client.try_reconnect().await {
375 Ok(()) => {
376 info!("Reconnected successfully");
377 client.reconnect_attempts.store(0, Ordering::SeqCst);
378
379 if let Err(e) = client.resubscribe_all().await {
381 warn!("Failed to resubscribe: {}", e);
382 }
383 break;
384 }
385 Err(e) => {
386 warn!("Reconnect failed: {}", e);
387 }
388 }
389 }
390 }
391 });
392 }
393
394 async fn try_reconnect(&self) -> Result<()> {
396 info!("Attempting to reconnect to {}", self.url);
397
398 let (sender, mut receiver) = <WebSocketTransport as Transport>::connect(&self.url).await?;
400
401 let (tx, mut rx) = mpsc::channel::<Bytes>(100);
403 *self.sender.write() = Some(tx);
404
405 let sender = Arc::new(sender);
407 let sender_clone = sender.clone();
408 tokio::spawn(async move {
409 while let Some(data) = rx.recv().await {
410 if let Err(e) = sender_clone.send(data).await {
411 error!("Send error: {}", e);
412 break;
413 }
414 }
415 });
416
417 let hello = Message::Hello(HelloMessage {
419 version: PROTOCOL_VERSION,
420 name: self.name.clone(),
421 features: self.features.clone(),
422 capabilities: None,
423 token: self.token.clone(),
424 });
425
426 self.send_message(&hello).await?;
427
428 let welcome_timeout = Duration::from_secs(10);
430 let deadline = tokio::time::Instant::now() + welcome_timeout;
431
432 loop {
433 match tokio::time::timeout_at(deadline, receiver.recv()).await {
434 Ok(Some(TransportEvent::Data(data))) => match codec::decode(&data) {
435 Ok((Message::Welcome(welcome), _)) => {
436 *self.session_id.write() = Some(welcome.session.clone());
437 *self.connected.write() = true;
438
439 self.clock.write().process_sync(
440 clasp_core::time::now(),
441 welcome.time,
442 welcome.time,
443 clasp_core::time::now(),
444 );
445
446 info!("Reconnected, session: {}", welcome.session);
447 break;
448 }
449 Ok((msg, _)) => {
450 debug!("Received during reconnect handshake: {:?}", msg);
451 }
452 Err(e) => {
453 warn!("Decode error during reconnect: {}", e);
454 }
455 },
456 Ok(Some(TransportEvent::Error(e))) => {
457 return Err(ClientError::ConnectionFailed(e));
458 }
459 Ok(Some(TransportEvent::Disconnected { reason })) => {
460 return Err(ClientError::ConnectionFailed(
461 reason.unwrap_or_else(|| "Disconnected".to_string()),
462 ));
463 }
464 Ok(None) => {
465 return Err(ClientError::ConnectionFailed(
466 "Connection closed".to_string(),
467 ));
468 }
469 Err(_) => {
470 return Err(ClientError::Timeout);
471 }
472 _ => {}
473 }
474 }
475
476 let params = Arc::clone(&self.params);
478 let subscriptions = Arc::clone(&self.subscriptions);
479 let pending_gets = Arc::clone(&self.pending_gets);
480 let signals = Arc::clone(&self.signals);
481 let last_error = Arc::clone(&self.last_error);
482 let connected_clone = Arc::clone(&self.connected);
483 let reconnect_notify = Arc::clone(&self.reconnect_notify);
484 let intentionally_closed = Arc::clone(&self.intentionally_closed);
485 let reconnect_enabled = self.reconnect;
486
487 tokio::spawn(async move {
488 while let Some(event) = receiver.recv().await {
489 match event {
490 TransportEvent::Data(data) => {
491 if let Ok((msg, _)) = codec::decode(&data) {
492 handle_message(
493 &msg,
494 ¶ms,
495 &subscriptions,
496 &pending_gets,
497 &signals,
498 &last_error,
499 );
500 }
501 }
502 TransportEvent::Disconnected { reason } => {
503 info!("Disconnected: {:?}", reason);
504 *connected_clone.write() = false;
505
506 if reconnect_enabled && !intentionally_closed.load(Ordering::SeqCst) {
507 reconnect_notify.notify_one();
508 }
509 break;
510 }
511 TransportEvent::Error(e) => {
512 error!("Error: {}", e);
513 }
514 _ => {}
515 }
516 }
517 });
518
519 Ok(())
520 }
521
522 async fn resubscribe_all(&self) -> Result<()> {
524 let subs: Vec<(u32, String)> = self
526 .subscriptions
527 .iter()
528 .map(|entry| (*entry.key(), entry.value().0.clone()))
529 .collect();
530
531 for (id, pattern) in subs {
532 let msg = Message::Subscribe(SubscribeMessage {
533 id,
534 pattern: pattern.clone(),
535 types: vec![],
536 options: Some(SubscribeOptions::default()),
537 });
538
539 self.send_message(&msg).await?;
540 debug!("Resubscribed to {} (id: {})", pattern, id);
541 }
542
543 Ok(())
544 }
545
546 pub fn is_connected(&self) -> bool {
548 *self.connected.read()
549 }
550
551 pub fn session_id(&self) -> Option<String> {
553 self.session_id.read().clone()
554 }
555
556 pub fn time(&self) -> u64 {
558 self.clock.read().server_time()
559 }
560
561 async fn send_message(&self, message: &Message) -> Result<()> {
563 let data = codec::encode(message)?;
564 self.send_raw(data).await
565 }
566
567 async fn send_raw(&self, data: Bytes) -> Result<()> {
569 let tx = {
571 let sender = self.sender.read();
572 sender.as_ref().cloned()
573 };
574
575 if let Some(tx) = tx {
576 tx.send(data)
577 .await
578 .map_err(|e| ClientError::SendFailed(e.to_string()))?;
579 Ok(())
580 } else {
581 Err(ClientError::NotConnected)
582 }
583 }
584
585 pub async fn subscribe<F>(&self, pattern: &str, callback: F) -> Result<u32>
587 where
588 F: Fn(Value, &str) + Send + Sync + 'static,
589 {
590 let id = self.next_sub_id.fetch_add(1, Ordering::SeqCst);
591
592 self.subscriptions
594 .insert(id, (pattern.to_string(), Box::new(callback)));
595
596 let msg = Message::Subscribe(SubscribeMessage {
598 id,
599 pattern: pattern.to_string(),
600 types: vec![],
601 options: Some(SubscribeOptions::default()),
602 });
603
604 self.send_message(&msg).await?;
605
606 debug!("Subscribed to {} (id: {})", pattern, id);
607 Ok(id)
608 }
609
610 pub async fn on<F>(&self, pattern: &str, callback: F) -> Result<u32>
612 where
613 F: Fn(Value, &str) + Send + Sync + 'static,
614 {
615 self.subscribe(pattern, callback).await
616 }
617
618 pub async fn unsubscribe(&self, id: u32) -> Result<()> {
620 self.subscriptions.remove(&id);
621
622 let msg = Message::Unsubscribe(UnsubscribeMessage { id });
623 self.send_message(&msg).await?;
624
625 Ok(())
626 }
627
628 pub async fn set(&self, address: &str, value: impl Into<Value>) -> Result<()> {
630 let msg = Message::Set(SetMessage {
631 address: address.to_string(),
632 value: value.into(),
633 revision: None,
634 lock: false,
635 unlock: false,
636 });
637
638 self.send_message(&msg).await
639 }
640
641 pub async fn set_locked(&self, address: &str, value: impl Into<Value>) -> Result<()> {
643 let msg = Message::Set(SetMessage {
644 address: address.to_string(),
645 value: value.into(),
646 revision: None,
647 lock: true,
648 unlock: false,
649 });
650
651 self.send_message(&msg).await
652 }
653
654 pub async fn set_unlocked(&self, address: &str, value: impl Into<Value>) -> Result<()> {
656 let msg = Message::Set(SetMessage {
657 address: address.to_string(),
658 value: value.into(),
659 revision: None,
660 lock: false,
661 unlock: true,
662 });
663
664 self.send_message(&msg).await
665 }
666
667 pub async fn get(&self, address: &str) -> Result<Value> {
669 if let Some(value) = self.params.get(address) {
671 return Ok(value.clone());
672 }
673
674 let (tx, rx) = oneshot::channel();
676 let address_key = address.to_string();
677 self.pending_gets.insert(address_key.clone(), tx);
678
679 let msg = Message::Get(GetMessage {
680 address: address.to_string(),
681 });
682 self.send_message(&msg).await?;
683
684 match tokio::time::timeout(std::time::Duration::from_secs(5), rx).await {
686 Ok(Ok(value)) => Ok(value),
687 Ok(Err(_)) => {
688 self.pending_gets.remove(&address_key);
690 Err(ClientError::Other("Get cancelled".to_string()))
691 }
692 Err(_) => {
693 self.pending_gets.remove(&address_key);
695 Err(ClientError::Timeout)
696 }
697 }
698 }
699
700 pub async fn emit(&self, address: &str, payload: impl Into<Value>) -> Result<()> {
702 let msg = Message::Publish(PublishMessage {
703 address: address.to_string(),
704 signal: Some(SignalType::Event),
705 value: None,
706 payload: Some(payload.into()),
707 samples: None,
708 rate: None,
709 id: None,
710 phase: None,
711 timestamp: Some(self.time()),
712 timeline: None,
713 });
714
715 self.send_message(&msg).await
716 }
717
718 pub async fn stream(&self, address: &str, value: impl Into<Value>) -> Result<()> {
720 let msg = Message::Publish(PublishMessage {
721 address: address.to_string(),
722 signal: Some(SignalType::Stream),
723 value: Some(value.into()),
724 payload: None,
725 samples: None,
726 rate: None,
727 id: None,
728 phase: None,
729 timestamp: Some(self.time()),
730 timeline: None,
731 });
732
733 self.send_message(&msg).await
734 }
735
736 pub async fn gesture(
757 &self,
758 address: &str,
759 id: u32,
760 phase: GesturePhase,
761 payload: impl Into<Value>,
762 ) -> Result<()> {
763 let msg = Message::Publish(PublishMessage {
764 address: address.to_string(),
765 signal: Some(SignalType::Gesture),
766 value: None,
767 payload: Some(payload.into()),
768 samples: None,
769 rate: None,
770 id: Some(id),
771 phase: Some(phase),
772 timestamp: Some(self.time()),
773 timeline: None,
774 });
775
776 self.send_message(&msg).await
777 }
778
779 pub async fn timeline(&self, address: &str, timeline_data: TimelineData) -> Result<()> {
800 let msg = Message::Publish(PublishMessage {
801 address: address.to_string(),
802 signal: Some(SignalType::Timeline),
803 value: None,
804 payload: None,
805 samples: None,
806 rate: None,
807 id: None,
808 phase: None,
809 timestamp: Some(self.time()),
810 timeline: Some(timeline_data),
811 });
812
813 self.send_message(&msg).await
814 }
815
816 pub async fn bundle(&self, messages: Vec<Message>) -> Result<()> {
818 let msg = Message::Bundle(BundleMessage {
819 timestamp: None,
820 messages,
821 });
822
823 self.send_message(&msg).await
824 }
825
826 pub async fn bundle_at(&self, messages: Vec<Message>, time: u64) -> Result<()> {
828 let msg = Message::Bundle(BundleMessage {
829 timestamp: Some(time),
830 messages,
831 });
832
833 self.send_message(&msg).await
834 }
835
836 pub fn cached(&self, address: &str) -> Option<Value> {
838 self.params.get(address).map(|v| v.clone())
839 }
840
841 pub async fn close(&self) {
844 self.intentionally_closed.store(true, Ordering::SeqCst);
845 *self.connected.write() = false;
846 *self.sender.write() = None;
847 }
848
849 pub fn signals(&self) -> Vec<SignalDefinition> {
851 self.signals.iter().map(|e| e.value().clone()).collect()
852 }
853
854 pub fn query_signals(&self, pattern: &str) -> Vec<SignalDefinition> {
856 self.signals
857 .iter()
858 .filter(|e| clasp_core::address::glob_match(pattern, e.key()))
859 .map(|e| e.value().clone())
860 .collect()
861 }
862
863 pub fn last_error(&self) -> Option<ErrorMessage> {
865 self.last_error.read().clone()
866 }
867
868 pub fn clear_error(&self) {
870 *self.last_error.write() = None;
871 }
872
873 #[cfg(feature = "p2p")]
875 async fn setup_p2p_subscriptions(&self, session_id: &str) -> Result<()> {
876 if let Some(ref p2p_manager) = self.p2p_manager {
877 let signal_address = format!("{}{}", P2P_SIGNAL_PREFIX, session_id);
878 let p2p_manager_signal = Arc::clone(p2p_manager);
879
880 let _ = self
882 .subscribe(&signal_address, move |value, address| {
883 let p2p = Arc::clone(&p2p_manager_signal);
884 let address = address.to_string(); tokio::spawn(async move {
886 if let Err(e) = p2p.handle_signal(&address, &value).await {
887 tracing::debug!("P2P signal handling error: {}", e);
888 }
889 });
890 })
891 .await?;
892
893 let p2p_manager_announce = Arc::clone(p2p_manager);
895 let _ = self
896 .subscribe(clasp_core::P2P_ANNOUNCE, move |value, _| {
897 p2p_manager_announce.handle_announce(&value);
898 })
899 .await?;
900 }
901 Ok(())
902 }
903
904 #[cfg(feature = "p2p")]
906 pub async fn connect_to_peer(&self, peer_session_id: &str) -> Result<()> {
907 if let Some(ref p2p_manager) = self.p2p_manager {
908 p2p_manager.connect_to_peer(peer_session_id).await
909 } else {
910 Err(ClientError::Other(
911 "P2P not configured. Use builder.p2p_config() to enable.".to_string(),
912 ))
913 }
914 }
915
916 #[cfg(feature = "p2p")]
918 pub fn on_p2p_event<F>(&self, callback: F)
919 where
920 F: Fn(p2p::P2PEvent) + Send + Sync + 'static,
921 {
922 if let Some(ref p2p_manager) = self.p2p_manager {
923 p2p_manager.on_event(callback);
924 }
925 }
926
927 #[cfg(feature = "p2p")]
929 pub fn is_peer_connected(&self, peer_session_id: &str) -> bool {
930 self.p2p_manager
931 .as_ref()
932 .map(|p2p| p2p.is_peer_connected(peer_session_id))
933 .unwrap_or(false)
934 }
935
936 #[cfg(feature = "p2p")]
941 pub async fn send_p2p(
942 &self,
943 peer_session_id: &str,
944 data: bytes::Bytes,
945 reliable: bool,
946 ) -> Result<p2p::SendResult> {
947 if let Some(ref p2p_manager) = self.p2p_manager {
948 p2p_manager
949 .send_to_peer(peer_session_id, data, reliable)
950 .await
951 } else {
952 Err(ClientError::Other(
953 "P2P not configured. Use builder.p2p_config() to enable.".to_string(),
954 ))
955 }
956 }
957
958 #[cfg(feature = "p2p")]
964 pub fn set_p2p_routing_mode(&self, mode: clasp_core::p2p::RoutingMode) {
965 if let Some(ref p2p_manager) = self.p2p_manager {
966 p2p_manager.set_routing_mode(mode);
967 }
968 }
969
970 #[cfg(feature = "p2p")]
972 pub fn p2p_routing_mode(&self) -> clasp_core::p2p::RoutingMode {
973 self.p2p_manager
974 .as_ref()
975 .map(|p2p| p2p.routing_mode())
976 .unwrap_or_default()
977 }
978}
979
980fn handle_message(
982 msg: &Message,
983 params: &Arc<DashMap<String, Value>>,
984 subscriptions: &Arc<DashMap<u32, (String, SubscriptionCallback)>>,
985 pending_gets: &Arc<DashMap<String, oneshot::Sender<Value>>>,
986 signals: &Arc<DashMap<String, SignalDefinition>>,
987 last_error: &Arc<RwLock<Option<ErrorMessage>>>,
988) {
989 match msg {
990 Message::Set(set) => {
991 params.insert(set.address.clone(), set.value.clone());
993
994 for entry in subscriptions.iter() {
996 let (pattern, callback) = entry.value();
997 if clasp_core::address::glob_match(pattern, &set.address) {
998 callback(set.value.clone(), &set.address);
999 }
1000 }
1001 }
1002
1003 Message::Snapshot(snapshot) => {
1004 for param in &snapshot.params {
1005 params.insert(param.address.clone(), param.value.clone());
1006
1007 if let Some((_, tx)) = pending_gets.remove(¶m.address) {
1009 let _ = tx.send(param.value.clone());
1010 }
1011
1012 for entry in subscriptions.iter() {
1014 let (pattern, callback) = entry.value();
1015 if clasp_core::address::glob_match(pattern, ¶m.address) {
1016 callback(param.value.clone(), ¶m.address);
1017 }
1018 }
1019 }
1020 }
1021
1022 Message::Publish(pub_msg) => {
1023 #[cfg(feature = "p2p")]
1024 {
1025 if pub_msg.address.starts_with(clasp_core::P2P_SIGNAL_PREFIX) {
1027 } else if pub_msg.address == clasp_core::P2P_ANNOUNCE {
1030 }
1032 }
1033
1034 let value = pub_msg
1036 .value
1037 .clone()
1038 .or_else(|| pub_msg.payload.clone())
1039 .unwrap_or(Value::Null);
1040
1041 for entry in subscriptions.iter() {
1042 let (pattern, callback) = entry.value();
1043 if clasp_core::address::glob_match(pattern, &pub_msg.address) {
1044 callback(value.clone(), &pub_msg.address);
1045 }
1046 }
1047 }
1048
1049 Message::Error(error) => {
1050 warn!(
1052 "Server error {}: {} (address: {:?})",
1053 error.code, error.message, error.address
1054 );
1055 *last_error.write() = Some(error.clone());
1056 }
1057
1058 Message::Ack(ack) => {
1059 debug!(
1061 "Received ACK for {:?} (revision: {:?})",
1062 ack.address, ack.revision
1063 );
1064 }
1065
1066 Message::Announce(announce) => {
1067 for signal in &announce.signals {
1069 debug!("Received signal announcement: {}", signal.address);
1070 signals.insert(signal.address.clone(), signal.clone());
1071 }
1072 }
1073
1074 Message::Sync(sync) => {
1075 if let (Some(t2), Some(t3)) = (sync.t2, sync.t3) {
1078 debug!("Clock sync: t1={}, t2={}, t3={}", sync.t1, t2, t3);
1079 }
1084 }
1085
1086 Message::Result(result) => {
1087 debug!("Received result with {} signals", result.signals.len());
1089 for signal in &result.signals {
1091 signals.insert(signal.address.clone(), signal.clone());
1092 }
1093 }
1094
1095 Message::Hello(_)
1097 | Message::Welcome(_)
1098 | Message::Subscribe(_)
1099 | Message::Unsubscribe(_)
1100 | Message::Get(_)
1101 | Message::Query(_)
1102 | Message::Replay(_)
1103 | Message::FederationSync(_) => {
1104 debug!("Received unexpected client-type message: {:?}", msg);
1105 }
1106
1107 Message::Bundle(bundle) => {
1109 for inner_msg in &bundle.messages {
1110 handle_message(
1111 inner_msg,
1112 params,
1113 subscriptions,
1114 pending_gets,
1115 signals,
1116 last_error,
1117 );
1118 }
1119 }
1120
1121 Message::Ping => {
1123 debug!("Received PING from server");
1124 }
1127
1128 Message::Pong => {
1129 debug!("Received PONG from server");
1130 }
1131 }
1132}