1use crate::error::{get_code_by_error, get_error_by_code, ErrorCode, ModbusError};
2use crate::layers::application::{ApplicationLayer, ApplicationProtocol, ApplicationRole};
3use crate::layers::physical::{ConnectionId, PhysicalLayer, ResponseFn};
4use crate::types::{
5 AddressRange, ApplicationDataUnit, CustomFunctionCode, FramedDataUnit, ServerId,
6};
7use crate::utils::{check_range, pack_coils, pack_registers};
8use async_trait::async_trait;
9use std::collections::{HashMap, VecDeque};
10use std::future::Future;
11use std::pin::Pin;
12use std::sync::Arc;
13use tokio::sync::broadcast::error::RecvError;
14
15type SlaveResponseFn = Arc<
16 dyn Fn(Vec<u8>) -> Pin<Box<dyn Future<Output = Result<(), ModbusError>> + Send>> + Send + Sync,
17>;
18
19#[async_trait]
20pub trait ModbusSlaveModel: Send + Sync {
21 fn unit(&self) -> u8;
22 fn address_range(&self) -> AddressRange;
23
24 async fn intercept(&self, _fc: u8, _data: &[u8]) -> Result<Option<Vec<u8>>, ModbusError> {
25 Ok(None)
26 }
27
28 async fn read_coils(&self, _address: u16, _length: u16) -> Result<Vec<bool>, ModbusError> {
29 Err(ModbusError::IllegalFunction)
30 }
31 async fn write_single_coil(&self, _address: u16, _value: bool) -> Result<(), ModbusError> {
32 Err(ModbusError::IllegalFunction)
33 }
34 async fn write_multiple_coils(&self, address: u16, values: &[bool]) -> Result<(), ModbusError> {
38 for (i, &v) in values.iter().enumerate() {
39 self.write_single_coil(address + i as u16, v).await?;
40 }
41 Ok(())
42 }
43
44 async fn read_discrete_inputs(
45 &self,
46 _address: u16,
47 _length: u16,
48 ) -> Result<Vec<bool>, ModbusError> {
49 Err(ModbusError::IllegalFunction)
50 }
51
52 async fn read_holding_registers(
53 &self,
54 _address: u16,
55 _length: u16,
56 ) -> Result<Vec<u16>, ModbusError> {
57 Err(ModbusError::IllegalFunction)
58 }
59 async fn write_single_register(&self, _address: u16, _value: u16) -> Result<(), ModbusError> {
60 Err(ModbusError::IllegalFunction)
61 }
62 async fn write_multiple_registers(
64 &self,
65 address: u16,
66 values: &[u16],
67 ) -> Result<(), ModbusError> {
68 for (i, &v) in values.iter().enumerate() {
69 self.write_single_register(address + i as u16, v).await?;
70 }
71 Ok(())
72 }
73 async fn mask_write_register(
76 &self,
77 address: u16,
78 and_mask: u16,
79 or_mask: u16,
80 ) -> Result<(), ModbusError> {
81 let regs = self.read_holding_registers(address, 1).await?;
82 let current = *regs.first().ok_or(ModbusError::ServerDeviceFailure)?;
83 let new = (current & and_mask) | (or_mask & !and_mask);
84 self.write_single_register(address, new).await
85 }
86
87 async fn read_input_registers(
88 &self,
89 _address: u16,
90 _length: u16,
91 ) -> Result<Vec<u16>, ModbusError> {
92 Err(ModbusError::IllegalFunction)
93 }
94
95 async fn report_server_id(&self) -> Result<ServerId, ModbusError> {
96 Err(ModbusError::IllegalFunction)
97 }
98 async fn read_device_identification(&self) -> Result<HashMap<u8, String>, ModbusError> {
99 Err(ModbusError::IllegalFunction)
100 }
101}
102
103#[derive(Clone, Copy, Debug, Default)]
106pub struct ModbusSlaveOptions {
107 pub concurrent: bool,
114}
115
116struct QueueEntry {
117 items: VecDeque<(FramedDataUnit, ResponseFn)>,
118 processing: bool,
119}
120
121pub struct ModbusSlave<A: ApplicationLayer, P: PhysicalLayer> {
122 application: Arc<A>,
123 physical: Arc<P>,
124 pub models: Arc<std::sync::Mutex<HashMap<u8, Arc<dyn ModbusSlaveModel>>>>,
125 pub concurrent: bool,
126 queues: Arc<tokio::sync::Mutex<HashMap<ConnectionId, QueueEntry>>>,
127 tasks: tokio::sync::Mutex<Vec<tokio::task::JoinHandle<()>>>,
128 is_open: Arc<std::sync::atomic::AtomicBool>,
129 custom_function_codes: std::sync::Mutex<HashMap<u8, CustomFunctionCode>>,
130 clean_level: std::sync::atomic::AtomicU8,
131 address_locks: Arc<tokio::sync::Mutex<HashMap<u16, Arc<tokio::sync::Mutex<()>>>>>,
135}
136
137impl<A: ApplicationLayer + 'static, P: PhysicalLayer + 'static> ModbusSlave<A, P> {
138 pub fn new(application: Arc<A>, physical: Arc<P>) -> Self {
139 Self::with_options(application, physical, ModbusSlaveOptions::default())
140 }
141
142 pub fn with_options(
143 application: Arc<A>,
144 physical: Arc<P>,
145 options: ModbusSlaveOptions,
146 ) -> Self {
147 if options.concurrent && application.protocol() != ApplicationProtocol::Tcp {
148 panic!("concurrent mode requires a Modbus TCP application layer");
149 }
150 application
151 .set_role(ApplicationRole::Slave)
152 .expect("application layer is already bound to a different role");
153 Self {
154 application,
155 physical,
156 models: Arc::new(std::sync::Mutex::new(HashMap::new())),
157 concurrent: options.concurrent,
158 queues: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
159 tasks: tokio::sync::Mutex::new(Vec::new()),
160 is_open: Arc::new(std::sync::atomic::AtomicBool::new(false)),
161 custom_function_codes: std::sync::Mutex::new(HashMap::new()),
162 address_locks: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
163 clean_level: std::sync::atomic::AtomicU8::new(0),
164 }
165 }
166
167 pub fn add(&self, model: Box<dyn ModbusSlaveModel>) {
168 let unit = model.unit();
169 let arc: Arc<dyn ModbusSlaveModel> = Arc::from(model);
170 self.models.lock().unwrap().insert(unit, arc);
171 }
172
173 pub fn remove(&self, unit: u8) {
174 self.models.lock().unwrap().remove(&unit);
175 }
176
177 pub fn is_open(&self) -> bool {
178 self.is_open.load(std::sync::atomic::Ordering::Acquire)
179 }
180
181 pub fn is_destroyed(&self) -> bool {
182 self.clean_level.load(std::sync::atomic::Ordering::Acquire) == 2
183 || self.physical.is_destroyed()
184 }
185
186 pub async fn open(&self, options: P::OpenOptions) -> Result<(), ModbusError> {
187 self.is_open
188 .store(true, std::sync::atomic::Ordering::Release);
189 self.clean_level
190 .store(0, std::sync::atomic::Ordering::Release);
191 self.queues.lock().await.clear();
193
194 let application = Arc::clone(&self.application);
195 let models = Arc::clone(&self.models);
196 let queues = Arc::clone(&self.queues);
197 let address_locks = Arc::clone(&self.address_locks);
198 let custom_fcs: Arc<HashMap<u8, CustomFunctionCode>> =
199 Arc::new(self.custom_function_codes.lock().unwrap().clone());
200 let concurrent = self.concurrent;
201 let is_open = Arc::clone(&self.is_open);
202 let mut framing_rx = self.application.subscribe_framing();
203 let framing_task = tokio::spawn(async move {
204 loop {
205 match framing_rx.recv().await {
206 Ok(framing) => {
207 if !is_open.load(std::sync::atomic::Ordering::Acquire) {
208 continue;
209 }
210 let frame = FramedDataUnit {
211 adu: framing.adu,
212 raw: framing.raw,
213 };
214 if concurrent {
215 let app = Arc::clone(&application);
218 let mdls = Arc::clone(&models);
219 let cfs = Arc::clone(&custom_fcs);
220 let locks = Arc::clone(&address_locks);
221 tokio::spawn(async move {
222 Self::process_frame(
223 &app,
224 &mdls,
225 &cfs,
226 &locks,
227 frame,
228 framing.response,
229 )
230 .await;
231 });
232 } else {
233 Self::enqueue_and_drain(
237 Arc::clone(&queues),
238 Arc::clone(&application),
239 Arc::clone(&models),
240 Arc::clone(&custom_fcs),
241 Arc::clone(&address_locks),
242 framing.connection,
243 frame,
244 framing.response,
245 )
246 .await;
247 }
248 }
249 Err(RecvError::Lagged(_)) => continue,
250 Err(RecvError::Closed) => break,
251 }
252 }
253 });
254
255 let queues_for_close = Arc::clone(&self.queues);
261 let mut conn_close_rx = self.physical.subscribe_connection_close();
262 let conn_close_task = tokio::spawn(async move {
263 loop {
264 match conn_close_rx.recv().await {
265 Ok(conn_id) => {
266 let mut g = queues_for_close.lock().await;
267 if let Some(entry) = g.get_mut(&conn_id) {
268 entry.items.clear();
269 if !entry.processing {
270 g.remove(&conn_id);
271 }
272 }
273 }
274 Err(RecvError::Lagged(_)) => continue,
275 Err(RecvError::Closed) => break,
276 }
277 }
278 });
279
280 let queues_for_full_close = Arc::clone(&self.queues);
282 let mut close_rx = self.physical.subscribe_close();
283 let close_task = tokio::spawn(async move {
284 loop {
285 match close_rx.recv().await {
286 Ok(()) => {
287 queues_for_full_close.lock().await.clear();
288 }
289 Err(RecvError::Lagged(_)) => continue,
290 Err(RecvError::Closed) => break,
291 }
292 }
293 });
294
295 self.tasks
296 .lock()
297 .await
298 .extend([framing_task, conn_close_task, close_task]);
299
300 self.physical.open(options).await?;
301 Ok(())
302 }
303
304 async fn clean(&self, level: u8) {
305 let current = self.clean_level.load(std::sync::atomic::Ordering::Acquire);
306 if current == 2 {
307 return;
308 }
309 if current == 1 && level == 1 {
310 return;
311 }
312 self.is_open
313 .store(false, std::sync::atomic::Ordering::Release);
314 self.queues.lock().await.clear();
315 self.address_locks.lock().await.clear();
316 if level == 2 {
317 self.custom_function_codes.lock().unwrap().clear();
318 self.models.lock().unwrap().clear();
319 }
320 self.clean_level
321 .store(level, std::sync::atomic::Ordering::Release);
322 }
323
324 pub async fn close(&self) -> Result<(), ModbusError> {
325 if self.clean_level.load(std::sync::atomic::Ordering::Acquire) == 2 {
326 return Ok(());
327 }
328 self.clean(1).await;
329 return self.physical.close().await;
330 }
331
332 pub async fn destroy(&self) {
333 if self.clean_level.load(std::sync::atomic::Ordering::Acquire) == 2 {
334 return;
335 }
336 self.clean(2).await;
337 {
338 let mut tasks = self.tasks.lock().await;
339 for task in tasks.drain(..) {
340 task.abort();
341 }
342 }
343 let _ = self.physical.destroy().await;
344 }
345
346 async fn with_address_lock<F, Fut, T>(
350 address_locks: &tokio::sync::Mutex<HashMap<u16, Arc<tokio::sync::Mutex<()>>>>,
351 addresses: &[u16],
352 f: F,
353 ) -> T
354 where
355 F: FnOnce() -> Fut,
356 Fut: std::future::Future<Output = T>,
357 {
358 let mut sorted: Vec<u16> = addresses.to_vec();
359 sorted.sort_unstable();
360 sorted.dedup();
361
362 let lock_arcs: Vec<Arc<tokio::sync::Mutex<()>>> = {
364 let mut locks = address_locks.lock().await;
365 sorted
366 .iter()
367 .map(|&addr| {
368 locks
369 .entry(addr)
370 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
371 .clone()
372 })
373 .collect()
374 };
375
376 let mut guards: Vec<tokio::sync::MutexGuard<'_, ()>> = Vec::with_capacity(lock_arcs.len());
380 for arc in &lock_arcs {
381 guards.push(arc.lock().await);
382 }
383
384 let result = f().await;
385 drop(guards);
386 drop(lock_arcs);
387
388 {
392 let mut locks = address_locks.lock().await;
393 for &addr in &sorted {
394 if let Some(arc) = locks.get(&addr) {
395 if Arc::strong_count(arc) == 1 {
396 locks.remove(&addr);
397 }
398 }
399 }
400 }
401
402 result
403 }
404
405 #[allow(clippy::too_many_arguments)]
409 async fn enqueue_and_drain(
410 queues: Arc<tokio::sync::Mutex<HashMap<ConnectionId, QueueEntry>>>,
411 application: Arc<A>,
412 models: Arc<std::sync::Mutex<HashMap<u8, Arc<dyn ModbusSlaveModel>>>>,
413 custom_fcs: Arc<HashMap<u8, CustomFunctionCode>>,
414 address_locks: Arc<tokio::sync::Mutex<HashMap<u16, Arc<tokio::sync::Mutex<()>>>>>,
415 connection: ConnectionId,
416 frame: FramedDataUnit,
417 response: ResponseFn,
418 ) {
419 let should_spawn = {
420 let mut g = queues.lock().await;
421 let entry = g.entry(Arc::clone(&connection)).or_insert(QueueEntry {
422 items: VecDeque::new(),
423 processing: false,
424 });
425 entry.items.push_back((frame, response));
426 if entry.processing {
427 false
428 } else {
429 entry.processing = true;
430 true
431 }
432 };
433 if should_spawn {
434 tokio::spawn(async move {
435 Self::drain_loop(
436 queues,
437 application,
438 models,
439 custom_fcs,
440 address_locks,
441 connection,
442 )
443 .await;
444 });
445 }
446 }
447
448 async fn drain_loop(
454 queues: Arc<tokio::sync::Mutex<HashMap<ConnectionId, QueueEntry>>>,
455 application: Arc<A>,
456 models: Arc<std::sync::Mutex<HashMap<u8, Arc<dyn ModbusSlaveModel>>>>,
457 custom_fcs: Arc<HashMap<u8, CustomFunctionCode>>,
458 address_locks: Arc<tokio::sync::Mutex<HashMap<u16, Arc<tokio::sync::Mutex<()>>>>>,
459 connection: ConnectionId,
460 ) {
461 loop {
462 let next = {
463 let mut g = queues.lock().await;
464 match g.get_mut(&connection) {
465 Some(entry) => entry.items.pop_front(),
466 None => return,
467 }
468 };
469 match next {
470 Some((frame, response)) => {
471 Self::process_frame(
472 &application,
473 &models,
474 &custom_fcs,
475 &address_locks,
476 frame,
477 response,
478 )
479 .await;
480 }
481 None => {
482 let mut g = queues.lock().await;
483 if let Some(entry) = g.get_mut(&connection) {
484 if entry.items.is_empty() {
485 g.remove(&connection);
486 return;
487 }
488 } else {
491 return;
492 }
493 }
494 }
495 }
496 }
497
498 async fn process_frame(
499 application: &Arc<A>,
500 models: &Arc<std::sync::Mutex<HashMap<u8, Arc<dyn ModbusSlaveModel>>>>,
501 custom_fcs: &HashMap<u8, CustomFunctionCode>,
502 address_locks: &tokio::sync::Mutex<HashMap<u16, Arc<tokio::sync::Mutex<()>>>>,
503 frame: FramedDataUnit,
504 response_fn: ResponseFn,
505 ) {
506 let unit = frame.adu.unit;
507 let models_snapshot: Vec<Arc<dyn ModbusSlaveModel>> = {
513 let g = models.lock().unwrap();
514 if unit == 0 {
515 g.values().map(Arc::clone).collect()
516 } else {
517 match g.get(&unit) {
518 Some(m) => vec![Arc::clone(m)],
519 None => return,
520 }
521 }
522 };
523
524 for model_arc in models_snapshot {
525 let model: &dyn ModbusSlaveModel = &*model_arc;
526
527 let response: SlaveResponseFn = if unit == 0 {
528 Arc::new(|_| {
529 Box::pin(async { Ok(()) })
530 as Pin<Box<dyn Future<Output = Result<(), ModbusError>> + Send>>
531 })
532 } else {
533 let response_fn = Arc::clone(&response_fn);
534 Arc::new(move |data| {
535 let response_fn = Arc::clone(&response_fn);
536 Box::pin(async move { response_fn(data).await })
537 as Pin<Box<dyn Future<Output = Result<(), ModbusError>> + Send>>
538 })
539 };
540
541 match model.intercept(frame.adu.fc, &frame.adu.data).await {
542 Ok(Some(data)) => {
543 let response_adu = ApplicationDataUnit {
544 transaction: frame.adu.transaction,
545 unit: frame.adu.unit,
546 fc: frame.adu.fc,
547 data,
548 };
549 let encoded = application.encode(&response_adu);
550 let _ = response(encoded).await;
551 continue;
552 }
553 Ok(None) => {}
554 Err(err) => {
555 let _ = Self::response_error(application, &frame.adu, response, &err).await;
556 continue;
557 }
558 }
559
560 let result = match frame.adu.fc {
561 0x01 => {
562 Self::handle_fc1(application, model, &frame.adu, Arc::clone(&response)).await
563 }
564 0x02 => {
565 Self::handle_fc2(application, model, &frame.adu, Arc::clone(&response)).await
566 }
567 0x03 => {
568 Self::handle_fc3(application, model, &frame.adu, Arc::clone(&response)).await
569 }
570 0x04 => {
571 Self::handle_fc4(application, model, &frame.adu, Arc::clone(&response)).await
572 }
573 0x05 => {
574 Self::handle_fc5(application, model, &frame.adu, Arc::clone(&response)).await
575 }
576 0x06 => {
577 Self::handle_fc6(application, model, &frame.adu, Arc::clone(&response)).await
578 }
579 0x0f => {
580 Self::handle_fc15(application, model, &frame.adu, Arc::clone(&response)).await
581 }
582 0x10 => {
583 Self::handle_fc16(application, model, &frame.adu, Arc::clone(&response)).await
584 }
585 0x11 => {
586 Self::handle_fc17(application, model, &frame.adu, Arc::clone(&response)).await
587 }
588 0x16 => {
589 Self::handle_fc22(
590 application,
591 model,
592 address_locks,
593 &frame.adu,
594 Arc::clone(&response),
595 )
596 .await
597 }
598 0x17 => {
599 Self::handle_fc23(
600 application,
601 model,
602 address_locks,
603 &frame.adu,
604 Arc::clone(&response),
605 )
606 .await
607 }
608 0x2b => {
609 Self::handle_fc43_14(application, model, &frame.adu, Arc::clone(&response))
610 .await
611 }
612 _ => {
613 if let Some(cfc) = custom_fcs.get(&frame.adu.fc) {
614 if let Some(ref handler) = cfc.handle {
615 let handler_clone: std::sync::Arc<
616 dyn Fn(Vec<u8>, u8) -> crate::types::CustomFcHandleResult
617 + Send
618 + Sync,
619 > = Arc::clone(handler);
620 let pdu = frame.adu.data.clone();
621 match handler_clone(pdu, frame.adu.unit).await {
622 Ok(response_data) => {
623 let response_adu = ApplicationDataUnit {
624 transaction: frame.adu.transaction,
625 unit: frame.adu.unit,
626 fc: frame.adu.fc,
627 data: response_data,
628 };
629 let _ = response(application.encode(&response_adu)).await;
630 continue;
631 }
632 Err(e) => {
633 let _ = Self::response_error(
634 application,
635 &frame.adu,
636 Arc::clone(&response),
637 &e,
638 )
639 .await;
640 continue;
641 }
642 }
643 }
644 }
645 Self::response_error(
646 application,
647 &frame.adu,
648 Arc::clone(&response),
649 &get_error_by_code(ErrorCode::IllegalFunction),
650 )
651 .await
652 }
653 };
654
655 if let Err(e) = result {
656 let _ = Self::response_error(application, &frame.adu, response, &e).await;
657 }
658 }
659 }
660
661 async fn response_error(
662 application: &Arc<A>,
663 adu: &ApplicationDataUnit,
664 response: SlaveResponseFn,
665 error: &ModbusError,
666 ) -> Result<(), ModbusError> {
667 let error_code = get_code_by_error(error) as u8;
668 let response_adu = ApplicationDataUnit {
669 transaction: adu.transaction,
670 unit: adu.unit,
671 fc: adu.fc | 0x80,
672 data: vec![error_code],
673 };
674 let encoded = application.encode(&response_adu);
675 response(encoded).await
676 }
677
678 async fn handle_fc1(
680 application: &Arc<A>,
681 model: &dyn ModbusSlaveModel,
682 adu: &ApplicationDataUnit,
683 response: SlaveResponseFn,
684 ) -> Result<(), ModbusError> {
685 if adu.data.len() != 4 {
686 return Ok(());
687 }
688 let address = u16::from_be_bytes([adu.data[0], adu.data[1]]);
689 let length = u16::from_be_bytes([adu.data[2], adu.data[3]]);
690
691 if !(1..=0x07d0).contains(&length) {
692 return Self::response_error(
693 application,
694 adu,
695 response,
696 &get_error_by_code(ErrorCode::IllegalDataValue),
697 )
698 .await;
699 }
700
701 if !check_range(&[address, address + length], &model.address_range().coils) {
702 return Self::response_error(
703 application,
704 adu,
705 response,
706 &get_error_by_code(ErrorCode::IllegalDataAddress),
707 )
708 .await;
709 }
710
711 match model.read_coils(address, length).await {
712 Ok(coils) => {
713 let response_adu = ApplicationDataUnit {
714 transaction: adu.transaction,
715 unit: adu.unit,
716 fc: adu.fc,
717 data: pack_coils(&coils, length),
718 };
719 response(application.encode(&response_adu)).await
720 }
721 Err(e) => Self::response_error(application, adu, response, &e).await,
722 }
723 }
724
725 async fn handle_fc2(
727 application: &Arc<A>,
728 model: &dyn ModbusSlaveModel,
729 adu: &ApplicationDataUnit,
730 response: SlaveResponseFn,
731 ) -> Result<(), ModbusError> {
732 if adu.data.len() != 4 {
733 return Ok(());
734 }
735 let address = u16::from_be_bytes([adu.data[0], adu.data[1]]);
736 let length = u16::from_be_bytes([adu.data[2], adu.data[3]]);
737
738 if !(1..=0x07d0).contains(&length) {
739 return Self::response_error(
740 application,
741 adu,
742 response,
743 &get_error_by_code(ErrorCode::IllegalDataValue),
744 )
745 .await;
746 }
747
748 if !check_range(
749 &[address, address + length],
750 &model.address_range().discrete_inputs,
751 ) {
752 return Self::response_error(
753 application,
754 adu,
755 response,
756 &get_error_by_code(ErrorCode::IllegalDataAddress),
757 )
758 .await;
759 }
760
761 match model.read_discrete_inputs(address, length).await {
762 Ok(inputs) => {
763 let response_adu = ApplicationDataUnit {
764 transaction: adu.transaction,
765 unit: adu.unit,
766 fc: adu.fc,
767 data: pack_coils(&inputs, length),
768 };
769 response(application.encode(&response_adu)).await
770 }
771 Err(e) => Self::response_error(application, adu, response, &e).await,
772 }
773 }
774
775 async fn handle_fc3(
777 application: &Arc<A>,
778 model: &dyn ModbusSlaveModel,
779 adu: &ApplicationDataUnit,
780 response: SlaveResponseFn,
781 ) -> Result<(), ModbusError> {
782 if adu.data.len() != 4 {
783 return Ok(());
784 }
785 let address = u16::from_be_bytes([adu.data[0], adu.data[1]]);
786 let length = u16::from_be_bytes([adu.data[2], adu.data[3]]);
787
788 if !(1..=0x007d).contains(&length) {
789 return Self::response_error(
790 application,
791 adu,
792 response,
793 &get_error_by_code(ErrorCode::IllegalDataValue),
794 )
795 .await;
796 }
797
798 if !check_range(
799 &[address, address + length],
800 &model.address_range().holding_registers,
801 ) {
802 return Self::response_error(
803 application,
804 adu,
805 response,
806 &get_error_by_code(ErrorCode::IllegalDataAddress),
807 )
808 .await;
809 }
810
811 match model.read_holding_registers(address, length).await {
812 Ok(registers) => {
813 let response_adu = ApplicationDataUnit {
814 transaction: adu.transaction,
815 unit: adu.unit,
816 fc: adu.fc,
817 data: pack_registers(®isters, length),
818 };
819 response(application.encode(&response_adu)).await
820 }
821 Err(e) => Self::response_error(application, adu, response, &e).await,
822 }
823 }
824
825 async fn handle_fc4(
827 application: &Arc<A>,
828 model: &dyn ModbusSlaveModel,
829 adu: &ApplicationDataUnit,
830 response: SlaveResponseFn,
831 ) -> Result<(), ModbusError> {
832 if adu.data.len() != 4 {
833 return Ok(());
834 }
835 let address = u16::from_be_bytes([adu.data[0], adu.data[1]]);
836 let length = u16::from_be_bytes([adu.data[2], adu.data[3]]);
837
838 if !(1..=0x007d).contains(&length) {
839 return Self::response_error(
840 application,
841 adu,
842 response,
843 &get_error_by_code(ErrorCode::IllegalDataValue),
844 )
845 .await;
846 }
847
848 if !check_range(
849 &[address, address + length],
850 &model.address_range().input_registers,
851 ) {
852 return Self::response_error(
853 application,
854 adu,
855 response,
856 &get_error_by_code(ErrorCode::IllegalDataAddress),
857 )
858 .await;
859 }
860
861 match model.read_input_registers(address, length).await {
862 Ok(registers) => {
863 let response_adu = ApplicationDataUnit {
864 transaction: adu.transaction,
865 unit: adu.unit,
866 fc: adu.fc,
867 data: pack_registers(®isters, length),
868 };
869 response(application.encode(&response_adu)).await
870 }
871 Err(e) => Self::response_error(application, adu, response, &e).await,
872 }
873 }
874
875 async fn handle_fc5(
877 application: &Arc<A>,
878 model: &dyn ModbusSlaveModel,
879 adu: &ApplicationDataUnit,
880 response: SlaveResponseFn,
881 ) -> Result<(), ModbusError> {
882 if adu.data.len() != 4 {
883 return Ok(());
884 }
885 let address = u16::from_be_bytes([adu.data[0], adu.data[1]]);
886 let value = u16::from_be_bytes([adu.data[2], adu.data[3]]);
887
888 if value != 0x0000 && value != 0xff00 {
889 return Self::response_error(
890 application,
891 adu,
892 response,
893 &get_error_by_code(ErrorCode::IllegalDataValue),
894 )
895 .await;
896 }
897
898 if !check_range(&[address], &model.address_range().coils) {
899 return Self::response_error(
900 application,
901 adu,
902 response,
903 &get_error_by_code(ErrorCode::IllegalDataAddress),
904 )
905 .await;
906 }
907
908 match model.write_single_coil(address, value == 0xff00).await {
909 Ok(()) => response(application.encode(adu)).await,
910 Err(e) => Self::response_error(application, adu, response, &e).await,
911 }
912 }
913
914 async fn handle_fc6(
916 application: &Arc<A>,
917 model: &dyn ModbusSlaveModel,
918 adu: &ApplicationDataUnit,
919 response: SlaveResponseFn,
920 ) -> Result<(), ModbusError> {
921 if adu.data.len() != 4 {
922 return Ok(());
923 }
924 let address = u16::from_be_bytes([adu.data[0], adu.data[1]]);
925 let value = u16::from_be_bytes([adu.data[2], adu.data[3]]);
926
927 if !check_range(&[address], &model.address_range().holding_registers) {
928 return Self::response_error(
929 application,
930 adu,
931 response,
932 &get_error_by_code(ErrorCode::IllegalDataAddress),
933 )
934 .await;
935 }
936
937 match model.write_single_register(address, value).await {
938 Ok(()) => response(application.encode(adu)).await,
939 Err(e) => Self::response_error(application, adu, response, &e).await,
940 }
941 }
942
943 async fn handle_fc15(
945 application: &Arc<A>,
946 model: &dyn ModbusSlaveModel,
947 adu: &ApplicationDataUnit,
948 response: SlaveResponseFn,
949 ) -> Result<(), ModbusError> {
950 if adu.data.len() < 6 || adu.data.len() != 5 + adu.data[4] as usize {
951 return Ok(());
952 }
953 let address = u16::from_be_bytes([adu.data[0], adu.data[1]]);
954 let length = u16::from_be_bytes([adu.data[2], adu.data[3]]);
955 let byte_count = adu.data[4];
956
957 if !(1..=0x07b0).contains(&length) || byte_count as u16 != (length + 7) / 8 {
958 return Self::response_error(
959 application,
960 adu,
961 response,
962 &get_error_by_code(ErrorCode::IllegalDataValue),
963 )
964 .await;
965 }
966
967 if !check_range(&[address, address + length], &model.address_range().coils) {
968 return Self::response_error(
969 application,
970 adu,
971 response,
972 &get_error_by_code(ErrorCode::IllegalDataAddress),
973 )
974 .await;
975 }
976
977 let values: Vec<bool> = (0..length)
978 .map(|i| (adu.data[5 + i as usize / 8] >> (i % 8)) & 1 == 1)
979 .collect();
980
981 let result = model.write_multiple_coils(address, &values).await;
982
983 match result {
984 Ok(()) => {
985 let mut data = vec![0u8; 4];
986 data[0..2].copy_from_slice(&address.to_be_bytes());
987 data[2..4].copy_from_slice(&length.to_be_bytes());
988 let response_adu = ApplicationDataUnit {
989 transaction: adu.transaction,
990 unit: adu.unit,
991 fc: adu.fc,
992 data,
993 };
994 response(application.encode(&response_adu)).await
995 }
996 Err(e) => Self::response_error(application, adu, response, &e).await,
997 }
998 }
999
1000 async fn handle_fc16(
1002 application: &Arc<A>,
1003 model: &dyn ModbusSlaveModel,
1004 adu: &ApplicationDataUnit,
1005 response: SlaveResponseFn,
1006 ) -> Result<(), ModbusError> {
1007 if adu.data.len() < 6 || adu.data.len() != 5 + adu.data[4] as usize {
1008 return Ok(());
1009 }
1010 let address = u16::from_be_bytes([adu.data[0], adu.data[1]]);
1011 let length = u16::from_be_bytes([adu.data[2], adu.data[3]]);
1012 let byte_count = adu.data[4];
1013
1014 if !(1..=0x007b).contains(&length) || byte_count as u16 != length * 2 {
1015 return Self::response_error(
1016 application,
1017 adu,
1018 response,
1019 &get_error_by_code(ErrorCode::IllegalDataValue),
1020 )
1021 .await;
1022 }
1023
1024 if !check_range(
1025 &[address, address + length],
1026 &model.address_range().holding_registers,
1027 ) {
1028 return Self::response_error(
1029 application,
1030 adu,
1031 response,
1032 &get_error_by_code(ErrorCode::IllegalDataAddress),
1033 )
1034 .await;
1035 }
1036
1037 let values: Vec<u16> = (0..length)
1038 .map(|i| {
1039 u16::from_be_bytes([adu.data[5 + i as usize * 2], adu.data[6 + i as usize * 2]])
1040 })
1041 .collect();
1042
1043 let result = model.write_multiple_registers(address, &values).await;
1044
1045 match result {
1046 Ok(()) => {
1047 let mut data = vec![0u8; 4];
1048 data[0..2].copy_from_slice(&address.to_be_bytes());
1049 data[2..4].copy_from_slice(&length.to_be_bytes());
1050 let response_adu = ApplicationDataUnit {
1051 transaction: adu.transaction,
1052 unit: adu.unit,
1053 fc: adu.fc,
1054 data,
1055 };
1056 response(application.encode(&response_adu)).await
1057 }
1058 Err(e) => Self::response_error(application, adu, response, &e).await,
1059 }
1060 }
1061
1062 async fn handle_fc17(
1064 application: &Arc<A>,
1065 model: &dyn ModbusSlaveModel,
1066 adu: &ApplicationDataUnit,
1067 response: SlaveResponseFn,
1068 ) -> Result<(), ModbusError> {
1069 if !adu.data.is_empty() {
1070 return Ok(());
1071 }
1072
1073 match model.report_server_id().await {
1074 Ok(server_id) => {
1075 let server_id_bytes = if server_id.server_id.is_empty() {
1079 vec![model.unit()]
1080 } else {
1081 server_id.server_id.clone()
1082 };
1083 let byte_count = server_id_bytes.len() + 1 + server_id.additional_data.len();
1084 if byte_count > 255 {
1085 return Self::response_error(
1086 application,
1087 adu,
1088 response,
1089 &get_error_by_code(ErrorCode::ServerDeviceFailure),
1090 )
1091 .await;
1092 }
1093 let mut data = Vec::with_capacity(1 + byte_count);
1094 data.push(byte_count as u8);
1095 data.extend_from_slice(&server_id_bytes);
1096 data.push(if server_id.run_indicator_status {
1097 0xff
1098 } else {
1099 0x00
1100 });
1101 data.extend_from_slice(&server_id.additional_data);
1102 let response_adu = ApplicationDataUnit {
1103 transaction: adu.transaction,
1104 unit: adu.unit,
1105 fc: adu.fc,
1106 data,
1107 };
1108 response(application.encode(&response_adu)).await
1109 }
1110 Err(e) => Self::response_error(application, adu, response, &e).await,
1111 }
1112 }
1113
1114 async fn handle_fc22(
1116 application: &Arc<A>,
1117 model: &dyn ModbusSlaveModel,
1118 address_locks: &tokio::sync::Mutex<HashMap<u16, Arc<tokio::sync::Mutex<()>>>>,
1119 adu: &ApplicationDataUnit,
1120 response: SlaveResponseFn,
1121 ) -> Result<(), ModbusError> {
1122 if adu.data.len() != 6 {
1123 return Ok(());
1124 }
1125 let address = u16::from_be_bytes([adu.data[0], adu.data[1]]);
1126 let and_mask = u16::from_be_bytes([adu.data[2], adu.data[3]]);
1127 let or_mask = u16::from_be_bytes([adu.data[4], adu.data[5]]);
1128
1129 if !check_range(&[address], &model.address_range().holding_registers) {
1130 return Self::response_error(
1131 application,
1132 adu,
1133 response,
1134 &get_error_by_code(ErrorCode::IllegalDataAddress),
1135 )
1136 .await;
1137 }
1138
1139 let result = Self::with_address_lock(address_locks, &[address], || async {
1140 model.mask_write_register(address, and_mask, or_mask).await
1141 })
1142 .await;
1143
1144 match result {
1145 Ok(()) => response(application.encode(adu)).await,
1146 Err(e) => Self::response_error(application, adu, response, &e).await,
1147 }
1148 }
1149
1150 async fn handle_fc23(
1152 application: &Arc<A>,
1153 model: &dyn ModbusSlaveModel,
1154 address_locks: &tokio::sync::Mutex<HashMap<u16, Arc<tokio::sync::Mutex<()>>>>,
1155 adu: &ApplicationDataUnit,
1156 response: SlaveResponseFn,
1157 ) -> Result<(), ModbusError> {
1158 if adu.data.len() < 10 || adu.data.len() != 9 + adu.data[8] as usize {
1159 return Ok(());
1160 }
1161 let read_address = u16::from_be_bytes([adu.data[0], adu.data[1]]);
1162 let read_length = u16::from_be_bytes([adu.data[2], adu.data[3]]);
1163 let write_address = u16::from_be_bytes([adu.data[4], adu.data[5]]);
1164 let write_length = u16::from_be_bytes([adu.data[6], adu.data[7]]);
1165 let byte_count = adu.data[8];
1166
1167 if !(1..=0x007d).contains(&read_length)
1168 || !(1..=0x0079).contains(&write_length)
1169 || byte_count as u16 != write_length * 2
1170 {
1171 return Self::response_error(
1172 application,
1173 adu,
1174 response,
1175 &get_error_by_code(ErrorCode::IllegalDataValue),
1176 )
1177 .await;
1178 }
1179
1180 if !check_range(
1181 &[
1182 read_address,
1183 read_address + read_length,
1184 write_address,
1185 write_address + write_length,
1186 ],
1187 &model.address_range().holding_registers,
1188 ) {
1189 return Self::response_error(
1190 application,
1191 adu,
1192 response,
1193 &get_error_by_code(ErrorCode::IllegalDataAddress),
1194 )
1195 .await;
1196 }
1197
1198 let write_values: Vec<u16> = (0..write_length)
1199 .map(|i| {
1200 u16::from_be_bytes([adu.data[9 + i as usize * 2], adu.data[10 + i as usize * 2]])
1201 })
1202 .collect();
1203
1204 let write_addresses: Vec<u16> = (0..write_length).map(|i| write_address + i).collect();
1205
1206 let write_result = Self::with_address_lock(address_locks, &write_addresses, || async {
1207 model
1208 .write_multiple_registers(write_address, &write_values)
1209 .await
1210 })
1211 .await;
1212
1213 if let Err(e) = write_result {
1214 return Self::response_error(application, adu, response, &e).await;
1215 }
1216
1217 match model
1218 .read_holding_registers(read_address, read_length)
1219 .await
1220 {
1221 Ok(registers) => {
1222 let response_adu = ApplicationDataUnit {
1223 transaction: adu.transaction,
1224 unit: adu.unit,
1225 fc: adu.fc,
1226 data: pack_registers(®isters, read_length),
1227 };
1228 response(application.encode(&response_adu)).await
1229 }
1230 Err(e) => Self::response_error(application, adu, response, &e).await,
1231 }
1232 }
1233
1234 async fn handle_fc43_14(
1236 application: &Arc<A>,
1237 model: &dyn ModbusSlaveModel,
1238 adu: &ApplicationDataUnit,
1239 response: SlaveResponseFn,
1240 ) -> Result<(), ModbusError> {
1241 if adu.data.len() != 3 || adu.data[0] != 0x0e {
1242 return Self::response_error(
1243 application,
1244 adu,
1245 response,
1246 &get_error_by_code(ErrorCode::IllegalFunction),
1247 )
1248 .await;
1249 }
1250
1251 let read_device_id_code = adu.data[1];
1252 let mut object_id = adu.data[2];
1253
1254 match read_device_id_code {
1255 0x01 => {
1256 if object_id > 0x02 || (object_id > 0x06 && object_id < 0x80) {
1257 object_id = 0x00;
1258 }
1259 }
1260 0x02 => {
1261 if object_id > 0x06 {
1262 object_id = 0x00;
1263 }
1264 }
1265 0x03 => {
1266 if object_id > 0x06 && object_id < 0x80 {
1267 object_id = 0x00;
1268 }
1269 }
1270 0x04 => {
1271 if object_id > 0x06 && object_id < 0x80 {
1272 return Self::response_error(
1273 application,
1274 adu,
1275 response,
1276 &get_error_by_code(ErrorCode::IllegalDataAddress),
1277 )
1278 .await;
1279 }
1280 }
1281 _ => {
1282 return Self::response_error(
1283 application,
1284 adu,
1285 response,
1286 &get_error_by_code(ErrorCode::IllegalDataValue),
1287 )
1288 .await;
1289 }
1290 }
1291
1292 match model.read_device_identification().await {
1293 Ok(identification) => {
1294 let mut objects: Vec<(u8, String)> = vec![
1295 (0x00, "null".to_string()),
1296 (0x01, "null".to_string()),
1297 (0x02, "null".to_string()),
1298 ];
1299 for (k, v) in identification {
1300 if let Some(pos) = objects.iter().position(|(id, _)| *id == k) {
1301 objects[pos] = (k, v);
1302 } else {
1303 objects.push((k, v));
1304 }
1305 }
1306 objects.sort_by_key(|(id, _)| *id);
1307
1308 let has_object_id = objects.iter().any(|(id, _)| *id == object_id);
1309 if !has_object_id {
1310 if read_device_id_code == 0x04 {
1311 return Self::response_error(
1312 application,
1313 adu,
1314 response,
1315 &get_error_by_code(ErrorCode::IllegalDataAddress),
1316 )
1317 .await;
1318 }
1319 object_id = 0x00;
1320 }
1321
1322 let max_id = objects.last().map(|(id, _)| *id).unwrap_or(0);
1323 let conformity_level = if max_id >= 0x80 {
1327 0x83
1328 } else if max_id > 0x02 {
1329 0x82
1330 } else {
1331 0x81
1332 };
1333
1334 let mut ids = Vec::new();
1335 let mut total_length = 10usize;
1336 let mut last_id = 0u8;
1337
1338 for &(id, ref value) in &objects {
1339 if id < object_id {
1340 continue;
1341 }
1342 if value.len() > 245 {
1343 return Self::response_error(
1344 application,
1345 adu,
1346 response,
1347 &get_error_by_code(ErrorCode::ServerDeviceFailure),
1348 )
1349 .await;
1350 }
1351 if total_length + 2 + value.len() > 253 {
1352 if last_id == 0 {
1353 last_id = id;
1354 }
1355 continue;
1356 }
1357 total_length += 2 + value.len();
1358 ids.push(id);
1359 if read_device_id_code == 0x04 {
1360 break;
1361 }
1362 }
1363
1364 let mut data = vec![
1365 0x0e,
1366 read_device_id_code,
1367 conformity_level,
1368 if last_id == 0 { 0x00 } else { 0xff },
1369 last_id,
1370 ids.len() as u8,
1371 ];
1372 for id in ids {
1373 if let Some((_, value)) = objects.iter().find(|(oid, _)| *oid == id) {
1374 data.push(id);
1375 data.push(value.len() as u8);
1376 data.extend_from_slice(value.as_bytes());
1377 }
1378 }
1379
1380 let response_adu = ApplicationDataUnit {
1381 transaction: adu.transaction,
1382 unit: adu.unit,
1383 fc: adu.fc,
1384 data,
1385 };
1386 response(application.encode(&response_adu)).await
1387 }
1388 Err(e) => Self::response_error(application, adu, response, &e).await,
1389 }
1390 }
1391
1392 pub fn add_custom_function_code(&self, cfc: CustomFunctionCode) {
1393 self.application.add_custom_function_code(cfc.clone());
1394 self.custom_function_codes
1395 .lock()
1396 .unwrap()
1397 .insert(cfc.fc, cfc);
1398 }
1399
1400 pub fn remove_custom_function_code(&self, fc: u8) {
1401 self.application.remove_custom_function_code(fc);
1402 self.custom_function_codes.lock().unwrap().remove(&fc);
1403 }
1404}