clasp_client/
client.rs

1//! Main Clasp client implementation
2
3use bytes::Bytes;
4use dashmap::DashMap;
5use parking_lot::{Mutex, RwLock};
6use clasp_core::{
7    codec, time::ClockSync, BundleMessage, GetMessage, HelloMessage, Message, PublishMessage,
8    SetMessage, SignalType, SubscribeMessage, SubscribeOptions, UnsubscribeMessage, Value,
9    PROTOCOL_VERSION,
10};
11use clasp_transport::{Transport, TransportEvent, TransportReceiver, TransportSender, WebSocketTransport};
12use std::sync::atomic::{AtomicU32, Ordering};
13use std::sync::Arc;
14use tokio::sync::{mpsc, oneshot};
15use tracing::{debug, error, info, warn};
16
17use crate::builder::ClaspBuilder;
18use crate::error::{ClientError, Result};
19
20/// Subscription callback type
21pub type SubscriptionCallback = Box<dyn Fn(Value, &str) + Send + Sync>;
22
23/// A Clasp client
24pub struct Clasp {
25    url: String,
26    name: String,
27    features: Vec<String>,
28    token: Option<String>,
29    reconnect: bool,
30    reconnect_interval_ms: u64,
31
32    /// Session ID (set after connect)
33    session_id: RwLock<Option<String>>,
34
35    /// Connection state
36    connected: Arc<RwLock<bool>>,
37
38    /// Sender for outgoing messages
39    sender: RwLock<Option<mpsc::Sender<Bytes>>>,
40
41    /// Local param cache
42    params: Arc<DashMap<String, Value>>,
43
44    /// Subscriptions
45    subscriptions: Arc<DashMap<u32, (String, SubscriptionCallback)>>,
46
47    /// Subscription ID counter
48    next_sub_id: AtomicU32,
49
50    /// Clock synchronization
51    clock: RwLock<ClockSync>,
52
53    /// Pending get requests
54    pending_gets: Arc<DashMap<String, oneshot::Sender<Value>>>,
55}
56
57impl Clasp {
58    /// Create a new client (use builder for more options)
59    pub fn new(
60        url: &str,
61        name: String,
62        features: Vec<String>,
63        token: Option<String>,
64        reconnect: bool,
65        reconnect_interval_ms: u64,
66    ) -> Self {
67        Self {
68            url: url.to_string(),
69            name,
70            features,
71            token,
72            reconnect,
73            reconnect_interval_ms,
74            session_id: RwLock::new(None),
75            connected: Arc::new(RwLock::new(false)),
76            sender: RwLock::new(None),
77            params: Arc::new(DashMap::new()),
78            subscriptions: Arc::new(DashMap::new()),
79            next_sub_id: AtomicU32::new(1),
80            clock: RwLock::new(ClockSync::new()),
81            pending_gets: Arc::new(DashMap::new()),
82        }
83    }
84
85    /// Create a builder
86    pub fn builder(url: &str) -> ClaspBuilder {
87        ClaspBuilder::new(url)
88    }
89
90    /// Connect to server (convenience method)
91    pub async fn connect_to(url: &str) -> Result<Self> {
92        ClaspBuilder::new(url).connect().await
93    }
94
95    /// Internal connect
96    pub(crate) async fn do_connect(&mut self) -> Result<()> {
97        if *self.connected.read() {
98            return Err(ClientError::AlreadyConnected);
99        }
100
101        info!("Connecting to {}", self.url);
102
103        // Connect WebSocket
104        let (sender, mut receiver) = <WebSocketTransport as Transport>::connect(&self.url).await?;
105
106        // Create send channel
107        let (tx, mut rx) = mpsc::channel::<Bytes>(100);
108        *self.sender.write() = Some(tx);
109
110        let connected = self.connected.clone();
111
112        // Spawn sender task
113        let sender = Arc::new(sender);
114        let sender_clone = sender.clone();
115        tokio::spawn(async move {
116            while let Some(data) = rx.recv().await {
117                if let Err(e) = sender_clone.send(data).await {
118                    error!("Send error: {}", e);
119                    break;
120                }
121            }
122        });
123
124        // Send HELLO
125        let hello = Message::Hello(HelloMessage {
126            version: PROTOCOL_VERSION,
127            name: self.name.clone(),
128            features: self.features.clone(),
129            capabilities: None,
130            token: self.token.clone(),
131        });
132
133        self.send_message(&hello).await?;
134
135        // Wait for WELCOME
136        loop {
137            match receiver.recv().await {
138                Some(TransportEvent::Data(data)) => {
139                    match codec::decode(&data) {
140                        Ok((Message::Welcome(welcome), _)) => {
141                            *self.session_id.write() = Some(welcome.session.clone());
142                            *connected.write() = true;
143
144                            // Sync clock
145                            self.clock.write().process_sync(
146                                clasp_core::time::now(),
147                                welcome.time,
148                                welcome.time,
149                                clasp_core::time::now(),
150                            );
151
152                            info!("Connected, session: {}", welcome.session);
153                            break;
154                        }
155                        Ok((msg, _)) => {
156                            debug!("Received during handshake: {:?}", msg);
157                        }
158                        Err(e) => {
159                            warn!("Decode error: {}", e);
160                        }
161                    }
162                }
163                Some(TransportEvent::Error(e)) => {
164                    return Err(ClientError::ConnectionFailed(e));
165                }
166                Some(TransportEvent::Disconnected { reason }) => {
167                    return Err(ClientError::ConnectionFailed(
168                        reason.unwrap_or_else(|| "Disconnected".to_string()),
169                    ));
170                }
171                None => {
172                    return Err(ClientError::ConnectionFailed("Connection closed".to_string()));
173                }
174                _ => {}
175            }
176        }
177
178        // Spawn receiver task
179        let params = Arc::clone(&self.params);
180        let subscriptions = Arc::clone(&self.subscriptions);
181        let pending_gets = Arc::clone(&self.pending_gets);
182        let connected_clone = Arc::clone(&self.connected);
183
184        tokio::spawn(async move {
185            while let Some(event) = receiver.recv().await {
186                match event {
187                    TransportEvent::Data(data) => {
188                        if let Ok((msg, _)) = codec::decode(&data) {
189                            handle_message(&msg, &params, &subscriptions, &pending_gets);
190                        }
191                    }
192                    TransportEvent::Disconnected { reason } => {
193                        info!("Disconnected: {:?}", reason);
194                        *connected_clone.write() = false;
195                        break;
196                    }
197                    TransportEvent::Error(e) => {
198                        error!("Error: {}", e);
199                    }
200                    _ => {}
201                }
202            }
203        });
204
205        Ok(())
206    }
207
208    /// Check if connected
209    pub fn is_connected(&self) -> bool {
210        *self.connected.read()
211    }
212
213    /// Get session ID
214    pub fn session_id(&self) -> Option<String> {
215        self.session_id.read().clone()
216    }
217
218    /// Get current server time (microseconds)
219    pub fn time(&self) -> u64 {
220        self.clock.read().server_time()
221    }
222
223    /// Send a raw message
224    async fn send_message(&self, message: &Message) -> Result<()> {
225        let data = codec::encode(message)?;
226        self.send_raw(data).await
227    }
228
229    /// Send raw bytes
230    async fn send_raw(&self, data: Bytes) -> Result<()> {
231        let sender = self.sender.read();
232        if let Some(tx) = sender.as_ref() {
233            tx.send(data)
234                .await
235                .map_err(|e| ClientError::SendFailed(e.to_string()))?;
236            Ok(())
237        } else {
238            Err(ClientError::NotConnected)
239        }
240    }
241
242    /// Subscribe to an address pattern
243    pub async fn subscribe<F>(&self, pattern: &str, callback: F) -> Result<u32>
244    where
245        F: Fn(Value, &str) + Send + Sync + 'static,
246    {
247        let id = self.next_sub_id.fetch_add(1, Ordering::SeqCst);
248
249        // Store callback
250        self.subscriptions
251            .insert(id, (pattern.to_string(), Box::new(callback)));
252
253        // Send subscribe message
254        let msg = Message::Subscribe(SubscribeMessage {
255            id,
256            pattern: pattern.to_string(),
257            types: vec![],
258            options: Some(SubscribeOptions::default()),
259        });
260
261        self.send_message(&msg).await?;
262
263        debug!("Subscribed to {} (id: {})", pattern, id);
264        Ok(id)
265    }
266
267    /// Shorthand for subscribe
268    pub async fn on<F>(&self, pattern: &str, callback: F) -> Result<u32>
269    where
270        F: Fn(Value, &str) + Send + Sync + 'static,
271    {
272        self.subscribe(pattern, callback).await
273    }
274
275    /// Unsubscribe
276    pub async fn unsubscribe(&self, id: u32) -> Result<()> {
277        self.subscriptions.remove(&id);
278
279        let msg = Message::Unsubscribe(UnsubscribeMessage { id });
280        self.send_message(&msg).await?;
281
282        Ok(())
283    }
284
285    /// Set a parameter value
286    pub async fn set(&self, address: &str, value: impl Into<Value>) -> Result<()> {
287        let msg = Message::Set(SetMessage {
288            address: address.to_string(),
289            value: value.into(),
290            revision: None,
291            lock: false,
292            unlock: false,
293        });
294
295        self.send_message(&msg).await
296    }
297
298    /// Set with lock
299    pub async fn set_locked(&self, address: &str, value: impl Into<Value>) -> Result<()> {
300        let msg = Message::Set(SetMessage {
301            address: address.to_string(),
302            value: value.into(),
303            revision: None,
304            lock: true,
305            unlock: false,
306        });
307
308        self.send_message(&msg).await
309    }
310
311    /// Get current value (cached or request)
312    pub async fn get(&self, address: &str) -> Result<Value> {
313        // Check cache first
314        if let Some(value) = self.params.get(address) {
315            return Ok(value.clone());
316        }
317
318        // Request from server
319        let (tx, rx) = oneshot::channel();
320        self.pending_gets.insert(address.to_string(), tx);
321
322        let msg = Message::Get(GetMessage {
323            address: address.to_string(),
324        });
325        self.send_message(&msg).await?;
326
327        // Wait for response (with timeout)
328        match tokio::time::timeout(std::time::Duration::from_secs(5), rx).await {
329            Ok(Ok(value)) => Ok(value),
330            Ok(Err(_)) => Err(ClientError::Other("Get cancelled".to_string())),
331            Err(_) => Err(ClientError::Timeout),
332        }
333    }
334
335    /// Emit an event
336    pub async fn emit(&self, address: &str, payload: impl Into<Value>) -> Result<()> {
337        let msg = Message::Publish(PublishMessage {
338            address: address.to_string(),
339            signal: Some(SignalType::Event),
340            value: None,
341            payload: Some(payload.into()),
342            samples: None,
343            rate: None,
344            id: None,
345            phase: None,
346            timestamp: Some(self.time()),
347        });
348
349        self.send_message(&msg).await
350    }
351
352    /// Send stream sample
353    pub async fn stream(&self, address: &str, value: impl Into<Value>) -> Result<()> {
354        let msg = Message::Publish(PublishMessage {
355            address: address.to_string(),
356            signal: Some(SignalType::Stream),
357            value: Some(value.into()),
358            payload: None,
359            samples: None,
360            rate: None,
361            id: None,
362            phase: None,
363            timestamp: Some(self.time()),
364        });
365
366        self.send_message(&msg).await
367    }
368
369    /// Send atomic bundle
370    pub async fn bundle(&self, messages: Vec<Message>) -> Result<()> {
371        let msg = Message::Bundle(BundleMessage {
372            timestamp: None,
373            messages,
374        });
375
376        self.send_message(&msg).await
377    }
378
379    /// Send scheduled bundle
380    pub async fn bundle_at(&self, messages: Vec<Message>, time: u64) -> Result<()> {
381        let msg = Message::Bundle(BundleMessage {
382            timestamp: Some(time),
383            messages,
384        });
385
386        self.send_message(&msg).await
387    }
388
389    /// Get cached param value
390    pub fn cached(&self, address: &str) -> Option<Value> {
391        self.params.get(address).map(|v| v.clone())
392    }
393
394    /// Close connection
395    pub async fn close(&self) {
396        *self.connected.write() = false;
397        *self.sender.write() = None;
398    }
399}
400
401/// Handle incoming message
402fn handle_message(
403    msg: &Message,
404    params: &Arc<DashMap<String, Value>>,
405    subscriptions: &Arc<DashMap<u32, (String, SubscriptionCallback)>>,
406    pending_gets: &Arc<DashMap<String, oneshot::Sender<Value>>>,
407) {
408    match msg {
409        Message::Set(set) => {
410            // Update cache
411            params.insert(set.address.clone(), set.value.clone());
412
413            // Notify subscribers
414            for entry in subscriptions.iter() {
415                let (pattern, callback) = entry.value();
416                if clasp_core::address::glob_match(pattern, &set.address) {
417                    callback(set.value.clone(), &set.address);
418                }
419            }
420        }
421
422        Message::Snapshot(snapshot) => {
423            for param in &snapshot.params {
424                params.insert(param.address.clone(), param.value.clone());
425
426                // Complete pending gets
427                if let Some((_, tx)) = pending_gets.remove(&param.address) {
428                    let _ = tx.send(param.value.clone());
429                }
430
431                // Notify subscribers
432                for entry in subscriptions.iter() {
433                    let (pattern, callback) = entry.value();
434                    if clasp_core::address::glob_match(pattern, &param.address) {
435                        callback(param.value.clone(), &param.address);
436                    }
437                }
438            }
439        }
440
441        Message::Publish(pub_msg) => {
442            // Notify subscribers
443            let value = pub_msg
444                .value
445                .clone()
446                .or_else(|| pub_msg.payload.clone())
447                .unwrap_or(Value::Null);
448
449            for entry in subscriptions.iter() {
450                let (pattern, callback) = entry.value();
451                if clasp_core::address::glob_match(pattern, &pub_msg.address) {
452                    callback(value.clone(), &pub_msg.address);
453                }
454            }
455        }
456
457        _ => {}
458    }
459}