datex_native/com_interfaces/serial/
serial_client.rs1use datex_core::network::com_interfaces::default_setup_data::serial::serial_client::SerialClientInterfaceSetupData;
2use datex_core::{derive_setup_data, network::{
3 com_hub::errors::ComInterfaceCreateError,
4 com_interfaces::com_interface::{
5 properties::{InterfaceDirection, ComInterfaceProperties},
6 },
7}};
8use core::{ result::Result};
9use std::sync::{Arc, Mutex};
10use std::time::Duration;
11use log::{error};
12use datex_core::network::com_interfaces::com_interface::factory::ComInterfaceConfiguration;
13use datex_core::global::dxb_block::DXBBlock;
14use datex_core::network::com_hub::managers::com_interface_manager::ComInterfaceAsyncFactoryResult;
15use datex_core::network::com_interfaces::com_interface::factory::{SocketConfiguration, SendCallback, SendFailure, SocketProperties, SendSuccess, ComInterfaceAsyncFactory};
16use tokio::task::spawn_blocking;
17
18derive_setup_data!(SerialClientInterfaceSetupDataNative, SerialClientInterfaceSetupData);
19
20impl SerialClientInterfaceSetupDataNative {
21 const TIMEOUT: Duration = Duration::from_millis(1000);
22 const BUFFER_SIZE: usize = 1024;
23
24 async fn create_interface(self) -> Result<ComInterfaceConfiguration, ComInterfaceCreateError> {
25 let port_name = self.port_name.clone().ok_or(
26 ComInterfaceCreateError::invalid_setup_data("Port name is required"),
27 )?;
28
29 if port_name.is_empty() {
30 return Err(ComInterfaceCreateError::InvalidSetupData(
31 "Port name cannot be empty".to_string(),
32 ));
33 }
34
35 let port_name_clone = port_name.clone();
36 let port = spawn_blocking(move || {
37 serialport::new(port_name_clone, self.baud_rate)
38 .timeout(Self::TIMEOUT)
39 .open()
40 }).await.unwrap().map_err(|err| {
41 ComInterfaceCreateError::connection_error_with_details(err)
42 })?;
43 let port = Arc::new(Mutex::new(port));
44 let port_clone = port.clone();
45
46 Ok(ComInterfaceConfiguration::new_single_socket(
47 ComInterfaceProperties {
48 name: Some(port_name),
49 ..Self::get_default_properties()
50 },
51 SocketConfiguration::new_in_out(
52 SocketProperties::new(InterfaceDirection::InOut, 1),
53 async gen move {
54 loop {
55 let result = spawn_blocking({
56 let port = port_clone.clone();
57 move || {
58 let mut buffer = [0u8; Self::BUFFER_SIZE];
59 match port.try_lock().unwrap().read(&mut buffer) {
60 Ok(n) if n > 0 => Some(buffer[..n].to_vec()),
61 _ => None,
62 }
63 }
64 }).await;
65 match result {
66 Ok(Some(incoming)) => {
67 yield Ok(incoming);
68 }
69 _ => {
70 error!("Serial read error or shutdown");
71 return yield Err(());
72 }
73 }
74 }
75 },
76 SendCallback::new_sync(
77 move |block: DXBBlock|
78 port.lock()
79 .unwrap()
80 .write_all(block.to_bytes().as_slice())
81 .map_err(|e| {
82 error!("Serial write error: {e}");
83 SendFailure(Box::new(block))
84 })
85 .map(|_| SendSuccess::Sent)
86 )
87 )
88 ))
89 }
90}
91
92impl ComInterfaceAsyncFactory for SerialClientInterfaceSetupDataNative {
93 fn create_interface(self) -> ComInterfaceAsyncFactoryResult {
94 Box::pin(self.create_interface())
95 }
96
97 fn get_default_properties() -> ComInterfaceProperties {
98 SerialClientInterfaceSetupData::get_default_properties()
99 }
100}