Skip to main content

rs_modbus/
master.rs

1use crate::error::ModbusError;
2use crate::layers::application::{ApplicationLayer, ApplicationProtocol, ApplicationRole, Framing};
3use crate::layers::physical::PhysicalLayer;
4use crate::master_session::{MasterSession, PreCheck, PreCheckOutcome, WaiterKey};
5use crate::types::{
6    ApplicationDataUnit, CustomFunctionCode, DeviceIdentification, DeviceObject, MasterResponse,
7    ServerId,
8};
9use crate::utils::{parse_coils, parse_registers};
10use std::sync::atomic::{AtomicU16, AtomicU8, Ordering};
11use std::sync::{Arc, Mutex};
12use std::time::Duration;
13use tokio::sync::broadcast::error::RecvError;
14use tokio::task::JoinHandle;
15
16/// Tunables for [`ModbusMaster::new`]. Mirrors njs-modbus
17/// `ModbusMasterOptions`.
18#[derive(Clone, Copy, Debug)]
19pub struct ModbusMasterOptions {
20    /// Per-request timeout in ms when the caller does not pass an explicit
21    /// timeout. Defaults to 1000.
22    pub timeout_ms: u64,
23    /// Enable pipelined concurrent requests on a single connection. Only
24    /// valid for Modbus TCP application layers — constructing a master with
25    /// `concurrent: true` on RTU or ASCII layers panics. Defaults to
26    /// `false` (FIFO queue, requests are serialized).
27    pub concurrent: bool,
28}
29
30impl Default for ModbusMasterOptions {
31    fn default() -> Self {
32        Self {
33            timeout_ms: 1000,
34            concurrent: false,
35        }
36    }
37}
38
39pub struct ModbusMaster<A: ApplicationLayer, P: PhysicalLayer> {
40    application: Arc<A>,
41    physical: Arc<P>,
42    session: Arc<MasterSession>,
43    pub timeout_ms: u64,
44    pub concurrent: bool,
45    next_tid: AtomicU16,
46    clean_level: AtomicU8,
47    queue_lock: tokio::sync::Mutex<()>,
48    tasks: Mutex<Vec<JoinHandle<()>>>,
49}
50
51impl<A: ApplicationLayer + 'static, P: PhysicalLayer + 'static> ModbusMaster<A, P> {
52    pub fn new(application: Arc<A>, physical: Arc<P>, options: ModbusMasterOptions) -> Self {
53        if options.concurrent && application.protocol() != ApplicationProtocol::Tcp {
54            panic!("concurrent mode requires a Modbus TCP application layer");
55        }
56        application
57            .set_role(ApplicationRole::Master)
58            .expect("application layer is already bound to a different role");
59        let session = Arc::new(MasterSession::new());
60
61        let session_for_framing = Arc::clone(&session);
62        let mut framing_rx = application.subscribe_framing();
63        let framing_task = tokio::spawn(async move {
64            loop {
65                match framing_rx.recv().await {
66                    Ok(frame) => session_for_framing.handle_frame(frame),
67                    Err(RecvError::Lagged(_)) => continue,
68                    Err(RecvError::Closed) => break,
69                }
70            }
71        });
72
73        let session_for_error = Arc::clone(&session);
74        let mut error_rx = application.subscribe_framing_error();
75        let error_task = tokio::spawn(async move {
76            loop {
77                match error_rx.recv().await {
78                    Ok(err) => session_for_error.handle_error(err),
79                    Err(RecvError::Lagged(_)) => continue,
80                    Err(RecvError::Closed) => break,
81                }
82            }
83        });
84
85        Self {
86            application,
87            physical,
88            session,
89            timeout_ms: options.timeout_ms,
90            concurrent: options.concurrent,
91            next_tid: AtomicU16::new(1),
92            clean_level: AtomicU8::new(0),
93            queue_lock: tokio::sync::Mutex::new(()),
94            tasks: Mutex::new(vec![framing_task, error_task]),
95        }
96    }
97
98    /// Allocate the next transaction ID. Cycles through `1..=65535`,
99    /// skipping `0` on wrap. Matches njs-modbus `_nextTid` semantics.
100    fn allocate_tid(&self) -> u16 {
101        self.next_tid
102            .fetch_update(Ordering::Release, Ordering::Acquire, |t| {
103                let next = if t == 65535 { 1 } else { t + 1 };
104                Some(next)
105            })
106            .unwrap()
107    }
108
109    fn clean(&self, level: u8) {
110        let current = self.clean_level.load(Ordering::Acquire);
111        if current == 2 {
112            return;
113        }
114        if current == 1 && level == 1 {
115            return;
116        }
117        let err = if level == 2 {
118            ModbusError::InvalidState("Master destroyed".into())
119        } else {
120            ModbusError::InvalidState("Master closed".into())
121        };
122        self.session.stop_all(err);
123        self.clean_level.store(level, Ordering::Release);
124    }
125
126    pub fn is_open(&self) -> bool {
127        self.physical.is_open()
128    }
129
130    pub fn is_destroyed(&self) -> bool {
131        self.clean_level.load(Ordering::Acquire) == 2 || self.physical.is_destroyed()
132    }
133
134    pub async fn open(&self, options: P::OpenOptions) -> Result<(), ModbusError> {
135        if self.clean_level.load(Ordering::Acquire) == 2 {
136            return Err(ModbusError::PortDestroyed);
137        }
138        self.clean_level.store(0, Ordering::Release);
139        self.next_tid.store(1, Ordering::Release);
140        self.physical.open(options).await?;
141        Ok(())
142    }
143
144    pub async fn close(&self) -> Result<(), ModbusError> {
145        if self.clean_level.load(Ordering::Acquire) == 2 {
146            return Ok(());
147        }
148        self.clean(1);
149        self.physical.close().await
150    }
151
152    pub async fn destroy(&self) {
153        if self.clean_level.load(Ordering::Acquire) == 2 {
154            return;
155        }
156        self.clean(2);
157        {
158            let mut tasks = self.tasks.lock().unwrap();
159            for task in tasks.drain(..) {
160                task.abort();
161            }
162        }
163        self.application.destroy().await;
164        let _ = self.physical.destroy().await;
165    }
166
167    fn check_unit_fc(unit: u8, fc: u8) -> PreCheck {
168        Arc::new(move |f: &Framing| {
169            if f.adu.unit == unit && f.adu.fc == fc {
170                PreCheckOutcome::Pass
171            } else {
172                PreCheckOutcome::Fail(ModbusError::InvalidResponse)
173            }
174        })
175    }
176
177    fn check_length(expected: usize) -> PreCheck {
178        Arc::new(move |_: &Framing| PreCheckOutcome::NeedLength(expected))
179    }
180
181    fn check_byte_count(expected: usize) -> PreCheck {
182        Arc::new(move |f: &Framing| {
183            if !f.adu.data.is_empty() && f.adu.data[0] as usize == expected {
184                PreCheckOutcome::Pass
185            } else {
186                PreCheckOutcome::Fail(ModbusError::InvalidResponse)
187            }
188        })
189    }
190
191    fn check_echo(expected: Vec<u8>) -> PreCheck {
192        Arc::new(move |f: &Framing| {
193            if f.adu.data == expected {
194                PreCheckOutcome::Pass
195            } else {
196                PreCheckOutcome::Fail(ModbusError::InvalidResponse)
197            }
198        })
199    }
200
201    async fn wait_response(
202        &self,
203        request: &ApplicationDataUnit,
204        checks: Vec<PreCheck>,
205        timeout_ms: u64,
206    ) -> Result<Option<Framing>, ModbusError> {
207        // Reject up-front so a newly issued call after `close()` doesn't
208        // hit the socket. Necessary in concurrent mode (no queue lock) and
209        // also covers the rare case where a FIFO caller starts mid-close.
210        if self.clean_level.load(Ordering::Acquire) >= 1 {
211            return Err(ModbusError::InvalidState("Master closed".into()));
212        }
213
214        // FIFO mode: serialize call-sites via the queue lock so two callers
215        // can't trample each other's MasterSession waiter slot. Concurrent
216        // mode dispatches without holding the lock — each TCP request gets
217        // its own TID-keyed waiter slot.
218        let _queue_guard = if self.concurrent {
219            None
220        } else {
221            Some(self.queue_lock.lock().await)
222        };
223
224        // A close() may have landed while we were waiting for the queue
225        // lock. Re-check before allocating a TID / writing.
226        if self.clean_level.load(Ordering::Acquire) >= 1 {
227            return Err(ModbusError::InvalidState("Master closed".into()));
228        }
229
230        // FIFO mode: drop stale buffer state from the previous request
231        // before sending. Concurrent mode must NOT flush because other
232        // in-flight requests share the application-layer buffer.
233        if !self.concurrent {
234            self.application.flush();
235        }
236
237        let broadcast = request.unit == 0;
238        let uses_tid = self.application.protocol() == ApplicationProtocol::Tcp && !broadcast;
239
240        // Build the actual request frame. For TCP non-broadcast requests
241        // we allocate a fresh TID and encode it into the MBAP header; the
242        // slave echoes that TID back on its response so we can demux
243        // pipelined replies.
244        let (encoded, key) = if uses_tid {
245            let tid = self.allocate_tid();
246            let adu = ApplicationDataUnit {
247                transaction: Some(tid),
248                unit: request.unit,
249                fc: request.fc,
250                data: request.data.clone(),
251            };
252            (self.application.encode(&adu), WaiterKey::Tid(tid))
253        } else {
254            (self.application.encode(request), WaiterKey::Fifo)
255        };
256
257        // Pre-check chain. When TID is used, prepend a TID match so any
258        // stale response from a previous request (or a different in-flight
259        // one) fails fast.
260        let final_checks: Vec<PreCheck> = if let WaiterKey::Tid(tid) = key {
261            let mut v: Vec<PreCheck> = Vec::with_capacity(checks.len() + 1);
262            v.push(Arc::new(move |f: &Framing| {
263                if f.adu.transaction == Some(tid) {
264                    PreCheckOutcome::Pass
265                } else {
266                    PreCheckOutcome::Fail(ModbusError::InvalidResponse)
267                }
268            }));
269            v.extend(checks);
270            v
271        } else {
272            checks
273        };
274
275        // Arm the waiter BEFORE the write — otherwise a fast slave's reply
276        // can arrive between write completion and `start(...)` and be
277        // dropped on the floor.
278        let rx = self.session.start(key, final_checks);
279        if let Err(err) = self.physical.write(&encoded).await {
280            self.session.stop(key);
281            return Err(err);
282        }
283
284        if broadcast {
285            // No response expected. Tear down the (unused) waiter we just
286            // armed for the FIFO key.
287            self.session.stop(key);
288            return Ok(None);
289        }
290
291        let timeout = Duration::from_millis(timeout_ms);
292        match tokio::time::timeout(timeout, rx).await {
293            Ok(Ok(Ok(frame))) => Ok(Some(frame)),
294            Ok(Ok(Err(err))) => Err(err),
295            Ok(Err(_)) => {
296                // Receiver dropped (e.g. session.stop_all elsewhere).
297                Err(ModbusError::InvalidState(
298                    "master session was cleared while waiting".into(),
299                ))
300            }
301            Err(_) => {
302                self.session.stop(key);
303                Err(ModbusError::Timeout)
304            }
305        }
306    }
307
308    // FC1 - Read Coils
309    pub async fn read_coils(
310        &self,
311        unit: u8,
312        address: u16,
313        length: u16,
314        timeout_ms: Option<u64>,
315    ) -> Result<Option<MasterResponse<Vec<bool>>>, ModbusError> {
316        let fc = 0x01;
317        let byte_count = ((length + 7) / 8) as usize;
318
319        let mut buf = vec![0u8; 4];
320        buf[0..2].copy_from_slice(&address.to_be_bytes());
321        buf[2..4].copy_from_slice(&length.to_be_bytes());
322
323        let request = ApplicationDataUnit::new(unit, fc, buf);
324
325        let frame = self
326            .wait_response(
327                &request,
328                vec![
329                    Self::check_unit_fc(unit, fc),
330                    Self::check_length(1 + byte_count),
331                    Self::check_byte_count(byte_count),
332                ],
333                timeout_ms.unwrap_or(self.timeout_ms),
334            )
335            .await?;
336
337        match frame {
338            Some(f) => Ok(Some(MasterResponse {
339                transaction: f.adu.transaction,
340                unit: f.adu.unit,
341                fc: f.adu.fc,
342                data: parse_coils(&f.adu.data, length),
343                raw: f.raw,
344            })),
345            None => Ok(None),
346        }
347    }
348
349    /// Alias for [`Self::read_coils`].
350    pub async fn write_fc1(
351        &self,
352        unit: u8,
353        address: u16,
354        length: u16,
355        timeout_ms: Option<u64>,
356    ) -> Result<Option<MasterResponse<Vec<bool>>>, ModbusError> {
357        self.read_coils(unit, address, length, timeout_ms).await
358    }
359
360    // FC2 - Read Discrete Inputs
361    pub async fn read_discrete_inputs(
362        &self,
363        unit: u8,
364        address: u16,
365        length: u16,
366        timeout_ms: Option<u64>,
367    ) -> Result<Option<MasterResponse<Vec<bool>>>, ModbusError> {
368        let fc = 0x02;
369        let byte_count = ((length + 7) / 8) as usize;
370
371        let mut buf = vec![0u8; 4];
372        buf[0..2].copy_from_slice(&address.to_be_bytes());
373        buf[2..4].copy_from_slice(&length.to_be_bytes());
374
375        let request = ApplicationDataUnit::new(unit, fc, buf);
376
377        let frame = self
378            .wait_response(
379                &request,
380                vec![
381                    Self::check_unit_fc(unit, fc),
382                    Self::check_length(1 + byte_count),
383                    Self::check_byte_count(byte_count),
384                ],
385                timeout_ms.unwrap_or(self.timeout_ms),
386            )
387            .await?;
388
389        match frame {
390            Some(f) => Ok(Some(MasterResponse {
391                transaction: f.adu.transaction,
392                unit: f.adu.unit,
393                fc: f.adu.fc,
394                data: parse_coils(&f.adu.data, length),
395                raw: f.raw,
396            })),
397            None => Ok(None),
398        }
399    }
400
401    /// Alias for [`Self::read_discrete_inputs`].
402    pub async fn write_fc2(
403        &self,
404        unit: u8,
405        address: u16,
406        length: u16,
407        timeout_ms: Option<u64>,
408    ) -> Result<Option<MasterResponse<Vec<bool>>>, ModbusError> {
409        self.read_discrete_inputs(unit, address, length, timeout_ms)
410            .await
411    }
412
413    // FC3 - Read Holding Registers
414    pub async fn read_holding_registers(
415        &self,
416        unit: u8,
417        address: u16,
418        length: u16,
419        timeout_ms: Option<u64>,
420    ) -> Result<Option<MasterResponse<Vec<u16>>>, ModbusError> {
421        let fc = 0x03;
422        let byte_count = (length * 2) as usize;
423
424        let mut buf = vec![0u8; 4];
425        buf[0..2].copy_from_slice(&address.to_be_bytes());
426        buf[2..4].copy_from_slice(&length.to_be_bytes());
427
428        let request = ApplicationDataUnit::new(unit, fc, buf);
429
430        let frame = self
431            .wait_response(
432                &request,
433                vec![
434                    Self::check_unit_fc(unit, fc),
435                    Self::check_length(1 + byte_count),
436                    Self::check_byte_count(byte_count),
437                ],
438                timeout_ms.unwrap_or(self.timeout_ms),
439            )
440            .await?;
441
442        match frame {
443            Some(f) => Ok(Some(MasterResponse {
444                transaction: f.adu.transaction,
445                unit: f.adu.unit,
446                fc: f.adu.fc,
447                data: parse_registers(&f.adu.data, length),
448                raw: f.raw,
449            })),
450            None => Ok(None),
451        }
452    }
453
454    /// Alias for [`Self::read_holding_registers`].
455    pub async fn write_fc3(
456        &self,
457        unit: u8,
458        address: u16,
459        length: u16,
460        timeout_ms: Option<u64>,
461    ) -> Result<Option<MasterResponse<Vec<u16>>>, ModbusError> {
462        self.read_holding_registers(unit, address, length, timeout_ms)
463            .await
464    }
465
466    // FC4 - Read Input Registers
467    pub async fn read_input_registers(
468        &self,
469        unit: u8,
470        address: u16,
471        length: u16,
472        timeout_ms: Option<u64>,
473    ) -> Result<Option<MasterResponse<Vec<u16>>>, ModbusError> {
474        let fc = 0x04;
475        let byte_count = (length * 2) as usize;
476
477        let mut buf = vec![0u8; 4];
478        buf[0..2].copy_from_slice(&address.to_be_bytes());
479        buf[2..4].copy_from_slice(&length.to_be_bytes());
480
481        let request = ApplicationDataUnit::new(unit, fc, buf);
482
483        let frame = self
484            .wait_response(
485                &request,
486                vec![
487                    Self::check_unit_fc(unit, fc),
488                    Self::check_length(1 + byte_count),
489                    Self::check_byte_count(byte_count),
490                ],
491                timeout_ms.unwrap_or(self.timeout_ms),
492            )
493            .await?;
494
495        match frame {
496            Some(f) => Ok(Some(MasterResponse {
497                transaction: f.adu.transaction,
498                unit: f.adu.unit,
499                fc: f.adu.fc,
500                data: parse_registers(&f.adu.data, length),
501                raw: f.raw,
502            })),
503            None => Ok(None),
504        }
505    }
506
507    /// Alias for [`Self::read_input_registers`].
508    pub async fn write_fc4(
509        &self,
510        unit: u8,
511        address: u16,
512        length: u16,
513        timeout_ms: Option<u64>,
514    ) -> Result<Option<MasterResponse<Vec<u16>>>, ModbusError> {
515        self.read_input_registers(unit, address, length, timeout_ms)
516            .await
517    }
518
519    // FC5 - Write Single Coil
520    pub async fn write_single_coil(
521        &self,
522        unit: u8,
523        address: u16,
524        value: bool,
525        timeout_ms: Option<u64>,
526    ) -> Result<Option<MasterResponse<bool>>, ModbusError> {
527        let fc = 0x05;
528
529        let mut buf = vec![0u8; 4];
530        buf[0..2].copy_from_slice(&address.to_be_bytes());
531        let value_u16: u16 = if value { 0xff00 } else { 0x0000 };
532        buf[2..4].copy_from_slice(&value_u16.to_be_bytes());
533
534        let request = ApplicationDataUnit::new(unit, fc, buf.clone());
535
536        let frame = self
537            .wait_response(
538                &request,
539                vec![
540                    Self::check_unit_fc(unit, fc),
541                    Self::check_length(4),
542                    Self::check_echo(buf),
543                ],
544                timeout_ms.unwrap_or(self.timeout_ms),
545            )
546            .await?;
547
548        match frame {
549            Some(f) => Ok(Some(MasterResponse {
550                transaction: f.adu.transaction,
551                unit: f.adu.unit,
552                fc: f.adu.fc,
553                data: value,
554                raw: f.raw,
555            })),
556            None => Ok(None),
557        }
558    }
559
560    /// Alias for [`Self::write_single_coil`].
561    pub async fn write_fc5(
562        &self,
563        unit: u8,
564        address: u16,
565        value: bool,
566        timeout_ms: Option<u64>,
567    ) -> Result<Option<MasterResponse<bool>>, ModbusError> {
568        self.write_single_coil(unit, address, value, timeout_ms)
569            .await
570    }
571
572    // FC6 - Write Single Register
573    pub async fn write_single_register(
574        &self,
575        unit: u8,
576        address: u16,
577        value: u16,
578        timeout_ms: Option<u64>,
579    ) -> Result<Option<MasterResponse<u16>>, ModbusError> {
580        let fc = 0x06;
581
582        let mut buf = vec![0u8; 4];
583        buf[0..2].copy_from_slice(&address.to_be_bytes());
584        buf[2..4].copy_from_slice(&value.to_be_bytes());
585
586        let request = ApplicationDataUnit::new(unit, fc, buf.clone());
587
588        let frame = self
589            .wait_response(
590                &request,
591                vec![
592                    Self::check_unit_fc(unit, fc),
593                    Self::check_length(4),
594                    Self::check_echo(buf),
595                ],
596                timeout_ms.unwrap_or(self.timeout_ms),
597            )
598            .await?;
599
600        match frame {
601            Some(f) => Ok(Some(MasterResponse {
602                transaction: f.adu.transaction,
603                unit: f.adu.unit,
604                fc: f.adu.fc,
605                data: value,
606                raw: f.raw,
607            })),
608            None => Ok(None),
609        }
610    }
611
612    /// Alias for [`Self::write_single_register`].
613    pub async fn write_fc6(
614        &self,
615        unit: u8,
616        address: u16,
617        value: u16,
618        timeout_ms: Option<u64>,
619    ) -> Result<Option<MasterResponse<u16>>, ModbusError> {
620        self.write_single_register(unit, address, value, timeout_ms)
621            .await
622    }
623
624    // FC15 - Write Multiple Coils
625    pub async fn write_multiple_coils(
626        &self,
627        unit: u8,
628        address: u16,
629        values: &[bool],
630        timeout_ms: Option<u64>,
631    ) -> Result<Option<MasterResponse<Vec<bool>>>, ModbusError> {
632        let fc = 0x0f;
633        let byte_count = ((values.len() + 7) / 8) as u8;
634
635        let mut buf = vec![0u8; 5 + byte_count as usize];
636        buf[0..2].copy_from_slice(&address.to_be_bytes());
637        buf[2..4].copy_from_slice(&(values.len() as u16).to_be_bytes());
638        buf[4] = byte_count;
639        for (byte_idx, chunk) in values.chunks(8).enumerate() {
640            let mut byte = 0u8;
641            for (bit_idx, &v) in chunk.iter().enumerate() {
642                if v {
643                    byte |= 1 << bit_idx;
644                }
645            }
646            buf[5 + byte_idx] = byte;
647        }
648
649        let tx_buf = buf.clone();
650        let request = ApplicationDataUnit::new(unit, fc, buf);
651
652        let frame = self
653            .wait_response(
654                &request,
655                vec![
656                    Self::check_unit_fc(unit, fc),
657                    Self::check_length(4),
658                    Self::check_echo(tx_buf[..4].to_vec()),
659                ],
660                timeout_ms.unwrap_or(self.timeout_ms),
661            )
662            .await?;
663
664        match frame {
665            Some(f) => Ok(Some(MasterResponse {
666                transaction: f.adu.transaction,
667                unit: f.adu.unit,
668                fc: f.adu.fc,
669                data: values.to_vec(),
670                raw: f.raw,
671            })),
672            None => Ok(None),
673        }
674    }
675
676    /// Alias for [`Self::write_multiple_coils`].
677    pub async fn write_fc15(
678        &self,
679        unit: u8,
680        address: u16,
681        values: &[bool],
682        timeout_ms: Option<u64>,
683    ) -> Result<Option<MasterResponse<Vec<bool>>>, ModbusError> {
684        self.write_multiple_coils(unit, address, values, timeout_ms)
685            .await
686    }
687
688    // FC16 - Write Multiple Registers
689    pub async fn write_multiple_registers(
690        &self,
691        unit: u8,
692        address: u16,
693        values: &[u16],
694        timeout_ms: Option<u64>,
695    ) -> Result<Option<MasterResponse<Vec<u16>>>, ModbusError> {
696        let fc = 0x10;
697        let byte_count = (values.len() * 2) as u8;
698
699        let mut buf = vec![0u8; 5 + byte_count as usize];
700        buf[0..2].copy_from_slice(&address.to_be_bytes());
701        buf[2..4].copy_from_slice(&(values.len() as u16).to_be_bytes());
702        buf[4] = byte_count;
703        for (i, &v) in values.iter().enumerate() {
704            buf[5 + i * 2..7 + i * 2].copy_from_slice(&v.to_be_bytes());
705        }
706
707        let tx_buf = buf.clone();
708        let request = ApplicationDataUnit::new(unit, fc, buf);
709
710        let frame = self
711            .wait_response(
712                &request,
713                vec![
714                    Self::check_unit_fc(unit, fc),
715                    Self::check_length(4),
716                    Self::check_echo(tx_buf[..4].to_vec()),
717                ],
718                timeout_ms.unwrap_or(self.timeout_ms),
719            )
720            .await?;
721
722        match frame {
723            Some(f) => Ok(Some(MasterResponse {
724                transaction: f.adu.transaction,
725                unit: f.adu.unit,
726                fc: f.adu.fc,
727                data: values.to_vec(),
728                raw: f.raw,
729            })),
730            None => Ok(None),
731        }
732    }
733
734    /// Alias for [`Self::write_multiple_registers`].
735    pub async fn write_fc16(
736        &self,
737        unit: u8,
738        address: u16,
739        values: &[u16],
740        timeout_ms: Option<u64>,
741    ) -> Result<Option<MasterResponse<Vec<u16>>>, ModbusError> {
742        self.write_multiple_registers(unit, address, values, timeout_ms)
743            .await
744    }
745
746    // FC17 - Report Server ID
747    pub async fn report_server_id(
748        &self,
749        unit: u8,
750        server_id_length: usize,
751        timeout_ms: Option<u64>,
752    ) -> Result<Option<MasterResponse<ServerId>>, ModbusError> {
753        let fc = 0x11;
754        let request = ApplicationDataUnit::new(unit, fc, vec![]);
755
756        let frame = self
757            .wait_response(
758                &request,
759                vec![
760                    Self::check_unit_fc(unit, fc),
761                    Arc::new(move |f: &Framing| {
762                        if !f.adu.data.is_empty() {
763                            let len = 1 + f.adu.data[0] as usize;
764                            PreCheckOutcome::NeedLength(len)
765                        } else {
766                            PreCheckOutcome::InsufficientData
767                        }
768                    }),
769                ],
770                timeout_ms.unwrap_or(self.timeout_ms),
771            )
772            .await?;
773
774        match frame {
775            Some(f) => {
776                let run_status_index = 1 + server_id_length;
777                if f.adu.data.len() < run_status_index + 1 {
778                    return Err(ModbusError::InvalidResponse);
779                }
780                Ok(Some(MasterResponse {
781                    transaction: f.adu.transaction,
782                    unit: f.adu.unit,
783                    fc: f.adu.fc,
784                    data: ServerId {
785                        server_id: f.adu.data[1..run_status_index].to_vec(),
786                        run_indicator_status: f.adu.data[run_status_index] == 0xff,
787                        additional_data: f.adu.data[run_status_index + 1..].to_vec(),
788                    },
789                    raw: f.raw,
790                }))
791            }
792            None => Ok(None),
793        }
794    }
795
796    /// Alias for [`Self::report_server_id`].
797    pub async fn handle_fc17(
798        &self,
799        unit: u8,
800        server_id_length: usize,
801        timeout_ms: Option<u64>,
802    ) -> Result<Option<MasterResponse<ServerId>>, ModbusError> {
803        self.report_server_id(unit, server_id_length, timeout_ms)
804            .await
805    }
806
807    // FC22 - Mask Write Register
808    pub async fn mask_write_register(
809        &self,
810        unit: u8,
811        address: u16,
812        and_mask: u16,
813        or_mask: u16,
814        timeout_ms: Option<u64>,
815    ) -> Result<Option<MasterResponse<(u16, u16)>>, ModbusError> {
816        let fc = 0x16;
817
818        let mut buf = vec![0u8; 6];
819        buf[0..2].copy_from_slice(&address.to_be_bytes());
820        buf[2..4].copy_from_slice(&and_mask.to_be_bytes());
821        buf[4..6].copy_from_slice(&or_mask.to_be_bytes());
822
823        let request = ApplicationDataUnit::new(unit, fc, buf.clone());
824
825        let frame = self
826            .wait_response(
827                &request,
828                vec![
829                    Self::check_unit_fc(unit, fc),
830                    Self::check_length(6),
831                    Self::check_echo(buf),
832                ],
833                timeout_ms.unwrap_or(self.timeout_ms),
834            )
835            .await?;
836
837        match frame {
838            Some(f) => Ok(Some(MasterResponse {
839                transaction: f.adu.transaction,
840                unit: f.adu.unit,
841                fc: f.adu.fc,
842                data: (and_mask, or_mask),
843                raw: f.raw,
844            })),
845            None => Ok(None),
846        }
847    }
848
849    /// Alias for [`Self::mask_write_register`].
850    pub async fn handle_fc22(
851        &self,
852        unit: u8,
853        address: u16,
854        and_mask: u16,
855        or_mask: u16,
856        timeout_ms: Option<u64>,
857    ) -> Result<Option<MasterResponse<(u16, u16)>>, ModbusError> {
858        self.mask_write_register(unit, address, and_mask, or_mask, timeout_ms)
859            .await
860    }
861
862    // FC23 - Read/Write Multiple Registers
863    pub async fn read_and_write_multiple_registers(
864        &self,
865        unit: u8,
866        read_address: u16,
867        read_length: u16,
868        write_address: u16,
869        write_values: &[u16],
870        timeout_ms: Option<u64>,
871    ) -> Result<Option<MasterResponse<Vec<u16>>>, ModbusError> {
872        let fc = 0x17;
873        let byte_count = (write_values.len() * 2) as u8;
874
875        let mut buf = vec![0u8; 9 + byte_count as usize];
876        buf[0..2].copy_from_slice(&read_address.to_be_bytes());
877        buf[2..4].copy_from_slice(&read_length.to_be_bytes());
878        buf[4..6].copy_from_slice(&write_address.to_be_bytes());
879        buf[6..8].copy_from_slice(&(write_values.len() as u16).to_be_bytes());
880        buf[8] = byte_count;
881        for (i, &v) in write_values.iter().enumerate() {
882            buf[9 + i * 2..11 + i * 2].copy_from_slice(&v.to_be_bytes());
883        }
884
885        let request = ApplicationDataUnit::new(unit, fc, buf);
886        let read_byte_count = (read_length * 2) as usize;
887
888        let frame = self
889            .wait_response(
890                &request,
891                vec![
892                    Self::check_unit_fc(unit, fc),
893                    Self::check_length(1 + read_byte_count),
894                    Self::check_byte_count(read_byte_count),
895                ],
896                timeout_ms.unwrap_or(self.timeout_ms),
897            )
898            .await?;
899
900        match frame {
901            Some(f) => Ok(Some(MasterResponse {
902                transaction: f.adu.transaction,
903                unit: f.adu.unit,
904                fc: f.adu.fc,
905                data: parse_registers(&f.adu.data, read_length),
906                raw: f.raw,
907            })),
908            None => Ok(None),
909        }
910    }
911
912    /// Alias for [`Self::read_and_write_multiple_registers`].
913    pub async fn handle_fc23(
914        &self,
915        unit: u8,
916        read_address: u16,
917        read_length: u16,
918        write_address: u16,
919        write_values: &[u16],
920        timeout_ms: Option<u64>,
921    ) -> Result<Option<MasterResponse<Vec<u16>>>, ModbusError> {
922        self.read_and_write_multiple_registers(
923            unit,
924            read_address,
925            read_length,
926            write_address,
927            write_values,
928            timeout_ms,
929        )
930        .await
931    }
932
933    // FC43/14 - Read Device Identification
934    pub async fn read_device_identification(
935        &self,
936        unit: u8,
937        read_device_id_code: u8,
938        object_id: u8,
939        timeout_ms: Option<u64>,
940    ) -> Result<Option<MasterResponse<DeviceIdentification>>, ModbusError> {
941        let fc = 0x2b;
942        let request =
943            ApplicationDataUnit::new(unit, fc, vec![0x0e, read_device_id_code, object_id]);
944
945        let frame = self
946            .wait_response(
947                &request,
948                vec![
949                    Self::check_unit_fc(unit, fc),
950                    Arc::new(move |f: &Framing| {
951                        if f.adu.data.len() >= 6
952                            && f.adu.data[0] == 0x0e
953                            && f.adu.data[1] == read_device_id_code
954                        {
955                            let num_objects = f.adu.data[5] as usize;
956                            let mut total = 6usize;
957                            let mut idx = 6;
958                            for _ in 0..num_objects {
959                                if idx + 2 > f.adu.data.len() {
960                                    return PreCheckOutcome::InsufficientData;
961                                }
962                                let obj_len = f.adu.data[idx + 1] as usize;
963                                total += 2 + obj_len;
964                                idx += 2 + obj_len;
965                            }
966                            PreCheckOutcome::NeedLength(total)
967                        } else {
968                            PreCheckOutcome::Fail(ModbusError::InvalidResponse)
969                        }
970                    }),
971                ],
972                timeout_ms.unwrap_or(self.timeout_ms),
973            )
974            .await?;
975
976        match frame {
977            Some(f) => {
978                let mut objects = Vec::new();
979                let num_objects = f.adu.data[5] as usize;
980                let mut idx = 6;
981                for _ in 0..num_objects {
982                    let obj_id = f.adu.data[idx];
983                    let obj_len = f.adu.data[idx + 1] as usize;
984                    let obj_value =
985                        String::from_utf8_lossy(&f.adu.data[idx + 2..idx + 2 + obj_len])
986                            .to_string();
987                    objects.push(DeviceObject {
988                        id: obj_id,
989                        value: obj_value,
990                    });
991                    idx += 2 + obj_len;
992                }
993                Ok(Some(MasterResponse {
994                    transaction: f.adu.transaction,
995                    unit: f.adu.unit,
996                    fc: f.adu.fc,
997                    data: DeviceIdentification {
998                        read_device_id_code: f.adu.data[1],
999                        conformity_level: f.adu.data[2],
1000                        more_follows: f.adu.data[3] == 0xff,
1001                        next_object_id: f.adu.data[4],
1002                        objects,
1003                    },
1004                    raw: f.raw,
1005                }))
1006            }
1007            None => Ok(None),
1008        }
1009    }
1010
1011    /// Alias for [`Self::read_device_identification`].
1012    pub async fn handle_fc43_14(
1013        &self,
1014        unit: u8,
1015        read_device_id_code: u8,
1016        object_id: u8,
1017        timeout_ms: Option<u64>,
1018    ) -> Result<Option<MasterResponse<DeviceIdentification>>, ModbusError> {
1019        self.read_device_identification(unit, read_device_id_code, object_id, timeout_ms)
1020            .await
1021    }
1022
1023    pub fn add_custom_function_code(&self, cfc: CustomFunctionCode) {
1024        self.application.add_custom_function_code(cfc);
1025    }
1026
1027    pub fn remove_custom_function_code(&self, fc: u8) {
1028        self.application.remove_custom_function_code(fc);
1029    }
1030
1031    /// Send a non-standard / custom function code request. The master only
1032    /// validates that the response has matching `unit` and `fc`; any payload
1033    /// is returned as raw bytes. The caller must have registered a
1034    /// [`CustomFunctionCode`] with `predict_response_length` on the
1035    /// application layer (or on this master) so RTU framing can advance.
1036    ///
1037    /// `unit == 0` is broadcast: returns `Ok(None)` after the write.
1038    pub async fn send_custom_fc(
1039        &self,
1040        unit: u8,
1041        fc: u8,
1042        data: Vec<u8>,
1043        timeout_ms: Option<u64>,
1044    ) -> Result<Option<MasterResponse<Vec<u8>>>, ModbusError> {
1045        let request = ApplicationDataUnit::new(unit, fc, data);
1046        let frame = self
1047            .wait_response(
1048                &request,
1049                vec![Self::check_unit_fc(unit, fc)],
1050                timeout_ms.unwrap_or(self.timeout_ms),
1051            )
1052            .await?;
1053        match frame {
1054            Some(f) => Ok(Some(MasterResponse {
1055                transaction: f.adu.transaction,
1056                unit: f.adu.unit,
1057                fc: f.adu.fc,
1058                data: f.adu.data,
1059                raw: f.raw,
1060            })),
1061            None => Ok(None),
1062        }
1063    }
1064}