1use bytes::Bytes;
4use clasp_core::{
5 codec, time::ClockSync, BundleMessage, ErrorMessage, GesturePhase, GetMessage, HelloMessage,
6 Message, PublishMessage, SetMessage, SignalDefinition, SignalType, SubscribeMessage,
7 SubscribeOptions, TimelineData, UnsubscribeMessage, Value, PROTOCOL_VERSION,
8};
9use clasp_transport::{
10 Transport, TransportEvent, TransportReceiver, TransportSender, WebSocketTransport,
11};
12use dashmap::DashMap;
13use parking_lot::RwLock;
14use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::{mpsc, oneshot, Notify};
18use tracing::{debug, error, info, warn};
19
20use crate::builder::ClaspBuilder;
21use crate::error::{ClientError, Result};
22#[cfg(feature = "p2p")]
23use crate::p2p;
24#[cfg(feature = "p2p")]
25use clasp_core::{P2PConfig, P2P_SIGNAL_PREFIX};
26
27pub type SubscriptionCallback = Box<dyn Fn(Value, &str) + Send + Sync>;
29
30pub struct Clasp {
32 url: String,
33 name: String,
34 features: Vec<String>,
35 token: Option<String>,
36 reconnect: bool,
37 reconnect_interval_ms: u64,
38
39 session_id: RwLock<Option<String>>,
41
42 connected: Arc<RwLock<bool>>,
44
45 sender: RwLock<Option<mpsc::Sender<Bytes>>>,
47
48 params: Arc<DashMap<String, Value>>,
50
51 subscriptions: Arc<DashMap<u32, (String, SubscriptionCallback)>>,
53
54 next_sub_id: AtomicU32,
56
57 clock: RwLock<ClockSync>,
59
60 pending_gets: Arc<DashMap<String, oneshot::Sender<Value>>>,
62
63 signals: Arc<DashMap<String, SignalDefinition>>,
65
66 last_error: Arc<RwLock<Option<ErrorMessage>>>,
68
69 reconnect_attempts: Arc<AtomicU32>,
71
72 max_reconnect_attempts: u32,
74
75 intentionally_closed: Arc<AtomicBool>,
77
78 reconnect_notify: Arc<Notify>,
80
81 #[cfg(feature = "p2p")]
83 p2p_config: Option<P2PConfig>,
84
85 #[cfg(feature = "p2p")]
87 p2p_manager: Option<Arc<p2p::P2PManager>>,
88}
89
90impl Clasp {
91 pub fn new(
93 url: &str,
94 name: String,
95 features: Vec<String>,
96 token: Option<String>,
97 reconnect: bool,
98 reconnect_interval_ms: u64,
99 ) -> Self {
100 Self {
101 url: url.to_string(),
102 name,
103 features,
104 token,
105 reconnect,
106 reconnect_interval_ms,
107 session_id: RwLock::new(None),
108 connected: Arc::new(RwLock::new(false)),
109 sender: RwLock::new(None),
110 params: Arc::new(DashMap::new()),
111 subscriptions: Arc::new(DashMap::new()),
112 next_sub_id: AtomicU32::new(1),
113 clock: RwLock::new(ClockSync::new()),
114 pending_gets: Arc::new(DashMap::new()),
115 signals: Arc::new(DashMap::new()),
116 last_error: Arc::new(RwLock::new(None)),
117 reconnect_attempts: Arc::new(AtomicU32::new(0)),
118 max_reconnect_attempts: 10,
119 intentionally_closed: Arc::new(AtomicBool::new(false)),
120 reconnect_notify: Arc::new(Notify::new()),
121 #[cfg(feature = "p2p")]
122 p2p_config: None,
123 #[cfg(feature = "p2p")]
124 p2p_manager: None,
125 }
126 }
127
128 #[cfg(feature = "p2p")]
130 pub(crate) fn set_p2p_config(&mut self, config: P2PConfig) {
131 self.p2p_config = Some(config);
132 }
133
134 pub fn builder(url: &str) -> ClaspBuilder {
136 ClaspBuilder::new(url)
137 }
138
139 pub async fn connect_to(url: &str) -> Result<Self> {
141 ClaspBuilder::new(url).connect().await
142 }
143
144 pub(crate) async fn do_connect(&mut self) -> Result<()> {
146 if *self.connected.read() {
147 return Err(ClientError::AlreadyConnected);
148 }
149
150 info!("Connecting to {}", self.url);
151
152 let (sender, mut receiver) = <WebSocketTransport as Transport>::connect(&self.url).await?;
154
155 let (tx, mut rx) = mpsc::channel::<Bytes>(100);
157 *self.sender.write() = Some(tx);
158
159 let connected = self.connected.clone();
160
161 let sender = Arc::new(sender);
163 let sender_clone = sender.clone();
164 tokio::spawn(async move {
165 while let Some(data) = rx.recv().await {
166 if let Err(e) = sender_clone.send(data).await {
167 error!("Send error: {}", e);
168 break;
169 }
170 }
171 });
172
173 let hello = Message::Hello(HelloMessage {
175 version: PROTOCOL_VERSION,
176 name: self.name.clone(),
177 features: self.features.clone(),
178 capabilities: None,
179 token: self.token.clone(),
180 });
181
182 self.send_message(&hello).await?;
183
184 loop {
186 match receiver.recv().await {
187 Some(TransportEvent::Data(data)) => {
188 match codec::decode(&data) {
189 Ok((Message::Welcome(welcome), _)) => {
190 *self.session_id.write() = Some(welcome.session.clone());
191 *connected.write() = true;
192
193 self.clock.write().process_sync(
195 clasp_core::time::now(),
196 welcome.time,
197 welcome.time,
198 clasp_core::time::now(),
199 );
200
201 #[cfg(feature = "p2p")]
203 {
204 if let Some(p2p_config) = self.p2p_config.take() {
205 let session_id = welcome.session.clone();
206 let (signal_tx, mut signal_rx) = mpsc::channel(100);
208 let p2p_manager =
209 Arc::new(p2p::P2PManager::new(p2p_config, signal_tx));
210 p2p_manager.set_session_id(session_id.clone());
211
212 let sender = self.sender.read().clone();
214 let p2p_manager_for_task = Arc::clone(&p2p_manager);
215 if let Some(sender_tx) = sender {
216 tokio::spawn(async move {
217 while let Some(msg) = signal_rx.recv().await {
218 if let Some(encoded) = codec::encode(&msg).ok() {
219 if let Err(e) =
220 sender_tx.send(encoded.into()).await
221 {
222 tracing::warn!(
223 "Failed to send P2P signal: {}",
224 e
225 );
226 break;
227 }
228 }
229 }
230 });
231 }
232
233 self.p2p_manager = Some(Arc::clone(&p2p_manager));
235
236 let _ = p2p_manager.announce().await;
238
239 let _ = self.setup_p2p_subscriptions(&session_id).await;
241 }
242 }
243
244 info!("Connected, session: {}", welcome.session);
245 break;
246 }
247 Ok((msg, _)) => {
248 debug!("Received during handshake: {:?}", msg);
249 }
250 Err(e) => {
251 warn!("Decode error: {}", e);
252 }
253 }
254 }
255 Some(TransportEvent::Error(e)) => {
256 return Err(ClientError::ConnectionFailed(e));
257 }
258 Some(TransportEvent::Disconnected { reason }) => {
259 return Err(ClientError::ConnectionFailed(
260 reason.unwrap_or_else(|| "Disconnected".to_string()),
261 ));
262 }
263 None => {
264 return Err(ClientError::ConnectionFailed(
265 "Connection closed".to_string(),
266 ));
267 }
268 _ => {}
269 }
270 }
271
272 self.reconnect_attempts.store(0, Ordering::SeqCst);
274 self.intentionally_closed.store(false, Ordering::SeqCst);
275
276 let params = Arc::clone(&self.params);
278 let subscriptions = Arc::clone(&self.subscriptions);
279 let pending_gets = Arc::clone(&self.pending_gets);
280 let signals = Arc::clone(&self.signals);
281 let last_error = Arc::clone(&self.last_error);
282 let connected_clone = Arc::clone(&self.connected);
283 let reconnect_notify = Arc::clone(&self.reconnect_notify);
284 let intentionally_closed = Arc::clone(&self.intentionally_closed);
285 let reconnect_enabled = self.reconnect;
286 #[cfg(feature = "p2p")]
287 let p2p_manager = self.p2p_manager.clone();
288
289 tokio::spawn(async move {
290 while let Some(event) = receiver.recv().await {
291 match event {
292 TransportEvent::Data(data) => {
293 if let Ok((msg, _)) = codec::decode(&data) {
294 #[cfg(feature = "p2p")]
295 {
296 }
299 handle_message(
300 &msg,
301 ¶ms,
302 &subscriptions,
303 &pending_gets,
304 &signals,
305 &last_error,
306 );
307 }
308 }
309 TransportEvent::Disconnected { reason } => {
310 info!("Disconnected: {:?}", reason);
311 *connected_clone.write() = false;
312
313 if reconnect_enabled && !intentionally_closed.load(Ordering::SeqCst) {
315 reconnect_notify.notify_one();
316 }
317 break;
318 }
319 TransportEvent::Error(e) => {
320 error!("Error: {}", e);
321 }
322 _ => {}
323 }
324 }
325 });
326
327 Ok(())
328 }
329
330 pub fn start_reconnect_loop(self: &Arc<Self>) {
332 if !self.reconnect {
333 return;
334 }
335
336 let client = Arc::clone(self);
337 tokio::spawn(async move {
338 loop {
339 client.reconnect_notify.notified().await;
341
342 if client.intentionally_closed.load(Ordering::SeqCst) {
343 break;
344 }
345
346 loop {
348 let attempts = client.reconnect_attempts.fetch_add(1, Ordering::SeqCst);
349
350 if client.max_reconnect_attempts > 0
351 && attempts >= client.max_reconnect_attempts
352 {
353 error!(
354 "Max reconnect attempts ({}) reached",
355 client.max_reconnect_attempts
356 );
357 break;
358 }
359
360 let base_ms = client.reconnect_interval_ms;
362 let delay_ms =
363 (base_ms as f64 * 1.5_f64.powi(attempts as i32)).min(30000.0) as u64;
364
365 info!("Reconnect attempt {} in {}ms", attempts + 1, delay_ms);
366 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
367
368 if client.intentionally_closed.load(Ordering::SeqCst) {
369 break;
370 }
371
372 match client.try_reconnect().await {
375 Ok(()) => {
376 info!("Reconnected successfully");
377 client.reconnect_attempts.store(0, Ordering::SeqCst);
378
379 if let Err(e) = client.resubscribe_all().await {
381 warn!("Failed to resubscribe: {}", e);
382 }
383 break;
384 }
385 Err(e) => {
386 warn!("Reconnect failed: {}", e);
387 }
388 }
389 }
390 }
391 });
392 }
393
394 async fn try_reconnect(&self) -> Result<()> {
396 info!("Attempting to reconnect to {}", self.url);
397
398 let (sender, mut receiver) = <WebSocketTransport as Transport>::connect(&self.url).await?;
400
401 let (tx, mut rx) = mpsc::channel::<Bytes>(100);
403 *self.sender.write() = Some(tx);
404
405 let sender = Arc::new(sender);
407 let sender_clone = sender.clone();
408 tokio::spawn(async move {
409 while let Some(data) = rx.recv().await {
410 if let Err(e) = sender_clone.send(data).await {
411 error!("Send error: {}", e);
412 break;
413 }
414 }
415 });
416
417 let hello = Message::Hello(HelloMessage {
419 version: PROTOCOL_VERSION,
420 name: self.name.clone(),
421 features: self.features.clone(),
422 capabilities: None,
423 token: self.token.clone(),
424 });
425
426 self.send_message(&hello).await?;
427
428 let welcome_timeout = Duration::from_secs(10);
430 let deadline = tokio::time::Instant::now() + welcome_timeout;
431
432 loop {
433 match tokio::time::timeout_at(deadline, receiver.recv()).await {
434 Ok(Some(TransportEvent::Data(data))) => match codec::decode(&data) {
435 Ok((Message::Welcome(welcome), _)) => {
436 *self.session_id.write() = Some(welcome.session.clone());
437 *self.connected.write() = true;
438
439 self.clock.write().process_sync(
440 clasp_core::time::now(),
441 welcome.time,
442 welcome.time,
443 clasp_core::time::now(),
444 );
445
446 info!("Reconnected, session: {}", welcome.session);
447 break;
448 }
449 Ok((msg, _)) => {
450 debug!("Received during reconnect handshake: {:?}", msg);
451 }
452 Err(e) => {
453 warn!("Decode error during reconnect: {}", e);
454 }
455 },
456 Ok(Some(TransportEvent::Error(e))) => {
457 return Err(ClientError::ConnectionFailed(e));
458 }
459 Ok(Some(TransportEvent::Disconnected { reason })) => {
460 return Err(ClientError::ConnectionFailed(
461 reason.unwrap_or_else(|| "Disconnected".to_string()),
462 ));
463 }
464 Ok(None) => {
465 return Err(ClientError::ConnectionFailed(
466 "Connection closed".to_string(),
467 ));
468 }
469 Err(_) => {
470 return Err(ClientError::Timeout);
471 }
472 _ => {}
473 }
474 }
475
476 let params = Arc::clone(&self.params);
478 let subscriptions = Arc::clone(&self.subscriptions);
479 let pending_gets = Arc::clone(&self.pending_gets);
480 let signals = Arc::clone(&self.signals);
481 let last_error = Arc::clone(&self.last_error);
482 let connected_clone = Arc::clone(&self.connected);
483 let reconnect_notify = Arc::clone(&self.reconnect_notify);
484 let intentionally_closed = Arc::clone(&self.intentionally_closed);
485 let reconnect_enabled = self.reconnect;
486
487 tokio::spawn(async move {
488 while let Some(event) = receiver.recv().await {
489 match event {
490 TransportEvent::Data(data) => {
491 if let Ok((msg, _)) = codec::decode(&data) {
492 handle_message(
493 &msg,
494 ¶ms,
495 &subscriptions,
496 &pending_gets,
497 &signals,
498 &last_error,
499 );
500 }
501 }
502 TransportEvent::Disconnected { reason } => {
503 info!("Disconnected: {:?}", reason);
504 *connected_clone.write() = false;
505
506 if reconnect_enabled && !intentionally_closed.load(Ordering::SeqCst) {
507 reconnect_notify.notify_one();
508 }
509 break;
510 }
511 TransportEvent::Error(e) => {
512 error!("Error: {}", e);
513 }
514 _ => {}
515 }
516 }
517 });
518
519 Ok(())
520 }
521
522 async fn resubscribe_all(&self) -> Result<()> {
524 let subs: Vec<(u32, String)> = self
526 .subscriptions
527 .iter()
528 .map(|entry| (*entry.key(), entry.value().0.clone()))
529 .collect();
530
531 for (id, pattern) in subs {
532 let msg = Message::Subscribe(SubscribeMessage {
533 id,
534 pattern: pattern.clone(),
535 types: vec![],
536 options: Some(SubscribeOptions::default()),
537 });
538
539 self.send_message(&msg).await?;
540 debug!("Resubscribed to {} (id: {})", pattern, id);
541 }
542
543 Ok(())
544 }
545
546 pub fn is_connected(&self) -> bool {
548 *self.connected.read()
549 }
550
551 pub fn session_id(&self) -> Option<String> {
553 self.session_id.read().clone()
554 }
555
556 pub fn time(&self) -> u64 {
558 self.clock.read().server_time()
559 }
560
561 async fn send_message(&self, message: &Message) -> Result<()> {
563 let data = codec::encode(message)?;
564 self.send_raw(data).await
565 }
566
567 async fn send_raw(&self, data: Bytes) -> Result<()> {
569 let tx = {
571 let sender = self.sender.read();
572 sender.as_ref().cloned()
573 };
574
575 if let Some(tx) = tx {
576 tx.send(data)
577 .await
578 .map_err(|e| ClientError::SendFailed(e.to_string()))?;
579 Ok(())
580 } else {
581 Err(ClientError::NotConnected)
582 }
583 }
584
585 pub async fn subscribe<F>(&self, pattern: &str, callback: F) -> Result<u32>
587 where
588 F: Fn(Value, &str) + Send + Sync + 'static,
589 {
590 let id = self.next_sub_id.fetch_add(1, Ordering::SeqCst);
591
592 self.subscriptions
594 .insert(id, (pattern.to_string(), Box::new(callback)));
595
596 let msg = Message::Subscribe(SubscribeMessage {
598 id,
599 pattern: pattern.to_string(),
600 types: vec![],
601 options: Some(SubscribeOptions::default()),
602 });
603
604 self.send_message(&msg).await?;
605
606 debug!("Subscribed to {} (id: {})", pattern, id);
607 Ok(id)
608 }
609
610 pub async fn on<F>(&self, pattern: &str, callback: F) -> Result<u32>
612 where
613 F: Fn(Value, &str) + Send + Sync + 'static,
614 {
615 self.subscribe(pattern, callback).await
616 }
617
618 pub async fn unsubscribe(&self, id: u32) -> Result<()> {
620 self.subscriptions.remove(&id);
621
622 let msg = Message::Unsubscribe(UnsubscribeMessage { id });
623 self.send_message(&msg).await?;
624
625 Ok(())
626 }
627
628 pub async fn set(&self, address: &str, value: impl Into<Value>) -> Result<()> {
630 let msg = Message::Set(SetMessage {
631 address: address.to_string(),
632 value: value.into(),
633 revision: None,
634 lock: false,
635 unlock: false,
636 ttl: None,
637 });
638
639 self.send_message(&msg).await
640 }
641
642 pub async fn set_locked(&self, address: &str, value: impl Into<Value>) -> Result<()> {
644 let msg = Message::Set(SetMessage {
645 address: address.to_string(),
646 value: value.into(),
647 revision: None,
648 lock: true,
649 unlock: false,
650 ttl: None,
651 });
652
653 self.send_message(&msg).await
654 }
655
656 pub async fn set_unlocked(&self, address: &str, value: impl Into<Value>) -> Result<()> {
658 let msg = Message::Set(SetMessage {
659 address: address.to_string(),
660 value: value.into(),
661 revision: None,
662 lock: false,
663 unlock: true,
664 ttl: None,
665 });
666
667 self.send_message(&msg).await
668 }
669
670 pub async fn set_with_ttl(&self, address: &str, value: impl Into<Value>, ttl: clasp_core::Ttl) -> Result<()> {
672 let msg = Message::Set(SetMessage {
673 address: address.to_string(),
674 value: value.into(),
675 revision: None,
676 lock: false,
677 unlock: false,
678 ttl: Some(ttl),
679 });
680
681 self.send_message(&msg).await
682 }
683
684 pub async fn get(&self, address: &str) -> Result<Value> {
686 if let Some(value) = self.params.get(address) {
688 return Ok(value.clone());
689 }
690
691 let (tx, rx) = oneshot::channel();
693 let address_key = address.to_string();
694 self.pending_gets.insert(address_key.clone(), tx);
695
696 let msg = Message::Get(GetMessage {
697 address: address.to_string(),
698 });
699 self.send_message(&msg).await?;
700
701 match tokio::time::timeout(std::time::Duration::from_secs(5), rx).await {
703 Ok(Ok(value)) => Ok(value),
704 Ok(Err(_)) => {
705 self.pending_gets.remove(&address_key);
707 Err(ClientError::Other("Get cancelled".to_string()))
708 }
709 Err(_) => {
710 self.pending_gets.remove(&address_key);
712 Err(ClientError::Timeout)
713 }
714 }
715 }
716
717 pub async fn emit(&self, address: &str, payload: impl Into<Value>) -> Result<()> {
719 let msg = Message::Publish(PublishMessage {
720 address: address.to_string(),
721 signal: Some(SignalType::Event),
722 value: None,
723 payload: Some(payload.into()),
724 samples: None,
725 rate: None,
726 id: None,
727 phase: None,
728 timestamp: Some(self.time()),
729 timeline: None,
730 });
731
732 self.send_message(&msg).await
733 }
734
735 pub async fn stream(&self, address: &str, value: impl Into<Value>) -> Result<()> {
737 let msg = Message::Publish(PublishMessage {
738 address: address.to_string(),
739 signal: Some(SignalType::Stream),
740 value: Some(value.into()),
741 payload: None,
742 samples: None,
743 rate: None,
744 id: None,
745 phase: None,
746 timestamp: Some(self.time()),
747 timeline: None,
748 });
749
750 self.send_message(&msg).await
751 }
752
753 pub async fn gesture(
774 &self,
775 address: &str,
776 id: u32,
777 phase: GesturePhase,
778 payload: impl Into<Value>,
779 ) -> Result<()> {
780 let msg = Message::Publish(PublishMessage {
781 address: address.to_string(),
782 signal: Some(SignalType::Gesture),
783 value: None,
784 payload: Some(payload.into()),
785 samples: None,
786 rate: None,
787 id: Some(id),
788 phase: Some(phase),
789 timestamp: Some(self.time()),
790 timeline: None,
791 });
792
793 self.send_message(&msg).await
794 }
795
796 pub async fn timeline(&self, address: &str, timeline_data: TimelineData) -> Result<()> {
817 let msg = Message::Publish(PublishMessage {
818 address: address.to_string(),
819 signal: Some(SignalType::Timeline),
820 value: None,
821 payload: None,
822 samples: None,
823 rate: None,
824 id: None,
825 phase: None,
826 timestamp: Some(self.time()),
827 timeline: Some(timeline_data),
828 });
829
830 self.send_message(&msg).await
831 }
832
833 pub async fn bundle(&self, messages: Vec<Message>) -> Result<()> {
835 let msg = Message::Bundle(BundleMessage {
836 timestamp: None,
837 messages,
838 });
839
840 self.send_message(&msg).await
841 }
842
843 pub async fn bundle_at(&self, messages: Vec<Message>, time: u64) -> Result<()> {
845 let msg = Message::Bundle(BundleMessage {
846 timestamp: Some(time),
847 messages,
848 });
849
850 self.send_message(&msg).await
851 }
852
853 pub fn cached(&self, address: &str) -> Option<Value> {
855 self.params.get(address).map(|v| v.clone())
856 }
857
858 pub async fn close(&self) {
861 self.intentionally_closed.store(true, Ordering::SeqCst);
862 *self.connected.write() = false;
863 *self.sender.write() = None;
864 }
865
866 pub fn signals(&self) -> Vec<SignalDefinition> {
868 self.signals.iter().map(|e| e.value().clone()).collect()
869 }
870
871 pub fn query_signals(&self, pattern: &str) -> Vec<SignalDefinition> {
873 self.signals
874 .iter()
875 .filter(|e| clasp_core::address::glob_match(pattern, e.key()))
876 .map(|e| e.value().clone())
877 .collect()
878 }
879
880 pub fn last_error(&self) -> Option<ErrorMessage> {
882 self.last_error.read().clone()
883 }
884
885 pub fn clear_error(&self) {
887 *self.last_error.write() = None;
888 }
889
890 #[cfg(feature = "p2p")]
892 async fn setup_p2p_subscriptions(&self, session_id: &str) -> Result<()> {
893 if let Some(ref p2p_manager) = self.p2p_manager {
894 let signal_address = format!("{}{}", P2P_SIGNAL_PREFIX, session_id);
895 let p2p_manager_signal = Arc::clone(p2p_manager);
896
897 let _ = self
899 .subscribe(&signal_address, move |value, address| {
900 let p2p = Arc::clone(&p2p_manager_signal);
901 let address = address.to_string(); tokio::spawn(async move {
903 if let Err(e) = p2p.handle_signal(&address, &value).await {
904 tracing::debug!("P2P signal handling error: {}", e);
905 }
906 });
907 })
908 .await?;
909
910 let p2p_manager_announce = Arc::clone(p2p_manager);
912 let _ = self
913 .subscribe(clasp_core::P2P_ANNOUNCE, move |value, _| {
914 p2p_manager_announce.handle_announce(&value);
915 })
916 .await?;
917 }
918 Ok(())
919 }
920
921 #[cfg(feature = "p2p")]
923 pub async fn connect_to_peer(&self, peer_session_id: &str) -> Result<()> {
924 if let Some(ref p2p_manager) = self.p2p_manager {
925 p2p_manager.connect_to_peer(peer_session_id).await
926 } else {
927 Err(ClientError::Other(
928 "P2P not configured. Use builder.p2p_config() to enable.".to_string(),
929 ))
930 }
931 }
932
933 #[cfg(feature = "p2p")]
935 pub fn on_p2p_event<F>(&self, callback: F)
936 where
937 F: Fn(p2p::P2PEvent) + Send + Sync + 'static,
938 {
939 if let Some(ref p2p_manager) = self.p2p_manager {
940 p2p_manager.on_event(callback);
941 }
942 }
943
944 #[cfg(feature = "p2p")]
946 pub fn is_peer_connected(&self, peer_session_id: &str) -> bool {
947 self.p2p_manager
948 .as_ref()
949 .map(|p2p| p2p.is_peer_connected(peer_session_id))
950 .unwrap_or(false)
951 }
952
953 #[cfg(feature = "p2p")]
958 pub async fn send_p2p(
959 &self,
960 peer_session_id: &str,
961 data: bytes::Bytes,
962 reliable: bool,
963 ) -> Result<p2p::SendResult> {
964 if let Some(ref p2p_manager) = self.p2p_manager {
965 p2p_manager
966 .send_to_peer(peer_session_id, data, reliable)
967 .await
968 } else {
969 Err(ClientError::Other(
970 "P2P not configured. Use builder.p2p_config() to enable.".to_string(),
971 ))
972 }
973 }
974
975 #[cfg(feature = "p2p")]
981 pub fn set_p2p_routing_mode(&self, mode: clasp_core::p2p::RoutingMode) {
982 if let Some(ref p2p_manager) = self.p2p_manager {
983 p2p_manager.set_routing_mode(mode);
984 }
985 }
986
987 #[cfg(feature = "p2p")]
989 pub fn p2p_routing_mode(&self) -> clasp_core::p2p::RoutingMode {
990 self.p2p_manager
991 .as_ref()
992 .map(|p2p| p2p.routing_mode())
993 .unwrap_or_default()
994 }
995}
996
997fn handle_message(
999 msg: &Message,
1000 params: &Arc<DashMap<String, Value>>,
1001 subscriptions: &Arc<DashMap<u32, (String, SubscriptionCallback)>>,
1002 pending_gets: &Arc<DashMap<String, oneshot::Sender<Value>>>,
1003 signals: &Arc<DashMap<String, SignalDefinition>>,
1004 last_error: &Arc<RwLock<Option<ErrorMessage>>>,
1005) {
1006 match msg {
1007 Message::Set(set) => {
1008 params.insert(set.address.clone(), set.value.clone());
1010
1011 for entry in subscriptions.iter() {
1013 let (pattern, callback) = entry.value();
1014 if clasp_core::address::glob_match(pattern, &set.address) {
1015 callback(set.value.clone(), &set.address);
1016 }
1017 }
1018 }
1019
1020 Message::Snapshot(snapshot) => {
1021 for param in &snapshot.params {
1022 params.insert(param.address.clone(), param.value.clone());
1023
1024 if let Some((_, tx)) = pending_gets.remove(¶m.address) {
1026 let _ = tx.send(param.value.clone());
1027 }
1028
1029 for entry in subscriptions.iter() {
1031 let (pattern, callback) = entry.value();
1032 if clasp_core::address::glob_match(pattern, ¶m.address) {
1033 callback(param.value.clone(), ¶m.address);
1034 }
1035 }
1036 }
1037 }
1038
1039 Message::Publish(pub_msg) => {
1040 #[cfg(feature = "p2p")]
1041 {
1042 if pub_msg.address.starts_with(clasp_core::P2P_SIGNAL_PREFIX) {
1044 } else if pub_msg.address == clasp_core::P2P_ANNOUNCE {
1047 }
1049 }
1050
1051 let value = pub_msg
1053 .value
1054 .clone()
1055 .or_else(|| pub_msg.payload.clone())
1056 .unwrap_or(Value::Null);
1057
1058 for entry in subscriptions.iter() {
1059 let (pattern, callback) = entry.value();
1060 if clasp_core::address::glob_match(pattern, &pub_msg.address) {
1061 callback(value.clone(), &pub_msg.address);
1062 }
1063 }
1064 }
1065
1066 Message::Error(error) => {
1067 warn!(
1069 "Server error {}: {} (address: {:?})",
1070 error.code, error.message, error.address
1071 );
1072 *last_error.write() = Some(error.clone());
1073 }
1074
1075 Message::Ack(ack) => {
1076 debug!(
1078 "Received ACK for {:?} (revision: {:?})",
1079 ack.address, ack.revision
1080 );
1081 }
1082
1083 Message::Announce(announce) => {
1084 for signal in &announce.signals {
1086 debug!("Received signal announcement: {}", signal.address);
1087 signals.insert(signal.address.clone(), signal.clone());
1088 }
1089 }
1090
1091 Message::Sync(sync) => {
1092 if let (Some(t2), Some(t3)) = (sync.t2, sync.t3) {
1095 debug!("Clock sync: t1={}, t2={}, t3={}", sync.t1, t2, t3);
1096 }
1101 }
1102
1103 Message::Result(result) => {
1104 debug!("Received result with {} signals", result.signals.len());
1106 for signal in &result.signals {
1108 signals.insert(signal.address.clone(), signal.clone());
1109 }
1110 }
1111
1112 Message::Hello(_)
1114 | Message::Welcome(_)
1115 | Message::Subscribe(_)
1116 | Message::Unsubscribe(_)
1117 | Message::Get(_)
1118 | Message::Query(_)
1119 | Message::Replay(_)
1120 | Message::FederationSync(_) => {
1121 debug!("Received unexpected client-type message: {:?}", msg);
1122 }
1123
1124 Message::Bundle(bundle) => {
1126 for inner_msg in &bundle.messages {
1127 handle_message(
1128 inner_msg,
1129 params,
1130 subscriptions,
1131 pending_gets,
1132 signals,
1133 last_error,
1134 );
1135 }
1136 }
1137
1138 Message::Ping => {
1140 debug!("Received PING from server");
1141 }
1144
1145 Message::Pong => {
1146 debug!("Received PONG from server");
1147 }
1148 }
1149}