1use drmem_api::{
2 device,
3 driver::{self, DriverConfig},
4 Error, Result,
5};
6use std::future::Future;
7use std::net::SocketAddrV4;
8use std::sync::Arc;
9use std::{convert::Infallible, pin::Pin};
10use tokio::{
11 io::{self, AsyncReadExt},
12 net::{
13 tcp::{OwnedReadHalf, OwnedWriteHalf},
14 TcpStream,
15 },
16 sync::Mutex,
17 time,
18};
19use tracing::{debug, error, info, warn, Span};
20
21#[cfg_attr(test, derive(Debug, PartialEq))]
25enum State {
26 Unknown,
27 Off { off_time: u64 },
28 On { off_time: u64, on_time: u64 },
29}
30
31impl State {
35 pub fn off_event(
43 &mut self,
44 stamp: u64,
45 gpm: f64,
46 ) -> Option<(u64, f64, f64)> {
47 match *self {
48 State::Unknown => {
49 info!("sync-ed with OFF state");
50 *self = State::Off { off_time: stamp };
51 None
52 }
53
54 State::Off { .. } => {
55 warn!("ignoring duplicate OFF event");
56 None
57 }
58
59 State::On { off_time, on_time } => {
60 if on_time >= stamp {
67 warn!(
68 "timestamp for OFF event is {} ms ahead of ON event",
69 on_time - stamp
70 );
71 *self = State::Off { off_time: stamp };
72 return None;
73 }
74
75 let on_time = (stamp - on_time) as f64;
76
77 if on_time > 500.0 {
95 let off_time = stamp - off_time;
96 let duty = on_time * 1000.0 / (off_time as f64);
97 let in_flow = (gpm * duty / 10.0).round() / 100.0;
98
99 *self = State::Off { off_time: stamp };
100 Some((off_time, duty.round() / 10.0, in_flow))
101 } else {
102 warn!("ignoring short ON time -- {:.0} ms", on_time);
103 None
104 }
105 }
106 }
107 }
108
109 pub fn on_event(&mut self, stamp: u64) -> bool {
115 match *self {
116 State::Unknown => false,
117
118 State::Off { off_time } => {
119 if stamp > off_time {
124 *self = State::On {
125 off_time,
126 on_time: stamp,
127 };
128 true
129 } else {
130 warn!(
131 "timestamp for ON event is {} ms ahead of OFF event",
132 off_time - stamp
133 );
134 false
135 }
136 }
137
138 State::On { .. } => {
139 warn!("ignoring duplicate ON event");
140 false
141 }
142 }
143 }
144}
145
146pub struct Instance {
147 state: State,
148 gpm: f64,
149 rx: OwnedReadHalf,
150 _tx: OwnedWriteHalf,
151}
152
153pub struct Devices {
154 d_service: driver::ReadOnlyDevice<bool>,
155 d_state: driver::ReadOnlyDevice<bool>,
156 d_duty: driver::ReadOnlyDevice<f64>,
157 d_inflow: driver::ReadOnlyDevice<f64>,
158 d_duration: driver::ReadOnlyDevice<f64>,
159}
160
161impl Instance {
162 pub const NAME: &'static str = "sump-gpio";
163
164 pub const SUMMARY: &'static str =
165 "monitors and computes parameters for a sump pump";
166
167 pub const DESCRIPTION: &'static str = include_str!("../README.md");
168
169 fn elapsed(millis: u64) -> String {
170 match (millis + 500) / 1000 {
171 dur if dur >= 3600 * 24 - 30 => {
172 let dur = dur + 30;
173
174 format!(
175 "{}d{}h{}m",
176 dur / (3600 * 24),
177 (dur / 3600) % 24,
178 (dur / 60) % 60
179 )
180 }
181 dur if dur >= 3570 => {
182 let dur = dur + 30;
183
184 format!("{}h{}m", dur / 3600, (dur / 60) % 60)
185 }
186 dur if dur >= 60 => {
187 format!("{}m{}s", dur / 60, dur % 60)
188 }
189 dur => {
190 format!("{}s", dur)
191 }
192 }
193 }
194
195 fn get_cfg_address(cfg: &DriverConfig) -> Result<SocketAddrV4> {
198 match cfg.get("addr") {
199 Some(toml::value::Value::String(addr)) => {
200 if let Ok(addr) = addr.parse::<SocketAddrV4>() {
201 Ok(addr)
202 } else {
203 Err(Error::ConfigError(String::from(
204 "'addr' not in hostname:port format",
205 )))
206 }
207 }
208 Some(_) => Err(Error::ConfigError(String::from(
209 "'addr' config parameter should be a string",
210 ))),
211 None => Err(Error::ConfigError(String::from(
212 "missing 'addr' parameter in config",
213 ))),
214 }
215 }
216
217 fn get_cfg_gpm(cfg: &DriverConfig) -> Result<f64> {
222 match cfg.get("gpm") {
223 Some(toml::value::Value::Integer(gpm)) => Ok(*gpm as f64),
224 Some(toml::value::Value::Float(gpm)) => Ok(*gpm),
225 Some(_) => Err(Error::ConfigError(String::from(
226 "'gpm' config parameter should be a number",
227 ))),
228 None => Err(Error::ConfigError(String::from(
229 "missing 'gpm' parameter in config",
230 ))),
231 }
232 }
233
234 fn connect(addr: &SocketAddrV4) -> Result<TcpStream> {
235 use socket2::{Domain, Socket, TcpKeepalive, Type};
236
237 let keepalive = TcpKeepalive::new()
238 .with_time(time::Duration::from_secs(5))
239 .with_interval(time::Duration::from_secs(5));
240 let socket = Socket::new(Domain::IPV4, Type::STREAM, None)
241 .expect("couldn't create socket");
242
243 socket
244 .set_tcp_keepalive(&keepalive)
245 .expect("couldn't enable keep-alive on sump socket");
246
247 match socket.connect_timeout(
248 &<SocketAddrV4 as Into<socket2::SockAddr>>::into(*addr),
249 time::Duration::from_millis(100),
250 ) {
251 Ok(()) => {
252 info!("connected");
253
254 socket
258 .set_nonblocking(true)
259 .expect("couldn't make socket nonblocking");
260
261 TcpStream::from_std(socket.into()).map_err(|_| {
262 error!("couldn't convert to tokio::TcpStream");
263 Error::MissingPeer(String::from("sump pump"))
264 })
265 }
266 Err(_) => {
267 error!("couldn't connect to {}", addr);
268 Err(Error::MissingPeer(String::from("sump pump")))
269 }
270 }
271 }
272
273 async fn get_reading(&mut self) -> io::Result<(u64, bool)> {
278 let stamp = self.rx.read_u64().await?;
279 let value = self.rx.read_u32().await?;
280
281 Ok((stamp, value != 0))
282 }
283}
284
285impl driver::API for Instance {
286 type DeviceSet = Devices;
287
288 fn register_devices(
289 core: driver::RequestChan,
290 _: &DriverConfig,
291 max_history: Option<usize>,
292 ) -> Pin<Box<dyn Future<Output = Result<Self::DeviceSet>> + Send>> {
293 let service_name = "service".parse::<device::Base>().unwrap();
294 let state_name = "state".parse::<device::Base>().unwrap();
295 let duty_name = "duty".parse::<device::Base>().unwrap();
296 let in_flow_name = "in-flow".parse::<device::Base>().unwrap();
297 let dur_name = "duration".parse::<device::Base>().unwrap();
298
299 Box::pin(async move {
300 let d_service =
303 core.add_ro_device(service_name, None, max_history).await?;
304 let d_state =
305 core.add_ro_device(state_name, None, max_history).await?;
306 let d_duty = core
307 .add_ro_device(duty_name, Some("%"), max_history)
308 .await?;
309 let d_inflow = core
310 .add_ro_device(in_flow_name, Some("gpm"), max_history)
311 .await?;
312 let d_duration = core
313 .add_ro_device(dur_name, Some("min"), max_history)
314 .await?;
315
316 Ok(Devices {
317 d_service,
318 d_state,
319 d_duty,
320 d_inflow,
321 d_duration,
322 })
323 })
324 }
325
326 fn create_instance(
327 cfg: &DriverConfig,
328 ) -> Pin<Box<dyn Future<Output = Result<Box<Self>>> + Send>> {
329 let addr = Instance::get_cfg_address(cfg);
330 let gpm = Instance::get_cfg_gpm(cfg);
331
332 let fut = async move {
333 let addr = addr?;
336 let gpm = gpm?;
337
338 Span::current().record("cfg", addr.to_string());
339
340 let (rx, _tx) = Instance::connect(&addr)?.into_split();
344
345 Ok(Box::new(Instance {
346 state: State::Unknown,
347 gpm,
348 rx,
349 _tx,
350 }))
351 };
352
353 Box::pin(fut)
354 }
355
356 fn run<'a>(
357 &'a mut self,
358 devices: Arc<Mutex<Devices>>,
359 ) -> Pin<Box<dyn Future<Output = Infallible> + Send + 'a>> {
360 let fut = async move {
361 {
365 let addr = self
366 .rx
367 .peer_addr()
368 .map(|v| format!("{}", v))
369 .unwrap_or_else(|_| String::from("**unknown**"));
370
371 Span::current().record("cfg", addr.as_str());
372 }
373
374 let mut devices = devices.lock().await;
375
376 devices.d_service.report_update(true).await;
377
378 loop {
379 match self.get_reading().await {
380 Ok((stamp, true)) => {
381 if self.state.on_event(stamp) {
382 devices.d_state.report_update(true).await;
383 }
384 }
385
386 Ok((stamp, false)) => {
387 let gpm = self.gpm;
388
389 if let Some((cycle, duty, in_flow)) =
390 self.state.off_event(stamp, gpm)
391 {
392 debug!(
393 "cycle: {}, duty: {:.1}%, inflow: {:.2} gpm",
394 Instance::elapsed(cycle),
395 duty,
396 in_flow
397 );
398
399 devices.d_state.report_update(false).await;
400 devices.d_duty.report_update(duty).await;
401 devices.d_inflow.report_update(in_flow).await;
402 devices
403 .d_duration
404 .report_update(
405 ((cycle as f64) / 600.0).round() / 100.0,
406 )
407 .await;
408 }
409 }
410
411 Err(e) => {
412 devices.d_state.report_update(false).await;
413 devices.d_service.report_update(false).await;
414 panic!("couldn't read sump state -- {:?}", e);
415 }
416 }
417 }
418 };
419
420 Box::pin(fut)
421 }
422}
423
424#[cfg(test)]
425mod tests {
426 use super::*;
427
428 #[test]
429 fn test_states() {
430 let mut state = State::Unknown;
431
432 assert_eq!(state.on_event(0), false);
433 assert_eq!(state, State::Unknown);
434
435 state = State::Off { off_time: 100 };
436
437 assert_eq!(state.on_event(0), false);
438 assert_eq!(state, State::Off { off_time: 100 });
439 assert_eq!(state.on_event(200), true);
440 assert_eq!(
441 state,
442 State::On {
443 off_time: 100,
444 on_time: 200
445 }
446 );
447
448 assert_eq!(state.on_event(200), false);
449 assert_eq!(
450 state,
451 State::On {
452 off_time: 100,
453 on_time: 200
454 }
455 );
456
457 state = State::Unknown;
458
459 assert_eq!(state.off_event(1000, 50.0), None);
460 assert_eq!(state, State::Off { off_time: 1000 });
461 assert_eq!(state.off_event(1100, 50.0), None);
462 assert_eq!(state, State::Off { off_time: 1000 });
463
464 state = State::On {
465 off_time: 1000,
466 on_time: 101000,
467 };
468
469 assert_eq!(state.off_event(1000, 50.0), None);
470 assert_eq!(state, State::Off { off_time: 1000 });
471
472 state = State::On {
473 off_time: 1000,
474 on_time: 101000,
475 };
476
477 assert_eq!(state.off_event(101500, 50.0), None);
478 assert_eq!(
479 state,
480 State::On {
481 off_time: 1000,
482 on_time: 101000
483 }
484 );
485
486 assert!(state.off_event(101501, 50.0).is_some());
487 assert_eq!(state, State::Off { off_time: 101501 });
488
489 state = State::On {
490 off_time: 0,
491 on_time: 540000,
492 };
493
494 assert_eq!(state.off_event(600000, 50.0), Some((600000, 10.0, 5.0)));
495 assert_eq!(state, State::Off { off_time: 600000 });
496
497 state = State::On {
498 off_time: 0,
499 on_time: 54000,
500 };
501
502 assert_eq!(state.off_event(60000, 60.0), Some((60000, 10.0, 6.0)));
503 assert_eq!(state, State::Off { off_time: 60000 });
504 }
505
506 #[test]
507 fn test_elapsed() {
508 assert_eq!(Instance::elapsed(0), "0s");
509 assert_eq!(Instance::elapsed(1000), "1s");
510 assert_eq!(Instance::elapsed(59000), "59s");
511 assert_eq!(Instance::elapsed(60000), "1m0s");
512
513 assert_eq!(Instance::elapsed(3569000), "59m29s");
514 assert_eq!(Instance::elapsed(3570000), "1h0m");
515 assert_eq!(Instance::elapsed(3599000), "1h0m");
516 assert_eq!(Instance::elapsed(3600000), "1h0m");
517
518 assert_eq!(Instance::elapsed(3600000 * 24 - 31000), "23h59m");
519 assert_eq!(Instance::elapsed(3600000 * 24 - 30000), "1d0h0m");
520 assert_eq!(Instance::elapsed(3600000 * 24 - 1000), "1d0h0m");
521 assert_eq!(Instance::elapsed(3600000 * 24), "1d0h0m");
522 }
523}