Skip to main content

ember_plus/client/
connection.rs

1//! Client connection handling.
2
3use std::sync::Arc;
4use std::time::Duration;
5use tokio::io::{AsyncReadExt, AsyncWriteExt};
6use tokio::net::TcpStream;
7use tokio::sync::{mpsc, Mutex, RwLock};
8use tokio::time::timeout;
9use tracing::{debug, error, info, warn};
10
11use crate::error::{Error, Result};
12use crate::codec;
13use crate::glow::{
14    GlowRoot, GlowElement, GlowNode, GlowParameter, EmberPath,
15    InvocationResult, parse_path, EmberValue, CommandBuilder,
16};
17use crate::s101::{S101Codec, S101Encoder, S101Message};
18use crate::tree::{EmberTree, TreeNodeRef};
19
20/// Configuration for the Ember+ client.
21#[derive(Debug, Clone)]
22pub struct ClientConfig {
23    /// Connection timeout
24    pub connect_timeout: Duration,
25    /// Request timeout
26    pub request_timeout: Duration,
27    /// Keep-alive interval
28    pub keepalive_interval: Duration,
29    /// Auto-reconnect on disconnect
30    pub auto_reconnect: bool,
31    /// Maximum reconnect attempts
32    pub max_reconnect_attempts: u32,
33}
34
35impl Default for ClientConfig {
36    fn default() -> Self {
37        ClientConfig {
38            connect_timeout: Duration::from_secs(10),
39            request_timeout: Duration::from_secs(30),
40            keepalive_interval: Duration::from_secs(5),
41            auto_reconnect: true,
42            max_reconnect_attempts: 3,
43        }
44    }
45}
46
47/// Callback type for value updates.
48pub type ValueCallback = Box<dyn Fn(&EmberPath, &EmberValue) + Send + Sync>;
49
50/// Callback type for stream updates.
51pub type StreamCallback = Box<dyn Fn(i32, &EmberValue) + Send + Sync>;
52
53/// An Ember+ client for connecting to providers.
54pub struct EmberClient {
55    /// Remote address
56    address: String,
57    /// Configuration
58    config: ClientConfig,
59    /// TCP stream
60    stream: Arc<Mutex<Option<TcpStream>>>,
61    /// S101 codec
62    codec: Arc<Mutex<S101Codec>>,
63    /// Local tree cache
64    tree: Arc<RwLock<EmberTree>>,
65    /// Connected flag
66    connected: Arc<RwLock<bool>>,
67    /// Request counter for invocations
68    request_counter: Arc<Mutex<i32>>,
69    /// Value update callbacks
70    value_callbacks: Arc<RwLock<Vec<ValueCallback>>>,
71    /// Stream update callbacks  
72    stream_callbacks: Arc<RwLock<Vec<StreamCallback>>>,
73}
74
75impl EmberClient {
76    /// Connect to an Ember+ provider.
77    pub async fn connect(address: &str) -> Result<Self> {
78        Self::connect_with_config(address, ClientConfig::default()).await
79    }
80
81    /// Connect with custom configuration.
82    pub async fn connect_with_config(address: &str, config: ClientConfig) -> Result<Self> {
83        info!("Connecting to Ember+ provider at {}", address);
84
85        let stream = timeout(config.connect_timeout, TcpStream::connect(address))
86            .await
87            .map_err(|_| Error::Timeout)?
88            .map_err(Error::Io)?;
89
90        stream.set_nodelay(true).ok();
91
92        info!("Connected to {}", address);
93
94        let client = EmberClient {
95            address: address.to_string(),
96            config,
97            stream: Arc::new(Mutex::new(Some(stream))),
98            codec: Arc::new(Mutex::new(S101Codec::new())),
99            tree: Arc::new(RwLock::new(EmberTree::new())),
100            connected: Arc::new(RwLock::new(true)),
101            request_counter: Arc::new(Mutex::new(0)),
102            value_callbacks: Arc::new(RwLock::new(Vec::new())),
103            stream_callbacks: Arc::new(RwLock::new(Vec::new())),
104        };
105
106        Ok(client)
107    }
108
109    /// Check if connected.
110    pub async fn is_connected(&self) -> bool {
111        *self.connected.read().await
112    }
113
114    /// Disconnect from the provider.
115    pub async fn disconnect(&self) -> Result<()> {
116        info!("Disconnecting from {}", self.address);
117        
118        *self.connected.write().await = false;
119        
120        if let Some(stream) = self.stream.lock().await.take() {
121            drop(stream);
122        }
123
124        Ok(())
125    }
126
127    /// Get the root directory.
128    pub async fn get_directory(&self) -> Result<GlowRoot> {
129        let root = GlowRoot::get_directory();
130        self.send_and_receive(root).await
131    }
132
133    /// Send a raw Glow request and get response.
134    pub async fn send_request(&self, request: GlowRoot) -> Result<GlowRoot> {
135        self.send_and_receive(request).await
136    }
137
138    /// Get an element by path (numeric path like "0.1.2").
139    /// This navigates progressively through the tree, requesting children at each level.
140    pub async fn get_element_by_path(&self, path: &str) -> Result<Option<TreeNodeRef>> {
141        let path_vec = parse_path(path)?;
142        
143        // Check if already in cache
144        if let Some(node) = self.tree.read().await.get_by_path(&path_vec) {
145            return Ok(Some(node));
146        }
147
148        // Navigate progressively: for each level, ensure we have the children
149        for i in 0..path_vec.len() {
150            let current_path = &path_vec[0..=i];
151            
152            // Check if this level is in cache
153            if self.tree.read().await.get_by_path(current_path).is_some() {
154                continue;
155            }
156
157            // Get parent path and identifier
158            let parent_path = if i == 0 { vec![] } else { path_vec[0..i].to_vec() };
159            
160            // Get identifier from parent's children info (if available)
161            let identifier = if !parent_path.is_empty() {
162                let tree = self.tree.read().await;
163                tree.get_by_path(&parent_path)
164                    .and_then(|parent| {
165                        parent.read().children()
166                            .find(|c| c.read().number() == current_path[i])
167                            .and_then(|c| c.read().identifier().map(|s| s.to_string()))
168                    })
169            } else {
170                // For root children, check root nodes
171                let tree = self.tree.read().await;
172                tree.get_by_path(current_path)
173                    .and_then(|n| n.read().identifier().map(|s| s.to_string()))
174            };
175
176            // Request this level with identifier
177            let elements = CommandBuilder::get_directory_at_path_with_info(
178                &current_path.to_vec(),
179                identifier.as_deref()
180            );
181            let root = GlowRoot::with_elements(elements);
182            let response = self.send_and_receive(root).await?;
183            self.tree.write().await.update_from_glow(&response);
184        }
185
186        // Return from cache
187        Ok(self.tree.read().await.get_by_path(&path_vec))
188    }
189
190    /// Expand the tree under a node (get all children recursively).
191    pub async fn expand(&self, path: &str) -> Result<()> {
192        let path_vec = parse_path(path)?;
193        self.expand_path(&path_vec).await
194    }
195
196    /// Expand starting from root.
197    pub async fn expand_root(&self) -> Result<()> {
198        // Get root directory first
199        let root_response = self.get_directory().await?;
200        self.tree.write().await.update_from_glow(&root_response);
201
202        // Expand each root child
203        for element in &root_response.elements {
204            if let Some(number) = element.number() {
205                self.expand_path(&[number]).await?;
206            }
207        }
208
209        Ok(())
210    }
211
212    /// Expand a specific path.
213    async fn expand_path(&self, path: &[i32]) -> Result<()> {
214        let path_vec = path.to_vec();
215        let elements = CommandBuilder::get_directory_at_path(&path_vec);
216        let root = GlowRoot::with_elements(elements);
217        let response = self.send_and_receive(root).await?;
218
219        self.tree.write().await.update_from_glow(&response);
220
221        // Recursively expand children
222        for element in &response.elements {
223            self.expand_element(element, path.to_vec()).await?;
224        }
225
226        Ok(())
227    }
228
229    /// Recursively expand an element's children.
230    async fn expand_element(&self, element: &GlowElement, parent_path: Vec<i32>) -> Result<()> {
231        match element {
232            GlowElement::Node(node) => {
233                let mut path = parent_path;
234                path.push(node.number);
235
236                // Request children
237                let elements = CommandBuilder::get_directory_at_path(&path);
238                let root = GlowRoot::with_elements(elements);
239                if let Ok(response) = self.send_and_receive(root).await {
240                    self.tree.write().await.update_from_glow(&response);
241
242                    for child in &node.children {
243                        Box::pin(self.expand_element(child, path.clone())).await?;
244                    }
245                }
246            }
247            GlowElement::QualifiedNode(path, node) => {
248                let elements = CommandBuilder::get_directory_at_path(path);
249                let root = GlowRoot::with_elements(elements);
250                if let Ok(response) = self.send_and_receive(root).await {
251                    self.tree.write().await.update_from_glow(&response);
252
253                    for child in &node.children {
254                        Box::pin(self.expand_element(child, path.clone())).await?;
255                    }
256                }
257            }
258            _ => {}
259        }
260
261        Ok(())
262    }
263
264    /// Set a parameter value.
265    pub async fn set_value(&self, path: &str, value: EmberValue) -> Result<()> {
266        let path_vec = parse_path(path)?;
267        let elements = CommandBuilder::set_value_at_path(&path_vec, value);
268        let root = GlowRoot::with_elements(elements);
269        
270        self.send_and_receive(root).await?;
271        Ok(())
272    }
273
274    /// Subscribe to changes at a path.
275    pub async fn subscribe(&self, path: &str) -> Result<()> {
276        let path_vec = parse_path(path)?;
277        let elements = CommandBuilder::subscribe_at_path(&path_vec);
278        let root = GlowRoot::with_elements(elements);
279        
280        self.send_and_receive(root).await?;
281        Ok(())
282    }
283
284    /// Unsubscribe from changes at a path.
285    pub async fn unsubscribe(&self, path: &str) -> Result<()> {
286        let path_vec = parse_path(path)?;
287        let elements = CommandBuilder::unsubscribe_at_path(&path_vec);
288        let root = GlowRoot::with_elements(elements);
289        
290        self.send_and_receive(root).await?;
291        Ok(())
292    }
293
294    /// Invoke a function.
295    pub async fn invoke(&self, path: &str, arguments: Vec<EmberValue>) -> Result<InvocationResult> {
296        let path_vec = parse_path(path)?;
297        
298        let invocation_id = {
299            let mut counter = self.request_counter.lock().await;
300            *counter += 1;
301            *counter
302        };
303
304        let elements = CommandBuilder::invoke_at_path(&path_vec, invocation_id, arguments);
305        let root = GlowRoot::with_elements(elements);
306        
307        let response = self.send_and_receive(root).await?;
308
309        // Find the invocation result
310        for result in response.invocation_results {
311            if result.invocation_id == invocation_id {
312                return Ok(result);
313            }
314        }
315
316        Err(Error::Timeout)
317    }
318
319    /// Get the local tree cache.
320    pub async fn tree(&self) -> tokio::sync::RwLockReadGuard<'_, EmberTree> {
321        self.tree.read().await
322    }
323
324    /// Register a callback for value updates.
325    pub async fn on_value_change(&self, callback: ValueCallback) {
326        self.value_callbacks.write().await.push(callback);
327    }
328
329    /// Register a callback for stream updates.
330    pub async fn on_stream_update(&self, callback: StreamCallback) {
331        self.stream_callbacks.write().await.push(callback);
332    }
333
334    /// Send a request and wait for response.
335    async fn send_and_receive(&self, request: GlowRoot) -> Result<GlowRoot> {
336        // Encode to Glow/BER
337        let glow_bytes = codec::encode(&request)?;
338        
339        // Frame with S101
340        let frame_bytes = S101Encoder::encode_ember_packet(&glow_bytes);
341
342        // Send
343        {
344            let mut stream_guard = self.stream.lock().await;
345            let stream = stream_guard.as_mut().ok_or_else(|| Error::connection("Not connected"))?;
346            
347            stream.write_all(&frame_bytes).await.map_err(Error::Io)?;
348            stream.flush().await.map_err(Error::Io)?;
349        }
350
351        debug!("Sent {} bytes", frame_bytes.len());
352
353        // Receive response
354        let response = timeout(self.config.request_timeout, self.receive_response()).await
355            .map_err(|_| Error::Timeout)??;
356
357        Ok(response)
358    }
359
360    /// Receive a response from the provider.
361    async fn receive_response(&self) -> Result<GlowRoot> {
362        let mut buffer = vec![0u8; 65536];
363        let mut accumulated = GlowRoot::new();
364
365        loop {
366            let n = {
367                let mut stream_guard = self.stream.lock().await;
368                let stream = stream_guard.as_mut().ok_or_else(|| Error::connection("Not connected"))?;
369                stream.read(&mut buffer).await.map_err(Error::Io)?
370            };
371
372            if n == 0 {
373                return Err(Error::connection("Connection closed"));
374            }
375
376            debug!("Received {} bytes", n);
377
378            // Feed to S101 decoder
379            let mut codec = self.codec.lock().await;
380            codec.feed(&buffer[..n]);
381
382            // Try to decode frames
383            while let Ok(Some(frame)) = codec.decode_frame() {
384                let message = frame.to_message();
385
386                match message {
387                    S101Message::EmberData { payload, flags, .. } => {
388                        // Decode Glow payload
389                        if let Ok(root) = codec::decode(&payload) {
390                            // Update tree first (before moving data)
391                            self.tree.write().await.update_from_glow(&root);
392
393                            // Notify callbacks
394                            self.notify_updates(&root).await;
395
396                            // Merge into accumulated response
397                            accumulated.elements.extend(root.elements);
398                            accumulated.invocation_results.extend(root.invocation_results);
399                            accumulated.streams.extend(root.streams);
400
401                            if flags.last_packet {
402                                return Ok(accumulated);
403                            }
404                        }
405                    }
406                    S101Message::KeepAliveRequest => {
407                        // Send keep-alive response
408                        let response = S101Encoder::encode_keepalive_response();
409                        let mut stream_guard = self.stream.lock().await;
410                        if let Some(stream) = stream_guard.as_mut() {
411                            let _ = stream.write_all(&response).await;
412                        }
413                    }
414                    S101Message::KeepAliveResponse => {
415                        debug!("Received keep-alive response");
416                    }
417                }
418            }
419
420            // If we got data and it seems complete, return
421            if !accumulated.elements.is_empty() || !accumulated.invocation_results.is_empty() {
422                return Ok(accumulated);
423            }
424        }
425    }
426
427    /// Notify callbacks of updates.
428    async fn notify_updates(&self, root: &GlowRoot) {
429        // Notify stream callbacks
430        let stream_callbacks = self.stream_callbacks.read().await;
431        for entry in &root.streams {
432            for callback in stream_callbacks.iter() {
433                callback(entry.stream_identifier, &entry.value);
434            }
435        }
436
437        // Notify value callbacks for parameter updates
438        let value_callbacks = self.value_callbacks.read().await;
439        for element in &root.elements {
440            self.notify_element_updates(element, vec![], &value_callbacks).await;
441        }
442    }
443
444    /// Recursively notify callbacks for element updates.
445    async fn notify_element_updates(
446        &self,
447        element: &GlowElement,
448        parent_path: Vec<i32>,
449        callbacks: &[ValueCallback],
450    ) {
451        match element {
452            GlowElement::Parameter(param) => {
453                if let Some(value) = &param.value {
454                    let mut path = parent_path;
455                    path.push(param.number);
456                    for callback in callbacks {
457                        callback(&path, value);
458                    }
459                }
460            }
461            GlowElement::QualifiedParameter(path, param) => {
462                if let Some(value) = &param.value {
463                    for callback in callbacks {
464                        callback(path, value);
465                    }
466                }
467            }
468            GlowElement::Node(node) => {
469                let mut path = parent_path;
470                path.push(node.number);
471                for child in &node.children {
472                    Box::pin(self.notify_element_updates(child, path.clone(), callbacks)).await;
473                }
474            }
475            GlowElement::QualifiedNode(path, node) => {
476                for child in &node.children {
477                    Box::pin(self.notify_element_updates(child, path.clone(), callbacks)).await;
478                }
479            }
480            _ => {}
481        }
482    }
483}
484
485impl Drop for EmberClient {
486    fn drop(&mut self) {
487        // Connection cleanup happens automatically when stream is dropped
488    }
489}