1use bytes::Bytes;
4use clasp_core::{
5 codec, time::ClockSync, BundleMessage, GetMessage, HelloMessage, Message, PublishMessage,
6 SetMessage, SignalType, SubscribeMessage, SubscribeOptions, UnsubscribeMessage, Value,
7 PROTOCOL_VERSION,
8};
9use clasp_transport::{
10 Transport, TransportEvent, TransportReceiver, TransportSender, WebSocketTransport,
11};
12use dashmap::DashMap;
13use parking_lot::{Mutex, RwLock};
14use std::sync::atomic::{AtomicU32, Ordering};
15use std::sync::Arc;
16use tokio::sync::{mpsc, oneshot};
17use tracing::{debug, error, info, warn};
18
19use crate::builder::ClaspBuilder;
20use crate::error::{ClientError, Result};
21
22pub type SubscriptionCallback = Box<dyn Fn(Value, &str) + Send + Sync>;
24
25pub struct Clasp {
27 url: String,
28 name: String,
29 features: Vec<String>,
30 token: Option<String>,
31 reconnect: bool,
32 reconnect_interval_ms: u64,
33
34 session_id: RwLock<Option<String>>,
36
37 connected: Arc<RwLock<bool>>,
39
40 sender: RwLock<Option<mpsc::Sender<Bytes>>>,
42
43 params: Arc<DashMap<String, Value>>,
45
46 subscriptions: Arc<DashMap<u32, (String, SubscriptionCallback)>>,
48
49 next_sub_id: AtomicU32,
51
52 clock: RwLock<ClockSync>,
54
55 pending_gets: Arc<DashMap<String, oneshot::Sender<Value>>>,
57}
58
59impl Clasp {
60 pub fn new(
62 url: &str,
63 name: String,
64 features: Vec<String>,
65 token: Option<String>,
66 reconnect: bool,
67 reconnect_interval_ms: u64,
68 ) -> Self {
69 Self {
70 url: url.to_string(),
71 name,
72 features,
73 token,
74 reconnect,
75 reconnect_interval_ms,
76 session_id: RwLock::new(None),
77 connected: Arc::new(RwLock::new(false)),
78 sender: RwLock::new(None),
79 params: Arc::new(DashMap::new()),
80 subscriptions: Arc::new(DashMap::new()),
81 next_sub_id: AtomicU32::new(1),
82 clock: RwLock::new(ClockSync::new()),
83 pending_gets: Arc::new(DashMap::new()),
84 }
85 }
86
87 pub fn builder(url: &str) -> ClaspBuilder {
89 ClaspBuilder::new(url)
90 }
91
92 pub async fn connect_to(url: &str) -> Result<Self> {
94 ClaspBuilder::new(url).connect().await
95 }
96
97 pub(crate) async fn do_connect(&mut self) -> Result<()> {
99 if *self.connected.read() {
100 return Err(ClientError::AlreadyConnected);
101 }
102
103 info!("Connecting to {}", self.url);
104
105 let (sender, mut receiver) = <WebSocketTransport as Transport>::connect(&self.url).await?;
107
108 let (tx, mut rx) = mpsc::channel::<Bytes>(100);
110 *self.sender.write() = Some(tx);
111
112 let connected = self.connected.clone();
113
114 let sender = Arc::new(sender);
116 let sender_clone = sender.clone();
117 tokio::spawn(async move {
118 while let Some(data) = rx.recv().await {
119 if let Err(e) = sender_clone.send(data).await {
120 error!("Send error: {}", e);
121 break;
122 }
123 }
124 });
125
126 let hello = Message::Hello(HelloMessage {
128 version: PROTOCOL_VERSION,
129 name: self.name.clone(),
130 features: self.features.clone(),
131 capabilities: None,
132 token: self.token.clone(),
133 });
134
135 self.send_message(&hello).await?;
136
137 loop {
139 match receiver.recv().await {
140 Some(TransportEvent::Data(data)) => {
141 match codec::decode(&data) {
142 Ok((Message::Welcome(welcome), _)) => {
143 *self.session_id.write() = Some(welcome.session.clone());
144 *connected.write() = true;
145
146 self.clock.write().process_sync(
148 clasp_core::time::now(),
149 welcome.time,
150 welcome.time,
151 clasp_core::time::now(),
152 );
153
154 info!("Connected, session: {}", welcome.session);
155 break;
156 }
157 Ok((msg, _)) => {
158 debug!("Received during handshake: {:?}", msg);
159 }
160 Err(e) => {
161 warn!("Decode error: {}", e);
162 }
163 }
164 }
165 Some(TransportEvent::Error(e)) => {
166 return Err(ClientError::ConnectionFailed(e));
167 }
168 Some(TransportEvent::Disconnected { reason }) => {
169 return Err(ClientError::ConnectionFailed(
170 reason.unwrap_or_else(|| "Disconnected".to_string()),
171 ));
172 }
173 None => {
174 return Err(ClientError::ConnectionFailed(
175 "Connection closed".to_string(),
176 ));
177 }
178 _ => {}
179 }
180 }
181
182 let params = Arc::clone(&self.params);
184 let subscriptions = Arc::clone(&self.subscriptions);
185 let pending_gets = Arc::clone(&self.pending_gets);
186 let connected_clone = Arc::clone(&self.connected);
187
188 tokio::spawn(async move {
189 while let Some(event) = receiver.recv().await {
190 match event {
191 TransportEvent::Data(data) => {
192 if let Ok((msg, _)) = codec::decode(&data) {
193 handle_message(&msg, ¶ms, &subscriptions, &pending_gets);
194 }
195 }
196 TransportEvent::Disconnected { reason } => {
197 info!("Disconnected: {:?}", reason);
198 *connected_clone.write() = false;
199 break;
200 }
201 TransportEvent::Error(e) => {
202 error!("Error: {}", e);
203 }
204 _ => {}
205 }
206 }
207 });
208
209 Ok(())
210 }
211
212 pub fn is_connected(&self) -> bool {
214 *self.connected.read()
215 }
216
217 pub fn session_id(&self) -> Option<String> {
219 self.session_id.read().clone()
220 }
221
222 pub fn time(&self) -> u64 {
224 self.clock.read().server_time()
225 }
226
227 async fn send_message(&self, message: &Message) -> Result<()> {
229 let data = codec::encode(message)?;
230 self.send_raw(data).await
231 }
232
233 async fn send_raw(&self, data: Bytes) -> Result<()> {
235 let sender = self.sender.read();
236 if let Some(tx) = sender.as_ref() {
237 tx.send(data)
238 .await
239 .map_err(|e| ClientError::SendFailed(e.to_string()))?;
240 Ok(())
241 } else {
242 Err(ClientError::NotConnected)
243 }
244 }
245
246 pub async fn subscribe<F>(&self, pattern: &str, callback: F) -> Result<u32>
248 where
249 F: Fn(Value, &str) + Send + Sync + 'static,
250 {
251 let id = self.next_sub_id.fetch_add(1, Ordering::SeqCst);
252
253 self.subscriptions
255 .insert(id, (pattern.to_string(), Box::new(callback)));
256
257 let msg = Message::Subscribe(SubscribeMessage {
259 id,
260 pattern: pattern.to_string(),
261 types: vec![],
262 options: Some(SubscribeOptions::default()),
263 });
264
265 self.send_message(&msg).await?;
266
267 debug!("Subscribed to {} (id: {})", pattern, id);
268 Ok(id)
269 }
270
271 pub async fn on<F>(&self, pattern: &str, callback: F) -> Result<u32>
273 where
274 F: Fn(Value, &str) + Send + Sync + 'static,
275 {
276 self.subscribe(pattern, callback).await
277 }
278
279 pub async fn unsubscribe(&self, id: u32) -> Result<()> {
281 self.subscriptions.remove(&id);
282
283 let msg = Message::Unsubscribe(UnsubscribeMessage { id });
284 self.send_message(&msg).await?;
285
286 Ok(())
287 }
288
289 pub async fn set(&self, address: &str, value: impl Into<Value>) -> Result<()> {
291 let msg = Message::Set(SetMessage {
292 address: address.to_string(),
293 value: value.into(),
294 revision: None,
295 lock: false,
296 unlock: false,
297 });
298
299 self.send_message(&msg).await
300 }
301
302 pub async fn set_locked(&self, address: &str, value: impl Into<Value>) -> Result<()> {
304 let msg = Message::Set(SetMessage {
305 address: address.to_string(),
306 value: value.into(),
307 revision: None,
308 lock: true,
309 unlock: false,
310 });
311
312 self.send_message(&msg).await
313 }
314
315 pub async fn get(&self, address: &str) -> Result<Value> {
317 if let Some(value) = self.params.get(address) {
319 return Ok(value.clone());
320 }
321
322 let (tx, rx) = oneshot::channel();
324 self.pending_gets.insert(address.to_string(), tx);
325
326 let msg = Message::Get(GetMessage {
327 address: address.to_string(),
328 });
329 self.send_message(&msg).await?;
330
331 match tokio::time::timeout(std::time::Duration::from_secs(5), rx).await {
333 Ok(Ok(value)) => Ok(value),
334 Ok(Err(_)) => Err(ClientError::Other("Get cancelled".to_string())),
335 Err(_) => Err(ClientError::Timeout),
336 }
337 }
338
339 pub async fn emit(&self, address: &str, payload: impl Into<Value>) -> Result<()> {
341 let msg = Message::Publish(PublishMessage {
342 address: address.to_string(),
343 signal: Some(SignalType::Event),
344 value: None,
345 payload: Some(payload.into()),
346 samples: None,
347 rate: None,
348 id: None,
349 phase: None,
350 timestamp: Some(self.time()),
351 });
352
353 self.send_message(&msg).await
354 }
355
356 pub async fn stream(&self, address: &str, value: impl Into<Value>) -> Result<()> {
358 let msg = Message::Publish(PublishMessage {
359 address: address.to_string(),
360 signal: Some(SignalType::Stream),
361 value: Some(value.into()),
362 payload: None,
363 samples: None,
364 rate: None,
365 id: None,
366 phase: None,
367 timestamp: Some(self.time()),
368 });
369
370 self.send_message(&msg).await
371 }
372
373 pub async fn bundle(&self, messages: Vec<Message>) -> Result<()> {
375 let msg = Message::Bundle(BundleMessage {
376 timestamp: None,
377 messages,
378 });
379
380 self.send_message(&msg).await
381 }
382
383 pub async fn bundle_at(&self, messages: Vec<Message>, time: u64) -> Result<()> {
385 let msg = Message::Bundle(BundleMessage {
386 timestamp: Some(time),
387 messages,
388 });
389
390 self.send_message(&msg).await
391 }
392
393 pub fn cached(&self, address: &str) -> Option<Value> {
395 self.params.get(address).map(|v| v.clone())
396 }
397
398 pub async fn close(&self) {
400 *self.connected.write() = false;
401 *self.sender.write() = None;
402 }
403}
404
405fn handle_message(
407 msg: &Message,
408 params: &Arc<DashMap<String, Value>>,
409 subscriptions: &Arc<DashMap<u32, (String, SubscriptionCallback)>>,
410 pending_gets: &Arc<DashMap<String, oneshot::Sender<Value>>>,
411) {
412 match msg {
413 Message::Set(set) => {
414 params.insert(set.address.clone(), set.value.clone());
416
417 for entry in subscriptions.iter() {
419 let (pattern, callback) = entry.value();
420 if clasp_core::address::glob_match(pattern, &set.address) {
421 callback(set.value.clone(), &set.address);
422 }
423 }
424 }
425
426 Message::Snapshot(snapshot) => {
427 for param in &snapshot.params {
428 params.insert(param.address.clone(), param.value.clone());
429
430 if let Some((_, tx)) = pending_gets.remove(¶m.address) {
432 let _ = tx.send(param.value.clone());
433 }
434
435 for entry in subscriptions.iter() {
437 let (pattern, callback) = entry.value();
438 if clasp_core::address::glob_match(pattern, ¶m.address) {
439 callback(param.value.clone(), ¶m.address);
440 }
441 }
442 }
443 }
444
445 Message::Publish(pub_msg) => {
446 let value = pub_msg
448 .value
449 .clone()
450 .or_else(|| pub_msg.payload.clone())
451 .unwrap_or(Value::Null);
452
453 for entry in subscriptions.iter() {
454 let (pattern, callback) = entry.value();
455 if clasp_core::address::glob_match(pattern, &pub_msg.address) {
456 callback(value.clone(), &pub_msg.address);
457 }
458 }
459 }
460
461 _ => {}
462 }
463}