1use bytes::Bytes;
4use clasp_core::{
5 codec, time::ClockSync, BundleMessage, ErrorMessage, GetMessage, HelloMessage, Message,
6 PublishMessage, SetMessage, SignalDefinition, SignalType, SubscribeMessage, SubscribeOptions,
7 UnsubscribeMessage, Value, PROTOCOL_VERSION,
8};
9use clasp_transport::{
10 Transport, TransportEvent, TransportReceiver, TransportSender, WebSocketTransport,
11};
12use dashmap::DashMap;
13use parking_lot::RwLock;
14use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::{mpsc, oneshot, Notify};
18use tracing::{debug, error, info, warn};
19
20use crate::builder::ClaspBuilder;
21use crate::error::{ClientError, Result};
22
23pub type SubscriptionCallback = Box<dyn Fn(Value, &str) + Send + Sync>;
25
26pub struct Clasp {
28 url: String,
29 name: String,
30 features: Vec<String>,
31 token: Option<String>,
32 reconnect: bool,
33 reconnect_interval_ms: u64,
34
35 session_id: RwLock<Option<String>>,
37
38 connected: Arc<RwLock<bool>>,
40
41 sender: RwLock<Option<mpsc::Sender<Bytes>>>,
43
44 params: Arc<DashMap<String, Value>>,
46
47 subscriptions: Arc<DashMap<u32, (String, SubscriptionCallback)>>,
49
50 next_sub_id: AtomicU32,
52
53 clock: RwLock<ClockSync>,
55
56 pending_gets: Arc<DashMap<String, oneshot::Sender<Value>>>,
58
59 signals: Arc<DashMap<String, SignalDefinition>>,
61
62 last_error: Arc<RwLock<Option<ErrorMessage>>>,
64
65 reconnect_attempts: Arc<AtomicU32>,
67
68 max_reconnect_attempts: u32,
70
71 intentionally_closed: Arc<AtomicBool>,
73
74 reconnect_notify: Arc<Notify>,
76}
77
78impl Clasp {
79 pub fn new(
81 url: &str,
82 name: String,
83 features: Vec<String>,
84 token: Option<String>,
85 reconnect: bool,
86 reconnect_interval_ms: u64,
87 ) -> Self {
88 Self {
89 url: url.to_string(),
90 name,
91 features,
92 token,
93 reconnect,
94 reconnect_interval_ms,
95 session_id: RwLock::new(None),
96 connected: Arc::new(RwLock::new(false)),
97 sender: RwLock::new(None),
98 params: Arc::new(DashMap::new()),
99 subscriptions: Arc::new(DashMap::new()),
100 next_sub_id: AtomicU32::new(1),
101 clock: RwLock::new(ClockSync::new()),
102 pending_gets: Arc::new(DashMap::new()),
103 signals: Arc::new(DashMap::new()),
104 last_error: Arc::new(RwLock::new(None)),
105 reconnect_attempts: Arc::new(AtomicU32::new(0)),
106 max_reconnect_attempts: 10,
107 intentionally_closed: Arc::new(AtomicBool::new(false)),
108 reconnect_notify: Arc::new(Notify::new()),
109 }
110 }
111
112 pub fn builder(url: &str) -> ClaspBuilder {
114 ClaspBuilder::new(url)
115 }
116
117 pub async fn connect_to(url: &str) -> Result<Self> {
119 ClaspBuilder::new(url).connect().await
120 }
121
122 pub(crate) async fn do_connect(&mut self) -> Result<()> {
124 if *self.connected.read() {
125 return Err(ClientError::AlreadyConnected);
126 }
127
128 info!("Connecting to {}", self.url);
129
130 let (sender, mut receiver) = <WebSocketTransport as Transport>::connect(&self.url).await?;
132
133 let (tx, mut rx) = mpsc::channel::<Bytes>(100);
135 *self.sender.write() = Some(tx);
136
137 let connected = self.connected.clone();
138
139 let sender = Arc::new(sender);
141 let sender_clone = sender.clone();
142 tokio::spawn(async move {
143 while let Some(data) = rx.recv().await {
144 if let Err(e) = sender_clone.send(data).await {
145 error!("Send error: {}", e);
146 break;
147 }
148 }
149 });
150
151 let hello = Message::Hello(HelloMessage {
153 version: PROTOCOL_VERSION,
154 name: self.name.clone(),
155 features: self.features.clone(),
156 capabilities: None,
157 token: self.token.clone(),
158 });
159
160 self.send_message(&hello).await?;
161
162 loop {
164 match receiver.recv().await {
165 Some(TransportEvent::Data(data)) => {
166 match codec::decode(&data) {
167 Ok((Message::Welcome(welcome), _)) => {
168 *self.session_id.write() = Some(welcome.session.clone());
169 *connected.write() = true;
170
171 self.clock.write().process_sync(
173 clasp_core::time::now(),
174 welcome.time,
175 welcome.time,
176 clasp_core::time::now(),
177 );
178
179 info!("Connected, session: {}", welcome.session);
180 break;
181 }
182 Ok((msg, _)) => {
183 debug!("Received during handshake: {:?}", msg);
184 }
185 Err(e) => {
186 warn!("Decode error: {}", e);
187 }
188 }
189 }
190 Some(TransportEvent::Error(e)) => {
191 return Err(ClientError::ConnectionFailed(e));
192 }
193 Some(TransportEvent::Disconnected { reason }) => {
194 return Err(ClientError::ConnectionFailed(
195 reason.unwrap_or_else(|| "Disconnected".to_string()),
196 ));
197 }
198 None => {
199 return Err(ClientError::ConnectionFailed(
200 "Connection closed".to_string(),
201 ));
202 }
203 _ => {}
204 }
205 }
206
207 self.reconnect_attempts.store(0, Ordering::SeqCst);
209 self.intentionally_closed.store(false, Ordering::SeqCst);
210
211 let params = Arc::clone(&self.params);
213 let subscriptions = Arc::clone(&self.subscriptions);
214 let pending_gets = Arc::clone(&self.pending_gets);
215 let signals = Arc::clone(&self.signals);
216 let last_error = Arc::clone(&self.last_error);
217 let connected_clone = Arc::clone(&self.connected);
218 let reconnect_notify = Arc::clone(&self.reconnect_notify);
219 let intentionally_closed = Arc::clone(&self.intentionally_closed);
220 let reconnect_enabled = self.reconnect;
221
222 tokio::spawn(async move {
223 while let Some(event) = receiver.recv().await {
224 match event {
225 TransportEvent::Data(data) => {
226 if let Ok((msg, _)) = codec::decode(&data) {
227 handle_message(&msg, ¶ms, &subscriptions, &pending_gets, &signals, &last_error);
228 }
229 }
230 TransportEvent::Disconnected { reason } => {
231 info!("Disconnected: {:?}", reason);
232 *connected_clone.write() = false;
233
234 if reconnect_enabled && !intentionally_closed.load(Ordering::SeqCst) {
236 reconnect_notify.notify_one();
237 }
238 break;
239 }
240 TransportEvent::Error(e) => {
241 error!("Error: {}", e);
242 }
243 _ => {}
244 }
245 }
246 });
247
248 Ok(())
249 }
250
251 pub fn start_reconnect_loop(self: &Arc<Self>) {
253 if !self.reconnect {
254 return;
255 }
256
257 let client = Arc::clone(self);
258 tokio::spawn(async move {
259 loop {
260 client.reconnect_notify.notified().await;
262
263 if client.intentionally_closed.load(Ordering::SeqCst) {
264 break;
265 }
266
267 loop {
269 let attempts = client.reconnect_attempts.fetch_add(1, Ordering::SeqCst);
270
271 if client.max_reconnect_attempts > 0 && attempts >= client.max_reconnect_attempts {
272 error!("Max reconnect attempts ({}) reached", client.max_reconnect_attempts);
273 break;
274 }
275
276 let base_ms = client.reconnect_interval_ms;
278 let delay_ms = (base_ms as f64 * 1.5_f64.powi(attempts as i32)).min(30000.0) as u64;
279
280 info!("Reconnect attempt {} in {}ms", attempts + 1, delay_ms);
281 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
282
283 if client.intentionally_closed.load(Ordering::SeqCst) {
284 break;
285 }
286
287 match client.try_reconnect().await {
290 Ok(()) => {
291 info!("Reconnected successfully");
292 client.reconnect_attempts.store(0, Ordering::SeqCst);
293
294 if let Err(e) = client.resubscribe_all().await {
296 warn!("Failed to resubscribe: {}", e);
297 }
298 break;
299 }
300 Err(e) => {
301 warn!("Reconnect failed: {}", e);
302 }
303 }
304 }
305 }
306 });
307 }
308
309 async fn try_reconnect(&self) -> Result<()> {
311 info!("Attempting to reconnect to {}", self.url);
312
313 let (sender, mut receiver) = <WebSocketTransport as Transport>::connect(&self.url).await?;
315
316 let (tx, mut rx) = mpsc::channel::<Bytes>(100);
318 *self.sender.write() = Some(tx);
319
320 let sender = Arc::new(sender);
322 let sender_clone = sender.clone();
323 tokio::spawn(async move {
324 while let Some(data) = rx.recv().await {
325 if let Err(e) = sender_clone.send(data).await {
326 error!("Send error: {}", e);
327 break;
328 }
329 }
330 });
331
332 let hello = Message::Hello(HelloMessage {
334 version: PROTOCOL_VERSION,
335 name: self.name.clone(),
336 features: self.features.clone(),
337 capabilities: None,
338 token: self.token.clone(),
339 });
340
341 self.send_message(&hello).await?;
342
343 let welcome_timeout = Duration::from_secs(10);
345 let deadline = tokio::time::Instant::now() + welcome_timeout;
346
347 loop {
348 match tokio::time::timeout_at(deadline, receiver.recv()).await {
349 Ok(Some(TransportEvent::Data(data))) => {
350 match codec::decode(&data) {
351 Ok((Message::Welcome(welcome), _)) => {
352 *self.session_id.write() = Some(welcome.session.clone());
353 *self.connected.write() = true;
354
355 self.clock.write().process_sync(
356 clasp_core::time::now(),
357 welcome.time,
358 welcome.time,
359 clasp_core::time::now(),
360 );
361
362 info!("Reconnected, session: {}", welcome.session);
363 break;
364 }
365 Ok((msg, _)) => {
366 debug!("Received during reconnect handshake: {:?}", msg);
367 }
368 Err(e) => {
369 warn!("Decode error during reconnect: {}", e);
370 }
371 }
372 }
373 Ok(Some(TransportEvent::Error(e))) => {
374 return Err(ClientError::ConnectionFailed(e));
375 }
376 Ok(Some(TransportEvent::Disconnected { reason })) => {
377 return Err(ClientError::ConnectionFailed(
378 reason.unwrap_or_else(|| "Disconnected".to_string()),
379 ));
380 }
381 Ok(None) => {
382 return Err(ClientError::ConnectionFailed("Connection closed".to_string()));
383 }
384 Err(_) => {
385 return Err(ClientError::Timeout);
386 }
387 _ => {}
388 }
389 }
390
391 let params = Arc::clone(&self.params);
393 let subscriptions = Arc::clone(&self.subscriptions);
394 let pending_gets = Arc::clone(&self.pending_gets);
395 let signals = Arc::clone(&self.signals);
396 let last_error = Arc::clone(&self.last_error);
397 let connected_clone = Arc::clone(&self.connected);
398 let reconnect_notify = Arc::clone(&self.reconnect_notify);
399 let intentionally_closed = Arc::clone(&self.intentionally_closed);
400 let reconnect_enabled = self.reconnect;
401
402 tokio::spawn(async move {
403 while let Some(event) = receiver.recv().await {
404 match event {
405 TransportEvent::Data(data) => {
406 if let Ok((msg, _)) = codec::decode(&data) {
407 handle_message(&msg, ¶ms, &subscriptions, &pending_gets, &signals, &last_error);
408 }
409 }
410 TransportEvent::Disconnected { reason } => {
411 info!("Disconnected: {:?}", reason);
412 *connected_clone.write() = false;
413
414 if reconnect_enabled && !intentionally_closed.load(Ordering::SeqCst) {
415 reconnect_notify.notify_one();
416 }
417 break;
418 }
419 TransportEvent::Error(e) => {
420 error!("Error: {}", e);
421 }
422 _ => {}
423 }
424 }
425 });
426
427 Ok(())
428 }
429
430 async fn resubscribe_all(&self) -> Result<()> {
432 let subs: Vec<(u32, String)> = self
434 .subscriptions
435 .iter()
436 .map(|entry| (*entry.key(), entry.value().0.clone()))
437 .collect();
438
439 for (id, pattern) in subs {
440 let msg = Message::Subscribe(SubscribeMessage {
441 id,
442 pattern: pattern.clone(),
443 types: vec![],
444 options: Some(SubscribeOptions::default()),
445 });
446
447 self.send_message(&msg).await?;
448 debug!("Resubscribed to {} (id: {})", pattern, id);
449 }
450
451 Ok(())
452 }
453
454 pub fn is_connected(&self) -> bool {
456 *self.connected.read()
457 }
458
459 pub fn session_id(&self) -> Option<String> {
461 self.session_id.read().clone()
462 }
463
464 pub fn time(&self) -> u64 {
466 self.clock.read().server_time()
467 }
468
469 async fn send_message(&self, message: &Message) -> Result<()> {
471 let data = codec::encode(message)?;
472 self.send_raw(data).await
473 }
474
475 async fn send_raw(&self, data: Bytes) -> Result<()> {
477 let tx = {
479 let sender = self.sender.read();
480 sender.as_ref().cloned()
481 };
482
483 if let Some(tx) = tx {
484 tx.send(data)
485 .await
486 .map_err(|e| ClientError::SendFailed(e.to_string()))?;
487 Ok(())
488 } else {
489 Err(ClientError::NotConnected)
490 }
491 }
492
493 pub async fn subscribe<F>(&self, pattern: &str, callback: F) -> Result<u32>
495 where
496 F: Fn(Value, &str) + Send + Sync + 'static,
497 {
498 let id = self.next_sub_id.fetch_add(1, Ordering::SeqCst);
499
500 self.subscriptions
502 .insert(id, (pattern.to_string(), Box::new(callback)));
503
504 let msg = Message::Subscribe(SubscribeMessage {
506 id,
507 pattern: pattern.to_string(),
508 types: vec![],
509 options: Some(SubscribeOptions::default()),
510 });
511
512 self.send_message(&msg).await?;
513
514 debug!("Subscribed to {} (id: {})", pattern, id);
515 Ok(id)
516 }
517
518 pub async fn on<F>(&self, pattern: &str, callback: F) -> Result<u32>
520 where
521 F: Fn(Value, &str) + Send + Sync + 'static,
522 {
523 self.subscribe(pattern, callback).await
524 }
525
526 pub async fn unsubscribe(&self, id: u32) -> Result<()> {
528 self.subscriptions.remove(&id);
529
530 let msg = Message::Unsubscribe(UnsubscribeMessage { id });
531 self.send_message(&msg).await?;
532
533 Ok(())
534 }
535
536 pub async fn set(&self, address: &str, value: impl Into<Value>) -> Result<()> {
538 let msg = Message::Set(SetMessage {
539 address: address.to_string(),
540 value: value.into(),
541 revision: None,
542 lock: false,
543 unlock: false,
544 });
545
546 self.send_message(&msg).await
547 }
548
549 pub async fn set_locked(&self, address: &str, value: impl Into<Value>) -> Result<()> {
551 let msg = Message::Set(SetMessage {
552 address: address.to_string(),
553 value: value.into(),
554 revision: None,
555 lock: true,
556 unlock: false,
557 });
558
559 self.send_message(&msg).await
560 }
561
562 pub async fn get(&self, address: &str) -> Result<Value> {
564 if let Some(value) = self.params.get(address) {
566 return Ok(value.clone());
567 }
568
569 let (tx, rx) = oneshot::channel();
571 let address_key = address.to_string();
572 self.pending_gets.insert(address_key.clone(), tx);
573
574 let msg = Message::Get(GetMessage {
575 address: address.to_string(),
576 });
577 self.send_message(&msg).await?;
578
579 match tokio::time::timeout(std::time::Duration::from_secs(5), rx).await {
581 Ok(Ok(value)) => Ok(value),
582 Ok(Err(_)) => {
583 self.pending_gets.remove(&address_key);
585 Err(ClientError::Other("Get cancelled".to_string()))
586 }
587 Err(_) => {
588 self.pending_gets.remove(&address_key);
590 Err(ClientError::Timeout)
591 }
592 }
593 }
594
595 pub async fn emit(&self, address: &str, payload: impl Into<Value>) -> Result<()> {
597 let msg = Message::Publish(PublishMessage {
598 address: address.to_string(),
599 signal: Some(SignalType::Event),
600 value: None,
601 payload: Some(payload.into()),
602 samples: None,
603 rate: None,
604 id: None,
605 phase: None,
606 timestamp: Some(self.time()),
607 });
608
609 self.send_message(&msg).await
610 }
611
612 pub async fn stream(&self, address: &str, value: impl Into<Value>) -> Result<()> {
614 let msg = Message::Publish(PublishMessage {
615 address: address.to_string(),
616 signal: Some(SignalType::Stream),
617 value: Some(value.into()),
618 payload: None,
619 samples: None,
620 rate: None,
621 id: None,
622 phase: None,
623 timestamp: Some(self.time()),
624 });
625
626 self.send_message(&msg).await
627 }
628
629 pub async fn bundle(&self, messages: Vec<Message>) -> Result<()> {
631 let msg = Message::Bundle(BundleMessage {
632 timestamp: None,
633 messages,
634 });
635
636 self.send_message(&msg).await
637 }
638
639 pub async fn bundle_at(&self, messages: Vec<Message>, time: u64) -> Result<()> {
641 let msg = Message::Bundle(BundleMessage {
642 timestamp: Some(time),
643 messages,
644 });
645
646 self.send_message(&msg).await
647 }
648
649 pub fn cached(&self, address: &str) -> Option<Value> {
651 self.params.get(address).map(|v| v.clone())
652 }
653
654 pub async fn close(&self) {
657 self.intentionally_closed.store(true, Ordering::SeqCst);
658 *self.connected.write() = false;
659 *self.sender.write() = None;
660 }
661
662 pub fn signals(&self) -> Vec<SignalDefinition> {
664 self.signals.iter().map(|e| e.value().clone()).collect()
665 }
666
667 pub fn query_signals(&self, pattern: &str) -> Vec<SignalDefinition> {
669 self.signals
670 .iter()
671 .filter(|e| clasp_core::address::glob_match(pattern, e.key()))
672 .map(|e| e.value().clone())
673 .collect()
674 }
675
676 pub fn last_error(&self) -> Option<ErrorMessage> {
678 self.last_error.read().clone()
679 }
680
681 pub fn clear_error(&self) {
683 *self.last_error.write() = None;
684 }
685}
686
687fn handle_message(
689 msg: &Message,
690 params: &Arc<DashMap<String, Value>>,
691 subscriptions: &Arc<DashMap<u32, (String, SubscriptionCallback)>>,
692 pending_gets: &Arc<DashMap<String, oneshot::Sender<Value>>>,
693 signals: &Arc<DashMap<String, SignalDefinition>>,
694 last_error: &Arc<RwLock<Option<ErrorMessage>>>,
695) {
696 match msg {
697 Message::Set(set) => {
698 params.insert(set.address.clone(), set.value.clone());
700
701 for entry in subscriptions.iter() {
703 let (pattern, callback) = entry.value();
704 if clasp_core::address::glob_match(pattern, &set.address) {
705 callback(set.value.clone(), &set.address);
706 }
707 }
708 }
709
710 Message::Snapshot(snapshot) => {
711 for param in &snapshot.params {
712 params.insert(param.address.clone(), param.value.clone());
713
714 if let Some((_, tx)) = pending_gets.remove(¶m.address) {
716 let _ = tx.send(param.value.clone());
717 }
718
719 for entry in subscriptions.iter() {
721 let (pattern, callback) = entry.value();
722 if clasp_core::address::glob_match(pattern, ¶m.address) {
723 callback(param.value.clone(), ¶m.address);
724 }
725 }
726 }
727 }
728
729 Message::Publish(pub_msg) => {
730 let value = pub_msg
732 .value
733 .clone()
734 .or_else(|| pub_msg.payload.clone())
735 .unwrap_or(Value::Null);
736
737 for entry in subscriptions.iter() {
738 let (pattern, callback) = entry.value();
739 if clasp_core::address::glob_match(pattern, &pub_msg.address) {
740 callback(value.clone(), &pub_msg.address);
741 }
742 }
743 }
744
745 Message::Error(error) => {
746 warn!(
748 "Server error {}: {} (address: {:?})",
749 error.code, error.message, error.address
750 );
751 *last_error.write() = Some(error.clone());
752 }
753
754 Message::Ack(ack) => {
755 debug!(
757 "Received ACK for {:?} (revision: {:?})",
758 ack.address, ack.revision
759 );
760 }
761
762 Message::Announce(announce) => {
763 for signal in &announce.signals {
765 debug!("Received signal announcement: {}", signal.address);
766 signals.insert(signal.address.clone(), signal.clone());
767 }
768 }
769
770 Message::Sync(sync) => {
771 if let (Some(t2), Some(t3)) = (sync.t2, sync.t3) {
774 debug!(
775 "Clock sync: t1={}, t2={}, t3={}",
776 sync.t1, t2, t3
777 );
778 }
783 }
784
785 Message::Result(result) => {
786 debug!(
788 "Received result with {} signals",
789 result.signals.len()
790 );
791 for signal in &result.signals {
793 signals.insert(signal.address.clone(), signal.clone());
794 }
795 }
796
797 Message::Hello(_) | Message::Welcome(_) | Message::Subscribe(_)
799 | Message::Unsubscribe(_) | Message::Get(_) | Message::Query(_) => {
800 debug!("Received unexpected client-type message: {:?}", msg);
801 }
802
803 Message::Bundle(bundle) => {
805 for inner_msg in &bundle.messages {
806 handle_message(inner_msg, params, subscriptions, pending_gets, signals, last_error);
807 }
808 }
809
810 Message::Ping => {
812 debug!("Received PING from server");
813 }
816
817 Message::Pong => {
818 debug!("Received PONG from server");
819 }
820 }
821}