huawei_inverter/
client.rs1use crate::{
2 error::ReadError, helpers::request_failure_actions, lock::get_connection,
3 pointrange::PointRanges, Config, DynValue, ModbusError, Model, Point, PointDefinition,
4 PointRange, Value, Word, WriteError, WriteQueue,
5};
6use itertools::Itertools;
7use std::{net::SocketAddr, time::Duration};
8use tokio::time::sleep;
9use tokio_modbus::{Slave, SlaveId};
10
11#[derive(Debug, Clone, Copy)]
12pub struct Client {
13 config: Config,
14}
15
16#[derive(Debug, thiserror::Error)]
17pub enum ClientError {
18 #[error(transparent)]
19 Modbus(#[from] ModbusError),
20 #[error(transparent)]
21 Read(#[from] ReadError),
22 #[error(transparent)]
23 Write(#[from] WriteError),
24 #[error("IO error: {0}")]
25 IO(#[from] std::io::Error),
26}
27
28impl Client {
29 pub fn new(config: Config) -> Self {
30 Self { config }
31 }
32
33 pub fn socket_addr(&self) -> SocketAddr {
34 self.config.socket_addr
35 }
36
37 pub fn slave(&self) -> Slave {
38 self.config.slave
39 }
40
41 pub fn slave_id(&self) -> SlaveId {
42 self.config.slave.0
43 }
44
45 pub fn connect_timeout(&self) -> Duration {
46 self.config.connect_timeout
47 }
48
49 pub fn read_timeout(&self) -> Duration {
50 self.config.read_timeout
51 }
52
53 pub fn write_timeout(&self) -> Duration {
54 self.config.write_timeout
55 }
56
57 pub fn num_read_retries(&self) -> usize {
58 self.config.num_read_retries
59 }
60
61 pub fn num_write_retries(&self) -> usize {
62 self.config.num_write_retries
63 }
64
65 pub async fn read_model<M: Model>(&self) -> Result<M, ClientError> {
66 let max_read_length = 125;
69 let point_ranges = M::point_ranges();
70 let ranges = if max_read_length == point_ranges.max_read_words() {
71 point_ranges
72 } else {
73 PointRanges::new(M::READABLE_POINT_DEFINITIONS.to_owned(), max_read_length)
74 };
75 let raw = self.read_raw(&ranges).await?;
77 Ok(M::parse(&raw)?)
78 }
79 pub async fn read_point<V: Value>(&self, point: &Point<V>) -> Result<V, ClientError> {
80 let ranges = PointRanges::new(vec![point.def], 125);
82 let data = self.read_raw(&ranges).await?;
83 Ok(ranges
84 .parse_one(point.to_owned(), &data)
85 .map_err(ReadError::from)?)
86 }
87
88 pub async fn read_point_dyn(&self, def: PointDefinition) -> Result<DynValue, ClientError> {
89 let ranges = PointRanges::new(vec![def], 125);
90 let data = self.read_raw(&ranges).await?;
91 Ok(def
92 .decode_dyn(&data[0..def.quantity as usize])
93 .map_err(ReadError::from)?)
94 }
95
96 pub async fn write_point<V: Value + std::fmt::Debug>(
97 &self,
98 point: Point<V>,
99 value: V,
100 ) -> Result<(), ClientError> {
101 value.validate().map_err(WriteError::from)?;
103
104 let raw_value = value.encode();
105 self.write_raw(&[(point.def, raw_value)]).await?;
106 Ok(())
107 }
108
109 pub async fn write_multiple(&self, writer: &mut WriteQueue) -> Result<(), ClientError> {
110 let point_values = writer
111 .dequeue_all()
112 .into_iter()
113 .map(|w| (w.point_def(), w.raw_value().to_owned()))
114 .collect_vec();
115 self.write_raw(&point_values).await?;
116 Ok(())
117 }
118
119 pub async fn read_raw(&self, ranges: &PointRanges) -> Result<Vec<u16>, ClientError> {
121 let mut data: Vec<u16> = Vec::with_capacity(ranges.address_len());
122 let num_retries = self.num_read_retries();
123
124 let connection = get_connection(self.config).await?;
125 let mut conn_guard = connection.lock().await;
126
127 for range in ranges.ranges() {
128 match range {
129 PointRange::Gap(range) => {
130 tracing::trace!("Filling range {}..{} with zeroes…", range.start, range.end);
131 data.extend((range.start..range.end).map(|_| 0));
132 }
133 PointRange::Definitions { range, .. } => {
134 let start = range.start;
135 let quantity = range.end - range.start;
136 for retry_no in 0..=num_retries {
137 let last_retry = retry_no == num_retries;
138 match conn_guard.read_registers(start, quantity).await {
139 Ok(chunk) => {
140 data.extend(chunk);
141 break;
142 }
143 Err(error) => {
144 if last_retry {
145 return Err(error.into());
146 }
147 tracing::debug!(
148 "Modbus request failed with error: {:?}, retrying...",
149 error
150 );
151 let actions = request_failure_actions(error)?;
152 for action in actions {
153 match action {
154 crate::helpers::RequestFailureAction::Sleep(secs) => {
155 sleep(Duration::from_secs(secs.into())).await
156 }
157 crate::helpers::RequestFailureAction::NewContext => {
158 conn_guard.setup().await?;
159 }
160 }
161 }
162 }
163 };
164 }
165 }
166 }
167 }
168 Ok(data)
169 }
170
171 pub async fn write_raw(
172 &self,
173 point_values: &[(PointDefinition, Box<[Word]>)],
174 ) -> Result<(), ClientError> {
175 let num_retries = self.num_write_retries();
176
177 for (def, _) in point_values {
178 if !def.is_writable() {
179 return Err(WriteError::NotWritable.into());
180 }
181 }
182
183 let connection = get_connection(self.config).await?;
184 let mut conn_guard = connection.lock().await;
185
186 for (point_def, raw_value) in point_values {
187 if raw_value.len() > point_def.quantity as usize {
188 return Err(WriteError::ValueTooLarge.into());
189 }
190
191 for retry_no in 0..=num_retries {
192 let last_retry = retry_no == num_retries;
193 match conn_guard
194 .write_registers(point_def.address, raw_value)
195 .await
196 {
197 Ok(_) => {
198 tracing::debug!(
199 "Wrote {:?} word(s) to point {:?}",
200 raw_value.len(),
201 point_def.address
202 );
203 break;
204 }
205 Err(error) => {
206 if last_retry {
207 return Err(error.into());
208 }
209 tracing::debug!(
210 "Modbus request failed with error: {:?}, retrying...",
211 error
212 );
213 let actions = request_failure_actions(error)?;
214 for action in actions {
215 match action {
216 crate::helpers::RequestFailureAction::Sleep(secs) => {
217 sleep(Duration::from_secs(secs.into())).await
218 }
219 crate::helpers::RequestFailureAction::NewContext => {
220 conn_guard.setup().await?;
221 }
222 }
223 }
224 }
225 };
226 }
227 }
228 Ok(())
229 }
230}