Skip to main content

plc_comm_hostlink/
client.rs

1use crate::address::{
2    force_device_types, mbs_device_types, model_name_for_code, mws_device_types, parse_device,
3    rdc_device_types, resolve_effective_format, validate_device_count, validate_device_span,
4    validate_device_type, validate_expansion_buffer_count, validate_expansion_buffer_span,
5    wr_device_types, ws_device_types,
6};
7use crate::device_ranges::{KvDeviceRangeCatalog, device_range_catalog_for_query_model};
8use crate::error::HostLinkError;
9use crate::helpers;
10use crate::model::{
11    HostLinkClock, HostLinkConnectionOptions, HostLinkTraceDirection, HostLinkTraceFrame,
12    HostLinkTransportMode, KvModelInfo, KvPlcMode, TraceHook,
13};
14use crate::protocol::{
15    build_frame, decode_comment_response, decode_response, ensure_success, split_data_tokens,
16};
17use std::fmt::Write as _;
18use std::future::Future;
19use std::sync::Arc;
20use std::time::{Duration, SystemTime};
21use tokio::io::{AsyncReadExt, AsyncWriteExt};
22use tokio::net::{TcpStream, UdpSocket};
23use tokio::sync::Mutex;
24use tokio::time::timeout;
25
26const UDP_RECEIVE_BUFFER_SIZE: usize = 65_535;
27
28pub trait HostLinkPayloadValue {
29    fn format_for_suffix(&self, data_format: &str) -> String;
30
31    fn append_to_payload(&self, data_format: &str, output: &mut String) {
32        output.push_str(&self.format_for_suffix(data_format));
33    }
34}
35
36macro_rules! impl_payload_for_ints {
37    ($($ty:ty),* $(,)?) => {
38        $(
39            impl HostLinkPayloadValue for $ty {
40                fn format_for_suffix(&self, data_format: &str) -> String {
41                    let mut value = String::new();
42                    self.append_to_payload(data_format, &mut value);
43                    value
44                }
45
46                fn append_to_payload(&self, data_format: &str, output: &mut String) {
47                    if data_format == ".H" {
48                        let _ = write!(output, "{:X}", ((*self as i128) & 0xFFFF));
49                    } else {
50                        let _ = write!(output, "{}", self);
51                    }
52                }
53            }
54        )*
55    };
56}
57
58impl_payload_for_ints!(u8, u16, u32, u64, usize, i8, i16, i32, i64, isize);
59
60impl HostLinkPayloadValue for f32 {
61    fn format_for_suffix(&self, _data_format: &str) -> String {
62        let mut value = String::new();
63        self.append_to_payload("", &mut value);
64        value
65    }
66
67    fn append_to_payload(&self, _data_format: &str, output: &mut String) {
68        let _ = write!(output, "{}", self);
69    }
70}
71
72impl HostLinkPayloadValue for f64 {
73    fn format_for_suffix(&self, _data_format: &str) -> String {
74        let mut value = String::new();
75        self.append_to_payload("", &mut value);
76        value
77    }
78
79    fn append_to_payload(&self, _data_format: &str, output: &mut String) {
80        let _ = write!(output, "{}", self);
81    }
82}
83
84impl HostLinkPayloadValue for bool {
85    fn format_for_suffix(&self, _data_format: &str) -> String {
86        let mut value = String::new();
87        self.append_to_payload("", &mut value);
88        value
89    }
90
91    fn append_to_payload(&self, _data_format: &str, output: &mut String) {
92        output.push(if *self { '1' } else { '0' });
93    }
94}
95
96impl HostLinkPayloadValue for String {
97    fn format_for_suffix(&self, _data_format: &str) -> String {
98        self.trim().to_owned()
99    }
100
101    fn append_to_payload(&self, _data_format: &str, output: &mut String) {
102        output.push_str(self.trim());
103    }
104}
105
106impl HostLinkPayloadValue for &str {
107    fn format_for_suffix(&self, _data_format: &str) -> String {
108        self.trim().to_owned()
109    }
110
111    fn append_to_payload(&self, _data_format: &str, output: &mut String) {
112        output.push_str(self.trim());
113    }
114}
115
116impl<T: HostLinkPayloadValue + ?Sized> HostLinkPayloadValue for &T {
117    fn format_for_suffix(&self, data_format: &str) -> String {
118        (*self).format_for_suffix(data_format)
119    }
120
121    fn append_to_payload(&self, data_format: &str, output: &mut String) {
122        (*self).append_to_payload(data_format, output);
123    }
124}
125
126#[derive(Clone)]
127pub struct HostLinkClient {
128    inner: Arc<Mutex<ClientInner>>,
129}
130
131pub struct HostLinkClientFactory;
132
133#[derive(Clone)]
134pub struct QueuedHostLinkClient {
135    client: HostLinkClient,
136    gate: Arc<Mutex<()>>,
137}
138
139enum Transport {
140    Tcp(TcpStream),
141    Udp(UdpSocket),
142}
143
144struct ClientInner {
145    options: HostLinkConnectionOptions,
146    transport: Option<Transport>,
147    trace_hook: Option<TraceHook>,
148    rx_buf: Vec<u8>,
149    rx_start: usize,
150    rx_count: usize,
151    tcp_read_buf: Vec<u8>,
152    udp_read_buf: Vec<u8>,
153}
154
155impl HostLinkClient {
156    pub fn new(options: HostLinkConnectionOptions) -> Self {
157        Self {
158            inner: Arc::new(Mutex::new(ClientInner {
159                options,
160                transport: None,
161                trace_hook: None,
162                rx_buf: vec![0u8; 4096],
163                rx_start: 0,
164                rx_count: 0,
165                tcp_read_buf: vec![0u8; 8192],
166                udp_read_buf: vec![0u8; UDP_RECEIVE_BUFFER_SIZE],
167            })),
168        }
169    }
170
171    pub async fn connect(options: HostLinkConnectionOptions) -> Result<Self, HostLinkError> {
172        let client = Self::new(options);
173        client.open().await?;
174        Ok(client)
175    }
176
177    pub async fn open(&self) -> Result<(), HostLinkError> {
178        self.inner.lock().await.open().await
179    }
180
181    pub async fn close(&self) -> Result<(), HostLinkError> {
182        self.inner.lock().await.close();
183        Ok(())
184    }
185
186    pub async fn is_open(&self) -> bool {
187        self.inner.lock().await.transport.is_some()
188    }
189
190    pub async fn timeout(&self) -> Duration {
191        self.inner.lock().await.options.timeout
192    }
193
194    pub async fn set_timeout(&self, timeout: Duration) {
195        self.inner.lock().await.options.timeout = timeout;
196    }
197
198    pub async fn append_lf_on_send(&self) -> bool {
199        self.inner.lock().await.options.append_lf_on_send
200    }
201
202    pub async fn set_append_lf_on_send(&self, value: bool) {
203        self.inner.lock().await.options.append_lf_on_send = value;
204    }
205
206    pub async fn set_trace_hook(&self, trace_hook: Option<TraceHook>) {
207        self.inner.lock().await.trace_hook = trace_hook;
208    }
209
210    pub async fn send_raw(&self, body: &str) -> Result<String, HostLinkError> {
211        self.inner.lock().await.send_raw(body).await
212    }
213
214    pub async fn change_mode(&self, mode: KvPlcMode) -> Result<(), HostLinkError> {
215        self.expect_ok(&format!("M{}", mode as u8)).await
216    }
217
218    pub async fn clear_error(&self) -> Result<(), HostLinkError> {
219        self.expect_ok("ER").await
220    }
221
222    pub async fn check_error_no(&self) -> Result<String, HostLinkError> {
223        self.send_raw("?E").await
224    }
225
226    pub async fn query_model(&self) -> Result<KvModelInfo, HostLinkError> {
227        let code = self.send_raw("?K").await?;
228        Ok(KvModelInfo {
229            model: model_name_for_code(&code).to_owned(),
230            code,
231        })
232    }
233
234    pub async fn read_device_range_catalog(&self) -> Result<KvDeviceRangeCatalog, HostLinkError> {
235        let model = self.query_model().await?;
236        device_range_catalog_for_query_model(&model)
237    }
238
239    pub async fn confirm_operating_mode(&self) -> Result<KvPlcMode, HostLinkError> {
240        match self.send_raw("?M").await?.parse::<u8>() {
241            Ok(0) => Ok(KvPlcMode::Program),
242            Ok(1) => Ok(KvPlcMode::Run),
243            _ => Err(HostLinkError::protocol("Unsupported PLC mode response")),
244        }
245    }
246
247    pub async fn set_time(&self, value: Option<HostLinkClock>) -> Result<(), HostLinkError> {
248        let value = value.unwrap_or_else(HostLinkClock::now_local);
249        if value.month == 0
250            || value.month > 12
251            || value.day == 0
252            || value.day > 31
253            || value.hour > 23
254            || value.minute > 59
255            || value.second > 59
256            || value.week > 6
257        {
258            return Err(HostLinkError::protocol(
259                "Invalid time fields for WRT command",
260            ));
261        }
262
263        self.expect_ok(&format!(
264            "WRT {:02} {:02} {:02} {:02} {:02} {:02} {}",
265            value.year, value.month, value.day, value.hour, value.minute, value.second, value.week
266        ))
267        .await
268    }
269
270    pub async fn forced_set(&self, device: &str) -> Result<(), HostLinkError> {
271        let mut address = parse_device(device)?;
272        validate_device_type("ST", &address.device_type, force_device_types())?;
273        address.suffix.clear();
274        self.expect_ok(&format!("ST {}", address.to_text()?)).await
275    }
276
277    pub async fn forced_reset(&self, device: &str) -> Result<(), HostLinkError> {
278        let mut address = parse_device(device)?;
279        validate_device_type("RS", &address.device_type, force_device_types())?;
280        address.suffix.clear();
281        self.expect_ok(&format!("RS {}", address.to_text()?)).await
282    }
283
284    pub async fn read(
285        &self,
286        device: &str,
287        data_format: Option<&str>,
288    ) -> Result<Vec<String>, HostLinkError> {
289        let mut address = parse_device(device)?;
290        let suffix = if let Some(data_format) = data_format {
291            crate::address::normalize_suffix(data_format)?
292        } else {
293            address.suffix.clone()
294        };
295        let suffix = resolve_effective_format(&address.device_type, &suffix);
296        validate_device_span(&address.device_type, address.number, &suffix, 1)?;
297        address.suffix = suffix;
298        let response = self.send_raw(&format!("RD {}", address.to_text()?)).await?;
299        Ok(split_data_tokens(&response))
300    }
301
302    pub async fn read_consecutive(
303        &self,
304        device: &str,
305        count: usize,
306        data_format: Option<&str>,
307    ) -> Result<Vec<String>, HostLinkError> {
308        let mut address = parse_device(device)?;
309        let suffix = if let Some(data_format) = data_format {
310            crate::address::normalize_suffix(data_format)?
311        } else {
312            address.suffix.clone()
313        };
314        let suffix = resolve_effective_format(&address.device_type, &suffix);
315        validate_device_count(&address.device_type, &suffix, count)?;
316        validate_device_span(&address.device_type, address.number, &suffix, count)?;
317        address.suffix = suffix;
318        let response = self
319            .send_raw(&format!("RDS {} {}", address.to_text()?, count))
320            .await?;
321        Ok(split_data_tokens(&response))
322    }
323
324    pub async fn write<T: HostLinkPayloadValue>(
325        &self,
326        device: &str,
327        value: T,
328        data_format: Option<&str>,
329    ) -> Result<(), HostLinkError> {
330        let mut address = parse_device(device)?;
331        let suffix = if let Some(data_format) = data_format {
332            crate::address::normalize_suffix(data_format)?
333        } else {
334            address.suffix.clone()
335        };
336        let suffix = resolve_effective_format(&address.device_type, &suffix);
337        validate_device_type("WR", &address.device_type, wr_device_types())?;
338        validate_device_span(&address.device_type, address.number, &suffix, 1)?;
339        address.suffix = suffix.clone();
340        let mut command = String::from("WR ");
341        command.push_str(&address.to_text()?);
342        command.push(' ');
343        value.append_to_payload(&suffix, &mut command);
344        self.expect_ok(&command).await
345    }
346
347    pub async fn write_consecutive<T: HostLinkPayloadValue>(
348        &self,
349        device: &str,
350        values: &[T],
351        data_format: Option<&str>,
352    ) -> Result<(), HostLinkError> {
353        if values.is_empty() {
354            return Err(HostLinkError::protocol("values must not be empty"));
355        }
356
357        let mut address = parse_device(device)?;
358        let suffix = if let Some(data_format) = data_format {
359            crate::address::normalize_suffix(data_format)?
360        } else {
361            address.suffix.clone()
362        };
363        let suffix = resolve_effective_format(&address.device_type, &suffix);
364        validate_device_type("WRS", &address.device_type, wr_device_types())?;
365        validate_device_count(&address.device_type, &suffix, values.len())?;
366        validate_device_span(&address.device_type, address.number, &suffix, values.len())?;
367        address.suffix = suffix.clone();
368        let payload = build_joined_payload(values, &suffix);
369        self.expect_ok(&format!(
370            "WRS {} {} {}",
371            address.to_text()?,
372            values.len(),
373            payload
374        ))
375        .await
376    }
377
378    pub async fn register_monitor_bits<S: AsRef<str>>(
379        &self,
380        devices: &[S],
381    ) -> Result<(), HostLinkError> {
382        if devices.is_empty() {
383            return Err(HostLinkError::protocol("At least one device is required"));
384        }
385        if devices.len() > 120 {
386            return Err(HostLinkError::protocol(
387                "Maximum 120 devices can be registered",
388            ));
389        }
390
391        let mut command = String::from("MBS");
392        for device in devices {
393            let mut address = parse_device(device.as_ref())?;
394            validate_device_type("MBS", &address.device_type, mbs_device_types())?;
395            address.suffix.clear();
396            command.push(' ');
397            command.push_str(&address.to_text()?);
398        }
399        self.expect_ok(&command).await
400    }
401
402    pub async fn register_monitor_words<S: AsRef<str>>(
403        &self,
404        devices: &[S],
405    ) -> Result<(), HostLinkError> {
406        if devices.is_empty() {
407            return Err(HostLinkError::protocol("At least one device is required"));
408        }
409        if devices.len() > 120 {
410            return Err(HostLinkError::protocol(
411                "Maximum 120 devices can be registered",
412            ));
413        }
414
415        let mut command = String::from("MWS");
416        for device in devices {
417            let mut address = parse_device(device.as_ref())?;
418            validate_device_type("MWS", &address.device_type, mws_device_types())?;
419            let suffix = resolve_effective_format(&address.device_type, &address.suffix);
420            validate_device_span(&address.device_type, address.number, &suffix, 1)?;
421            address.suffix = suffix;
422            command.push(' ');
423            command.push_str(&address.to_text()?);
424        }
425        self.expect_ok(&command).await
426    }
427
428    pub async fn read_monitor_bits(&self) -> Result<Vec<String>, HostLinkError> {
429        let response = self.send_raw("MBR").await?;
430        Ok(split_data_tokens(&response))
431    }
432
433    pub async fn read_monitor_words(&self) -> Result<Vec<String>, HostLinkError> {
434        let response = self.send_raw("MWR").await?;
435        Ok(split_data_tokens(&response))
436    }
437
438    pub async fn forced_set_consecutive(
439        &self,
440        device: &str,
441        count: usize,
442    ) -> Result<(), HostLinkError> {
443        if !(1..=16).contains(&count) {
444            return Err(HostLinkError::protocol("count must be 1-16."));
445        }
446        let mut address = parse_device(device)?;
447        validate_device_type("STS", &address.device_type, force_device_types())?;
448        address.suffix.clear();
449        self.expect_ok(&format!("STS {} {}", address.to_text()?, count))
450            .await
451    }
452
453    pub async fn forced_reset_consecutive(
454        &self,
455        device: &str,
456        count: usize,
457    ) -> Result<(), HostLinkError> {
458        if !(1..=16).contains(&count) {
459            return Err(HostLinkError::protocol("count must be 1-16."));
460        }
461        let mut address = parse_device(device)?;
462        validate_device_type("RSS", &address.device_type, force_device_types())?;
463        address.suffix.clear();
464        self.expect_ok(&format!("RSS {} {}", address.to_text()?, count))
465            .await
466    }
467
468    pub async fn read_consecutive_legacy(
469        &self,
470        device: &str,
471        count: usize,
472        data_format: Option<&str>,
473    ) -> Result<Vec<String>, HostLinkError> {
474        let mut address = parse_device(device)?;
475        let suffix = if let Some(data_format) = data_format {
476            crate::address::normalize_suffix(data_format)?
477        } else {
478            address.suffix.clone()
479        };
480        let suffix = resolve_effective_format(&address.device_type, &suffix);
481        validate_device_count(&address.device_type, &suffix, count)?;
482        validate_device_span(&address.device_type, address.number, &suffix, count)?;
483        address.suffix = suffix;
484        let response = self
485            .send_raw(&format!("RDE {} {}", address.to_text()?, count))
486            .await?;
487        Ok(split_data_tokens(&response))
488    }
489
490    pub async fn write_consecutive_legacy<T: HostLinkPayloadValue>(
491        &self,
492        device: &str,
493        values: &[T],
494        data_format: Option<&str>,
495    ) -> Result<(), HostLinkError> {
496        if values.is_empty() {
497            return Err(HostLinkError::protocol("values must not be empty"));
498        }
499        let mut address = parse_device(device)?;
500        let suffix = if let Some(data_format) = data_format {
501            crate::address::normalize_suffix(data_format)?
502        } else {
503            address.suffix.clone()
504        };
505        let suffix = resolve_effective_format(&address.device_type, &suffix);
506        validate_device_type("WRE", &address.device_type, wr_device_types())?;
507        validate_device_count(&address.device_type, &suffix, values.len())?;
508        validate_device_span(&address.device_type, address.number, &suffix, values.len())?;
509        address.suffix = suffix.clone();
510        let payload = build_joined_payload(values, &suffix);
511        self.expect_ok(&format!(
512            "WRE {} {} {}",
513            address.to_text()?,
514            values.len(),
515            payload
516        ))
517        .await
518    }
519
520    pub async fn write_set_value<T: HostLinkPayloadValue>(
521        &self,
522        device: &str,
523        value: T,
524        data_format: Option<&str>,
525    ) -> Result<(), HostLinkError> {
526        let mut address = parse_device(device)?;
527        validate_device_type("WS", &address.device_type, ws_device_types())?;
528        let suffix = if let Some(data_format) = data_format {
529            crate::address::normalize_suffix(data_format)?
530        } else {
531            resolve_effective_format(&address.device_type, &address.suffix)
532        };
533        validate_device_span(&address.device_type, address.number, &suffix, 1)?;
534        address.suffix = suffix.clone();
535        let mut command = String::from("WS ");
536        command.push_str(&address.to_text()?);
537        command.push(' ');
538        value.append_to_payload(&suffix, &mut command);
539        self.expect_ok(&command).await
540    }
541
542    pub async fn write_set_value_consecutive<T: HostLinkPayloadValue>(
543        &self,
544        device: &str,
545        values: &[T],
546        data_format: Option<&str>,
547    ) -> Result<(), HostLinkError> {
548        if values.is_empty() {
549            return Err(HostLinkError::protocol("values must not be empty"));
550        }
551        let mut address = parse_device(device)?;
552        validate_device_type("WSS", &address.device_type, ws_device_types())?;
553        let suffix = if let Some(data_format) = data_format {
554            crate::address::normalize_suffix(data_format)?
555        } else {
556            resolve_effective_format(&address.device_type, &address.suffix)
557        };
558        validate_device_span(&address.device_type, address.number, &suffix, values.len())?;
559        address.suffix = suffix.clone();
560        let payload = build_joined_payload(values, &suffix);
561        self.expect_ok(&format!(
562            "WSS {} {} {}",
563            address.to_text()?,
564            values.len(),
565            payload
566        ))
567        .await
568    }
569
570    pub async fn switch_bank(&self, bank_no: u8) -> Result<(), HostLinkError> {
571        if bank_no > 15 {
572            return Err(HostLinkError::protocol("bankNo must be 0-15."));
573        }
574        self.expect_ok(&format!("BE {bank_no}")).await
575    }
576
577    pub async fn read_expansion_unit_buffer(
578        &self,
579        unit_no: u8,
580        address: u32,
581        count: usize,
582        data_format: Option<&str>,
583    ) -> Result<Vec<String>, HostLinkError> {
584        if unit_no > 48 {
585            return Err(HostLinkError::protocol("unitNo must be 0-48."));
586        }
587        if address > 59_999 {
588            return Err(HostLinkError::protocol("address must be 0-59999."));
589        }
590        let suffix = if let Some(data_format) = data_format {
591            crate::address::normalize_suffix(data_format)?
592        } else {
593            ".U".to_owned()
594        };
595        validate_expansion_buffer_count(&suffix, count)?;
596        validate_expansion_buffer_span(address, &suffix, count)?;
597        let response = self
598            .send_raw(&format!("URD {unit_no:02} {address}{suffix} {count}"))
599            .await?;
600        Ok(split_data_tokens(&response))
601    }
602
603    pub async fn write_expansion_unit_buffer<T: HostLinkPayloadValue>(
604        &self,
605        unit_no: u8,
606        address: u32,
607        values: &[T],
608        data_format: Option<&str>,
609    ) -> Result<(), HostLinkError> {
610        if values.is_empty() {
611            return Err(HostLinkError::protocol("values must not be empty"));
612        }
613        if unit_no > 48 {
614            return Err(HostLinkError::protocol("unitNo must be 0-48."));
615        }
616        if address > 59_999 {
617            return Err(HostLinkError::protocol("address must be 0-59999."));
618        }
619        let suffix = if let Some(data_format) = data_format {
620            crate::address::normalize_suffix(data_format)?
621        } else {
622            ".U".to_owned()
623        };
624        validate_expansion_buffer_count(&suffix, values.len())?;
625        validate_expansion_buffer_span(address, &suffix, values.len())?;
626        let payload = build_joined_payload(values, &suffix);
627        self.expect_ok(&format!(
628            "UWR {unit_no:02} {address}{suffix} {} {payload}",
629            values.len()
630        ))
631        .await
632    }
633
634    pub async fn read_comments(
635        &self,
636        device: &str,
637        strip_padding: bool,
638    ) -> Result<String, HostLinkError> {
639        let mut address = parse_device(device)?;
640        validate_device_type("RDC", &address.device_type, rdc_device_types())?;
641        address.suffix.clear();
642        let response = self
643            .inner
644            .lock()
645            .await
646            .send_raw_decoded(
647                &format!("RDC {}", address.to_text()?),
648                decode_comment_response,
649            )
650            .await?;
651        if strip_padding {
652            Ok(response.trim_end_matches(' ').to_owned())
653        } else {
654            Ok(response)
655        }
656    }
657
658    pub async fn read_typed(
659        &self,
660        device: &str,
661        dtype: &str,
662    ) -> Result<helpers::HostLinkValue, HostLinkError> {
663        helpers::read_typed(self, device, dtype).await
664    }
665
666    pub async fn read_timer_counter(
667        &self,
668        device: &str,
669    ) -> Result<helpers::TimerCounterValue, HostLinkError> {
670        helpers::read_timer_counter(self, device).await
671    }
672
673    pub async fn read_timer(
674        &self,
675        device: &str,
676    ) -> Result<helpers::TimerCounterValue, HostLinkError> {
677        helpers::read_timer(self, device).await
678    }
679
680    pub async fn read_counter(
681        &self,
682        device: &str,
683    ) -> Result<helpers::TimerCounterValue, HostLinkError> {
684        helpers::read_counter(self, device).await
685    }
686
687    pub async fn write_typed<T: HostLinkPayloadValue>(
688        &self,
689        device: &str,
690        dtype: &str,
691        value: T,
692    ) -> Result<(), HostLinkError> {
693        helpers::write_typed(self, device, dtype, &value).await
694    }
695
696    pub async fn read_named<S: AsRef<str>>(
697        &self,
698        addresses: &[S],
699    ) -> Result<helpers::NamedSnapshot, HostLinkError> {
700        helpers::read_named(self, addresses).await
701    }
702
703    pub async fn write_bit_in_word(
704        &self,
705        device: &str,
706        bit_index: u8,
707        value: bool,
708    ) -> Result<(), HostLinkError> {
709        helpers::write_bit_in_word(self, device, bit_index, value).await
710    }
711
712    async fn expect_ok(&self, body: &str) -> Result<(), HostLinkError> {
713        let response = self.send_raw(body).await?;
714        if response == "OK" {
715            Ok(())
716        } else {
717            Err(HostLinkError::protocol(format!(
718                "Expected 'OK' but received '{response}' for command '{body}'"
719            )))
720        }
721    }
722}
723
724impl ClientInner {
725    async fn open(&mut self) -> Result<(), HostLinkError> {
726        if self.transport.is_some() {
727            return Ok(());
728        }
729
730        let transport = match self.options.transport {
731            HostLinkTransportMode::Tcp => {
732                let stream = timeout(
733                    self.options.timeout,
734                    TcpStream::connect((self.options.host.as_str(), self.options.port)),
735                )
736                .await
737                .map_err(|_| HostLinkError::connection("tcp connect timed out"))??;
738                stream.set_nodelay(true)?;
739                Transport::Tcp(stream)
740            }
741            HostLinkTransportMode::Udp => {
742                let socket = UdpSocket::bind("0.0.0.0:0").await?;
743                timeout(
744                    self.options.timeout,
745                    socket.connect((self.options.host.as_str(), self.options.port)),
746                )
747                .await
748                .map_err(|_| HostLinkError::connection("udp connect timed out"))??;
749                Transport::Udp(socket)
750            }
751        };
752
753        self.transport = Some(transport);
754        self.rx_start = 0;
755        self.rx_count = 0;
756        Ok(())
757    }
758
759    fn close(&mut self) {
760        self.transport = None;
761        self.rx_start = 0;
762        self.rx_count = 0;
763    }
764
765    async fn send_raw(&mut self, body: &str) -> Result<String, HostLinkError> {
766        self.send_raw_decoded(body, decode_response).await
767    }
768
769    async fn send_raw_decoded<F>(&mut self, body: &str, decoder: F) -> Result<String, HostLinkError>
770    where
771        F: Fn(&[u8]) -> Result<String, HostLinkError>,
772    {
773        self.open().await?;
774        let frame = build_frame(body, self.options.append_lf_on_send);
775        self.fire_trace(HostLinkTraceDirection::Send, &frame);
776
777        match self.transport.as_mut() {
778            Some(Transport::Tcp(stream)) => {
779                write_all_with_timeout(stream, &frame, self.options.timeout).await?;
780                let raw = recv_tcp_line(
781                    stream,
782                    &mut self.rx_buf,
783                    &mut self.rx_start,
784                    &mut self.rx_count,
785                    &mut self.tcp_read_buf,
786                    self.options.timeout,
787                )
788                .await?;
789                self.fire_trace(HostLinkTraceDirection::Receive, &raw);
790                ensure_success(decoder(&raw)?)
791            }
792            Some(Transport::Udp(socket)) => {
793                send_udp_with_timeout(socket, &frame, self.options.timeout).await?;
794                recv_udp_with_timeout(socket, &mut self.udp_read_buf, self.options.timeout).await?;
795                let raw = &self.udp_read_buf;
796                self.fire_trace(HostLinkTraceDirection::Receive, raw);
797                ensure_success(decoder(raw)?)
798            }
799            None => Err(HostLinkError::connection("transport was not opened")),
800        }
801    }
802
803    fn fire_trace(&self, direction: HostLinkTraceDirection, data: &[u8]) {
804        if let Some(trace_hook) = &self.trace_hook {
805            trace_hook(HostLinkTraceFrame {
806                direction,
807                data: data.to_vec(),
808                timestamp: SystemTime::now(),
809            });
810        }
811    }
812}
813
814impl HostLinkClientFactory {
815    pub async fn open_and_connect(
816        options: HostLinkConnectionOptions,
817    ) -> Result<QueuedHostLinkClient, HostLinkError> {
818        if options.host.trim().is_empty() {
819            return Err(HostLinkError::protocol("Host must not be empty."));
820        }
821
822        let client = HostLinkClient::new(options);
823        let queued = QueuedHostLinkClient::new(client);
824        queued.open().await?;
825        Ok(queued)
826    }
827}
828
829pub async fn open_and_connect(
830    options: HostLinkConnectionOptions,
831) -> Result<QueuedHostLinkClient, HostLinkError> {
832    HostLinkClientFactory::open_and_connect(options).await
833}
834
835impl QueuedHostLinkClient {
836    pub fn new(client: HostLinkClient) -> Self {
837        Self {
838            client,
839            gate: Arc::new(Mutex::new(())),
840        }
841    }
842
843    pub fn inner_client(&self) -> &HostLinkClient {
844        &self.client
845    }
846
847    pub async fn is_open(&self) -> bool {
848        self.client.is_open().await
849    }
850
851    pub async fn open(&self) -> Result<(), HostLinkError> {
852        let _guard = self.gate.lock().await;
853        self.client.open().await
854    }
855
856    pub async fn close(&self) -> Result<(), HostLinkError> {
857        let _guard = self.gate.lock().await;
858        self.client.close().await
859    }
860
861    pub async fn set_trace_hook(&self, trace_hook: Option<TraceHook>) {
862        let _guard = self.gate.lock().await;
863        self.client.set_trace_hook(trace_hook).await;
864    }
865
866    pub async fn execute_async<F, Fut, T>(&self, operation: F) -> Result<T, HostLinkError>
867    where
868        F: FnOnce(&HostLinkClient) -> Fut,
869        Fut: Future<Output = Result<T, HostLinkError>>,
870    {
871        let _guard = self.gate.lock().await;
872        operation(&self.client).await
873    }
874
875    pub async fn send_raw(&self, body: &str) -> Result<String, HostLinkError> {
876        let _guard = self.gate.lock().await;
877        self.client.send_raw(body).await
878    }
879
880    pub async fn read_comments(
881        &self,
882        device: &str,
883        strip_padding: bool,
884    ) -> Result<String, HostLinkError> {
885        let _guard = self.gate.lock().await;
886        self.client.read_comments(device, strip_padding).await
887    }
888
889    pub async fn read_typed(
890        &self,
891        device: &str,
892        dtype: &str,
893    ) -> Result<helpers::HostLinkValue, HostLinkError> {
894        let _guard = self.gate.lock().await;
895        helpers::read_typed(&self.client, device, dtype).await
896    }
897
898    pub async fn read_timer_counter(
899        &self,
900        device: &str,
901    ) -> Result<helpers::TimerCounterValue, HostLinkError> {
902        let _guard = self.gate.lock().await;
903        helpers::read_timer_counter(&self.client, device).await
904    }
905
906    pub async fn read_timer(
907        &self,
908        device: &str,
909    ) -> Result<helpers::TimerCounterValue, HostLinkError> {
910        let _guard = self.gate.lock().await;
911        helpers::read_timer(&self.client, device).await
912    }
913
914    pub async fn read_counter(
915        &self,
916        device: &str,
917    ) -> Result<helpers::TimerCounterValue, HostLinkError> {
918        let _guard = self.gate.lock().await;
919        helpers::read_counter(&self.client, device).await
920    }
921
922    pub async fn write_typed<T: HostLinkPayloadValue>(
923        &self,
924        device: &str,
925        dtype: &str,
926        value: T,
927    ) -> Result<(), HostLinkError> {
928        let _guard = self.gate.lock().await;
929        helpers::write_typed(&self.client, device, dtype, &value).await
930    }
931
932    pub async fn write_bit_in_word(
933        &self,
934        device: &str,
935        bit_index: u8,
936        value: bool,
937    ) -> Result<(), HostLinkError> {
938        let _guard = self.gate.lock().await;
939        helpers::write_bit_in_word(&self.client, device, bit_index, value).await
940    }
941
942    pub async fn read_named<S: AsRef<str>>(
943        &self,
944        addresses: &[S],
945    ) -> Result<helpers::NamedSnapshot, HostLinkError> {
946        let _guard = self.gate.lock().await;
947        helpers::read_named(&self.client, addresses).await
948    }
949
950    pub async fn read_device_range_catalog(&self) -> Result<KvDeviceRangeCatalog, HostLinkError> {
951        let _guard = self.gate.lock().await;
952        self.client.read_device_range_catalog().await
953    }
954
955    pub fn poll<'a, S: AsRef<str> + 'a>(
956        &'a self,
957        addresses: &'a [S],
958        interval: Duration,
959    ) -> impl futures_core::Stream<Item = Result<helpers::NamedSnapshot, HostLinkError>> + 'a {
960        async_stream::try_stream! {
961            let addr_list = addresses.iter().map(|item| item.as_ref().to_owned()).collect::<Vec<_>>();
962            let compiled = helpers::compile_read_named_plan(&addr_list);
963            loop {
964                let snapshot = {
965                    let _guard = self.gate.lock().await;
966                    if let Some(plan) = &compiled {
967                        helpers::execute_read_named_plan(&self.client, plan).await?
968                    } else {
969                        helpers::read_named_sequential(&self.client, &addr_list).await?
970                    }
971                };
972                yield snapshot;
973                tokio::time::sleep(interval).await;
974            }
975        }
976    }
977
978    pub async fn read_words(&self, device: &str, count: usize) -> Result<Vec<u16>, HostLinkError> {
979        let _guard = self.gate.lock().await;
980        helpers::read_words(self.inner_client(), device, count).await
981    }
982
983    pub async fn read_dwords(&self, device: &str, count: usize) -> Result<Vec<u32>, HostLinkError> {
984        let _guard = self.gate.lock().await;
985        helpers::read_dwords(self.inner_client(), device, count).await
986    }
987}
988
989async fn write_all_with_timeout(
990    stream: &mut TcpStream,
991    payload: &[u8],
992    duration: Duration,
993) -> Result<(), HostLinkError> {
994    timeout(duration, stream.write_all(payload))
995        .await
996        .map_err(|_| HostLinkError::connection("write timed out"))??;
997    Ok(())
998}
999
1000async fn send_udp_with_timeout(
1001    socket: &mut UdpSocket,
1002    payload: &[u8],
1003    duration: Duration,
1004) -> Result<(), HostLinkError> {
1005    timeout(duration, socket.send(payload))
1006        .await
1007        .map_err(|_| HostLinkError::connection("write timed out"))??;
1008    Ok(())
1009}
1010
1011async fn recv_udp_with_timeout(
1012    socket: &mut UdpSocket,
1013    buffer: &mut Vec<u8>,
1014    duration: Duration,
1015) -> Result<(), HostLinkError> {
1016    if buffer.len() != UDP_RECEIVE_BUFFER_SIZE {
1017        // UDP datagrams cannot be continued by another recv call.
1018        // Keep the buffer large enough for a full datagram to avoid truncating PLC responses.
1019        buffer.resize(UDP_RECEIVE_BUFFER_SIZE, 0);
1020    }
1021    let read = timeout(duration, socket.recv(buffer.as_mut_slice()))
1022        .await
1023        .map_err(|_| HostLinkError::connection("read timed out"))??;
1024    buffer.truncate(read);
1025    Ok(())
1026}
1027
1028fn build_joined_payload<T: HostLinkPayloadValue>(values: &[T], suffix: &str) -> String {
1029    let mut payload = String::new();
1030    for (index, value) in values.iter().enumerate() {
1031        if index > 0 {
1032            payload.push(' ');
1033        }
1034        value.append_to_payload(suffix, &mut payload);
1035    }
1036    payload
1037}
1038
1039async fn recv_tcp_line(
1040    stream: &mut TcpStream,
1041    rx_buf: &mut Vec<u8>,
1042    rx_start: &mut usize,
1043    rx_count: &mut usize,
1044    tcp_read_buf: &mut [u8],
1045    duration: Duration,
1046) -> Result<Vec<u8>, HostLinkError> {
1047    loop {
1048        let mut found_idx = None;
1049        for index in 0..*rx_count {
1050            let byte = rx_buf[*rx_start + index];
1051            if matches!(byte, b'\r' | b'\n') {
1052                found_idx = Some(index);
1053                break;
1054            }
1055        }
1056
1057        if let Some(found_idx) = found_idx {
1058            let line = rx_buf[*rx_start..*rx_start + found_idx].to_vec();
1059            let mut skip = found_idx;
1060            while skip < *rx_count && matches!(rx_buf[*rx_start + skip], b'\r' | b'\n') {
1061                skip += 1;
1062            }
1063            *rx_start += skip;
1064            *rx_count -= skip;
1065            if *rx_start > rx_buf.len() / 2 {
1066                rx_buf.copy_within(*rx_start..*rx_start + *rx_count, 0);
1067                *rx_start = 0;
1068            }
1069            return Ok(line);
1070        }
1071
1072        let read = timeout(duration, stream.read(tcp_read_buf))
1073            .await
1074            .map_err(|_| HostLinkError::connection("read timed out"))??;
1075        if read == 0 {
1076            if *rx_count > 0 {
1077                let line = rx_buf[*rx_start..*rx_start + *rx_count].to_vec();
1078                *rx_start = 0;
1079                *rx_count = 0;
1080                return Ok(line);
1081            }
1082            return Err(HostLinkError::connection("Connection closed by PLC"));
1083        }
1084
1085        if *rx_start + *rx_count + read > rx_buf.len() {
1086            if *rx_count > 0 {
1087                rx_buf.copy_within(*rx_start..*rx_start + *rx_count, 0);
1088            }
1089            *rx_start = 0;
1090            if *rx_count + read > rx_buf.len() {
1091                rx_buf.resize((rx_buf.len() * 2).max(*rx_count + read), 0);
1092            }
1093        }
1094
1095        let target = *rx_start + *rx_count;
1096        rx_buf[target..target + read].copy_from_slice(&tcp_read_buf[..read]);
1097        *rx_count += read;
1098    }
1099}