1use std::sync::Arc;
4use std::time::Duration;
5use tokio::io::{AsyncReadExt, AsyncWriteExt};
6use tokio::net::TcpStream;
7use tokio::sync::{mpsc, Mutex, RwLock};
8use tokio::time::timeout;
9use tracing::{debug, error, info, warn};
10
11use crate::error::{Error, Result};
12use crate::codec;
13use crate::glow::{
14 GlowRoot, GlowElement, GlowNode, GlowParameter, EmberPath,
15 InvocationResult, parse_path, EmberValue, CommandBuilder,
16};
17use crate::s101::{S101Codec, S101Encoder, S101Message};
18use crate::tree::{EmberTree, TreeNodeRef};
19
20#[derive(Debug, Clone)]
22pub struct ClientConfig {
23 pub connect_timeout: Duration,
25 pub request_timeout: Duration,
27 pub keepalive_interval: Duration,
29 pub auto_reconnect: bool,
31 pub max_reconnect_attempts: u32,
33}
34
35impl Default for ClientConfig {
36 fn default() -> Self {
37 ClientConfig {
38 connect_timeout: Duration::from_secs(10),
39 request_timeout: Duration::from_secs(30),
40 keepalive_interval: Duration::from_secs(5),
41 auto_reconnect: true,
42 max_reconnect_attempts: 3,
43 }
44 }
45}
46
47pub type ValueCallback = Box<dyn Fn(&EmberPath, &EmberValue) + Send + Sync>;
49
50pub type StreamCallback = Box<dyn Fn(i32, &EmberValue) + Send + Sync>;
52
53pub struct EmberClient {
55 address: String,
57 config: ClientConfig,
59 stream: Arc<Mutex<Option<TcpStream>>>,
61 codec: Arc<Mutex<S101Codec>>,
63 tree: Arc<RwLock<EmberTree>>,
65 connected: Arc<RwLock<bool>>,
67 request_counter: Arc<Mutex<i32>>,
69 value_callbacks: Arc<RwLock<Vec<ValueCallback>>>,
71 stream_callbacks: Arc<RwLock<Vec<StreamCallback>>>,
73}
74
75impl EmberClient {
76 pub async fn connect(address: &str) -> Result<Self> {
78 Self::connect_with_config(address, ClientConfig::default()).await
79 }
80
81 pub async fn connect_with_config(address: &str, config: ClientConfig) -> Result<Self> {
83 info!("Connecting to Ember+ provider at {}", address);
84
85 let stream = timeout(config.connect_timeout, TcpStream::connect(address))
86 .await
87 .map_err(|_| Error::Timeout)?
88 .map_err(Error::Io)?;
89
90 stream.set_nodelay(true).ok();
91
92 info!("Connected to {}", address);
93
94 let client = EmberClient {
95 address: address.to_string(),
96 config,
97 stream: Arc::new(Mutex::new(Some(stream))),
98 codec: Arc::new(Mutex::new(S101Codec::new())),
99 tree: Arc::new(RwLock::new(EmberTree::new())),
100 connected: Arc::new(RwLock::new(true)),
101 request_counter: Arc::new(Mutex::new(0)),
102 value_callbacks: Arc::new(RwLock::new(Vec::new())),
103 stream_callbacks: Arc::new(RwLock::new(Vec::new())),
104 };
105
106 Ok(client)
107 }
108
109 pub async fn is_connected(&self) -> bool {
111 *self.connected.read().await
112 }
113
114 pub async fn disconnect(&self) -> Result<()> {
116 info!("Disconnecting from {}", self.address);
117
118 *self.connected.write().await = false;
119
120 if let Some(stream) = self.stream.lock().await.take() {
121 drop(stream);
122 }
123
124 Ok(())
125 }
126
127 pub async fn get_directory(&self) -> Result<GlowRoot> {
129 let root = GlowRoot::get_directory();
130 self.send_and_receive(root).await
131 }
132
133 pub async fn send_request(&self, request: GlowRoot) -> Result<GlowRoot> {
135 self.send_and_receive(request).await
136 }
137
138 pub async fn get_element_by_path(&self, path: &str) -> Result<Option<TreeNodeRef>> {
141 let path_vec = parse_path(path)?;
142
143 if let Some(node) = self.tree.read().await.get_by_path(&path_vec) {
145 return Ok(Some(node));
146 }
147
148 for i in 0..path_vec.len() {
150 let current_path = &path_vec[0..=i];
151
152 if self.tree.read().await.get_by_path(current_path).is_some() {
154 continue;
155 }
156
157 let parent_path = if i == 0 { vec![] } else { path_vec[0..i].to_vec() };
159
160 let identifier = if !parent_path.is_empty() {
162 let tree = self.tree.read().await;
163 tree.get_by_path(&parent_path)
164 .and_then(|parent| {
165 parent.read().children()
166 .find(|c| c.read().number() == current_path[i])
167 .and_then(|c| c.read().identifier().map(|s| s.to_string()))
168 })
169 } else {
170 let tree = self.tree.read().await;
172 tree.get_by_path(current_path)
173 .and_then(|n| n.read().identifier().map(|s| s.to_string()))
174 };
175
176 let elements = CommandBuilder::get_directory_at_path_with_info(
178 ¤t_path.to_vec(),
179 identifier.as_deref()
180 );
181 let root = GlowRoot::with_elements(elements);
182 let response = self.send_and_receive(root).await?;
183 self.tree.write().await.update_from_glow(&response);
184 }
185
186 Ok(self.tree.read().await.get_by_path(&path_vec))
188 }
189
190 pub async fn expand(&self, path: &str) -> Result<()> {
192 let path_vec = parse_path(path)?;
193 self.expand_path(&path_vec).await
194 }
195
196 pub async fn expand_root(&self) -> Result<()> {
198 let root_response = self.get_directory().await?;
200 self.tree.write().await.update_from_glow(&root_response);
201
202 for element in &root_response.elements {
204 if let Some(number) = element.number() {
205 self.expand_path(&[number]).await?;
206 }
207 }
208
209 Ok(())
210 }
211
212 async fn expand_path(&self, path: &[i32]) -> Result<()> {
214 let path_vec = path.to_vec();
215 let elements = CommandBuilder::get_directory_at_path(&path_vec);
216 let root = GlowRoot::with_elements(elements);
217 let response = self.send_and_receive(root).await?;
218
219 self.tree.write().await.update_from_glow(&response);
220
221 for element in &response.elements {
223 self.expand_element(element, path.to_vec()).await?;
224 }
225
226 Ok(())
227 }
228
229 async fn expand_element(&self, element: &GlowElement, parent_path: Vec<i32>) -> Result<()> {
231 match element {
232 GlowElement::Node(node) => {
233 let mut path = parent_path;
234 path.push(node.number);
235
236 let elements = CommandBuilder::get_directory_at_path(&path);
238 let root = GlowRoot::with_elements(elements);
239 if let Ok(response) = self.send_and_receive(root).await {
240 self.tree.write().await.update_from_glow(&response);
241
242 for child in &node.children {
243 Box::pin(self.expand_element(child, path.clone())).await?;
244 }
245 }
246 }
247 GlowElement::QualifiedNode(path, node) => {
248 let elements = CommandBuilder::get_directory_at_path(path);
249 let root = GlowRoot::with_elements(elements);
250 if let Ok(response) = self.send_and_receive(root).await {
251 self.tree.write().await.update_from_glow(&response);
252
253 for child in &node.children {
254 Box::pin(self.expand_element(child, path.clone())).await?;
255 }
256 }
257 }
258 _ => {}
259 }
260
261 Ok(())
262 }
263
264 pub async fn set_value(&self, path: &str, value: EmberValue) -> Result<()> {
266 let path_vec = parse_path(path)?;
267 let elements = CommandBuilder::set_value_at_path(&path_vec, value);
268 let root = GlowRoot::with_elements(elements);
269
270 self.send_and_receive(root).await?;
271 Ok(())
272 }
273
274 pub async fn subscribe(&self, path: &str) -> Result<()> {
276 let path_vec = parse_path(path)?;
277 let elements = CommandBuilder::subscribe_at_path(&path_vec);
278 let root = GlowRoot::with_elements(elements);
279
280 self.send_and_receive(root).await?;
281 Ok(())
282 }
283
284 pub async fn unsubscribe(&self, path: &str) -> Result<()> {
286 let path_vec = parse_path(path)?;
287 let elements = CommandBuilder::unsubscribe_at_path(&path_vec);
288 let root = GlowRoot::with_elements(elements);
289
290 self.send_and_receive(root).await?;
291 Ok(())
292 }
293
294 pub async fn invoke(&self, path: &str, arguments: Vec<EmberValue>) -> Result<InvocationResult> {
296 let path_vec = parse_path(path)?;
297
298 let invocation_id = {
299 let mut counter = self.request_counter.lock().await;
300 *counter += 1;
301 *counter
302 };
303
304 let elements = CommandBuilder::invoke_at_path(&path_vec, invocation_id, arguments);
305 let root = GlowRoot::with_elements(elements);
306
307 let response = self.send_and_receive(root).await?;
308
309 for result in response.invocation_results {
311 if result.invocation_id == invocation_id {
312 return Ok(result);
313 }
314 }
315
316 Err(Error::Timeout)
317 }
318
319 pub async fn tree(&self) -> tokio::sync::RwLockReadGuard<'_, EmberTree> {
321 self.tree.read().await
322 }
323
324 pub async fn on_value_change(&self, callback: ValueCallback) {
326 self.value_callbacks.write().await.push(callback);
327 }
328
329 pub async fn on_stream_update(&self, callback: StreamCallback) {
331 self.stream_callbacks.write().await.push(callback);
332 }
333
334 async fn send_and_receive(&self, request: GlowRoot) -> Result<GlowRoot> {
336 let glow_bytes = codec::encode(&request)?;
338
339 let frame_bytes = S101Encoder::encode_ember_packet(&glow_bytes);
341
342 {
344 let mut stream_guard = self.stream.lock().await;
345 let stream = stream_guard.as_mut().ok_or_else(|| Error::connection("Not connected"))?;
346
347 stream.write_all(&frame_bytes).await.map_err(Error::Io)?;
348 stream.flush().await.map_err(Error::Io)?;
349 }
350
351 debug!("Sent {} bytes", frame_bytes.len());
352
353 let response = timeout(self.config.request_timeout, self.receive_response()).await
355 .map_err(|_| Error::Timeout)??;
356
357 Ok(response)
358 }
359
360 async fn receive_response(&self) -> Result<GlowRoot> {
362 let mut buffer = vec![0u8; 65536];
363 let mut accumulated = GlowRoot::new();
364
365 loop {
366 let n = {
367 let mut stream_guard = self.stream.lock().await;
368 let stream = stream_guard.as_mut().ok_or_else(|| Error::connection("Not connected"))?;
369 stream.read(&mut buffer).await.map_err(Error::Io)?
370 };
371
372 if n == 0 {
373 return Err(Error::connection("Connection closed"));
374 }
375
376 debug!("Received {} bytes", n);
377
378 let mut codec = self.codec.lock().await;
380 codec.feed(&buffer[..n]);
381
382 while let Ok(Some(frame)) = codec.decode_frame() {
384 let message = frame.to_message();
385
386 match message {
387 S101Message::EmberData { payload, flags, .. } => {
388 if let Ok(root) = codec::decode(&payload) {
390 self.tree.write().await.update_from_glow(&root);
392
393 self.notify_updates(&root).await;
395
396 accumulated.elements.extend(root.elements);
398 accumulated.invocation_results.extend(root.invocation_results);
399 accumulated.streams.extend(root.streams);
400
401 if flags.last_packet {
402 return Ok(accumulated);
403 }
404 }
405 }
406 S101Message::KeepAliveRequest => {
407 let response = S101Encoder::encode_keepalive_response();
409 let mut stream_guard = self.stream.lock().await;
410 if let Some(stream) = stream_guard.as_mut() {
411 let _ = stream.write_all(&response).await;
412 }
413 }
414 S101Message::KeepAliveResponse => {
415 debug!("Received keep-alive response");
416 }
417 }
418 }
419
420 if !accumulated.elements.is_empty() || !accumulated.invocation_results.is_empty() {
422 return Ok(accumulated);
423 }
424 }
425 }
426
427 async fn notify_updates(&self, root: &GlowRoot) {
429 let stream_callbacks = self.stream_callbacks.read().await;
431 for entry in &root.streams {
432 for callback in stream_callbacks.iter() {
433 callback(entry.stream_identifier, &entry.value);
434 }
435 }
436
437 let value_callbacks = self.value_callbacks.read().await;
439 for element in &root.elements {
440 self.notify_element_updates(element, vec![], &value_callbacks).await;
441 }
442 }
443
444 async fn notify_element_updates(
446 &self,
447 element: &GlowElement,
448 parent_path: Vec<i32>,
449 callbacks: &[ValueCallback],
450 ) {
451 match element {
452 GlowElement::Parameter(param) => {
453 if let Some(value) = ¶m.value {
454 let mut path = parent_path;
455 path.push(param.number);
456 for callback in callbacks {
457 callback(&path, value);
458 }
459 }
460 }
461 GlowElement::QualifiedParameter(path, param) => {
462 if let Some(value) = ¶m.value {
463 for callback in callbacks {
464 callback(path, value);
465 }
466 }
467 }
468 GlowElement::Node(node) => {
469 let mut path = parent_path;
470 path.push(node.number);
471 for child in &node.children {
472 Box::pin(self.notify_element_updates(child, path.clone(), callbacks)).await;
473 }
474 }
475 GlowElement::QualifiedNode(path, node) => {
476 for child in &node.children {
477 Box::pin(self.notify_element_updates(child, path.clone(), callbacks)).await;
478 }
479 }
480 _ => {}
481 }
482 }
483}
484
485impl Drop for EmberClient {
486 fn drop(&mut self) {
487 }
489}