Skip to main content

rs_modbus/
slave.rs

1use crate::error::{get_code_by_error, get_error_by_code, ErrorCode, ModbusError};
2use crate::layers::application::{ApplicationLayer, ApplicationProtocol, ApplicationRole};
3use crate::layers::physical::{ConnectionId, PhysicalLayer, ResponseFn};
4use crate::types::{
5    AddressRange, ApplicationDataUnit, CustomFunctionCode, FramedDataUnit, ServerId,
6};
7use crate::utils::{check_range, pack_coils, pack_registers};
8use async_trait::async_trait;
9use std::collections::{HashMap, VecDeque};
10use std::future::Future;
11use std::pin::Pin;
12use std::sync::Arc;
13use tokio::sync::broadcast::error::RecvError;
14
15type SlaveResponseFn = Arc<
16    dyn Fn(Vec<u8>) -> Pin<Box<dyn Future<Output = Result<(), ModbusError>> + Send>> + Send + Sync,
17>;
18
19#[async_trait]
20pub trait ModbusSlaveModel: Send + Sync {
21    fn unit(&self) -> u8;
22    fn address_range(&self) -> AddressRange;
23
24    async fn intercept(&self, _fc: u8, _data: &[u8]) -> Result<Option<Vec<u8>>, ModbusError> {
25        Ok(None)
26    }
27
28    async fn read_coils(&self, _address: u16, _length: u16) -> Result<Vec<bool>, ModbusError> {
29        Err(ModbusError::IllegalFunction)
30    }
31    async fn write_single_coil(&self, _address: u16, _value: bool) -> Result<(), ModbusError> {
32        Err(ModbusError::IllegalFunction)
33    }
34    /// Default: loop `write_single_coil`. Mirrors njs-modbus' behavior where
35    /// a model that only provides `writeSingleCoil` is automatically usable
36    /// for FC15 requests. Override to provide a bulk-write fast path.
37    async fn write_multiple_coils(&self, address: u16, values: &[bool]) -> Result<(), ModbusError> {
38        for (i, &v) in values.iter().enumerate() {
39            self.write_single_coil(address + i as u16, v).await?;
40        }
41        Ok(())
42    }
43
44    async fn read_discrete_inputs(
45        &self,
46        _address: u16,
47        _length: u16,
48    ) -> Result<Vec<bool>, ModbusError> {
49        Err(ModbusError::IllegalFunction)
50    }
51
52    async fn read_holding_registers(
53        &self,
54        _address: u16,
55        _length: u16,
56    ) -> Result<Vec<u16>, ModbusError> {
57        Err(ModbusError::IllegalFunction)
58    }
59    async fn write_single_register(&self, _address: u16, _value: u16) -> Result<(), ModbusError> {
60        Err(ModbusError::IllegalFunction)
61    }
62    /// Default: loop `write_single_register`. See `write_multiple_coils`.
63    async fn write_multiple_registers(
64        &self,
65        address: u16,
66        values: &[u16],
67    ) -> Result<(), ModbusError> {
68        for (i, &v) in values.iter().enumerate() {
69            self.write_single_register(address + i as u16, v).await?;
70        }
71        Ok(())
72    }
73    /// Default: read-modify-write using `read_holding_registers` +
74    /// `write_single_register`. Mirrors njs-modbus' fallback.
75    async fn mask_write_register(
76        &self,
77        address: u16,
78        and_mask: u16,
79        or_mask: u16,
80    ) -> Result<(), ModbusError> {
81        let regs = self.read_holding_registers(address, 1).await?;
82        let current = *regs.first().ok_or(ModbusError::ServerDeviceFailure)?;
83        let new = (current & and_mask) | (or_mask & !and_mask);
84        self.write_single_register(address, new).await
85    }
86
87    async fn read_input_registers(
88        &self,
89        _address: u16,
90        _length: u16,
91    ) -> Result<Vec<u16>, ModbusError> {
92        Err(ModbusError::IllegalFunction)
93    }
94
95    async fn report_server_id(&self) -> Result<ServerId, ModbusError> {
96        Err(ModbusError::IllegalFunction)
97    }
98    async fn read_device_identification(&self) -> Result<HashMap<u8, String>, ModbusError> {
99        Err(ModbusError::IllegalFunction)
100    }
101}
102
103/// Tunables for [`ModbusSlave::with_options`]. Mirrors njs-modbus
104/// `ModbusSlaveOptions`.
105#[derive(Clone, Copy, Debug, Default)]
106pub struct ModbusSlaveOptions {
107    /// Pipelined concurrent processing of requests within a single
108    /// connection. Only valid for Modbus TCP application layers (TID
109    /// disambiguates responses); constructing a slave with `concurrent:
110    /// true` on RTU or ASCII panics. Defaults to `false` (per-connection
111    /// FIFO — same connection serialized, different connections in
112    /// parallel).
113    pub concurrent: bool,
114}
115
116struct QueueEntry {
117    items: VecDeque<(FramedDataUnit, ResponseFn)>,
118    processing: bool,
119}
120
121pub struct ModbusSlave<A: ApplicationLayer, P: PhysicalLayer> {
122    application: Arc<A>,
123    physical: Arc<P>,
124    pub models: Arc<std::sync::Mutex<HashMap<u8, Arc<dyn ModbusSlaveModel>>>>,
125    pub concurrent: bool,
126    queues: Arc<tokio::sync::Mutex<HashMap<ConnectionId, QueueEntry>>>,
127    tasks: tokio::sync::Mutex<Vec<tokio::task::JoinHandle<()>>>,
128    is_open: Arc<std::sync::atomic::AtomicBool>,
129    custom_function_codes: std::sync::Mutex<HashMap<u8, CustomFunctionCode>>,
130    clean_level: std::sync::atomic::AtomicU8,
131    /// Per-address async locks for FC22/FC23 fallback paths. Maps a register
132    /// address to a tokio mutex; requests that touch overlapping addresses
133    /// are serialized. Mirrors njs-modbus `withAddressLock`.
134    address_locks: Arc<tokio::sync::Mutex<HashMap<u16, Arc<tokio::sync::Mutex<()>>>>>,
135}
136
137impl<A: ApplicationLayer + 'static, P: PhysicalLayer + 'static> ModbusSlave<A, P> {
138    pub fn new(application: Arc<A>, physical: Arc<P>) -> Self {
139        Self::with_options(application, physical, ModbusSlaveOptions::default())
140    }
141
142    pub fn with_options(
143        application: Arc<A>,
144        physical: Arc<P>,
145        options: ModbusSlaveOptions,
146    ) -> Self {
147        if options.concurrent && application.protocol() != ApplicationProtocol::Tcp {
148            panic!("concurrent mode requires a Modbus TCP application layer");
149        }
150        application
151            .set_role(ApplicationRole::Slave)
152            .expect("application layer is already bound to a different role");
153        Self {
154            application,
155            physical,
156            models: Arc::new(std::sync::Mutex::new(HashMap::new())),
157            concurrent: options.concurrent,
158            queues: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
159            tasks: tokio::sync::Mutex::new(Vec::new()),
160            is_open: Arc::new(std::sync::atomic::AtomicBool::new(false)),
161            custom_function_codes: std::sync::Mutex::new(HashMap::new()),
162            address_locks: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
163            clean_level: std::sync::atomic::AtomicU8::new(0),
164        }
165    }
166
167    pub fn add(&self, model: Box<dyn ModbusSlaveModel>) {
168        let unit = model.unit();
169        let arc: Arc<dyn ModbusSlaveModel> = Arc::from(model);
170        self.models.lock().unwrap().insert(unit, arc);
171    }
172
173    pub fn remove(&self, unit: u8) {
174        self.models.lock().unwrap().remove(&unit);
175    }
176
177    pub fn is_open(&self) -> bool {
178        self.is_open.load(std::sync::atomic::Ordering::Acquire)
179    }
180
181    pub fn is_destroyed(&self) -> bool {
182        self.clean_level.load(std::sync::atomic::Ordering::Acquire) == 2
183            || self.physical.is_destroyed()
184    }
185
186    pub async fn open(&self, options: P::OpenOptions) -> Result<(), ModbusError> {
187        self.is_open
188            .store(true, std::sync::atomic::Ordering::Release);
189        self.clean_level
190            .store(0, std::sync::atomic::Ordering::Release);
191        // Fresh session — drop any state from a prior open/close cycle.
192        self.queues.lock().await.clear();
193
194        let application = Arc::clone(&self.application);
195        let models = Arc::clone(&self.models);
196        let queues = Arc::clone(&self.queues);
197        let address_locks = Arc::clone(&self.address_locks);
198        let custom_fcs: Arc<HashMap<u8, CustomFunctionCode>> =
199            Arc::new(self.custom_function_codes.lock().unwrap().clone());
200        let concurrent = self.concurrent;
201        let is_open = Arc::clone(&self.is_open);
202        let mut framing_rx = self.application.subscribe_framing();
203        let framing_task = tokio::spawn(async move {
204            loop {
205                match framing_rx.recv().await {
206                    Ok(framing) => {
207                        if !is_open.load(std::sync::atomic::Ordering::Acquire) {
208                            continue;
209                        }
210                        let frame = FramedDataUnit {
211                            adu: framing.adu,
212                            raw: framing.raw,
213                        };
214                        if concurrent {
215                            // TCP-only: each frame gets its own task. The
216                            // TID embedded in the response disambiguates.
217                            let app = Arc::clone(&application);
218                            let mdls = Arc::clone(&models);
219                            let cfs = Arc::clone(&custom_fcs);
220                            let locks = Arc::clone(&address_locks);
221                            tokio::spawn(async move {
222                                Self::process_frame(
223                                    &app,
224                                    &mdls,
225                                    &cfs,
226                                    &locks,
227                                    frame,
228                                    framing.response,
229                                )
230                                .await;
231                            });
232                        } else {
233                            // Per-connection FIFO: push onto this
234                            // connection's queue, kick off a drain task
235                            // if not already running.
236                            Self::enqueue_and_drain(
237                                Arc::clone(&queues),
238                                Arc::clone(&application),
239                                Arc::clone(&models),
240                                Arc::clone(&custom_fcs),
241                                Arc::clone(&address_locks),
242                                framing.connection,
243                                frame,
244                                framing.response,
245                            )
246                            .await;
247                        }
248                    }
249                    Err(RecvError::Lagged(_)) => continue,
250                    Err(RecvError::Closed) => break,
251                }
252            }
253        });
254
255        // Drop a connection's queue when its peer disconnects — the
256        // response closure points at a now-dead socket, so there is no
257        // point continuing to process queued frames. Keeps the in-flight
258        // one running; the drain task cleans up the entry when it lands
259        // on an empty queue.
260        let queues_for_close = Arc::clone(&self.queues);
261        let mut conn_close_rx = self.physical.subscribe_connection_close();
262        let conn_close_task = tokio::spawn(async move {
263            loop {
264                match conn_close_rx.recv().await {
265                    Ok(conn_id) => {
266                        let mut g = queues_for_close.lock().await;
267                        if let Some(entry) = g.get_mut(&conn_id) {
268                            entry.items.clear();
269                            if !entry.processing {
270                                g.remove(&conn_id);
271                            }
272                        }
273                    }
274                    Err(RecvError::Lagged(_)) => continue,
275                    Err(RecvError::Closed) => break,
276                }
277            }
278        });
279
280        // Clear everything on a full physical-layer close.
281        let queues_for_full_close = Arc::clone(&self.queues);
282        let mut close_rx = self.physical.subscribe_close();
283        let close_task = tokio::spawn(async move {
284            loop {
285                match close_rx.recv().await {
286                    Ok(()) => {
287                        queues_for_full_close.lock().await.clear();
288                    }
289                    Err(RecvError::Lagged(_)) => continue,
290                    Err(RecvError::Closed) => break,
291                }
292            }
293        });
294
295        self.tasks
296            .lock()
297            .await
298            .extend([framing_task, conn_close_task, close_task]);
299
300        self.physical.open(options).await?;
301        Ok(())
302    }
303
304    async fn clean(&self, level: u8) {
305        let current = self.clean_level.load(std::sync::atomic::Ordering::Acquire);
306        if current == 2 {
307            return;
308        }
309        if current == 1 && level == 1 {
310            return;
311        }
312        self.is_open
313            .store(false, std::sync::atomic::Ordering::Release);
314        self.queues.lock().await.clear();
315        self.address_locks.lock().await.clear();
316        if level == 2 {
317            self.custom_function_codes.lock().unwrap().clear();
318            self.models.lock().unwrap().clear();
319        }
320        self.clean_level
321            .store(level, std::sync::atomic::Ordering::Release);
322    }
323
324    pub async fn close(&self) -> Result<(), ModbusError> {
325        if self.clean_level.load(std::sync::atomic::Ordering::Acquire) == 2 {
326            return Ok(());
327        }
328        self.clean(1).await;
329        return self.physical.close().await;
330    }
331
332    pub async fn destroy(&self) {
333        if self.clean_level.load(std::sync::atomic::Ordering::Acquire) == 2 {
334            return;
335        }
336        self.clean(2).await;
337        {
338            let mut tasks = self.tasks.lock().await;
339            for task in tasks.drain(..) {
340                task.abort();
341            }
342        }
343        let _ = self.physical.destroy().await;
344    }
345
346    /// Acquire an async lock for each address in `addresses`, execute `f`, then
347    /// release all locks. Addresses are deduplicated and sorted before locking
348    /// to avoid deadlocks. Mirrors njs-modbus `withAddressLock`.
349    async fn with_address_lock<F, Fut, T>(
350        address_locks: &tokio::sync::Mutex<HashMap<u16, Arc<tokio::sync::Mutex<()>>>>,
351        addresses: &[u16],
352        f: F,
353    ) -> T
354    where
355        F: FnOnce() -> Fut,
356        Fut: std::future::Future<Output = T>,
357    {
358        let mut sorted: Vec<u16> = addresses.to_vec();
359        sorted.sort_unstable();
360        sorted.dedup();
361
362        // Collect the Arc<Mutex<()>> for each address (may create new entries).
363        let lock_arcs: Vec<Arc<tokio::sync::Mutex<()>>> = {
364            let mut locks = address_locks.lock().await;
365            sorted
366                .iter()
367                .map(|&addr| {
368                    locks
369                        .entry(addr)
370                        .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
371                        .clone()
372                })
373                .collect()
374        };
375
376        // Acquire all locks in sorted order. Locking in a consistent sorted
377        // order ensures no two concurrent `with_address_lock` calls can
378        // deadlock with each other, even if their address sets overlap.
379        let mut guards: Vec<tokio::sync::MutexGuard<'_, ()>> = Vec::with_capacity(lock_arcs.len());
380        for arc in &lock_arcs {
381            guards.push(arc.lock().await);
382        }
383
384        let result = f().await;
385        drop(guards);
386        drop(lock_arcs);
387
388        // Clean up entries whose Arc is no longer referenced outside the map.
389        // This prevents unbounded growth when writes touch many distinct
390        // addresses over the lifetime of the slave.
391        {
392            let mut locks = address_locks.lock().await;
393            for &addr in &sorted {
394                if let Some(arc) = locks.get(&addr) {
395                    if Arc::strong_count(arc) == 1 {
396                        locks.remove(&addr);
397                    }
398                }
399            }
400        }
401
402        result
403    }
404
405    /// Push a frame onto the per-connection queue. If no drain task is
406    /// currently running for this connection, spawn one; otherwise the
407    /// already-running drain picks the new item up on its next iteration.
408    #[allow(clippy::too_many_arguments)]
409    async fn enqueue_and_drain(
410        queues: Arc<tokio::sync::Mutex<HashMap<ConnectionId, QueueEntry>>>,
411        application: Arc<A>,
412        models: Arc<std::sync::Mutex<HashMap<u8, Arc<dyn ModbusSlaveModel>>>>,
413        custom_fcs: Arc<HashMap<u8, CustomFunctionCode>>,
414        address_locks: Arc<tokio::sync::Mutex<HashMap<u16, Arc<tokio::sync::Mutex<()>>>>>,
415        connection: ConnectionId,
416        frame: FramedDataUnit,
417        response: ResponseFn,
418    ) {
419        let should_spawn = {
420            let mut g = queues.lock().await;
421            let entry = g.entry(Arc::clone(&connection)).or_insert(QueueEntry {
422                items: VecDeque::new(),
423                processing: false,
424            });
425            entry.items.push_back((frame, response));
426            if entry.processing {
427                false
428            } else {
429                entry.processing = true;
430                true
431            }
432        };
433        if should_spawn {
434            tokio::spawn(async move {
435                Self::drain_loop(
436                    queues,
437                    application,
438                    models,
439                    custom_fcs,
440                    address_locks,
441                    connection,
442                )
443                .await;
444            });
445        }
446    }
447
448    /// Drain the per-connection queue until empty. The entry is removed
449    /// from the map when the queue settles empty so the map doesn't grow
450    /// unbounded across ephemeral connections (UDP rinfos, brief TCP
451    /// clients). If the entry is gone mid-drain (cleared by a
452    /// `connection_close` handler), we bail early.
453    async fn drain_loop(
454        queues: Arc<tokio::sync::Mutex<HashMap<ConnectionId, QueueEntry>>>,
455        application: Arc<A>,
456        models: Arc<std::sync::Mutex<HashMap<u8, Arc<dyn ModbusSlaveModel>>>>,
457        custom_fcs: Arc<HashMap<u8, CustomFunctionCode>>,
458        address_locks: Arc<tokio::sync::Mutex<HashMap<u16, Arc<tokio::sync::Mutex<()>>>>>,
459        connection: ConnectionId,
460    ) {
461        loop {
462            let next = {
463                let mut g = queues.lock().await;
464                match g.get_mut(&connection) {
465                    Some(entry) => entry.items.pop_front(),
466                    None => return,
467                }
468            };
469            match next {
470                Some((frame, response)) => {
471                    Self::process_frame(
472                        &application,
473                        &models,
474                        &custom_fcs,
475                        &address_locks,
476                        frame,
477                        response,
478                    )
479                    .await;
480                }
481                None => {
482                    let mut g = queues.lock().await;
483                    if let Some(entry) = g.get_mut(&connection) {
484                        if entry.items.is_empty() {
485                            g.remove(&connection);
486                            return;
487                        }
488                        // else a new item snuck in between pop_front and
489                        // this lock; loop and re-drain.
490                    } else {
491                        return;
492                    }
493                }
494            }
495        }
496    }
497
498    async fn process_frame(
499        application: &Arc<A>,
500        models: &Arc<std::sync::Mutex<HashMap<u8, Arc<dyn ModbusSlaveModel>>>>,
501        custom_fcs: &HashMap<u8, CustomFunctionCode>,
502        address_locks: &tokio::sync::Mutex<HashMap<u16, Arc<tokio::sync::Mutex<()>>>>,
503        frame: FramedDataUnit,
504        response_fn: ResponseFn,
505    ) {
506        let unit = frame.adu.unit;
507        // Snapshot the model Arc(s) under a brief lock so the map mutex
508        // isn't held across handler `.await` points — otherwise a slow
509        // handler on one model would block every other connection's
510        // frames slave-wide. Mirrors the per-connection FIFO goal of
511        // Item #4.
512        let models_snapshot: Vec<Arc<dyn ModbusSlaveModel>> = {
513            let g = models.lock().unwrap();
514            if unit == 0 {
515                g.values().map(Arc::clone).collect()
516            } else {
517                match g.get(&unit) {
518                    Some(m) => vec![Arc::clone(m)],
519                    None => return,
520                }
521            }
522        };
523
524        for model_arc in models_snapshot {
525            let model: &dyn ModbusSlaveModel = &*model_arc;
526
527            let response: SlaveResponseFn = if unit == 0 {
528                Arc::new(|_| {
529                    Box::pin(async { Ok(()) })
530                        as Pin<Box<dyn Future<Output = Result<(), ModbusError>> + Send>>
531                })
532            } else {
533                let response_fn = Arc::clone(&response_fn);
534                Arc::new(move |data| {
535                    let response_fn = Arc::clone(&response_fn);
536                    Box::pin(async move { response_fn(data).await })
537                        as Pin<Box<dyn Future<Output = Result<(), ModbusError>> + Send>>
538                })
539            };
540
541            match model.intercept(frame.adu.fc, &frame.adu.data).await {
542                Ok(Some(data)) => {
543                    let response_adu = ApplicationDataUnit {
544                        transaction: frame.adu.transaction,
545                        unit: frame.adu.unit,
546                        fc: frame.adu.fc,
547                        data,
548                    };
549                    let encoded = application.encode(&response_adu);
550                    let _ = response(encoded).await;
551                    continue;
552                }
553                Ok(None) => {}
554                Err(err) => {
555                    let _ = Self::response_error(application, &frame.adu, response, &err).await;
556                    continue;
557                }
558            }
559
560            let result = match frame.adu.fc {
561                0x01 => {
562                    Self::handle_fc1(application, model, &frame.adu, Arc::clone(&response)).await
563                }
564                0x02 => {
565                    Self::handle_fc2(application, model, &frame.adu, Arc::clone(&response)).await
566                }
567                0x03 => {
568                    Self::handle_fc3(application, model, &frame.adu, Arc::clone(&response)).await
569                }
570                0x04 => {
571                    Self::handle_fc4(application, model, &frame.adu, Arc::clone(&response)).await
572                }
573                0x05 => {
574                    Self::handle_fc5(application, model, &frame.adu, Arc::clone(&response)).await
575                }
576                0x06 => {
577                    Self::handle_fc6(application, model, &frame.adu, Arc::clone(&response)).await
578                }
579                0x0f => {
580                    Self::handle_fc15(application, model, &frame.adu, Arc::clone(&response)).await
581                }
582                0x10 => {
583                    Self::handle_fc16(application, model, &frame.adu, Arc::clone(&response)).await
584                }
585                0x11 => {
586                    Self::handle_fc17(application, model, &frame.adu, Arc::clone(&response)).await
587                }
588                0x16 => {
589                    Self::handle_fc22(
590                        application,
591                        model,
592                        address_locks,
593                        &frame.adu,
594                        Arc::clone(&response),
595                    )
596                    .await
597                }
598                0x17 => {
599                    Self::handle_fc23(
600                        application,
601                        model,
602                        address_locks,
603                        &frame.adu,
604                        Arc::clone(&response),
605                    )
606                    .await
607                }
608                0x2b => {
609                    Self::handle_fc43_14(application, model, &frame.adu, Arc::clone(&response))
610                        .await
611                }
612                _ => {
613                    if let Some(cfc) = custom_fcs.get(&frame.adu.fc) {
614                        if let Some(ref handler) = cfc.handle {
615                            let handler_clone: std::sync::Arc<
616                                dyn Fn(Vec<u8>, u8) -> crate::types::CustomFcHandleResult
617                                    + Send
618                                    + Sync,
619                            > = Arc::clone(handler);
620                            let pdu = frame.adu.data.clone();
621                            match handler_clone(pdu, frame.adu.unit).await {
622                                Ok(response_data) => {
623                                    let response_adu = ApplicationDataUnit {
624                                        transaction: frame.adu.transaction,
625                                        unit: frame.adu.unit,
626                                        fc: frame.adu.fc,
627                                        data: response_data,
628                                    };
629                                    let _ = response(application.encode(&response_adu)).await;
630                                    continue;
631                                }
632                                Err(e) => {
633                                    let _ = Self::response_error(
634                                        application,
635                                        &frame.adu,
636                                        Arc::clone(&response),
637                                        &e,
638                                    )
639                                    .await;
640                                    continue;
641                                }
642                            }
643                        }
644                    }
645                    Self::response_error(
646                        application,
647                        &frame.adu,
648                        Arc::clone(&response),
649                        &get_error_by_code(ErrorCode::IllegalFunction),
650                    )
651                    .await
652                }
653            };
654
655            if let Err(e) = result {
656                let _ = Self::response_error(application, &frame.adu, response, &e).await;
657            }
658        }
659    }
660
661    async fn response_error(
662        application: &Arc<A>,
663        adu: &ApplicationDataUnit,
664        response: SlaveResponseFn,
665        error: &ModbusError,
666    ) -> Result<(), ModbusError> {
667        let error_code = get_code_by_error(error) as u8;
668        let response_adu = ApplicationDataUnit {
669            transaction: adu.transaction,
670            unit: adu.unit,
671            fc: adu.fc | 0x80,
672            data: vec![error_code],
673        };
674        let encoded = application.encode(&response_adu);
675        response(encoded).await
676    }
677
678    // FC1 - Read Coils
679    async fn handle_fc1(
680        application: &Arc<A>,
681        model: &dyn ModbusSlaveModel,
682        adu: &ApplicationDataUnit,
683        response: SlaveResponseFn,
684    ) -> Result<(), ModbusError> {
685        if adu.data.len() != 4 {
686            return Ok(());
687        }
688        let address = u16::from_be_bytes([adu.data[0], adu.data[1]]);
689        let length = u16::from_be_bytes([adu.data[2], adu.data[3]]);
690
691        if !(1..=0x07d0).contains(&length) {
692            return Self::response_error(
693                application,
694                adu,
695                response,
696                &get_error_by_code(ErrorCode::IllegalDataValue),
697            )
698            .await;
699        }
700
701        if !check_range(&[address, address + length], &model.address_range().coils) {
702            return Self::response_error(
703                application,
704                adu,
705                response,
706                &get_error_by_code(ErrorCode::IllegalDataAddress),
707            )
708            .await;
709        }
710
711        match model.read_coils(address, length).await {
712            Ok(coils) => {
713                let response_adu = ApplicationDataUnit {
714                    transaction: adu.transaction,
715                    unit: adu.unit,
716                    fc: adu.fc,
717                    data: pack_coils(&coils, length),
718                };
719                response(application.encode(&response_adu)).await
720            }
721            Err(e) => Self::response_error(application, adu, response, &e).await,
722        }
723    }
724
725    // FC2 - Read Discrete Inputs
726    async fn handle_fc2(
727        application: &Arc<A>,
728        model: &dyn ModbusSlaveModel,
729        adu: &ApplicationDataUnit,
730        response: SlaveResponseFn,
731    ) -> Result<(), ModbusError> {
732        if adu.data.len() != 4 {
733            return Ok(());
734        }
735        let address = u16::from_be_bytes([adu.data[0], adu.data[1]]);
736        let length = u16::from_be_bytes([adu.data[2], adu.data[3]]);
737
738        if !(1..=0x07d0).contains(&length) {
739            return Self::response_error(
740                application,
741                adu,
742                response,
743                &get_error_by_code(ErrorCode::IllegalDataValue),
744            )
745            .await;
746        }
747
748        if !check_range(
749            &[address, address + length],
750            &model.address_range().discrete_inputs,
751        ) {
752            return Self::response_error(
753                application,
754                adu,
755                response,
756                &get_error_by_code(ErrorCode::IllegalDataAddress),
757            )
758            .await;
759        }
760
761        match model.read_discrete_inputs(address, length).await {
762            Ok(inputs) => {
763                let response_adu = ApplicationDataUnit {
764                    transaction: adu.transaction,
765                    unit: adu.unit,
766                    fc: adu.fc,
767                    data: pack_coils(&inputs, length),
768                };
769                response(application.encode(&response_adu)).await
770            }
771            Err(e) => Self::response_error(application, adu, response, &e).await,
772        }
773    }
774
775    // FC3 - Read Holding Registers
776    async fn handle_fc3(
777        application: &Arc<A>,
778        model: &dyn ModbusSlaveModel,
779        adu: &ApplicationDataUnit,
780        response: SlaveResponseFn,
781    ) -> Result<(), ModbusError> {
782        if adu.data.len() != 4 {
783            return Ok(());
784        }
785        let address = u16::from_be_bytes([adu.data[0], adu.data[1]]);
786        let length = u16::from_be_bytes([adu.data[2], adu.data[3]]);
787
788        if !(1..=0x007d).contains(&length) {
789            return Self::response_error(
790                application,
791                adu,
792                response,
793                &get_error_by_code(ErrorCode::IllegalDataValue),
794            )
795            .await;
796        }
797
798        if !check_range(
799            &[address, address + length],
800            &model.address_range().holding_registers,
801        ) {
802            return Self::response_error(
803                application,
804                adu,
805                response,
806                &get_error_by_code(ErrorCode::IllegalDataAddress),
807            )
808            .await;
809        }
810
811        match model.read_holding_registers(address, length).await {
812            Ok(registers) => {
813                let response_adu = ApplicationDataUnit {
814                    transaction: adu.transaction,
815                    unit: adu.unit,
816                    fc: adu.fc,
817                    data: pack_registers(&registers, length),
818                };
819                response(application.encode(&response_adu)).await
820            }
821            Err(e) => Self::response_error(application, adu, response, &e).await,
822        }
823    }
824
825    // FC4 - Read Input Registers
826    async fn handle_fc4(
827        application: &Arc<A>,
828        model: &dyn ModbusSlaveModel,
829        adu: &ApplicationDataUnit,
830        response: SlaveResponseFn,
831    ) -> Result<(), ModbusError> {
832        if adu.data.len() != 4 {
833            return Ok(());
834        }
835        let address = u16::from_be_bytes([adu.data[0], adu.data[1]]);
836        let length = u16::from_be_bytes([adu.data[2], adu.data[3]]);
837
838        if !(1..=0x007d).contains(&length) {
839            return Self::response_error(
840                application,
841                adu,
842                response,
843                &get_error_by_code(ErrorCode::IllegalDataValue),
844            )
845            .await;
846        }
847
848        if !check_range(
849            &[address, address + length],
850            &model.address_range().input_registers,
851        ) {
852            return Self::response_error(
853                application,
854                adu,
855                response,
856                &get_error_by_code(ErrorCode::IllegalDataAddress),
857            )
858            .await;
859        }
860
861        match model.read_input_registers(address, length).await {
862            Ok(registers) => {
863                let response_adu = ApplicationDataUnit {
864                    transaction: adu.transaction,
865                    unit: adu.unit,
866                    fc: adu.fc,
867                    data: pack_registers(&registers, length),
868                };
869                response(application.encode(&response_adu)).await
870            }
871            Err(e) => Self::response_error(application, adu, response, &e).await,
872        }
873    }
874
875    // FC5 - Write Single Coil
876    async fn handle_fc5(
877        application: &Arc<A>,
878        model: &dyn ModbusSlaveModel,
879        adu: &ApplicationDataUnit,
880        response: SlaveResponseFn,
881    ) -> Result<(), ModbusError> {
882        if adu.data.len() != 4 {
883            return Ok(());
884        }
885        let address = u16::from_be_bytes([adu.data[0], adu.data[1]]);
886        let value = u16::from_be_bytes([adu.data[2], adu.data[3]]);
887
888        if value != 0x0000 && value != 0xff00 {
889            return Self::response_error(
890                application,
891                adu,
892                response,
893                &get_error_by_code(ErrorCode::IllegalDataValue),
894            )
895            .await;
896        }
897
898        if !check_range(&[address], &model.address_range().coils) {
899            return Self::response_error(
900                application,
901                adu,
902                response,
903                &get_error_by_code(ErrorCode::IllegalDataAddress),
904            )
905            .await;
906        }
907
908        match model.write_single_coil(address, value == 0xff00).await {
909            Ok(()) => response(application.encode(adu)).await,
910            Err(e) => Self::response_error(application, adu, response, &e).await,
911        }
912    }
913
914    // FC6 - Write Single Register
915    async fn handle_fc6(
916        application: &Arc<A>,
917        model: &dyn ModbusSlaveModel,
918        adu: &ApplicationDataUnit,
919        response: SlaveResponseFn,
920    ) -> Result<(), ModbusError> {
921        if adu.data.len() != 4 {
922            return Ok(());
923        }
924        let address = u16::from_be_bytes([adu.data[0], adu.data[1]]);
925        let value = u16::from_be_bytes([adu.data[2], adu.data[3]]);
926
927        if !check_range(&[address], &model.address_range().holding_registers) {
928            return Self::response_error(
929                application,
930                adu,
931                response,
932                &get_error_by_code(ErrorCode::IllegalDataAddress),
933            )
934            .await;
935        }
936
937        match model.write_single_register(address, value).await {
938            Ok(()) => response(application.encode(adu)).await,
939            Err(e) => Self::response_error(application, adu, response, &e).await,
940        }
941    }
942
943    // FC15 - Write Multiple Coils
944    async fn handle_fc15(
945        application: &Arc<A>,
946        model: &dyn ModbusSlaveModel,
947        adu: &ApplicationDataUnit,
948        response: SlaveResponseFn,
949    ) -> Result<(), ModbusError> {
950        if adu.data.len() < 6 || adu.data.len() != 5 + adu.data[4] as usize {
951            return Ok(());
952        }
953        let address = u16::from_be_bytes([adu.data[0], adu.data[1]]);
954        let length = u16::from_be_bytes([adu.data[2], adu.data[3]]);
955        let byte_count = adu.data[4];
956
957        if !(1..=0x07b0).contains(&length) || byte_count as u16 != (length + 7) / 8 {
958            return Self::response_error(
959                application,
960                adu,
961                response,
962                &get_error_by_code(ErrorCode::IllegalDataValue),
963            )
964            .await;
965        }
966
967        if !check_range(&[address, address + length], &model.address_range().coils) {
968            return Self::response_error(
969                application,
970                adu,
971                response,
972                &get_error_by_code(ErrorCode::IllegalDataAddress),
973            )
974            .await;
975        }
976
977        let values: Vec<bool> = (0..length)
978            .map(|i| (adu.data[5 + i as usize / 8] >> (i % 8)) & 1 == 1)
979            .collect();
980
981        let result = model.write_multiple_coils(address, &values).await;
982
983        match result {
984            Ok(()) => {
985                let mut data = vec![0u8; 4];
986                data[0..2].copy_from_slice(&address.to_be_bytes());
987                data[2..4].copy_from_slice(&length.to_be_bytes());
988                let response_adu = ApplicationDataUnit {
989                    transaction: adu.transaction,
990                    unit: adu.unit,
991                    fc: adu.fc,
992                    data,
993                };
994                response(application.encode(&response_adu)).await
995            }
996            Err(e) => Self::response_error(application, adu, response, &e).await,
997        }
998    }
999
1000    // FC16 - Write Multiple Registers
1001    async fn handle_fc16(
1002        application: &Arc<A>,
1003        model: &dyn ModbusSlaveModel,
1004        adu: &ApplicationDataUnit,
1005        response: SlaveResponseFn,
1006    ) -> Result<(), ModbusError> {
1007        if adu.data.len() < 6 || adu.data.len() != 5 + adu.data[4] as usize {
1008            return Ok(());
1009        }
1010        let address = u16::from_be_bytes([adu.data[0], adu.data[1]]);
1011        let length = u16::from_be_bytes([adu.data[2], adu.data[3]]);
1012        let byte_count = adu.data[4];
1013
1014        if !(1..=0x007b).contains(&length) || byte_count as u16 != length * 2 {
1015            return Self::response_error(
1016                application,
1017                adu,
1018                response,
1019                &get_error_by_code(ErrorCode::IllegalDataValue),
1020            )
1021            .await;
1022        }
1023
1024        if !check_range(
1025            &[address, address + length],
1026            &model.address_range().holding_registers,
1027        ) {
1028            return Self::response_error(
1029                application,
1030                adu,
1031                response,
1032                &get_error_by_code(ErrorCode::IllegalDataAddress),
1033            )
1034            .await;
1035        }
1036
1037        let values: Vec<u16> = (0..length)
1038            .map(|i| {
1039                u16::from_be_bytes([adu.data[5 + i as usize * 2], adu.data[6 + i as usize * 2]])
1040            })
1041            .collect();
1042
1043        let result = model.write_multiple_registers(address, &values).await;
1044
1045        match result {
1046            Ok(()) => {
1047                let mut data = vec![0u8; 4];
1048                data[0..2].copy_from_slice(&address.to_be_bytes());
1049                data[2..4].copy_from_slice(&length.to_be_bytes());
1050                let response_adu = ApplicationDataUnit {
1051                    transaction: adu.transaction,
1052                    unit: adu.unit,
1053                    fc: adu.fc,
1054                    data,
1055                };
1056                response(application.encode(&response_adu)).await
1057            }
1058            Err(e) => Self::response_error(application, adu, response, &e).await,
1059        }
1060    }
1061
1062    // FC17 - Report Server ID
1063    async fn handle_fc17(
1064        application: &Arc<A>,
1065        model: &dyn ModbusSlaveModel,
1066        adu: &ApplicationDataUnit,
1067        response: SlaveResponseFn,
1068    ) -> Result<(), ModbusError> {
1069        if !adu.data.is_empty() {
1070            return Ok(());
1071        }
1072
1073        match model.report_server_id().await {
1074            Ok(server_id) => {
1075                // Modbus V1.1b3 §6.17 leaves Server ID length device-specific
1076                // (N bytes). Validate the assembled payload fits in a single
1077                // byteCount byte before we serialize.
1078                let server_id_bytes = if server_id.server_id.is_empty() {
1079                    vec![model.unit()]
1080                } else {
1081                    server_id.server_id.clone()
1082                };
1083                let byte_count = server_id_bytes.len() + 1 + server_id.additional_data.len();
1084                if byte_count > 255 {
1085                    return Self::response_error(
1086                        application,
1087                        adu,
1088                        response,
1089                        &get_error_by_code(ErrorCode::ServerDeviceFailure),
1090                    )
1091                    .await;
1092                }
1093                let mut data = Vec::with_capacity(1 + byte_count);
1094                data.push(byte_count as u8);
1095                data.extend_from_slice(&server_id_bytes);
1096                data.push(if server_id.run_indicator_status {
1097                    0xff
1098                } else {
1099                    0x00
1100                });
1101                data.extend_from_slice(&server_id.additional_data);
1102                let response_adu = ApplicationDataUnit {
1103                    transaction: adu.transaction,
1104                    unit: adu.unit,
1105                    fc: adu.fc,
1106                    data,
1107                };
1108                response(application.encode(&response_adu)).await
1109            }
1110            Err(e) => Self::response_error(application, adu, response, &e).await,
1111        }
1112    }
1113
1114    // FC22 - Mask Write Register
1115    async fn handle_fc22(
1116        application: &Arc<A>,
1117        model: &dyn ModbusSlaveModel,
1118        address_locks: &tokio::sync::Mutex<HashMap<u16, Arc<tokio::sync::Mutex<()>>>>,
1119        adu: &ApplicationDataUnit,
1120        response: SlaveResponseFn,
1121    ) -> Result<(), ModbusError> {
1122        if adu.data.len() != 6 {
1123            return Ok(());
1124        }
1125        let address = u16::from_be_bytes([adu.data[0], adu.data[1]]);
1126        let and_mask = u16::from_be_bytes([adu.data[2], adu.data[3]]);
1127        let or_mask = u16::from_be_bytes([adu.data[4], adu.data[5]]);
1128
1129        if !check_range(&[address], &model.address_range().holding_registers) {
1130            return Self::response_error(
1131                application,
1132                adu,
1133                response,
1134                &get_error_by_code(ErrorCode::IllegalDataAddress),
1135            )
1136            .await;
1137        }
1138
1139        let result = Self::with_address_lock(address_locks, &[address], || async {
1140            model.mask_write_register(address, and_mask, or_mask).await
1141        })
1142        .await;
1143
1144        match result {
1145            Ok(()) => response(application.encode(adu)).await,
1146            Err(e) => Self::response_error(application, adu, response, &e).await,
1147        }
1148    }
1149
1150    // FC23 - Read/Write Multiple Registers
1151    async fn handle_fc23(
1152        application: &Arc<A>,
1153        model: &dyn ModbusSlaveModel,
1154        address_locks: &tokio::sync::Mutex<HashMap<u16, Arc<tokio::sync::Mutex<()>>>>,
1155        adu: &ApplicationDataUnit,
1156        response: SlaveResponseFn,
1157    ) -> Result<(), ModbusError> {
1158        if adu.data.len() < 10 || adu.data.len() != 9 + adu.data[8] as usize {
1159            return Ok(());
1160        }
1161        let read_address = u16::from_be_bytes([adu.data[0], adu.data[1]]);
1162        let read_length = u16::from_be_bytes([adu.data[2], adu.data[3]]);
1163        let write_address = u16::from_be_bytes([adu.data[4], adu.data[5]]);
1164        let write_length = u16::from_be_bytes([adu.data[6], adu.data[7]]);
1165        let byte_count = adu.data[8];
1166
1167        if !(1..=0x007d).contains(&read_length)
1168            || !(1..=0x0079).contains(&write_length)
1169            || byte_count as u16 != write_length * 2
1170        {
1171            return Self::response_error(
1172                application,
1173                adu,
1174                response,
1175                &get_error_by_code(ErrorCode::IllegalDataValue),
1176            )
1177            .await;
1178        }
1179
1180        if !check_range(
1181            &[
1182                read_address,
1183                read_address + read_length,
1184                write_address,
1185                write_address + write_length,
1186            ],
1187            &model.address_range().holding_registers,
1188        ) {
1189            return Self::response_error(
1190                application,
1191                adu,
1192                response,
1193                &get_error_by_code(ErrorCode::IllegalDataAddress),
1194            )
1195            .await;
1196        }
1197
1198        let write_values: Vec<u16> = (0..write_length)
1199            .map(|i| {
1200                u16::from_be_bytes([adu.data[9 + i as usize * 2], adu.data[10 + i as usize * 2]])
1201            })
1202            .collect();
1203
1204        let write_addresses: Vec<u16> = (0..write_length).map(|i| write_address + i).collect();
1205
1206        let write_result = Self::with_address_lock(address_locks, &write_addresses, || async {
1207            model
1208                .write_multiple_registers(write_address, &write_values)
1209                .await
1210        })
1211        .await;
1212
1213        if let Err(e) = write_result {
1214            return Self::response_error(application, adu, response, &e).await;
1215        }
1216
1217        match model
1218            .read_holding_registers(read_address, read_length)
1219            .await
1220        {
1221            Ok(registers) => {
1222                let response_adu = ApplicationDataUnit {
1223                    transaction: adu.transaction,
1224                    unit: adu.unit,
1225                    fc: adu.fc,
1226                    data: pack_registers(&registers, read_length),
1227                };
1228                response(application.encode(&response_adu)).await
1229            }
1230            Err(e) => Self::response_error(application, adu, response, &e).await,
1231        }
1232    }
1233
1234    // FC43/14 - Read Device Identification
1235    async fn handle_fc43_14(
1236        application: &Arc<A>,
1237        model: &dyn ModbusSlaveModel,
1238        adu: &ApplicationDataUnit,
1239        response: SlaveResponseFn,
1240    ) -> Result<(), ModbusError> {
1241        if adu.data.len() != 3 || adu.data[0] != 0x0e {
1242            return Self::response_error(
1243                application,
1244                adu,
1245                response,
1246                &get_error_by_code(ErrorCode::IllegalFunction),
1247            )
1248            .await;
1249        }
1250
1251        let read_device_id_code = adu.data[1];
1252        let mut object_id = adu.data[2];
1253
1254        match read_device_id_code {
1255            0x01 => {
1256                if object_id > 0x02 || (object_id > 0x06 && object_id < 0x80) {
1257                    object_id = 0x00;
1258                }
1259            }
1260            0x02 => {
1261                if object_id > 0x06 {
1262                    object_id = 0x00;
1263                }
1264            }
1265            0x03 => {
1266                if object_id > 0x06 && object_id < 0x80 {
1267                    object_id = 0x00;
1268                }
1269            }
1270            0x04 => {
1271                if object_id > 0x06 && object_id < 0x80 {
1272                    return Self::response_error(
1273                        application,
1274                        adu,
1275                        response,
1276                        &get_error_by_code(ErrorCode::IllegalDataAddress),
1277                    )
1278                    .await;
1279                }
1280            }
1281            _ => {
1282                return Self::response_error(
1283                    application,
1284                    adu,
1285                    response,
1286                    &get_error_by_code(ErrorCode::IllegalDataValue),
1287                )
1288                .await;
1289            }
1290        }
1291
1292        match model.read_device_identification().await {
1293            Ok(identification) => {
1294                let mut objects: Vec<(u8, String)> = vec![
1295                    (0x00, "null".to_string()),
1296                    (0x01, "null".to_string()),
1297                    (0x02, "null".to_string()),
1298                ];
1299                for (k, v) in identification {
1300                    if let Some(pos) = objects.iter().position(|(id, _)| *id == k) {
1301                        objects[pos] = (k, v);
1302                    } else {
1303                        objects.push((k, v));
1304                    }
1305                }
1306                objects.sort_by_key(|(id, _)| *id);
1307
1308                let has_object_id = objects.iter().any(|(id, _)| *id == object_id);
1309                if !has_object_id {
1310                    if read_device_id_code == 0x04 {
1311                        return Self::response_error(
1312                            application,
1313                            adu,
1314                            response,
1315                            &get_error_by_code(ErrorCode::IllegalDataAddress),
1316                        )
1317                        .await;
1318                    }
1319                    object_id = 0x00;
1320                }
1321
1322                let max_id = objects.last().map(|(id, _)| *id).unwrap_or(0);
1323                // Per Modbus V1.1b3 §6.21, Extended range is 0x80..=0xFF
1324                // (inclusive at 0x80). Off-by-one here meant an Extended
1325                // object at exactly 0x80 was under-reported as 0x82.
1326                let conformity_level = if max_id >= 0x80 {
1327                    0x83
1328                } else if max_id > 0x02 {
1329                    0x82
1330                } else {
1331                    0x81
1332                };
1333
1334                let mut ids = Vec::new();
1335                let mut total_length = 10usize;
1336                let mut last_id = 0u8;
1337
1338                for &(id, ref value) in &objects {
1339                    if id < object_id {
1340                        continue;
1341                    }
1342                    if value.len() > 245 {
1343                        return Self::response_error(
1344                            application,
1345                            adu,
1346                            response,
1347                            &get_error_by_code(ErrorCode::ServerDeviceFailure),
1348                        )
1349                        .await;
1350                    }
1351                    if total_length + 2 + value.len() > 253 {
1352                        if last_id == 0 {
1353                            last_id = id;
1354                        }
1355                        continue;
1356                    }
1357                    total_length += 2 + value.len();
1358                    ids.push(id);
1359                    if read_device_id_code == 0x04 {
1360                        break;
1361                    }
1362                }
1363
1364                let mut data = vec![
1365                    0x0e,
1366                    read_device_id_code,
1367                    conformity_level,
1368                    if last_id == 0 { 0x00 } else { 0xff },
1369                    last_id,
1370                    ids.len() as u8,
1371                ];
1372                for id in ids {
1373                    if let Some((_, value)) = objects.iter().find(|(oid, _)| *oid == id) {
1374                        data.push(id);
1375                        data.push(value.len() as u8);
1376                        data.extend_from_slice(value.as_bytes());
1377                    }
1378                }
1379
1380                let response_adu = ApplicationDataUnit {
1381                    transaction: adu.transaction,
1382                    unit: adu.unit,
1383                    fc: adu.fc,
1384                    data,
1385                };
1386                response(application.encode(&response_adu)).await
1387            }
1388            Err(e) => Self::response_error(application, adu, response, &e).await,
1389        }
1390    }
1391
1392    pub fn add_custom_function_code(&self, cfc: CustomFunctionCode) {
1393        self.application.add_custom_function_code(cfc.clone());
1394        self.custom_function_codes
1395            .lock()
1396            .unwrap()
1397            .insert(cfc.fc, cfc);
1398    }
1399
1400    pub fn remove_custom_function_code(&self, fc: u8) {
1401        self.application.remove_custom_function_code(fc);
1402        self.custom_function_codes.lock().unwrap().remove(&fc);
1403    }
1404}