Skip to main content

huawei_inverter/
client.rs

1use crate::{
2    error::ReadError, helpers::request_failure_actions, lock::get_connection,
3    pointrange::PointRanges, Config, DynValue, ModbusError, Model, Point, PointDefinition,
4    PointRange, Value, Word, WriteError, WriteQueue,
5};
6use itertools::Itertools;
7use std::{net::SocketAddr, time::Duration};
8use tokio::time::sleep;
9use tokio_modbus::{Slave, SlaveId};
10
11#[derive(Debug, Clone, Copy)]
12pub struct Client {
13    config: Config,
14}
15
16#[derive(Debug, thiserror::Error)]
17pub enum ClientError {
18    #[error(transparent)]
19    Modbus(#[from] ModbusError),
20    #[error(transparent)]
21    Read(#[from] ReadError),
22    #[error(transparent)]
23    Write(#[from] WriteError),
24    #[error("IO error: {0}")]
25    IO(#[from] std::io::Error),
26}
27
28impl Client {
29    pub fn new(config: Config) -> Self {
30        Self { config }
31    }
32
33    pub fn socket_addr(&self) -> SocketAddr {
34        self.config.socket_addr
35    }
36
37    pub fn slave(&self) -> Slave {
38        self.config.slave
39    }
40
41    pub fn slave_id(&self) -> SlaveId {
42        self.config.slave.0
43    }
44
45    pub fn connect_timeout(&self) -> Duration {
46        self.config.connect_timeout
47    }
48
49    pub fn read_timeout(&self) -> Duration {
50        self.config.read_timeout
51    }
52
53    pub fn write_timeout(&self) -> Duration {
54        self.config.write_timeout
55    }
56
57    pub fn num_read_retries(&self) -> usize {
58        self.config.num_read_retries
59    }
60
61    pub fn num_write_retries(&self) -> usize {
62        self.config.num_write_retries
63    }
64
65    pub async fn read_model<M: Model>(&self) -> Result<M, ClientError> {
66        // "125" seems to be a hard limit for how much can be read from Huawei inverter
67        // at the same time.
68        let max_read_length = 125;
69        let point_ranges = M::point_ranges();
70        let ranges = if max_read_length == point_ranges.max_read_words() {
71            point_ranges
72        } else {
73            PointRanges::new(M::READABLE_POINT_DEFINITIONS.to_owned(), max_read_length)
74        };
75        // First read gets timeout without waiting at least one second after connect.
76        let raw = self.read_raw(&ranges).await?;
77        Ok(M::parse(&raw)?)
78    }
79    pub async fn read_point<V: Value>(&self, point: &Point<V>) -> Result<V, ClientError> {
80        // First read gets timeout without waiting at least one second after connect.
81        let ranges = PointRanges::new(vec![point.def], 125);
82        let data = self.read_raw(&ranges).await?;
83        Ok(ranges
84            .parse_one(point.to_owned(), &data)
85            .map_err(ReadError::from)?)
86    }
87
88    pub async fn read_point_dyn(&self, def: PointDefinition) -> Result<DynValue, ClientError> {
89        let ranges = PointRanges::new(vec![def], 125);
90        let data = self.read_raw(&ranges).await?;
91        Ok(def
92            .decode_dyn(&data[0..def.quantity as usize])
93            .map_err(ReadError::from)?)
94    }
95
96    pub async fn write_point<V: Value + std::fmt::Debug>(
97        &self,
98        point: Point<V>,
99        value: V,
100    ) -> Result<(), ClientError> {
101        // Validate that value is correct before trying to insert
102        value.validate().map_err(WriteError::from)?;
103
104        let raw_value = value.encode();
105        self.write_raw(&[(point.def, raw_value)]).await?;
106        Ok(())
107    }
108
109    pub async fn write_multiple(&self, writer: &mut WriteQueue) -> Result<(), ClientError> {
110        let point_values = writer
111            .dequeue_all()
112            .into_iter()
113            .map(|w| (w.point_def(), w.raw_value().to_owned()))
114            .collect_vec();
115        self.write_raw(&point_values).await?;
116        Ok(())
117    }
118
119    /// Read raw data from modbus
120    pub async fn read_raw(&self, ranges: &PointRanges) -> Result<Vec<u16>, ClientError> {
121        let mut data: Vec<u16> = Vec::with_capacity(ranges.address_len());
122        let num_retries = self.num_read_retries();
123
124        let connection = get_connection(self.config).await?;
125        let mut conn_guard = connection.lock().await;
126
127        for range in ranges.ranges() {
128            match range {
129                PointRange::Gap(range) => {
130                    tracing::trace!("Filling range {}..{} with zeroes…", range.start, range.end);
131                    data.extend((range.start..range.end).map(|_| 0));
132                }
133                PointRange::Definitions { range, .. } => {
134                    let start = range.start;
135                    let quantity = range.end - range.start;
136                    for retry_no in 0..=num_retries {
137                        let last_retry = retry_no == num_retries;
138                        match conn_guard.read_registers(start, quantity).await {
139                            Ok(chunk) => {
140                                data.extend(chunk);
141                                break;
142                            }
143                            Err(error) => {
144                                if last_retry {
145                                    return Err(error.into());
146                                }
147                                tracing::debug!(
148                                    "Modbus request failed with error: {:?}, retrying...",
149                                    error
150                                );
151                                let actions = request_failure_actions(error)?;
152                                for action in actions {
153                                    match action {
154                                        crate::helpers::RequestFailureAction::Sleep(secs) => {
155                                            sleep(Duration::from_secs(secs.into())).await
156                                        }
157                                        crate::helpers::RequestFailureAction::NewContext => {
158                                            conn_guard.setup().await?;
159                                        }
160                                    }
161                                }
162                            }
163                        };
164                    }
165                }
166            }
167        }
168        Ok(data)
169    }
170
171    pub async fn write_raw(
172        &self,
173        point_values: &[(PointDefinition, Box<[Word]>)],
174    ) -> Result<(), ClientError> {
175        let num_retries = self.num_write_retries();
176
177        for (def, _) in point_values {
178            if !def.is_writable() {
179                return Err(WriteError::NotWritable.into());
180            }
181        }
182
183        let connection = get_connection(self.config).await?;
184        let mut conn_guard = connection.lock().await;
185
186        for (point_def, raw_value) in point_values {
187            if raw_value.len() > point_def.quantity as usize {
188                return Err(WriteError::ValueTooLarge.into());
189            }
190
191            for retry_no in 0..=num_retries {
192                let last_retry = retry_no == num_retries;
193                match conn_guard
194                    .write_registers(point_def.address, raw_value)
195                    .await
196                {
197                    Ok(_) => {
198                        tracing::debug!(
199                            "Wrote {:?} word(s) to point {:?}",
200                            raw_value.len(),
201                            point_def.address
202                        );
203                        break;
204                    }
205                    Err(error) => {
206                        if last_retry {
207                            return Err(error.into());
208                        }
209                        tracing::debug!(
210                            "Modbus request failed with error: {:?}, retrying...",
211                            error
212                        );
213                        let actions = request_failure_actions(error)?;
214                        for action in actions {
215                            match action {
216                                crate::helpers::RequestFailureAction::Sleep(secs) => {
217                                    sleep(Duration::from_secs(secs.into())).await
218                                }
219                                crate::helpers::RequestFailureAction::NewContext => {
220                                    conn_guard.setup().await?;
221                                }
222                            }
223                        }
224                    }
225                };
226            }
227        }
228        Ok(())
229    }
230}