clasp_client/
client.rs

1//! Main Clasp client implementation
2
3use bytes::Bytes;
4use clasp_core::{
5    codec, time::ClockSync, BundleMessage, GetMessage, HelloMessage, Message, PublishMessage,
6    SetMessage, SignalType, SubscribeMessage, SubscribeOptions, UnsubscribeMessage, Value,
7    PROTOCOL_VERSION,
8};
9use clasp_transport::{
10    Transport, TransportEvent, TransportReceiver, TransportSender, WebSocketTransport,
11};
12use dashmap::DashMap;
13use parking_lot::{Mutex, RwLock};
14use std::sync::atomic::{AtomicU32, Ordering};
15use std::sync::Arc;
16use tokio::sync::{mpsc, oneshot};
17use tracing::{debug, error, info, warn};
18
19use crate::builder::ClaspBuilder;
20use crate::error::{ClientError, Result};
21
22/// Subscription callback type
23pub type SubscriptionCallback = Box<dyn Fn(Value, &str) + Send + Sync>;
24
25/// A Clasp client
26pub struct Clasp {
27    url: String,
28    name: String,
29    features: Vec<String>,
30    token: Option<String>,
31    reconnect: bool,
32    reconnect_interval_ms: u64,
33
34    /// Session ID (set after connect)
35    session_id: RwLock<Option<String>>,
36
37    /// Connection state
38    connected: Arc<RwLock<bool>>,
39
40    /// Sender for outgoing messages
41    sender: RwLock<Option<mpsc::Sender<Bytes>>>,
42
43    /// Local param cache
44    params: Arc<DashMap<String, Value>>,
45
46    /// Subscriptions
47    subscriptions: Arc<DashMap<u32, (String, SubscriptionCallback)>>,
48
49    /// Subscription ID counter
50    next_sub_id: AtomicU32,
51
52    /// Clock synchronization
53    clock: RwLock<ClockSync>,
54
55    /// Pending get requests
56    pending_gets: Arc<DashMap<String, oneshot::Sender<Value>>>,
57}
58
59impl Clasp {
60    /// Create a new client (use builder for more options)
61    pub fn new(
62        url: &str,
63        name: String,
64        features: Vec<String>,
65        token: Option<String>,
66        reconnect: bool,
67        reconnect_interval_ms: u64,
68    ) -> Self {
69        Self {
70            url: url.to_string(),
71            name,
72            features,
73            token,
74            reconnect,
75            reconnect_interval_ms,
76            session_id: RwLock::new(None),
77            connected: Arc::new(RwLock::new(false)),
78            sender: RwLock::new(None),
79            params: Arc::new(DashMap::new()),
80            subscriptions: Arc::new(DashMap::new()),
81            next_sub_id: AtomicU32::new(1),
82            clock: RwLock::new(ClockSync::new()),
83            pending_gets: Arc::new(DashMap::new()),
84        }
85    }
86
87    /// Create a builder
88    pub fn builder(url: &str) -> ClaspBuilder {
89        ClaspBuilder::new(url)
90    }
91
92    /// Connect to server (convenience method)
93    pub async fn connect_to(url: &str) -> Result<Self> {
94        ClaspBuilder::new(url).connect().await
95    }
96
97    /// Internal connect
98    pub(crate) async fn do_connect(&mut self) -> Result<()> {
99        if *self.connected.read() {
100            return Err(ClientError::AlreadyConnected);
101        }
102
103        info!("Connecting to {}", self.url);
104
105        // Connect WebSocket
106        let (sender, mut receiver) = <WebSocketTransport as Transport>::connect(&self.url).await?;
107
108        // Create send channel
109        let (tx, mut rx) = mpsc::channel::<Bytes>(100);
110        *self.sender.write() = Some(tx);
111
112        let connected = self.connected.clone();
113
114        // Spawn sender task
115        let sender = Arc::new(sender);
116        let sender_clone = sender.clone();
117        tokio::spawn(async move {
118            while let Some(data) = rx.recv().await {
119                if let Err(e) = sender_clone.send(data).await {
120                    error!("Send error: {}", e);
121                    break;
122                }
123            }
124        });
125
126        // Send HELLO
127        let hello = Message::Hello(HelloMessage {
128            version: PROTOCOL_VERSION,
129            name: self.name.clone(),
130            features: self.features.clone(),
131            capabilities: None,
132            token: self.token.clone(),
133        });
134
135        self.send_message(&hello).await?;
136
137        // Wait for WELCOME
138        loop {
139            match receiver.recv().await {
140                Some(TransportEvent::Data(data)) => {
141                    match codec::decode(&data) {
142                        Ok((Message::Welcome(welcome), _)) => {
143                            *self.session_id.write() = Some(welcome.session.clone());
144                            *connected.write() = true;
145
146                            // Sync clock
147                            self.clock.write().process_sync(
148                                clasp_core::time::now(),
149                                welcome.time,
150                                welcome.time,
151                                clasp_core::time::now(),
152                            );
153
154                            info!("Connected, session: {}", welcome.session);
155                            break;
156                        }
157                        Ok((msg, _)) => {
158                            debug!("Received during handshake: {:?}", msg);
159                        }
160                        Err(e) => {
161                            warn!("Decode error: {}", e);
162                        }
163                    }
164                }
165                Some(TransportEvent::Error(e)) => {
166                    return Err(ClientError::ConnectionFailed(e));
167                }
168                Some(TransportEvent::Disconnected { reason }) => {
169                    return Err(ClientError::ConnectionFailed(
170                        reason.unwrap_or_else(|| "Disconnected".to_string()),
171                    ));
172                }
173                None => {
174                    return Err(ClientError::ConnectionFailed(
175                        "Connection closed".to_string(),
176                    ));
177                }
178                _ => {}
179            }
180        }
181
182        // Spawn receiver task
183        let params = Arc::clone(&self.params);
184        let subscriptions = Arc::clone(&self.subscriptions);
185        let pending_gets = Arc::clone(&self.pending_gets);
186        let connected_clone = Arc::clone(&self.connected);
187
188        tokio::spawn(async move {
189            while let Some(event) = receiver.recv().await {
190                match event {
191                    TransportEvent::Data(data) => {
192                        if let Ok((msg, _)) = codec::decode(&data) {
193                            handle_message(&msg, &params, &subscriptions, &pending_gets);
194                        }
195                    }
196                    TransportEvent::Disconnected { reason } => {
197                        info!("Disconnected: {:?}", reason);
198                        *connected_clone.write() = false;
199                        break;
200                    }
201                    TransportEvent::Error(e) => {
202                        error!("Error: {}", e);
203                    }
204                    _ => {}
205                }
206            }
207        });
208
209        Ok(())
210    }
211
212    /// Check if connected
213    pub fn is_connected(&self) -> bool {
214        *self.connected.read()
215    }
216
217    /// Get session ID
218    pub fn session_id(&self) -> Option<String> {
219        self.session_id.read().clone()
220    }
221
222    /// Get current server time (microseconds)
223    pub fn time(&self) -> u64 {
224        self.clock.read().server_time()
225    }
226
227    /// Send a raw message
228    async fn send_message(&self, message: &Message) -> Result<()> {
229        let data = codec::encode(message)?;
230        self.send_raw(data).await
231    }
232
233    /// Send raw bytes
234    async fn send_raw(&self, data: Bytes) -> Result<()> {
235        let sender = self.sender.read();
236        if let Some(tx) = sender.as_ref() {
237            tx.send(data)
238                .await
239                .map_err(|e| ClientError::SendFailed(e.to_string()))?;
240            Ok(())
241        } else {
242            Err(ClientError::NotConnected)
243        }
244    }
245
246    /// Subscribe to an address pattern
247    pub async fn subscribe<F>(&self, pattern: &str, callback: F) -> Result<u32>
248    where
249        F: Fn(Value, &str) + Send + Sync + 'static,
250    {
251        let id = self.next_sub_id.fetch_add(1, Ordering::SeqCst);
252
253        // Store callback
254        self.subscriptions
255            .insert(id, (pattern.to_string(), Box::new(callback)));
256
257        // Send subscribe message
258        let msg = Message::Subscribe(SubscribeMessage {
259            id,
260            pattern: pattern.to_string(),
261            types: vec![],
262            options: Some(SubscribeOptions::default()),
263        });
264
265        self.send_message(&msg).await?;
266
267        debug!("Subscribed to {} (id: {})", pattern, id);
268        Ok(id)
269    }
270
271    /// Shorthand for subscribe
272    pub async fn on<F>(&self, pattern: &str, callback: F) -> Result<u32>
273    where
274        F: Fn(Value, &str) + Send + Sync + 'static,
275    {
276        self.subscribe(pattern, callback).await
277    }
278
279    /// Unsubscribe
280    pub async fn unsubscribe(&self, id: u32) -> Result<()> {
281        self.subscriptions.remove(&id);
282
283        let msg = Message::Unsubscribe(UnsubscribeMessage { id });
284        self.send_message(&msg).await?;
285
286        Ok(())
287    }
288
289    /// Set a parameter value
290    pub async fn set(&self, address: &str, value: impl Into<Value>) -> Result<()> {
291        let msg = Message::Set(SetMessage {
292            address: address.to_string(),
293            value: value.into(),
294            revision: None,
295            lock: false,
296            unlock: false,
297        });
298
299        self.send_message(&msg).await
300    }
301
302    /// Set with lock
303    pub async fn set_locked(&self, address: &str, value: impl Into<Value>) -> Result<()> {
304        let msg = Message::Set(SetMessage {
305            address: address.to_string(),
306            value: value.into(),
307            revision: None,
308            lock: true,
309            unlock: false,
310        });
311
312        self.send_message(&msg).await
313    }
314
315    /// Get current value (cached or request)
316    pub async fn get(&self, address: &str) -> Result<Value> {
317        // Check cache first
318        if let Some(value) = self.params.get(address) {
319            return Ok(value.clone());
320        }
321
322        // Request from server
323        let (tx, rx) = oneshot::channel();
324        self.pending_gets.insert(address.to_string(), tx);
325
326        let msg = Message::Get(GetMessage {
327            address: address.to_string(),
328        });
329        self.send_message(&msg).await?;
330
331        // Wait for response (with timeout)
332        match tokio::time::timeout(std::time::Duration::from_secs(5), rx).await {
333            Ok(Ok(value)) => Ok(value),
334            Ok(Err(_)) => Err(ClientError::Other("Get cancelled".to_string())),
335            Err(_) => Err(ClientError::Timeout),
336        }
337    }
338
339    /// Emit an event
340    pub async fn emit(&self, address: &str, payload: impl Into<Value>) -> Result<()> {
341        let msg = Message::Publish(PublishMessage {
342            address: address.to_string(),
343            signal: Some(SignalType::Event),
344            value: None,
345            payload: Some(payload.into()),
346            samples: None,
347            rate: None,
348            id: None,
349            phase: None,
350            timestamp: Some(self.time()),
351        });
352
353        self.send_message(&msg).await
354    }
355
356    /// Send stream sample
357    pub async fn stream(&self, address: &str, value: impl Into<Value>) -> Result<()> {
358        let msg = Message::Publish(PublishMessage {
359            address: address.to_string(),
360            signal: Some(SignalType::Stream),
361            value: Some(value.into()),
362            payload: None,
363            samples: None,
364            rate: None,
365            id: None,
366            phase: None,
367            timestamp: Some(self.time()),
368        });
369
370        self.send_message(&msg).await
371    }
372
373    /// Send atomic bundle
374    pub async fn bundle(&self, messages: Vec<Message>) -> Result<()> {
375        let msg = Message::Bundle(BundleMessage {
376            timestamp: None,
377            messages,
378        });
379
380        self.send_message(&msg).await
381    }
382
383    /// Send scheduled bundle
384    pub async fn bundle_at(&self, messages: Vec<Message>, time: u64) -> Result<()> {
385        let msg = Message::Bundle(BundleMessage {
386            timestamp: Some(time),
387            messages,
388        });
389
390        self.send_message(&msg).await
391    }
392
393    /// Get cached param value
394    pub fn cached(&self, address: &str) -> Option<Value> {
395        self.params.get(address).map(|v| v.clone())
396    }
397
398    /// Close connection
399    pub async fn close(&self) {
400        *self.connected.write() = false;
401        *self.sender.write() = None;
402    }
403}
404
405/// Handle incoming message
406fn handle_message(
407    msg: &Message,
408    params: &Arc<DashMap<String, Value>>,
409    subscriptions: &Arc<DashMap<u32, (String, SubscriptionCallback)>>,
410    pending_gets: &Arc<DashMap<String, oneshot::Sender<Value>>>,
411) {
412    match msg {
413        Message::Set(set) => {
414            // Update cache
415            params.insert(set.address.clone(), set.value.clone());
416
417            // Notify subscribers
418            for entry in subscriptions.iter() {
419                let (pattern, callback) = entry.value();
420                if clasp_core::address::glob_match(pattern, &set.address) {
421                    callback(set.value.clone(), &set.address);
422                }
423            }
424        }
425
426        Message::Snapshot(snapshot) => {
427            for param in &snapshot.params {
428                params.insert(param.address.clone(), param.value.clone());
429
430                // Complete pending gets
431                if let Some((_, tx)) = pending_gets.remove(&param.address) {
432                    let _ = tx.send(param.value.clone());
433                }
434
435                // Notify subscribers
436                for entry in subscriptions.iter() {
437                    let (pattern, callback) = entry.value();
438                    if clasp_core::address::glob_match(pattern, &param.address) {
439                        callback(param.value.clone(), &param.address);
440                    }
441                }
442            }
443        }
444
445        Message::Publish(pub_msg) => {
446            // Notify subscribers
447            let value = pub_msg
448                .value
449                .clone()
450                .or_else(|| pub_msg.payload.clone())
451                .unwrap_or(Value::Null);
452
453            for entry in subscriptions.iter() {
454                let (pattern, callback) = entry.value();
455                if clasp_core::address::glob_match(pattern, &pub_msg.address) {
456                    callback(value.clone(), &pub_msg.address);
457                }
458            }
459        }
460
461        _ => {}
462    }
463}