1use bytes::Bytes;
4use dashmap::DashMap;
5use parking_lot::{Mutex, RwLock};
6use clasp_core::{
7 codec, time::ClockSync, BundleMessage, GetMessage, HelloMessage, Message, PublishMessage,
8 SetMessage, SignalType, SubscribeMessage, SubscribeOptions, UnsubscribeMessage, Value,
9 PROTOCOL_VERSION,
10};
11use clasp_transport::{Transport, TransportEvent, TransportReceiver, TransportSender, WebSocketTransport};
12use std::sync::atomic::{AtomicU32, Ordering};
13use std::sync::Arc;
14use tokio::sync::{mpsc, oneshot};
15use tracing::{debug, error, info, warn};
16
17use crate::builder::ClaspBuilder;
18use crate::error::{ClientError, Result};
19
20pub type SubscriptionCallback = Box<dyn Fn(Value, &str) + Send + Sync>;
22
23pub struct Clasp {
25 url: String,
26 name: String,
27 features: Vec<String>,
28 token: Option<String>,
29 reconnect: bool,
30 reconnect_interval_ms: u64,
31
32 session_id: RwLock<Option<String>>,
34
35 connected: Arc<RwLock<bool>>,
37
38 sender: RwLock<Option<mpsc::Sender<Bytes>>>,
40
41 params: Arc<DashMap<String, Value>>,
43
44 subscriptions: Arc<DashMap<u32, (String, SubscriptionCallback)>>,
46
47 next_sub_id: AtomicU32,
49
50 clock: RwLock<ClockSync>,
52
53 pending_gets: Arc<DashMap<String, oneshot::Sender<Value>>>,
55}
56
57impl Clasp {
58 pub fn new(
60 url: &str,
61 name: String,
62 features: Vec<String>,
63 token: Option<String>,
64 reconnect: bool,
65 reconnect_interval_ms: u64,
66 ) -> Self {
67 Self {
68 url: url.to_string(),
69 name,
70 features,
71 token,
72 reconnect,
73 reconnect_interval_ms,
74 session_id: RwLock::new(None),
75 connected: Arc::new(RwLock::new(false)),
76 sender: RwLock::new(None),
77 params: Arc::new(DashMap::new()),
78 subscriptions: Arc::new(DashMap::new()),
79 next_sub_id: AtomicU32::new(1),
80 clock: RwLock::new(ClockSync::new()),
81 pending_gets: Arc::new(DashMap::new()),
82 }
83 }
84
85 pub fn builder(url: &str) -> ClaspBuilder {
87 ClaspBuilder::new(url)
88 }
89
90 pub async fn connect_to(url: &str) -> Result<Self> {
92 ClaspBuilder::new(url).connect().await
93 }
94
95 pub(crate) async fn do_connect(&mut self) -> Result<()> {
97 if *self.connected.read() {
98 return Err(ClientError::AlreadyConnected);
99 }
100
101 info!("Connecting to {}", self.url);
102
103 let (sender, mut receiver) = <WebSocketTransport as Transport>::connect(&self.url).await?;
105
106 let (tx, mut rx) = mpsc::channel::<Bytes>(100);
108 *self.sender.write() = Some(tx);
109
110 let connected = self.connected.clone();
111
112 let sender = Arc::new(sender);
114 let sender_clone = sender.clone();
115 tokio::spawn(async move {
116 while let Some(data) = rx.recv().await {
117 if let Err(e) = sender_clone.send(data).await {
118 error!("Send error: {}", e);
119 break;
120 }
121 }
122 });
123
124 let hello = Message::Hello(HelloMessage {
126 version: PROTOCOL_VERSION,
127 name: self.name.clone(),
128 features: self.features.clone(),
129 capabilities: None,
130 token: self.token.clone(),
131 });
132
133 self.send_message(&hello).await?;
134
135 loop {
137 match receiver.recv().await {
138 Some(TransportEvent::Data(data)) => {
139 match codec::decode(&data) {
140 Ok((Message::Welcome(welcome), _)) => {
141 *self.session_id.write() = Some(welcome.session.clone());
142 *connected.write() = true;
143
144 self.clock.write().process_sync(
146 clasp_core::time::now(),
147 welcome.time,
148 welcome.time,
149 clasp_core::time::now(),
150 );
151
152 info!("Connected, session: {}", welcome.session);
153 break;
154 }
155 Ok((msg, _)) => {
156 debug!("Received during handshake: {:?}", msg);
157 }
158 Err(e) => {
159 warn!("Decode error: {}", e);
160 }
161 }
162 }
163 Some(TransportEvent::Error(e)) => {
164 return Err(ClientError::ConnectionFailed(e));
165 }
166 Some(TransportEvent::Disconnected { reason }) => {
167 return Err(ClientError::ConnectionFailed(
168 reason.unwrap_or_else(|| "Disconnected".to_string()),
169 ));
170 }
171 None => {
172 return Err(ClientError::ConnectionFailed("Connection closed".to_string()));
173 }
174 _ => {}
175 }
176 }
177
178 let params = Arc::clone(&self.params);
180 let subscriptions = Arc::clone(&self.subscriptions);
181 let pending_gets = Arc::clone(&self.pending_gets);
182 let connected_clone = Arc::clone(&self.connected);
183
184 tokio::spawn(async move {
185 while let Some(event) = receiver.recv().await {
186 match event {
187 TransportEvent::Data(data) => {
188 if let Ok((msg, _)) = codec::decode(&data) {
189 handle_message(&msg, ¶ms, &subscriptions, &pending_gets);
190 }
191 }
192 TransportEvent::Disconnected { reason } => {
193 info!("Disconnected: {:?}", reason);
194 *connected_clone.write() = false;
195 break;
196 }
197 TransportEvent::Error(e) => {
198 error!("Error: {}", e);
199 }
200 _ => {}
201 }
202 }
203 });
204
205 Ok(())
206 }
207
208 pub fn is_connected(&self) -> bool {
210 *self.connected.read()
211 }
212
213 pub fn session_id(&self) -> Option<String> {
215 self.session_id.read().clone()
216 }
217
218 pub fn time(&self) -> u64 {
220 self.clock.read().server_time()
221 }
222
223 async fn send_message(&self, message: &Message) -> Result<()> {
225 let data = codec::encode(message)?;
226 self.send_raw(data).await
227 }
228
229 async fn send_raw(&self, data: Bytes) -> Result<()> {
231 let sender = self.sender.read();
232 if let Some(tx) = sender.as_ref() {
233 tx.send(data)
234 .await
235 .map_err(|e| ClientError::SendFailed(e.to_string()))?;
236 Ok(())
237 } else {
238 Err(ClientError::NotConnected)
239 }
240 }
241
242 pub async fn subscribe<F>(&self, pattern: &str, callback: F) -> Result<u32>
244 where
245 F: Fn(Value, &str) + Send + Sync + 'static,
246 {
247 let id = self.next_sub_id.fetch_add(1, Ordering::SeqCst);
248
249 self.subscriptions
251 .insert(id, (pattern.to_string(), Box::new(callback)));
252
253 let msg = Message::Subscribe(SubscribeMessage {
255 id,
256 pattern: pattern.to_string(),
257 types: vec![],
258 options: Some(SubscribeOptions::default()),
259 });
260
261 self.send_message(&msg).await?;
262
263 debug!("Subscribed to {} (id: {})", pattern, id);
264 Ok(id)
265 }
266
267 pub async fn on<F>(&self, pattern: &str, callback: F) -> Result<u32>
269 where
270 F: Fn(Value, &str) + Send + Sync + 'static,
271 {
272 self.subscribe(pattern, callback).await
273 }
274
275 pub async fn unsubscribe(&self, id: u32) -> Result<()> {
277 self.subscriptions.remove(&id);
278
279 let msg = Message::Unsubscribe(UnsubscribeMessage { id });
280 self.send_message(&msg).await?;
281
282 Ok(())
283 }
284
285 pub async fn set(&self, address: &str, value: impl Into<Value>) -> Result<()> {
287 let msg = Message::Set(SetMessage {
288 address: address.to_string(),
289 value: value.into(),
290 revision: None,
291 lock: false,
292 unlock: false,
293 });
294
295 self.send_message(&msg).await
296 }
297
298 pub async fn set_locked(&self, address: &str, value: impl Into<Value>) -> Result<()> {
300 let msg = Message::Set(SetMessage {
301 address: address.to_string(),
302 value: value.into(),
303 revision: None,
304 lock: true,
305 unlock: false,
306 });
307
308 self.send_message(&msg).await
309 }
310
311 pub async fn get(&self, address: &str) -> Result<Value> {
313 if let Some(value) = self.params.get(address) {
315 return Ok(value.clone());
316 }
317
318 let (tx, rx) = oneshot::channel();
320 self.pending_gets.insert(address.to_string(), tx);
321
322 let msg = Message::Get(GetMessage {
323 address: address.to_string(),
324 });
325 self.send_message(&msg).await?;
326
327 match tokio::time::timeout(std::time::Duration::from_secs(5), rx).await {
329 Ok(Ok(value)) => Ok(value),
330 Ok(Err(_)) => Err(ClientError::Other("Get cancelled".to_string())),
331 Err(_) => Err(ClientError::Timeout),
332 }
333 }
334
335 pub async fn emit(&self, address: &str, payload: impl Into<Value>) -> Result<()> {
337 let msg = Message::Publish(PublishMessage {
338 address: address.to_string(),
339 signal: Some(SignalType::Event),
340 value: None,
341 payload: Some(payload.into()),
342 samples: None,
343 rate: None,
344 id: None,
345 phase: None,
346 timestamp: Some(self.time()),
347 });
348
349 self.send_message(&msg).await
350 }
351
352 pub async fn stream(&self, address: &str, value: impl Into<Value>) -> Result<()> {
354 let msg = Message::Publish(PublishMessage {
355 address: address.to_string(),
356 signal: Some(SignalType::Stream),
357 value: Some(value.into()),
358 payload: None,
359 samples: None,
360 rate: None,
361 id: None,
362 phase: None,
363 timestamp: Some(self.time()),
364 });
365
366 self.send_message(&msg).await
367 }
368
369 pub async fn bundle(&self, messages: Vec<Message>) -> Result<()> {
371 let msg = Message::Bundle(BundleMessage {
372 timestamp: None,
373 messages,
374 });
375
376 self.send_message(&msg).await
377 }
378
379 pub async fn bundle_at(&self, messages: Vec<Message>, time: u64) -> Result<()> {
381 let msg = Message::Bundle(BundleMessage {
382 timestamp: Some(time),
383 messages,
384 });
385
386 self.send_message(&msg).await
387 }
388
389 pub fn cached(&self, address: &str) -> Option<Value> {
391 self.params.get(address).map(|v| v.clone())
392 }
393
394 pub async fn close(&self) {
396 *self.connected.write() = false;
397 *self.sender.write() = None;
398 }
399}
400
401fn handle_message(
403 msg: &Message,
404 params: &Arc<DashMap<String, Value>>,
405 subscriptions: &Arc<DashMap<u32, (String, SubscriptionCallback)>>,
406 pending_gets: &Arc<DashMap<String, oneshot::Sender<Value>>>,
407) {
408 match msg {
409 Message::Set(set) => {
410 params.insert(set.address.clone(), set.value.clone());
412
413 for entry in subscriptions.iter() {
415 let (pattern, callback) = entry.value();
416 if clasp_core::address::glob_match(pattern, &set.address) {
417 callback(set.value.clone(), &set.address);
418 }
419 }
420 }
421
422 Message::Snapshot(snapshot) => {
423 for param in &snapshot.params {
424 params.insert(param.address.clone(), param.value.clone());
425
426 if let Some((_, tx)) = pending_gets.remove(¶m.address) {
428 let _ = tx.send(param.value.clone());
429 }
430
431 for entry in subscriptions.iter() {
433 let (pattern, callback) = entry.value();
434 if clasp_core::address::glob_match(pattern, ¶m.address) {
435 callback(param.value.clone(), ¶m.address);
436 }
437 }
438 }
439 }
440
441 Message::Publish(pub_msg) => {
442 let value = pub_msg
444 .value
445 .clone()
446 .or_else(|| pub_msg.payload.clone())
447 .unwrap_or(Value::Null);
448
449 for entry in subscriptions.iter() {
450 let (pattern, callback) = entry.value();
451 if clasp_core::address::glob_match(pattern, &pub_msg.address) {
452 callback(value.clone(), &pub_msg.address);
453 }
454 }
455 }
456
457 _ => {}
458 }
459}