1use busrt::client::AsyncClient;
2use busrt::{rpc::Rpc, Frame, QoS};
3use eva_common::{EResult, Error};
4use log::error;
5use once_cell::sync::OnceCell;
6use rmodbus::{
7 server::{context::ModbusContext, ModbusFrame},
8 ModbusFrameBuf, ModbusProto,
9};
10use uuid::Uuid;
11
12static BUS_TOPIC_IN: OnceCell<String> = OnceCell::new();
13static BUS_TOPIC_OUT: OnceCell<String> = OnceCell::new();
14
15pub async fn init(port_svc: &str, client: &mut dyn AsyncClient) -> EResult<()> {
16 let bus_topic_in = format!("SVE/{}/bus/in/", port_svc);
17 let bus_topic_out = format!("SVE/{}/bus/out/", port_svc);
18 client
19 .subscribe(&format!("{}#", bus_topic_in), QoS::Processed)
20 .await?;
21 BUS_TOPIC_IN
22 .set(bus_topic_in)
23 .map_err(|_| Error::core("Unable to set BUS_TOPIC_IN"))?;
24 BUS_TOPIC_OUT
25 .set(bus_topic_out)
26 .map_err(|_| Error::core("Unable to set BUS_TOPIC_OUT"))?;
27 Ok(())
28}
29
30pub async fn process_modbus_frame<
31 const C: usize,
32 const D: usize,
33 const I: usize,
34 const H: usize,
35>(
36 frame: Frame,
37 ctx: &mut ModbusContext<C, D, I, H>,
38 unit: u8,
39 rpc: &impl Rpc,
40) {
41 if let Some(topic) = frame.topic() {
42 if let Some(cid) = topic.strip_prefix(BUS_TOPIC_IN.get().unwrap()) {
43 match cid.parse::<Uuid>() {
44 Ok(client_id) => {
45 let mut response = Vec::new();
46 let mut buf = frame.payload().to_vec();
47 buf.resize(256, 0);
48 let frame_buf: ModbusFrameBuf = buf.try_into().unwrap();
49 let mut frame =
50 ModbusFrame::new(unit, &frame_buf, ModbusProto::TcpUdp, &mut response);
51 if let Err(e) = frame.parse() {
52 error!("client {} frame parse error: {}", client_id, e);
53 return;
54 }
55 if frame.processing_required {
56 let result = if frame.readonly {
57 frame.process_read(ctx)
58 } else {
59 frame.process_write(ctx)
60 };
61 if let Err(e) = result {
62 error!("client {} frame process error: {}", client_id, e);
63 return;
64 }
65 }
66 if frame.response_required {
67 frame.finalize_response().unwrap();
68 let _ = rpc
69 .client()
70 .lock()
71 .await
72 .publish(
73 &format!("{}{}", BUS_TOPIC_OUT.get().unwrap(), client_id),
74 response.into(),
75 QoS::Processed,
76 )
77 .await;
78 }
79 }
80 Err(e) => {
81 error!("invalid incoming topic {}: {}", topic, e);
82 }
83 }
84 }
85 }
86}