eva_sim_modbus/
lib.rs

1use busrt::client::AsyncClient;
2use busrt::{rpc::Rpc, Frame, QoS};
3use eva_common::{EResult, Error};
4use log::error;
5use once_cell::sync::OnceCell;
6use rmodbus::{
7    server::{context::ModbusContext, ModbusFrame},
8    ModbusFrameBuf, ModbusProto,
9};
10use uuid::Uuid;
11
12static BUS_TOPIC_IN: OnceCell<String> = OnceCell::new();
13static BUS_TOPIC_OUT: OnceCell<String> = OnceCell::new();
14
15pub async fn init(port_svc: &str, client: &mut dyn AsyncClient) -> EResult<()> {
16    let bus_topic_in = format!("SVE/{}/bus/in/", port_svc);
17    let bus_topic_out = format!("SVE/{}/bus/out/", port_svc);
18    client
19        .subscribe(&format!("{}#", bus_topic_in), QoS::Processed)
20        .await?;
21    BUS_TOPIC_IN
22        .set(bus_topic_in)
23        .map_err(|_| Error::core("Unable to set BUS_TOPIC_IN"))?;
24    BUS_TOPIC_OUT
25        .set(bus_topic_out)
26        .map_err(|_| Error::core("Unable to set BUS_TOPIC_OUT"))?;
27    Ok(())
28}
29
30pub async fn process_modbus_frame<
31    const C: usize,
32    const D: usize,
33    const I: usize,
34    const H: usize,
35>(
36    frame: Frame,
37    ctx: &mut ModbusContext<C, D, I, H>,
38    unit: u8,
39    rpc: &impl Rpc,
40) {
41    if let Some(topic) = frame.topic() {
42        if let Some(cid) = topic.strip_prefix(BUS_TOPIC_IN.get().unwrap()) {
43            match cid.parse::<Uuid>() {
44                Ok(client_id) => {
45                    let mut response = Vec::new();
46                    let mut buf = frame.payload().to_vec();
47                    buf.resize(256, 0);
48                    let frame_buf: ModbusFrameBuf = buf.try_into().unwrap();
49                    let mut frame =
50                        ModbusFrame::new(unit, &frame_buf, ModbusProto::TcpUdp, &mut response);
51                    if let Err(e) = frame.parse() {
52                        error!("client {} frame parse error: {}", client_id, e);
53                        return;
54                    }
55                    if frame.processing_required {
56                        let result = if frame.readonly {
57                            frame.process_read(ctx)
58                        } else {
59                            frame.process_write(ctx)
60                        };
61                        if let Err(e) = result {
62                            error!("client {} frame process error: {}", client_id, e);
63                            return;
64                        }
65                    }
66                    if frame.response_required {
67                        frame.finalize_response().unwrap();
68                        let _ = rpc
69                            .client()
70                            .lock()
71                            .await
72                            .publish(
73                                &format!("{}{}", BUS_TOPIC_OUT.get().unwrap(), client_id),
74                                response.into(),
75                                QoS::Processed,
76                            )
77                            .await;
78                    }
79                }
80                Err(e) => {
81                    error!("invalid incoming topic {}: {}", topic, e);
82                }
83            }
84        }
85    }
86}