1use crate::address::{
2 force_device_types, mbs_device_types, model_name_for_code, mws_device_types, parse_device,
3 rdc_device_types, resolve_effective_format, validate_device_count, validate_device_span,
4 validate_device_type, validate_expansion_buffer_count, validate_expansion_buffer_span,
5 wr_device_types, ws_device_types,
6};
7use crate::device_ranges::{KvDeviceRangeCatalog, device_range_catalog_for_query_model};
8use crate::error::HostLinkError;
9use crate::helpers;
10use crate::model::{
11 HostLinkClock, HostLinkConnectionOptions, HostLinkTraceDirection, HostLinkTraceFrame,
12 HostLinkTransportMode, KvModelInfo, KvPlcMode, TraceHook,
13};
14use crate::protocol::{
15 build_frame, decode_comment_response, decode_response, ensure_success, split_data_tokens,
16};
17use std::fmt::Write as _;
18use std::future::Future;
19use std::sync::Arc;
20use std::time::{Duration, SystemTime};
21use tokio::io::{AsyncReadExt, AsyncWriteExt};
22use tokio::net::{TcpStream, UdpSocket};
23use tokio::sync::Mutex;
24use tokio::time::timeout;
25
26const UDP_RECEIVE_BUFFER_SIZE: usize = 65_535;
27
28pub trait HostLinkPayloadValue {
29 fn format_for_suffix(&self, data_format: &str) -> String;
30
31 fn append_to_payload(&self, data_format: &str, output: &mut String) {
32 output.push_str(&self.format_for_suffix(data_format));
33 }
34}
35
36macro_rules! impl_payload_for_ints {
37 ($($ty:ty),* $(,)?) => {
38 $(
39 impl HostLinkPayloadValue for $ty {
40 fn format_for_suffix(&self, data_format: &str) -> String {
41 let mut value = String::new();
42 self.append_to_payload(data_format, &mut value);
43 value
44 }
45
46 fn append_to_payload(&self, data_format: &str, output: &mut String) {
47 if data_format == ".H" {
48 let _ = write!(output, "{:X}", ((*self as i128) & 0xFFFF));
49 } else {
50 let _ = write!(output, "{}", self);
51 }
52 }
53 }
54 )*
55 };
56}
57
58impl_payload_for_ints!(u8, u16, u32, u64, usize, i8, i16, i32, i64, isize);
59
60impl HostLinkPayloadValue for f32 {
61 fn format_for_suffix(&self, _data_format: &str) -> String {
62 let mut value = String::new();
63 self.append_to_payload("", &mut value);
64 value
65 }
66
67 fn append_to_payload(&self, _data_format: &str, output: &mut String) {
68 let _ = write!(output, "{}", self);
69 }
70}
71
72impl HostLinkPayloadValue for f64 {
73 fn format_for_suffix(&self, _data_format: &str) -> String {
74 let mut value = String::new();
75 self.append_to_payload("", &mut value);
76 value
77 }
78
79 fn append_to_payload(&self, _data_format: &str, output: &mut String) {
80 let _ = write!(output, "{}", self);
81 }
82}
83
84impl HostLinkPayloadValue for bool {
85 fn format_for_suffix(&self, _data_format: &str) -> String {
86 let mut value = String::new();
87 self.append_to_payload("", &mut value);
88 value
89 }
90
91 fn append_to_payload(&self, _data_format: &str, output: &mut String) {
92 output.push(if *self { '1' } else { '0' });
93 }
94}
95
96impl HostLinkPayloadValue for String {
97 fn format_for_suffix(&self, _data_format: &str) -> String {
98 self.trim().to_owned()
99 }
100
101 fn append_to_payload(&self, _data_format: &str, output: &mut String) {
102 output.push_str(self.trim());
103 }
104}
105
106impl HostLinkPayloadValue for &str {
107 fn format_for_suffix(&self, _data_format: &str) -> String {
108 self.trim().to_owned()
109 }
110
111 fn append_to_payload(&self, _data_format: &str, output: &mut String) {
112 output.push_str(self.trim());
113 }
114}
115
116impl<T: HostLinkPayloadValue + ?Sized> HostLinkPayloadValue for &T {
117 fn format_for_suffix(&self, data_format: &str) -> String {
118 (*self).format_for_suffix(data_format)
119 }
120
121 fn append_to_payload(&self, data_format: &str, output: &mut String) {
122 (*self).append_to_payload(data_format, output);
123 }
124}
125
126#[derive(Clone)]
127pub struct HostLinkClient {
128 inner: Arc<Mutex<ClientInner>>,
129}
130
131pub struct HostLinkClientFactory;
132
133#[derive(Clone)]
134pub struct QueuedHostLinkClient {
135 client: HostLinkClient,
136 gate: Arc<Mutex<()>>,
137}
138
139enum Transport {
140 Tcp(TcpStream),
141 Udp(UdpSocket),
142}
143
144struct ClientInner {
145 options: HostLinkConnectionOptions,
146 transport: Option<Transport>,
147 trace_hook: Option<TraceHook>,
148 rx_buf: Vec<u8>,
149 rx_start: usize,
150 rx_count: usize,
151 tcp_read_buf: Vec<u8>,
152 udp_read_buf: Vec<u8>,
153}
154
155impl HostLinkClient {
156 pub fn new(options: HostLinkConnectionOptions) -> Self {
157 Self {
158 inner: Arc::new(Mutex::new(ClientInner {
159 options,
160 transport: None,
161 trace_hook: None,
162 rx_buf: vec![0u8; 4096],
163 rx_start: 0,
164 rx_count: 0,
165 tcp_read_buf: vec![0u8; 8192],
166 udp_read_buf: vec![0u8; UDP_RECEIVE_BUFFER_SIZE],
167 })),
168 }
169 }
170
171 pub async fn connect(options: HostLinkConnectionOptions) -> Result<Self, HostLinkError> {
172 let client = Self::new(options);
173 client.open().await?;
174 Ok(client)
175 }
176
177 pub async fn open(&self) -> Result<(), HostLinkError> {
178 self.inner.lock().await.open().await
179 }
180
181 pub async fn close(&self) -> Result<(), HostLinkError> {
182 self.inner.lock().await.close();
183 Ok(())
184 }
185
186 pub async fn is_open(&self) -> bool {
187 self.inner.lock().await.transport.is_some()
188 }
189
190 pub async fn timeout(&self) -> Duration {
191 self.inner.lock().await.options.timeout
192 }
193
194 pub async fn set_timeout(&self, timeout: Duration) {
195 self.inner.lock().await.options.timeout = timeout;
196 }
197
198 pub async fn append_lf_on_send(&self) -> bool {
199 self.inner.lock().await.options.append_lf_on_send
200 }
201
202 pub async fn set_append_lf_on_send(&self, value: bool) {
203 self.inner.lock().await.options.append_lf_on_send = value;
204 }
205
206 pub async fn set_trace_hook(&self, trace_hook: Option<TraceHook>) {
207 self.inner.lock().await.trace_hook = trace_hook;
208 }
209
210 pub async fn send_raw(&self, body: &str) -> Result<String, HostLinkError> {
211 self.inner.lock().await.send_raw(body).await
212 }
213
214 pub async fn change_mode(&self, mode: KvPlcMode) -> Result<(), HostLinkError> {
215 self.expect_ok(&format!("M{}", mode as u8)).await
216 }
217
218 pub async fn clear_error(&self) -> Result<(), HostLinkError> {
219 self.expect_ok("ER").await
220 }
221
222 pub async fn check_error_no(&self) -> Result<String, HostLinkError> {
223 self.send_raw("?E").await
224 }
225
226 pub async fn query_model(&self) -> Result<KvModelInfo, HostLinkError> {
227 let code = self.send_raw("?K").await?;
228 Ok(KvModelInfo {
229 model: model_name_for_code(&code).to_owned(),
230 code,
231 })
232 }
233
234 pub async fn read_device_range_catalog(&self) -> Result<KvDeviceRangeCatalog, HostLinkError> {
235 let model = self.query_model().await?;
236 device_range_catalog_for_query_model(&model)
237 }
238
239 pub async fn confirm_operating_mode(&self) -> Result<KvPlcMode, HostLinkError> {
240 match self.send_raw("?M").await?.parse::<u8>() {
241 Ok(0) => Ok(KvPlcMode::Program),
242 Ok(1) => Ok(KvPlcMode::Run),
243 _ => Err(HostLinkError::protocol("Unsupported PLC mode response")),
244 }
245 }
246
247 pub async fn set_time(&self, value: Option<HostLinkClock>) -> Result<(), HostLinkError> {
248 let value = value.unwrap_or_else(HostLinkClock::now_local);
249 if value.month == 0
250 || value.month > 12
251 || value.day == 0
252 || value.day > 31
253 || value.hour > 23
254 || value.minute > 59
255 || value.second > 59
256 || value.week > 6
257 {
258 return Err(HostLinkError::protocol(
259 "Invalid time fields for WRT command",
260 ));
261 }
262
263 self.expect_ok(&format!(
264 "WRT {:02} {:02} {:02} {:02} {:02} {:02} {}",
265 value.year, value.month, value.day, value.hour, value.minute, value.second, value.week
266 ))
267 .await
268 }
269
270 pub async fn forced_set(&self, device: &str) -> Result<(), HostLinkError> {
271 let mut address = parse_device(device)?;
272 validate_device_type("ST", &address.device_type, force_device_types())?;
273 address.suffix.clear();
274 self.expect_ok(&format!("ST {}", address.to_text()?)).await
275 }
276
277 pub async fn forced_reset(&self, device: &str) -> Result<(), HostLinkError> {
278 let mut address = parse_device(device)?;
279 validate_device_type("RS", &address.device_type, force_device_types())?;
280 address.suffix.clear();
281 self.expect_ok(&format!("RS {}", address.to_text()?)).await
282 }
283
284 pub async fn read(
285 &self,
286 device: &str,
287 data_format: Option<&str>,
288 ) -> Result<Vec<String>, HostLinkError> {
289 let mut address = parse_device(device)?;
290 let suffix = if let Some(data_format) = data_format {
291 crate::address::normalize_suffix(data_format)?
292 } else {
293 address.suffix.clone()
294 };
295 let suffix = resolve_effective_format(&address.device_type, &suffix);
296 validate_device_span(&address.device_type, address.number, &suffix, 1)?;
297 address.suffix = suffix;
298 let response = self.send_raw(&format!("RD {}", address.to_text()?)).await?;
299 Ok(split_data_tokens(&response))
300 }
301
302 pub async fn read_consecutive(
303 &self,
304 device: &str,
305 count: usize,
306 data_format: Option<&str>,
307 ) -> Result<Vec<String>, HostLinkError> {
308 let mut address = parse_device(device)?;
309 let suffix = if let Some(data_format) = data_format {
310 crate::address::normalize_suffix(data_format)?
311 } else {
312 address.suffix.clone()
313 };
314 let suffix = resolve_effective_format(&address.device_type, &suffix);
315 validate_device_count(&address.device_type, &suffix, count)?;
316 validate_device_span(&address.device_type, address.number, &suffix, count)?;
317 address.suffix = suffix;
318 let response = self
319 .send_raw(&format!("RDS {} {}", address.to_text()?, count))
320 .await?;
321 Ok(split_data_tokens(&response))
322 }
323
324 pub async fn write<T: HostLinkPayloadValue>(
325 &self,
326 device: &str,
327 value: T,
328 data_format: Option<&str>,
329 ) -> Result<(), HostLinkError> {
330 let mut address = parse_device(device)?;
331 let suffix = if let Some(data_format) = data_format {
332 crate::address::normalize_suffix(data_format)?
333 } else {
334 address.suffix.clone()
335 };
336 let suffix = resolve_effective_format(&address.device_type, &suffix);
337 validate_device_type("WR", &address.device_type, wr_device_types())?;
338 validate_device_span(&address.device_type, address.number, &suffix, 1)?;
339 address.suffix = suffix.clone();
340 let mut command = String::from("WR ");
341 command.push_str(&address.to_text()?);
342 command.push(' ');
343 value.append_to_payload(&suffix, &mut command);
344 self.expect_ok(&command).await
345 }
346
347 pub async fn write_consecutive<T: HostLinkPayloadValue>(
348 &self,
349 device: &str,
350 values: &[T],
351 data_format: Option<&str>,
352 ) -> Result<(), HostLinkError> {
353 if values.is_empty() {
354 return Err(HostLinkError::protocol("values must not be empty"));
355 }
356
357 let mut address = parse_device(device)?;
358 let suffix = if let Some(data_format) = data_format {
359 crate::address::normalize_suffix(data_format)?
360 } else {
361 address.suffix.clone()
362 };
363 let suffix = resolve_effective_format(&address.device_type, &suffix);
364 validate_device_type("WRS", &address.device_type, wr_device_types())?;
365 validate_device_count(&address.device_type, &suffix, values.len())?;
366 validate_device_span(&address.device_type, address.number, &suffix, values.len())?;
367 address.suffix = suffix.clone();
368 let payload = build_joined_payload(values, &suffix);
369 self.expect_ok(&format!(
370 "WRS {} {} {}",
371 address.to_text()?,
372 values.len(),
373 payload
374 ))
375 .await
376 }
377
378 pub async fn register_monitor_bits<S: AsRef<str>>(
379 &self,
380 devices: &[S],
381 ) -> Result<(), HostLinkError> {
382 if devices.is_empty() {
383 return Err(HostLinkError::protocol("At least one device is required"));
384 }
385 if devices.len() > 120 {
386 return Err(HostLinkError::protocol(
387 "Maximum 120 devices can be registered",
388 ));
389 }
390
391 let mut command = String::from("MBS");
392 for device in devices {
393 let mut address = parse_device(device.as_ref())?;
394 validate_device_type("MBS", &address.device_type, mbs_device_types())?;
395 address.suffix.clear();
396 command.push(' ');
397 command.push_str(&address.to_text()?);
398 }
399 self.expect_ok(&command).await
400 }
401
402 pub async fn register_monitor_words<S: AsRef<str>>(
403 &self,
404 devices: &[S],
405 ) -> Result<(), HostLinkError> {
406 if devices.is_empty() {
407 return Err(HostLinkError::protocol("At least one device is required"));
408 }
409 if devices.len() > 120 {
410 return Err(HostLinkError::protocol(
411 "Maximum 120 devices can be registered",
412 ));
413 }
414
415 let mut command = String::from("MWS");
416 for device in devices {
417 let mut address = parse_device(device.as_ref())?;
418 validate_device_type("MWS", &address.device_type, mws_device_types())?;
419 let suffix = resolve_effective_format(&address.device_type, &address.suffix);
420 validate_device_span(&address.device_type, address.number, &suffix, 1)?;
421 address.suffix = suffix;
422 command.push(' ');
423 command.push_str(&address.to_text()?);
424 }
425 self.expect_ok(&command).await
426 }
427
428 pub async fn read_monitor_bits(&self) -> Result<Vec<String>, HostLinkError> {
429 let response = self.send_raw("MBR").await?;
430 Ok(split_data_tokens(&response))
431 }
432
433 pub async fn read_monitor_words(&self) -> Result<Vec<String>, HostLinkError> {
434 let response = self.send_raw("MWR").await?;
435 Ok(split_data_tokens(&response))
436 }
437
438 pub async fn forced_set_consecutive(
439 &self,
440 device: &str,
441 count: usize,
442 ) -> Result<(), HostLinkError> {
443 if !(1..=16).contains(&count) {
444 return Err(HostLinkError::protocol("count must be 1-16."));
445 }
446 let mut address = parse_device(device)?;
447 validate_device_type("STS", &address.device_type, force_device_types())?;
448 address.suffix.clear();
449 self.expect_ok(&format!("STS {} {}", address.to_text()?, count))
450 .await
451 }
452
453 pub async fn forced_reset_consecutive(
454 &self,
455 device: &str,
456 count: usize,
457 ) -> Result<(), HostLinkError> {
458 if !(1..=16).contains(&count) {
459 return Err(HostLinkError::protocol("count must be 1-16."));
460 }
461 let mut address = parse_device(device)?;
462 validate_device_type("RSS", &address.device_type, force_device_types())?;
463 address.suffix.clear();
464 self.expect_ok(&format!("RSS {} {}", address.to_text()?, count))
465 .await
466 }
467
468 pub async fn read_consecutive_legacy(
469 &self,
470 device: &str,
471 count: usize,
472 data_format: Option<&str>,
473 ) -> Result<Vec<String>, HostLinkError> {
474 let mut address = parse_device(device)?;
475 let suffix = if let Some(data_format) = data_format {
476 crate::address::normalize_suffix(data_format)?
477 } else {
478 address.suffix.clone()
479 };
480 let suffix = resolve_effective_format(&address.device_type, &suffix);
481 validate_device_count(&address.device_type, &suffix, count)?;
482 validate_device_span(&address.device_type, address.number, &suffix, count)?;
483 address.suffix = suffix;
484 let response = self
485 .send_raw(&format!("RDE {} {}", address.to_text()?, count))
486 .await?;
487 Ok(split_data_tokens(&response))
488 }
489
490 pub async fn write_consecutive_legacy<T: HostLinkPayloadValue>(
491 &self,
492 device: &str,
493 values: &[T],
494 data_format: Option<&str>,
495 ) -> Result<(), HostLinkError> {
496 if values.is_empty() {
497 return Err(HostLinkError::protocol("values must not be empty"));
498 }
499 let mut address = parse_device(device)?;
500 let suffix = if let Some(data_format) = data_format {
501 crate::address::normalize_suffix(data_format)?
502 } else {
503 address.suffix.clone()
504 };
505 let suffix = resolve_effective_format(&address.device_type, &suffix);
506 validate_device_type("WRE", &address.device_type, wr_device_types())?;
507 validate_device_count(&address.device_type, &suffix, values.len())?;
508 validate_device_span(&address.device_type, address.number, &suffix, values.len())?;
509 address.suffix = suffix.clone();
510 let payload = build_joined_payload(values, &suffix);
511 self.expect_ok(&format!(
512 "WRE {} {} {}",
513 address.to_text()?,
514 values.len(),
515 payload
516 ))
517 .await
518 }
519
520 pub async fn write_set_value<T: HostLinkPayloadValue>(
521 &self,
522 device: &str,
523 value: T,
524 data_format: Option<&str>,
525 ) -> Result<(), HostLinkError> {
526 let mut address = parse_device(device)?;
527 validate_device_type("WS", &address.device_type, ws_device_types())?;
528 let suffix = if let Some(data_format) = data_format {
529 crate::address::normalize_suffix(data_format)?
530 } else {
531 resolve_effective_format(&address.device_type, &address.suffix)
532 };
533 validate_device_span(&address.device_type, address.number, &suffix, 1)?;
534 address.suffix = suffix.clone();
535 let mut command = String::from("WS ");
536 command.push_str(&address.to_text()?);
537 command.push(' ');
538 value.append_to_payload(&suffix, &mut command);
539 self.expect_ok(&command).await
540 }
541
542 pub async fn write_set_value_consecutive<T: HostLinkPayloadValue>(
543 &self,
544 device: &str,
545 values: &[T],
546 data_format: Option<&str>,
547 ) -> Result<(), HostLinkError> {
548 if values.is_empty() {
549 return Err(HostLinkError::protocol("values must not be empty"));
550 }
551 let mut address = parse_device(device)?;
552 validate_device_type("WSS", &address.device_type, ws_device_types())?;
553 let suffix = if let Some(data_format) = data_format {
554 crate::address::normalize_suffix(data_format)?
555 } else {
556 resolve_effective_format(&address.device_type, &address.suffix)
557 };
558 validate_device_span(&address.device_type, address.number, &suffix, values.len())?;
559 address.suffix = suffix.clone();
560 let payload = build_joined_payload(values, &suffix);
561 self.expect_ok(&format!(
562 "WSS {} {} {}",
563 address.to_text()?,
564 values.len(),
565 payload
566 ))
567 .await
568 }
569
570 pub async fn switch_bank(&self, bank_no: u8) -> Result<(), HostLinkError> {
571 if bank_no > 15 {
572 return Err(HostLinkError::protocol("bankNo must be 0-15."));
573 }
574 self.expect_ok(&format!("BE {bank_no}")).await
575 }
576
577 pub async fn read_expansion_unit_buffer(
578 &self,
579 unit_no: u8,
580 address: u32,
581 count: usize,
582 data_format: Option<&str>,
583 ) -> Result<Vec<String>, HostLinkError> {
584 if unit_no > 48 {
585 return Err(HostLinkError::protocol("unitNo must be 0-48."));
586 }
587 if address > 59_999 {
588 return Err(HostLinkError::protocol("address must be 0-59999."));
589 }
590 let suffix = if let Some(data_format) = data_format {
591 crate::address::normalize_suffix(data_format)?
592 } else {
593 ".U".to_owned()
594 };
595 validate_expansion_buffer_count(&suffix, count)?;
596 validate_expansion_buffer_span(address, &suffix, count)?;
597 let response = self
598 .send_raw(&format!("URD {unit_no:02} {address}{suffix} {count}"))
599 .await?;
600 Ok(split_data_tokens(&response))
601 }
602
603 pub async fn write_expansion_unit_buffer<T: HostLinkPayloadValue>(
604 &self,
605 unit_no: u8,
606 address: u32,
607 values: &[T],
608 data_format: Option<&str>,
609 ) -> Result<(), HostLinkError> {
610 if values.is_empty() {
611 return Err(HostLinkError::protocol("values must not be empty"));
612 }
613 if unit_no > 48 {
614 return Err(HostLinkError::protocol("unitNo must be 0-48."));
615 }
616 if address > 59_999 {
617 return Err(HostLinkError::protocol("address must be 0-59999."));
618 }
619 let suffix = if let Some(data_format) = data_format {
620 crate::address::normalize_suffix(data_format)?
621 } else {
622 ".U".to_owned()
623 };
624 validate_expansion_buffer_count(&suffix, values.len())?;
625 validate_expansion_buffer_span(address, &suffix, values.len())?;
626 let payload = build_joined_payload(values, &suffix);
627 self.expect_ok(&format!(
628 "UWR {unit_no:02} {address}{suffix} {} {payload}",
629 values.len()
630 ))
631 .await
632 }
633
634 pub async fn read_comments(
635 &self,
636 device: &str,
637 strip_padding: bool,
638 ) -> Result<String, HostLinkError> {
639 let mut address = parse_device(device)?;
640 validate_device_type("RDC", &address.device_type, rdc_device_types())?;
641 address.suffix.clear();
642 let response = self
643 .inner
644 .lock()
645 .await
646 .send_raw_decoded(
647 &format!("RDC {}", address.to_text()?),
648 decode_comment_response,
649 )
650 .await?;
651 if strip_padding {
652 Ok(response.trim_end_matches(' ').to_owned())
653 } else {
654 Ok(response)
655 }
656 }
657
658 pub async fn read_typed(
659 &self,
660 device: &str,
661 dtype: &str,
662 ) -> Result<helpers::HostLinkValue, HostLinkError> {
663 helpers::read_typed(self, device, dtype).await
664 }
665
666 pub async fn read_timer_counter(
667 &self,
668 device: &str,
669 ) -> Result<helpers::TimerCounterValue, HostLinkError> {
670 helpers::read_timer_counter(self, device).await
671 }
672
673 pub async fn read_timer(
674 &self,
675 device: &str,
676 ) -> Result<helpers::TimerCounterValue, HostLinkError> {
677 helpers::read_timer(self, device).await
678 }
679
680 pub async fn read_counter(
681 &self,
682 device: &str,
683 ) -> Result<helpers::TimerCounterValue, HostLinkError> {
684 helpers::read_counter(self, device).await
685 }
686
687 pub async fn write_typed<T: HostLinkPayloadValue>(
688 &self,
689 device: &str,
690 dtype: &str,
691 value: T,
692 ) -> Result<(), HostLinkError> {
693 helpers::write_typed(self, device, dtype, &value).await
694 }
695
696 pub async fn read_named<S: AsRef<str>>(
697 &self,
698 addresses: &[S],
699 ) -> Result<helpers::NamedSnapshot, HostLinkError> {
700 helpers::read_named(self, addresses).await
701 }
702
703 pub async fn write_bit_in_word(
704 &self,
705 device: &str,
706 bit_index: u8,
707 value: bool,
708 ) -> Result<(), HostLinkError> {
709 helpers::write_bit_in_word(self, device, bit_index, value).await
710 }
711
712 async fn expect_ok(&self, body: &str) -> Result<(), HostLinkError> {
713 let response = self.send_raw(body).await?;
714 if response == "OK" {
715 Ok(())
716 } else {
717 Err(HostLinkError::protocol(format!(
718 "Expected 'OK' but received '{response}' for command '{body}'"
719 )))
720 }
721 }
722}
723
724impl ClientInner {
725 async fn open(&mut self) -> Result<(), HostLinkError> {
726 if self.transport.is_some() {
727 return Ok(());
728 }
729
730 let transport = match self.options.transport {
731 HostLinkTransportMode::Tcp => {
732 let stream = timeout(
733 self.options.timeout,
734 TcpStream::connect((self.options.host.as_str(), self.options.port)),
735 )
736 .await
737 .map_err(|_| HostLinkError::connection("tcp connect timed out"))??;
738 stream.set_nodelay(true)?;
739 Transport::Tcp(stream)
740 }
741 HostLinkTransportMode::Udp => {
742 let socket = UdpSocket::bind("0.0.0.0:0").await?;
743 timeout(
744 self.options.timeout,
745 socket.connect((self.options.host.as_str(), self.options.port)),
746 )
747 .await
748 .map_err(|_| HostLinkError::connection("udp connect timed out"))??;
749 Transport::Udp(socket)
750 }
751 };
752
753 self.transport = Some(transport);
754 self.rx_start = 0;
755 self.rx_count = 0;
756 Ok(())
757 }
758
759 fn close(&mut self) {
760 self.transport = None;
761 self.rx_start = 0;
762 self.rx_count = 0;
763 }
764
765 async fn send_raw(&mut self, body: &str) -> Result<String, HostLinkError> {
766 self.send_raw_decoded(body, decode_response).await
767 }
768
769 async fn send_raw_decoded<F>(&mut self, body: &str, decoder: F) -> Result<String, HostLinkError>
770 where
771 F: Fn(&[u8]) -> Result<String, HostLinkError>,
772 {
773 self.open().await?;
774 let frame = build_frame(body, self.options.append_lf_on_send);
775 self.fire_trace(HostLinkTraceDirection::Send, &frame);
776
777 match self.transport.as_mut() {
778 Some(Transport::Tcp(stream)) => {
779 write_all_with_timeout(stream, &frame, self.options.timeout).await?;
780 let raw = recv_tcp_line(
781 stream,
782 &mut self.rx_buf,
783 &mut self.rx_start,
784 &mut self.rx_count,
785 &mut self.tcp_read_buf,
786 self.options.timeout,
787 )
788 .await?;
789 self.fire_trace(HostLinkTraceDirection::Receive, &raw);
790 ensure_success(decoder(&raw)?)
791 }
792 Some(Transport::Udp(socket)) => {
793 send_udp_with_timeout(socket, &frame, self.options.timeout).await?;
794 recv_udp_with_timeout(socket, &mut self.udp_read_buf, self.options.timeout).await?;
795 let raw = &self.udp_read_buf;
796 self.fire_trace(HostLinkTraceDirection::Receive, raw);
797 ensure_success(decoder(raw)?)
798 }
799 None => Err(HostLinkError::connection("transport was not opened")),
800 }
801 }
802
803 fn fire_trace(&self, direction: HostLinkTraceDirection, data: &[u8]) {
804 if let Some(trace_hook) = &self.trace_hook {
805 trace_hook(HostLinkTraceFrame {
806 direction,
807 data: data.to_vec(),
808 timestamp: SystemTime::now(),
809 });
810 }
811 }
812}
813
814impl HostLinkClientFactory {
815 pub async fn open_and_connect(
816 options: HostLinkConnectionOptions,
817 ) -> Result<QueuedHostLinkClient, HostLinkError> {
818 if options.host.trim().is_empty() {
819 return Err(HostLinkError::protocol("Host must not be empty."));
820 }
821
822 let client = HostLinkClient::new(options);
823 let queued = QueuedHostLinkClient::new(client);
824 queued.open().await?;
825 Ok(queued)
826 }
827}
828
829pub async fn open_and_connect(
830 options: HostLinkConnectionOptions,
831) -> Result<QueuedHostLinkClient, HostLinkError> {
832 HostLinkClientFactory::open_and_connect(options).await
833}
834
835impl QueuedHostLinkClient {
836 pub fn new(client: HostLinkClient) -> Self {
837 Self {
838 client,
839 gate: Arc::new(Mutex::new(())),
840 }
841 }
842
843 pub fn inner_client(&self) -> &HostLinkClient {
844 &self.client
845 }
846
847 pub async fn is_open(&self) -> bool {
848 self.client.is_open().await
849 }
850
851 pub async fn open(&self) -> Result<(), HostLinkError> {
852 let _guard = self.gate.lock().await;
853 self.client.open().await
854 }
855
856 pub async fn close(&self) -> Result<(), HostLinkError> {
857 let _guard = self.gate.lock().await;
858 self.client.close().await
859 }
860
861 pub async fn set_trace_hook(&self, trace_hook: Option<TraceHook>) {
862 let _guard = self.gate.lock().await;
863 self.client.set_trace_hook(trace_hook).await;
864 }
865
866 pub async fn execute_async<F, Fut, T>(&self, operation: F) -> Result<T, HostLinkError>
867 where
868 F: FnOnce(&HostLinkClient) -> Fut,
869 Fut: Future<Output = Result<T, HostLinkError>>,
870 {
871 let _guard = self.gate.lock().await;
872 operation(&self.client).await
873 }
874
875 pub async fn send_raw(&self, body: &str) -> Result<String, HostLinkError> {
876 let _guard = self.gate.lock().await;
877 self.client.send_raw(body).await
878 }
879
880 pub async fn read_comments(
881 &self,
882 device: &str,
883 strip_padding: bool,
884 ) -> Result<String, HostLinkError> {
885 let _guard = self.gate.lock().await;
886 self.client.read_comments(device, strip_padding).await
887 }
888
889 pub async fn read_typed(
890 &self,
891 device: &str,
892 dtype: &str,
893 ) -> Result<helpers::HostLinkValue, HostLinkError> {
894 let _guard = self.gate.lock().await;
895 helpers::read_typed(&self.client, device, dtype).await
896 }
897
898 pub async fn read_timer_counter(
899 &self,
900 device: &str,
901 ) -> Result<helpers::TimerCounterValue, HostLinkError> {
902 let _guard = self.gate.lock().await;
903 helpers::read_timer_counter(&self.client, device).await
904 }
905
906 pub async fn read_timer(
907 &self,
908 device: &str,
909 ) -> Result<helpers::TimerCounterValue, HostLinkError> {
910 let _guard = self.gate.lock().await;
911 helpers::read_timer(&self.client, device).await
912 }
913
914 pub async fn read_counter(
915 &self,
916 device: &str,
917 ) -> Result<helpers::TimerCounterValue, HostLinkError> {
918 let _guard = self.gate.lock().await;
919 helpers::read_counter(&self.client, device).await
920 }
921
922 pub async fn write_typed<T: HostLinkPayloadValue>(
923 &self,
924 device: &str,
925 dtype: &str,
926 value: T,
927 ) -> Result<(), HostLinkError> {
928 let _guard = self.gate.lock().await;
929 helpers::write_typed(&self.client, device, dtype, &value).await
930 }
931
932 pub async fn write_bit_in_word(
933 &self,
934 device: &str,
935 bit_index: u8,
936 value: bool,
937 ) -> Result<(), HostLinkError> {
938 let _guard = self.gate.lock().await;
939 helpers::write_bit_in_word(&self.client, device, bit_index, value).await
940 }
941
942 pub async fn read_named<S: AsRef<str>>(
943 &self,
944 addresses: &[S],
945 ) -> Result<helpers::NamedSnapshot, HostLinkError> {
946 let _guard = self.gate.lock().await;
947 helpers::read_named(&self.client, addresses).await
948 }
949
950 pub async fn read_device_range_catalog(&self) -> Result<KvDeviceRangeCatalog, HostLinkError> {
951 let _guard = self.gate.lock().await;
952 self.client.read_device_range_catalog().await
953 }
954
955 pub fn poll<'a, S: AsRef<str> + 'a>(
956 &'a self,
957 addresses: &'a [S],
958 interval: Duration,
959 ) -> impl futures_core::Stream<Item = Result<helpers::NamedSnapshot, HostLinkError>> + 'a {
960 async_stream::try_stream! {
961 let addr_list = addresses.iter().map(|item| item.as_ref().to_owned()).collect::<Vec<_>>();
962 let compiled = helpers::compile_read_named_plan(&addr_list);
963 loop {
964 let snapshot = {
965 let _guard = self.gate.lock().await;
966 if let Some(plan) = &compiled {
967 helpers::execute_read_named_plan(&self.client, plan).await?
968 } else {
969 helpers::read_named_sequential(&self.client, &addr_list).await?
970 }
971 };
972 yield snapshot;
973 tokio::time::sleep(interval).await;
974 }
975 }
976 }
977
978 pub async fn read_words(&self, device: &str, count: usize) -> Result<Vec<u16>, HostLinkError> {
979 let _guard = self.gate.lock().await;
980 helpers::read_words(self.inner_client(), device, count).await
981 }
982
983 pub async fn read_dwords(&self, device: &str, count: usize) -> Result<Vec<u32>, HostLinkError> {
984 let _guard = self.gate.lock().await;
985 helpers::read_dwords(self.inner_client(), device, count).await
986 }
987}
988
989async fn write_all_with_timeout(
990 stream: &mut TcpStream,
991 payload: &[u8],
992 duration: Duration,
993) -> Result<(), HostLinkError> {
994 timeout(duration, stream.write_all(payload))
995 .await
996 .map_err(|_| HostLinkError::connection("write timed out"))??;
997 Ok(())
998}
999
1000async fn send_udp_with_timeout(
1001 socket: &mut UdpSocket,
1002 payload: &[u8],
1003 duration: Duration,
1004) -> Result<(), HostLinkError> {
1005 timeout(duration, socket.send(payload))
1006 .await
1007 .map_err(|_| HostLinkError::connection("write timed out"))??;
1008 Ok(())
1009}
1010
1011async fn recv_udp_with_timeout(
1012 socket: &mut UdpSocket,
1013 buffer: &mut Vec<u8>,
1014 duration: Duration,
1015) -> Result<(), HostLinkError> {
1016 if buffer.len() != UDP_RECEIVE_BUFFER_SIZE {
1017 buffer.resize(UDP_RECEIVE_BUFFER_SIZE, 0);
1020 }
1021 let read = timeout(duration, socket.recv(buffer.as_mut_slice()))
1022 .await
1023 .map_err(|_| HostLinkError::connection("read timed out"))??;
1024 buffer.truncate(read);
1025 Ok(())
1026}
1027
1028fn build_joined_payload<T: HostLinkPayloadValue>(values: &[T], suffix: &str) -> String {
1029 let mut payload = String::new();
1030 for (index, value) in values.iter().enumerate() {
1031 if index > 0 {
1032 payload.push(' ');
1033 }
1034 value.append_to_payload(suffix, &mut payload);
1035 }
1036 payload
1037}
1038
1039async fn recv_tcp_line(
1040 stream: &mut TcpStream,
1041 rx_buf: &mut Vec<u8>,
1042 rx_start: &mut usize,
1043 rx_count: &mut usize,
1044 tcp_read_buf: &mut [u8],
1045 duration: Duration,
1046) -> Result<Vec<u8>, HostLinkError> {
1047 loop {
1048 let mut found_idx = None;
1049 for index in 0..*rx_count {
1050 let byte = rx_buf[*rx_start + index];
1051 if matches!(byte, b'\r' | b'\n') {
1052 found_idx = Some(index);
1053 break;
1054 }
1055 }
1056
1057 if let Some(found_idx) = found_idx {
1058 let line = rx_buf[*rx_start..*rx_start + found_idx].to_vec();
1059 let mut skip = found_idx;
1060 while skip < *rx_count && matches!(rx_buf[*rx_start + skip], b'\r' | b'\n') {
1061 skip += 1;
1062 }
1063 *rx_start += skip;
1064 *rx_count -= skip;
1065 if *rx_start > rx_buf.len() / 2 {
1066 rx_buf.copy_within(*rx_start..*rx_start + *rx_count, 0);
1067 *rx_start = 0;
1068 }
1069 return Ok(line);
1070 }
1071
1072 let read = timeout(duration, stream.read(tcp_read_buf))
1073 .await
1074 .map_err(|_| HostLinkError::connection("read timed out"))??;
1075 if read == 0 {
1076 if *rx_count > 0 {
1077 let line = rx_buf[*rx_start..*rx_start + *rx_count].to_vec();
1078 *rx_start = 0;
1079 *rx_count = 0;
1080 return Ok(line);
1081 }
1082 return Err(HostLinkError::connection("Connection closed by PLC"));
1083 }
1084
1085 if *rx_start + *rx_count + read > rx_buf.len() {
1086 if *rx_count > 0 {
1087 rx_buf.copy_within(*rx_start..*rx_start + *rx_count, 0);
1088 }
1089 *rx_start = 0;
1090 if *rx_count + read > rx_buf.len() {
1091 rx_buf.resize((rx_buf.len() * 2).max(*rx_count + read), 0);
1092 }
1093 }
1094
1095 let target = *rx_start + *rx_count;
1096 rx_buf[target..target + read].copy_from_slice(&tcp_read_buf[..read]);
1097 *rx_count += read;
1098 }
1099}