1use bytes::Bytes;
4use clasp_core::{
5 codec, time::ClockSync, BundleMessage, ErrorMessage, GesturePhase, GetMessage, HelloMessage,
6 Message, PublishMessage, SetMessage, SignalDefinition, SignalType, SubscribeMessage,
7 SubscribeOptions, TimelineData, UnsubscribeMessage, Value, PROTOCOL_VERSION,
8};
9use clasp_transport::{
10 Transport, TransportEvent, TransportReceiver, TransportSender, WebSocketTransport,
11};
12use dashmap::DashMap;
13use parking_lot::RwLock;
14use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::{mpsc, oneshot, Notify};
18use tracing::{debug, error, info, warn};
19
20use crate::builder::ClaspBuilder;
21use crate::error::{ClientError, Result};
22#[cfg(feature = "p2p")]
23use crate::p2p;
24#[cfg(feature = "p2p")]
25use clasp_core::{P2PConfig, P2P_SIGNAL_PREFIX};
26
27pub type SubscriptionCallback = Box<dyn Fn(Value, &str) + Send + Sync>;
29
30pub struct Clasp {
32 url: String,
33 name: String,
34 features: Vec<String>,
35 token: Option<String>,
36 reconnect: bool,
37 reconnect_interval_ms: u64,
38
39 session_id: RwLock<Option<String>>,
41
42 connected: Arc<RwLock<bool>>,
44
45 sender: RwLock<Option<mpsc::Sender<Bytes>>>,
47
48 params: Arc<DashMap<String, Value>>,
50
51 subscriptions: Arc<DashMap<u32, (String, SubscriptionCallback)>>,
53
54 next_sub_id: AtomicU32,
56
57 clock: RwLock<ClockSync>,
59
60 pending_gets: Arc<DashMap<String, oneshot::Sender<Value>>>,
62
63 signals: Arc<DashMap<String, SignalDefinition>>,
65
66 last_error: Arc<RwLock<Option<ErrorMessage>>>,
68
69 reconnect_attempts: Arc<AtomicU32>,
71
72 max_reconnect_attempts: u32,
74
75 intentionally_closed: Arc<AtomicBool>,
77
78 reconnect_notify: Arc<Notify>,
80
81 #[cfg(feature = "p2p")]
83 p2p_config: Option<P2PConfig>,
84
85 #[cfg(feature = "p2p")]
87 p2p_manager: Option<Arc<p2p::P2PManager>>,
88}
89
90impl Clasp {
91 pub fn new(
93 url: &str,
94 name: String,
95 features: Vec<String>,
96 token: Option<String>,
97 reconnect: bool,
98 reconnect_interval_ms: u64,
99 ) -> Self {
100 Self {
101 url: url.to_string(),
102 name,
103 features,
104 token,
105 reconnect,
106 reconnect_interval_ms,
107 session_id: RwLock::new(None),
108 connected: Arc::new(RwLock::new(false)),
109 sender: RwLock::new(None),
110 params: Arc::new(DashMap::new()),
111 subscriptions: Arc::new(DashMap::new()),
112 next_sub_id: AtomicU32::new(1),
113 clock: RwLock::new(ClockSync::new()),
114 pending_gets: Arc::new(DashMap::new()),
115 signals: Arc::new(DashMap::new()),
116 last_error: Arc::new(RwLock::new(None)),
117 reconnect_attempts: Arc::new(AtomicU32::new(0)),
118 max_reconnect_attempts: 10,
119 intentionally_closed: Arc::new(AtomicBool::new(false)),
120 reconnect_notify: Arc::new(Notify::new()),
121 #[cfg(feature = "p2p")]
122 p2p_config: None,
123 #[cfg(feature = "p2p")]
124 p2p_manager: None,
125 }
126 }
127
128 #[cfg(feature = "p2p")]
130 pub(crate) fn set_p2p_config(&mut self, config: P2PConfig) {
131 self.p2p_config = Some(config);
132 }
133
134 pub fn builder(url: &str) -> ClaspBuilder {
136 ClaspBuilder::new(url)
137 }
138
139 pub async fn connect_to(url: &str) -> Result<Self> {
141 ClaspBuilder::new(url).connect().await
142 }
143
144 pub(crate) async fn do_connect(&mut self) -> Result<()> {
146 if *self.connected.read() {
147 return Err(ClientError::AlreadyConnected);
148 }
149
150 info!("Connecting to {}", self.url);
151
152 let (sender, mut receiver) = <WebSocketTransport as Transport>::connect(&self.url).await?;
154
155 let (tx, mut rx) = mpsc::channel::<Bytes>(100);
157 *self.sender.write() = Some(tx);
158
159 let connected = self.connected.clone();
160
161 let sender = Arc::new(sender);
163 let sender_clone = sender.clone();
164 tokio::spawn(async move {
165 while let Some(data) = rx.recv().await {
166 if let Err(e) = sender_clone.send(data).await {
167 error!("Send error: {}", e);
168 break;
169 }
170 }
171 });
172
173 let hello = Message::Hello(HelloMessage {
175 version: PROTOCOL_VERSION,
176 name: self.name.clone(),
177 features: self.features.clone(),
178 capabilities: None,
179 token: self.token.clone(),
180 });
181
182 self.send_message(&hello).await?;
183
184 loop {
186 match receiver.recv().await {
187 Some(TransportEvent::Data(data)) => {
188 match codec::decode(&data) {
189 Ok((Message::Welcome(welcome), _)) => {
190 *self.session_id.write() = Some(welcome.session.clone());
191 *connected.write() = true;
192
193 self.clock.write().process_sync(
195 clasp_core::time::now(),
196 welcome.time,
197 welcome.time,
198 clasp_core::time::now(),
199 );
200
201 #[cfg(feature = "p2p")]
203 {
204 if let Some(p2p_config) = self.p2p_config.take() {
205 let session_id = welcome.session.clone();
206 let (signal_tx, mut signal_rx) = mpsc::channel(100);
208 let p2p_manager =
209 Arc::new(p2p::P2PManager::new(p2p_config, signal_tx));
210 p2p_manager.set_session_id(session_id.clone());
211
212 let sender = self.sender.read().clone();
214 let p2p_manager_for_task = Arc::clone(&p2p_manager);
215 if let Some(sender_tx) = sender {
216 tokio::spawn(async move {
217 while let Some(msg) = signal_rx.recv().await {
218 if let Some(encoded) = codec::encode(&msg).ok() {
219 if let Err(e) =
220 sender_tx.send(encoded.into()).await
221 {
222 tracing::warn!(
223 "Failed to send P2P signal: {}",
224 e
225 );
226 break;
227 }
228 }
229 }
230 });
231 }
232
233 self.p2p_manager = Some(Arc::clone(&p2p_manager));
235
236 let _ = p2p_manager.announce().await;
238
239 let _ = self.setup_p2p_subscriptions(&session_id).await;
241 }
242 }
243
244 info!("Connected, session: {}", welcome.session);
245 break;
246 }
247 Ok((msg, _)) => {
248 debug!("Received during handshake: {:?}", msg);
249 }
250 Err(e) => {
251 warn!("Decode error: {}", e);
252 }
253 }
254 }
255 Some(TransportEvent::Error(e)) => {
256 return Err(ClientError::ConnectionFailed(e));
257 }
258 Some(TransportEvent::Disconnected { reason }) => {
259 return Err(ClientError::ConnectionFailed(
260 reason.unwrap_or_else(|| "Disconnected".to_string()),
261 ));
262 }
263 None => {
264 return Err(ClientError::ConnectionFailed(
265 "Connection closed".to_string(),
266 ));
267 }
268 _ => {}
269 }
270 }
271
272 self.reconnect_attempts.store(0, Ordering::SeqCst);
274 self.intentionally_closed.store(false, Ordering::SeqCst);
275
276 let params = Arc::clone(&self.params);
278 let subscriptions = Arc::clone(&self.subscriptions);
279 let pending_gets = Arc::clone(&self.pending_gets);
280 let signals = Arc::clone(&self.signals);
281 let last_error = Arc::clone(&self.last_error);
282 let connected_clone = Arc::clone(&self.connected);
283 let reconnect_notify = Arc::clone(&self.reconnect_notify);
284 let intentionally_closed = Arc::clone(&self.intentionally_closed);
285 let reconnect_enabled = self.reconnect;
286 #[cfg(feature = "p2p")]
287 let p2p_manager = self.p2p_manager.clone();
288
289 tokio::spawn(async move {
290 while let Some(event) = receiver.recv().await {
291 match event {
292 TransportEvent::Data(data) => {
293 if let Ok((msg, _)) = codec::decode(&data) {
294 #[cfg(feature = "p2p")]
295 {
296 }
299 handle_message(
300 &msg,
301 ¶ms,
302 &subscriptions,
303 &pending_gets,
304 &signals,
305 &last_error,
306 );
307 }
308 }
309 TransportEvent::Disconnected { reason } => {
310 info!("Disconnected: {:?}", reason);
311 *connected_clone.write() = false;
312
313 if reconnect_enabled && !intentionally_closed.load(Ordering::SeqCst) {
315 reconnect_notify.notify_one();
316 }
317 break;
318 }
319 TransportEvent::Error(e) => {
320 error!("Error: {}", e);
321 }
322 _ => {}
323 }
324 }
325 });
326
327 Ok(())
328 }
329
330 pub fn start_reconnect_loop(self: &Arc<Self>) {
332 if !self.reconnect {
333 return;
334 }
335
336 let client = Arc::clone(self);
337 tokio::spawn(async move {
338 loop {
339 client.reconnect_notify.notified().await;
341
342 if client.intentionally_closed.load(Ordering::SeqCst) {
343 break;
344 }
345
346 loop {
348 let attempts = client.reconnect_attempts.fetch_add(1, Ordering::SeqCst);
349
350 if client.max_reconnect_attempts > 0
351 && attempts >= client.max_reconnect_attempts
352 {
353 error!(
354 "Max reconnect attempts ({}) reached",
355 client.max_reconnect_attempts
356 );
357 break;
358 }
359
360 let base_ms = client.reconnect_interval_ms;
362 let delay_ms =
363 (base_ms as f64 * 1.5_f64.powi(attempts as i32)).min(30000.0) as u64;
364
365 info!("Reconnect attempt {} in {}ms", attempts + 1, delay_ms);
366 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
367
368 if client.intentionally_closed.load(Ordering::SeqCst) {
369 break;
370 }
371
372 match client.try_reconnect().await {
375 Ok(()) => {
376 info!("Reconnected successfully");
377 client.reconnect_attempts.store(0, Ordering::SeqCst);
378
379 if let Err(e) = client.resubscribe_all().await {
381 warn!("Failed to resubscribe: {}", e);
382 }
383 break;
384 }
385 Err(e) => {
386 warn!("Reconnect failed: {}", e);
387 }
388 }
389 }
390 }
391 });
392 }
393
394 async fn try_reconnect(&self) -> Result<()> {
396 info!("Attempting to reconnect to {}", self.url);
397
398 let (sender, mut receiver) = <WebSocketTransport as Transport>::connect(&self.url).await?;
400
401 let (tx, mut rx) = mpsc::channel::<Bytes>(100);
403 *self.sender.write() = Some(tx);
404
405 let sender = Arc::new(sender);
407 let sender_clone = sender.clone();
408 tokio::spawn(async move {
409 while let Some(data) = rx.recv().await {
410 if let Err(e) = sender_clone.send(data).await {
411 error!("Send error: {}", e);
412 break;
413 }
414 }
415 });
416
417 let hello = Message::Hello(HelloMessage {
419 version: PROTOCOL_VERSION,
420 name: self.name.clone(),
421 features: self.features.clone(),
422 capabilities: None,
423 token: self.token.clone(),
424 });
425
426 self.send_message(&hello).await?;
427
428 let welcome_timeout = Duration::from_secs(10);
430 let deadline = tokio::time::Instant::now() + welcome_timeout;
431
432 loop {
433 match tokio::time::timeout_at(deadline, receiver.recv()).await {
434 Ok(Some(TransportEvent::Data(data))) => match codec::decode(&data) {
435 Ok((Message::Welcome(welcome), _)) => {
436 *self.session_id.write() = Some(welcome.session.clone());
437 *self.connected.write() = true;
438
439 self.clock.write().process_sync(
440 clasp_core::time::now(),
441 welcome.time,
442 welcome.time,
443 clasp_core::time::now(),
444 );
445
446 info!("Reconnected, session: {}", welcome.session);
447 break;
448 }
449 Ok((msg, _)) => {
450 debug!("Received during reconnect handshake: {:?}", msg);
451 }
452 Err(e) => {
453 warn!("Decode error during reconnect: {}", e);
454 }
455 },
456 Ok(Some(TransportEvent::Error(e))) => {
457 return Err(ClientError::ConnectionFailed(e));
458 }
459 Ok(Some(TransportEvent::Disconnected { reason })) => {
460 return Err(ClientError::ConnectionFailed(
461 reason.unwrap_or_else(|| "Disconnected".to_string()),
462 ));
463 }
464 Ok(None) => {
465 return Err(ClientError::ConnectionFailed(
466 "Connection closed".to_string(),
467 ));
468 }
469 Err(_) => {
470 return Err(ClientError::Timeout);
471 }
472 _ => {}
473 }
474 }
475
476 let params = Arc::clone(&self.params);
478 let subscriptions = Arc::clone(&self.subscriptions);
479 let pending_gets = Arc::clone(&self.pending_gets);
480 let signals = Arc::clone(&self.signals);
481 let last_error = Arc::clone(&self.last_error);
482 let connected_clone = Arc::clone(&self.connected);
483 let reconnect_notify = Arc::clone(&self.reconnect_notify);
484 let intentionally_closed = Arc::clone(&self.intentionally_closed);
485 let reconnect_enabled = self.reconnect;
486
487 tokio::spawn(async move {
488 while let Some(event) = receiver.recv().await {
489 match event {
490 TransportEvent::Data(data) => {
491 if let Ok((msg, _)) = codec::decode(&data) {
492 handle_message(
493 &msg,
494 ¶ms,
495 &subscriptions,
496 &pending_gets,
497 &signals,
498 &last_error,
499 );
500 }
501 }
502 TransportEvent::Disconnected { reason } => {
503 info!("Disconnected: {:?}", reason);
504 *connected_clone.write() = false;
505
506 if reconnect_enabled && !intentionally_closed.load(Ordering::SeqCst) {
507 reconnect_notify.notify_one();
508 }
509 break;
510 }
511 TransportEvent::Error(e) => {
512 error!("Error: {}", e);
513 }
514 _ => {}
515 }
516 }
517 });
518
519 Ok(())
520 }
521
522 async fn resubscribe_all(&self) -> Result<()> {
524 let subs: Vec<(u32, String)> = self
526 .subscriptions
527 .iter()
528 .map(|entry| (*entry.key(), entry.value().0.clone()))
529 .collect();
530
531 for (id, pattern) in subs {
532 let msg = Message::Subscribe(SubscribeMessage {
533 id,
534 pattern: pattern.clone(),
535 types: vec![],
536 options: Some(SubscribeOptions::default()),
537 });
538
539 self.send_message(&msg).await?;
540 debug!("Resubscribed to {} (id: {})", pattern, id);
541 }
542
543 Ok(())
544 }
545
546 pub fn is_connected(&self) -> bool {
548 *self.connected.read()
549 }
550
551 pub fn session_id(&self) -> Option<String> {
553 self.session_id.read().clone()
554 }
555
556 pub fn time(&self) -> u64 {
558 self.clock.read().server_time()
559 }
560
561 async fn send_message(&self, message: &Message) -> Result<()> {
563 let data = codec::encode(message)?;
564 self.send_raw(data).await
565 }
566
567 async fn send_raw(&self, data: Bytes) -> Result<()> {
569 let tx = {
571 let sender = self.sender.read();
572 sender.as_ref().cloned()
573 };
574
575 if let Some(tx) = tx {
576 tx.send(data)
577 .await
578 .map_err(|e| ClientError::SendFailed(e.to_string()))?;
579 Ok(())
580 } else {
581 Err(ClientError::NotConnected)
582 }
583 }
584
585 pub async fn subscribe<F>(&self, pattern: &str, callback: F) -> Result<u32>
587 where
588 F: Fn(Value, &str) + Send + Sync + 'static,
589 {
590 let id = self.next_sub_id.fetch_add(1, Ordering::SeqCst);
591
592 self.subscriptions
594 .insert(id, (pattern.to_string(), Box::new(callback)));
595
596 let msg = Message::Subscribe(SubscribeMessage {
598 id,
599 pattern: pattern.to_string(),
600 types: vec![],
601 options: Some(SubscribeOptions::default()),
602 });
603
604 self.send_message(&msg).await?;
605
606 debug!("Subscribed to {} (id: {})", pattern, id);
607 Ok(id)
608 }
609
610 pub async fn on<F>(&self, pattern: &str, callback: F) -> Result<u32>
612 where
613 F: Fn(Value, &str) + Send + Sync + 'static,
614 {
615 self.subscribe(pattern, callback).await
616 }
617
618 pub async fn unsubscribe(&self, id: u32) -> Result<()> {
620 self.subscriptions.remove(&id);
621
622 let msg = Message::Unsubscribe(UnsubscribeMessage { id });
623 self.send_message(&msg).await?;
624
625 Ok(())
626 }
627
628 pub async fn set(&self, address: &str, value: impl Into<Value>) -> Result<()> {
630 let msg = Message::Set(SetMessage {
631 address: address.to_string(),
632 value: value.into(),
633 revision: None,
634 lock: false,
635 unlock: false,
636 ttl: None,
637 });
638
639 self.send_message(&msg).await
640 }
641
642 pub async fn set_locked(&self, address: &str, value: impl Into<Value>) -> Result<()> {
644 let msg = Message::Set(SetMessage {
645 address: address.to_string(),
646 value: value.into(),
647 revision: None,
648 lock: true,
649 unlock: false,
650 ttl: None,
651 });
652
653 self.send_message(&msg).await
654 }
655
656 pub async fn set_unlocked(&self, address: &str, value: impl Into<Value>) -> Result<()> {
658 let msg = Message::Set(SetMessage {
659 address: address.to_string(),
660 value: value.into(),
661 revision: None,
662 lock: false,
663 unlock: true,
664 ttl: None,
665 });
666
667 self.send_message(&msg).await
668 }
669
670 pub async fn set_with_ttl(
672 &self,
673 address: &str,
674 value: impl Into<Value>,
675 ttl: clasp_core::Ttl,
676 ) -> Result<()> {
677 let msg = Message::Set(SetMessage {
678 address: address.to_string(),
679 value: value.into(),
680 revision: None,
681 lock: false,
682 unlock: false,
683 ttl: Some(ttl),
684 });
685
686 self.send_message(&msg).await
687 }
688
689 pub async fn get(&self, address: &str) -> Result<Value> {
691 if let Some(value) = self.params.get(address) {
693 return Ok(value.clone());
694 }
695
696 let (tx, rx) = oneshot::channel();
698 let address_key = address.to_string();
699 self.pending_gets.insert(address_key.clone(), tx);
700
701 let msg = Message::Get(GetMessage {
702 address: address.to_string(),
703 });
704 self.send_message(&msg).await?;
705
706 match tokio::time::timeout(std::time::Duration::from_secs(5), rx).await {
708 Ok(Ok(value)) => Ok(value),
709 Ok(Err(_)) => {
710 self.pending_gets.remove(&address_key);
712 Err(ClientError::Other("Get cancelled".to_string()))
713 }
714 Err(_) => {
715 self.pending_gets.remove(&address_key);
717 Err(ClientError::Timeout)
718 }
719 }
720 }
721
722 pub async fn emit(&self, address: &str, payload: impl Into<Value>) -> Result<()> {
724 let msg = Message::Publish(PublishMessage {
725 address: address.to_string(),
726 signal: Some(SignalType::Event),
727 value: None,
728 payload: Some(payload.into()),
729 samples: None,
730 rate: None,
731 id: None,
732 phase: None,
733 timestamp: Some(self.time()),
734 timeline: None,
735 });
736
737 self.send_message(&msg).await
738 }
739
740 pub async fn stream(&self, address: &str, value: impl Into<Value>) -> Result<()> {
742 let msg = Message::Publish(PublishMessage {
743 address: address.to_string(),
744 signal: Some(SignalType::Stream),
745 value: Some(value.into()),
746 payload: None,
747 samples: None,
748 rate: None,
749 id: None,
750 phase: None,
751 timestamp: Some(self.time()),
752 timeline: None,
753 });
754
755 self.send_message(&msg).await
756 }
757
758 pub async fn gesture(
779 &self,
780 address: &str,
781 id: u32,
782 phase: GesturePhase,
783 payload: impl Into<Value>,
784 ) -> Result<()> {
785 let msg = Message::Publish(PublishMessage {
786 address: address.to_string(),
787 signal: Some(SignalType::Gesture),
788 value: None,
789 payload: Some(payload.into()),
790 samples: None,
791 rate: None,
792 id: Some(id),
793 phase: Some(phase),
794 timestamp: Some(self.time()),
795 timeline: None,
796 });
797
798 self.send_message(&msg).await
799 }
800
801 pub async fn timeline(&self, address: &str, timeline_data: TimelineData) -> Result<()> {
822 let msg = Message::Publish(PublishMessage {
823 address: address.to_string(),
824 signal: Some(SignalType::Timeline),
825 value: None,
826 payload: None,
827 samples: None,
828 rate: None,
829 id: None,
830 phase: None,
831 timestamp: Some(self.time()),
832 timeline: Some(timeline_data),
833 });
834
835 self.send_message(&msg).await
836 }
837
838 pub async fn bundle(&self, messages: Vec<Message>) -> Result<()> {
840 let msg = Message::Bundle(BundleMessage {
841 timestamp: None,
842 messages,
843 });
844
845 self.send_message(&msg).await
846 }
847
848 pub async fn bundle_at(&self, messages: Vec<Message>, time: u64) -> Result<()> {
850 let msg = Message::Bundle(BundleMessage {
851 timestamp: Some(time),
852 messages,
853 });
854
855 self.send_message(&msg).await
856 }
857
858 pub fn cached(&self, address: &str) -> Option<Value> {
860 self.params.get(address).map(|v| v.clone())
861 }
862
863 pub async fn close(&self) {
866 self.intentionally_closed.store(true, Ordering::SeqCst);
867 *self.connected.write() = false;
868 *self.sender.write() = None;
869 }
870
871 pub fn signals(&self) -> Vec<SignalDefinition> {
873 self.signals.iter().map(|e| e.value().clone()).collect()
874 }
875
876 pub fn query_signals(&self, pattern: &str) -> Vec<SignalDefinition> {
878 self.signals
879 .iter()
880 .filter(|e| clasp_core::address::glob_match(pattern, e.key()))
881 .map(|e| e.value().clone())
882 .collect()
883 }
884
885 pub fn last_error(&self) -> Option<ErrorMessage> {
887 self.last_error.read().clone()
888 }
889
890 pub fn clear_error(&self) {
892 *self.last_error.write() = None;
893 }
894
895 #[cfg(feature = "p2p")]
897 async fn setup_p2p_subscriptions(&self, session_id: &str) -> Result<()> {
898 if let Some(ref p2p_manager) = self.p2p_manager {
899 let signal_address = format!("{}{}", P2P_SIGNAL_PREFIX, session_id);
900 let p2p_manager_signal = Arc::clone(p2p_manager);
901
902 let _ = self
904 .subscribe(&signal_address, move |value, address| {
905 let p2p = Arc::clone(&p2p_manager_signal);
906 let address = address.to_string(); tokio::spawn(async move {
908 if let Err(e) = p2p.handle_signal(&address, &value).await {
909 tracing::debug!("P2P signal handling error: {}", e);
910 }
911 });
912 })
913 .await?;
914
915 let p2p_manager_announce = Arc::clone(p2p_manager);
917 let _ = self
918 .subscribe(clasp_core::P2P_ANNOUNCE, move |value, _| {
919 p2p_manager_announce.handle_announce(&value);
920 })
921 .await?;
922 }
923 Ok(())
924 }
925
926 #[cfg(feature = "p2p")]
928 pub async fn connect_to_peer(&self, peer_session_id: &str) -> Result<()> {
929 if let Some(ref p2p_manager) = self.p2p_manager {
930 p2p_manager.connect_to_peer(peer_session_id).await
931 } else {
932 Err(ClientError::Other(
933 "P2P not configured. Use builder.p2p_config() to enable.".to_string(),
934 ))
935 }
936 }
937
938 #[cfg(feature = "p2p")]
940 pub fn on_p2p_event<F>(&self, callback: F)
941 where
942 F: Fn(p2p::P2PEvent) + Send + Sync + 'static,
943 {
944 if let Some(ref p2p_manager) = self.p2p_manager {
945 p2p_manager.on_event(callback);
946 }
947 }
948
949 #[cfg(feature = "p2p")]
951 pub fn is_peer_connected(&self, peer_session_id: &str) -> bool {
952 self.p2p_manager
953 .as_ref()
954 .map(|p2p| p2p.is_peer_connected(peer_session_id))
955 .unwrap_or(false)
956 }
957
958 #[cfg(feature = "p2p")]
963 pub async fn send_p2p(
964 &self,
965 peer_session_id: &str,
966 data: bytes::Bytes,
967 reliable: bool,
968 ) -> Result<p2p::SendResult> {
969 if let Some(ref p2p_manager) = self.p2p_manager {
970 p2p_manager
971 .send_to_peer(peer_session_id, data, reliable)
972 .await
973 } else {
974 Err(ClientError::Other(
975 "P2P not configured. Use builder.p2p_config() to enable.".to_string(),
976 ))
977 }
978 }
979
980 #[cfg(feature = "p2p")]
986 pub fn set_p2p_routing_mode(&self, mode: clasp_core::p2p::RoutingMode) {
987 if let Some(ref p2p_manager) = self.p2p_manager {
988 p2p_manager.set_routing_mode(mode);
989 }
990 }
991
992 #[cfg(feature = "p2p")]
994 pub fn p2p_routing_mode(&self) -> clasp_core::p2p::RoutingMode {
995 self.p2p_manager
996 .as_ref()
997 .map(|p2p| p2p.routing_mode())
998 .unwrap_or_default()
999 }
1000}
1001
1002fn handle_message(
1004 msg: &Message,
1005 params: &Arc<DashMap<String, Value>>,
1006 subscriptions: &Arc<DashMap<u32, (String, SubscriptionCallback)>>,
1007 pending_gets: &Arc<DashMap<String, oneshot::Sender<Value>>>,
1008 signals: &Arc<DashMap<String, SignalDefinition>>,
1009 last_error: &Arc<RwLock<Option<ErrorMessage>>>,
1010) {
1011 match msg {
1012 Message::Set(set) => {
1013 params.insert(set.address.clone(), set.value.clone());
1015
1016 for entry in subscriptions.iter() {
1018 let (pattern, callback) = entry.value();
1019 if clasp_core::address::glob_match(pattern, &set.address) {
1020 callback(set.value.clone(), &set.address);
1021 }
1022 }
1023 }
1024
1025 Message::Snapshot(snapshot) => {
1026 for param in &snapshot.params {
1027 params.insert(param.address.clone(), param.value.clone());
1028
1029 if let Some((_, tx)) = pending_gets.remove(¶m.address) {
1031 let _ = tx.send(param.value.clone());
1032 }
1033
1034 for entry in subscriptions.iter() {
1036 let (pattern, callback) = entry.value();
1037 if clasp_core::address::glob_match(pattern, ¶m.address) {
1038 callback(param.value.clone(), ¶m.address);
1039 }
1040 }
1041 }
1042 }
1043
1044 Message::Publish(pub_msg) => {
1045 #[cfg(feature = "p2p")]
1046 {
1047 if pub_msg.address.starts_with(clasp_core::P2P_SIGNAL_PREFIX) {
1049 } else if pub_msg.address == clasp_core::P2P_ANNOUNCE {
1052 }
1054 }
1055
1056 let value = pub_msg
1058 .value
1059 .clone()
1060 .or_else(|| pub_msg.payload.clone())
1061 .unwrap_or(Value::Null);
1062
1063 for entry in subscriptions.iter() {
1064 let (pattern, callback) = entry.value();
1065 if clasp_core::address::glob_match(pattern, &pub_msg.address) {
1066 callback(value.clone(), &pub_msg.address);
1067 }
1068 }
1069 }
1070
1071 Message::Error(error) => {
1072 warn!(
1074 "Server error {}: {} (address: {:?})",
1075 error.code, error.message, error.address
1076 );
1077 *last_error.write() = Some(error.clone());
1078 }
1079
1080 Message::Ack(ack) => {
1081 debug!(
1083 "Received ACK for {:?} (revision: {:?})",
1084 ack.address, ack.revision
1085 );
1086 }
1087
1088 Message::Announce(announce) => {
1089 for signal in &announce.signals {
1091 debug!("Received signal announcement: {}", signal.address);
1092 signals.insert(signal.address.clone(), signal.clone());
1093 }
1094 }
1095
1096 Message::Sync(sync) => {
1097 if let (Some(t2), Some(t3)) = (sync.t2, sync.t3) {
1100 debug!("Clock sync: t1={}, t2={}, t3={}", sync.t1, t2, t3);
1101 }
1106 }
1107
1108 Message::Result(result) => {
1109 debug!("Received result with {} signals", result.signals.len());
1111 for signal in &result.signals {
1113 signals.insert(signal.address.clone(), signal.clone());
1114 }
1115 }
1116
1117 Message::Hello(_)
1119 | Message::Welcome(_)
1120 | Message::Subscribe(_)
1121 | Message::Unsubscribe(_)
1122 | Message::Get(_)
1123 | Message::Query(_)
1124 | Message::Replay(_)
1125 | Message::FederationSync(_) => {
1126 debug!("Received unexpected client-type message: {:?}", msg);
1127 }
1128
1129 Message::Bundle(bundle) => {
1131 for inner_msg in &bundle.messages {
1132 handle_message(
1133 inner_msg,
1134 params,
1135 subscriptions,
1136 pending_gets,
1137 signals,
1138 last_error,
1139 );
1140 }
1141 }
1142
1143 Message::Ping => {
1145 debug!("Received PING from server");
1146 }
1149
1150 Message::Pong => {
1151 debug!("Received PONG from server");
1152 }
1153 }
1154}