drmem_drv_sump/
lib.rs

1use drmem_api::{
2    device,
3    driver::{self, DriverConfig},
4    Error, Result,
5};
6use std::future::Future;
7use std::net::SocketAddrV4;
8use std::sync::Arc;
9use std::{convert::Infallible, pin::Pin};
10use tokio::{
11    io::{self, AsyncReadExt},
12    net::{
13        tcp::{OwnedReadHalf, OwnedWriteHalf},
14        TcpStream,
15    },
16    sync::Mutex,
17    time,
18};
19use tracing::{debug, error, info, warn, Span};
20
21// The sump pump monitor uses a state machine to decide when to
22// calculate the duty cycle and in-flow.
23
24#[cfg_attr(test, derive(Debug, PartialEq))]
25enum State {
26    Unknown,
27    Off { off_time: u64 },
28    On { off_time: u64, on_time: u64 },
29}
30
31// This interface allows a State value to update itself when an event
32// occurs.
33
34impl State {
35    // This method is called when an off event occurs. The timestamp
36    // of the off event needs to be provided. If the state machine has
37    // enough information of the previous pump cycle, it will return
38    // the duty cycle and in-flow rate. If the state machine is still
39    // sync-ing with the state, the state will get updated, but `None`
40    // will be returned.
41
42    pub fn off_event(
43        &mut self,
44        stamp: u64,
45        gpm: f64,
46    ) -> Option<(u64, f64, f64)> {
47        match *self {
48            State::Unknown => {
49                info!("sync-ed with OFF state");
50                *self = State::Off { off_time: stamp };
51                None
52            }
53
54            State::Off { .. } => {
55                warn!("ignoring duplicate OFF event");
56                None
57            }
58
59            State::On { off_time, on_time } => {
60                // The time stamp of the OFF time should come after
61                // the ON time. If it isn't, the sump pump task has a
62                // problem (i.e. system time was adjusted.) We can't
63                // give a decent computation, so just go into the DOWN
64                // state.
65
66                if on_time >= stamp {
67                    warn!(
68                        "timestamp for OFF event is {} ms ahead of ON event",
69                        on_time - stamp
70                    );
71                    *self = State::Off { off_time: stamp };
72                    return None;
73                }
74
75                let on_time = (stamp - on_time) as f64;
76
77                // After the first storm, there was one entry that
78                // glitched. The state of the motor registered "ON"
79                // for 50 ms, turned off, turned on 400ms later, and
80                // then stayed on for the rest of the normal,
81                // six-second cycle.
82                //
83                // I'm going under the assumption that the pump wasn't
84                // drawing enough current at the start of the cycle so
85                // the current switch's detection "faded" in and out.
86                // This could be due to not setting the sensitivity of
87                // the switch high enough or, possibly, the pump
88                // failing (once in a great while, we hear the pump go
89                // through a strange-sounding cycle.)
90                //
91                // If the ON cycle is less than a half second, we'll
92                // ignore it and stay in the ON state.
93
94                if on_time > 500.0 {
95                    let off_time = stamp - off_time;
96                    let duty = on_time * 1000.0 / (off_time as f64);
97                    let in_flow = (gpm * duty / 10.0).round() / 100.0;
98
99                    *self = State::Off { off_time: stamp };
100                    Some((off_time, duty.round() / 10.0, in_flow))
101                } else {
102                    warn!("ignoring short ON time -- {:.0} ms", on_time);
103                    None
104                }
105            }
106        }
107    }
108
109    // This method is called when updating the state with an on
110    // event. The timestamp of the on event needs to be provided. If
111    // the on event actually caused a state change, `true` is
112    // returned.
113
114    pub fn on_event(&mut self, stamp: u64) -> bool {
115        match *self {
116            State::Unknown => false,
117
118            State::Off { off_time } => {
119                // Make sure the ON time occurred *after* the OFF
120                // time. This is necessary for the computations to
121                // yield valid results.
122
123                if stamp > off_time {
124                    *self = State::On {
125                        off_time,
126                        on_time: stamp,
127                    };
128                    true
129                } else {
130                    warn!(
131                        "timestamp for ON event is {} ms ahead of OFF event",
132                        off_time - stamp
133                    );
134                    false
135                }
136            }
137
138            State::On { .. } => {
139                warn!("ignoring duplicate ON event");
140                false
141            }
142        }
143    }
144}
145
146pub struct Instance {
147    state: State,
148    gpm: f64,
149    rx: OwnedReadHalf,
150    _tx: OwnedWriteHalf,
151}
152
153pub struct Devices {
154    d_service: driver::ReadOnlyDevice<bool>,
155    d_state: driver::ReadOnlyDevice<bool>,
156    d_duty: driver::ReadOnlyDevice<f64>,
157    d_inflow: driver::ReadOnlyDevice<f64>,
158    d_duration: driver::ReadOnlyDevice<f64>,
159}
160
161impl Instance {
162    pub const NAME: &'static str = "sump-gpio";
163
164    pub const SUMMARY: &'static str =
165        "monitors and computes parameters for a sump pump";
166
167    pub const DESCRIPTION: &'static str = include_str!("../README.md");
168
169    fn elapsed(millis: u64) -> String {
170        match (millis + 500) / 1000 {
171            dur if dur >= 3600 * 24 - 30 => {
172                let dur = dur + 30;
173
174                format!(
175                    "{}d{}h{}m",
176                    dur / (3600 * 24),
177                    (dur / 3600) % 24,
178                    (dur / 60) % 60
179                )
180            }
181            dur if dur >= 3570 => {
182                let dur = dur + 30;
183
184                format!("{}h{}m", dur / 3600, (dur / 60) % 60)
185            }
186            dur if dur >= 60 => {
187                format!("{}m{}s", dur / 60, dur % 60)
188            }
189            dur => {
190                format!("{}s", dur)
191            }
192        }
193    }
194
195    // Attempts to pull the hostname/port for the remote process.
196
197    fn get_cfg_address(cfg: &DriverConfig) -> Result<SocketAddrV4> {
198        match cfg.get("addr") {
199            Some(toml::value::Value::String(addr)) => {
200                if let Ok(addr) = addr.parse::<SocketAddrV4>() {
201                    Ok(addr)
202                } else {
203                    Err(Error::ConfigError(String::from(
204                        "'addr' not in hostname:port format",
205                    )))
206                }
207            }
208            Some(_) => Err(Error::ConfigError(String::from(
209                "'addr' config parameter should be a string",
210            ))),
211            None => Err(Error::ConfigError(String::from(
212                "missing 'addr' parameter in config",
213            ))),
214        }
215    }
216
217    // Attempts to pull the gal-per-min parameter from the driver's
218    // configuration. The value can be specified as an integer or
219    // floating point. It gets returned only as an `f64`.
220
221    fn get_cfg_gpm(cfg: &DriverConfig) -> Result<f64> {
222        match cfg.get("gpm") {
223            Some(toml::value::Value::Integer(gpm)) => Ok(*gpm as f64),
224            Some(toml::value::Value::Float(gpm)) => Ok(*gpm),
225            Some(_) => Err(Error::ConfigError(String::from(
226                "'gpm' config parameter should be a number",
227            ))),
228            None => Err(Error::ConfigError(String::from(
229                "missing 'gpm' parameter in config",
230            ))),
231        }
232    }
233
234    fn connect(addr: &SocketAddrV4) -> Result<TcpStream> {
235        use socket2::{Domain, Socket, TcpKeepalive, Type};
236
237        let keepalive = TcpKeepalive::new()
238            .with_time(time::Duration::from_secs(5))
239            .with_interval(time::Duration::from_secs(5));
240        let socket = Socket::new(Domain::IPV4, Type::STREAM, None)
241            .expect("couldn't create socket");
242
243        socket
244            .set_tcp_keepalive(&keepalive)
245            .expect("couldn't enable keep-alive on sump socket");
246
247        match socket.connect_timeout(
248            &<SocketAddrV4 as Into<socket2::SockAddr>>::into(*addr),
249            time::Duration::from_millis(100),
250        ) {
251            Ok(()) => {
252                info!("connected");
253
254                // Before we move the socket into `tokio`'s control,
255                // it must be placed in non-blocking mode.
256
257                socket
258                    .set_nonblocking(true)
259                    .expect("couldn't make socket nonblocking");
260
261                TcpStream::from_std(socket.into()).map_err(|_| {
262                    error!("couldn't convert to tokio::TcpStream");
263                    Error::MissingPeer(String::from("sump pump"))
264                })
265            }
266            Err(_) => {
267                error!("couldn't connect to {}", addr);
268                Err(Error::MissingPeer(String::from("sump pump")))
269            }
270        }
271    }
272
273    // This function reads the next frame from the sump pump process.
274    // It either returns `Ok()` with the two fields' values or `Err()`
275    // if a socket error occurred.
276
277    async fn get_reading(&mut self) -> io::Result<(u64, bool)> {
278        let stamp = self.rx.read_u64().await?;
279        let value = self.rx.read_u32().await?;
280
281        Ok((stamp, value != 0))
282    }
283}
284
285impl driver::API for Instance {
286    type DeviceSet = Devices;
287
288    fn register_devices(
289        core: driver::RequestChan,
290        _: &DriverConfig,
291        max_history: Option<usize>,
292    ) -> Pin<Box<dyn Future<Output = Result<Self::DeviceSet>> + Send>> {
293        let service_name = "service".parse::<device::Base>().unwrap();
294        let state_name = "state".parse::<device::Base>().unwrap();
295        let duty_name = "duty".parse::<device::Base>().unwrap();
296        let in_flow_name = "in-flow".parse::<device::Base>().unwrap();
297        let dur_name = "duration".parse::<device::Base>().unwrap();
298
299        Box::pin(async move {
300            // Define the devices managed by this driver.
301
302            let d_service =
303                core.add_ro_device(service_name, None, max_history).await?;
304            let d_state =
305                core.add_ro_device(state_name, None, max_history).await?;
306            let d_duty = core
307                .add_ro_device(duty_name, Some("%"), max_history)
308                .await?;
309            let d_inflow = core
310                .add_ro_device(in_flow_name, Some("gpm"), max_history)
311                .await?;
312            let d_duration = core
313                .add_ro_device(dur_name, Some("min"), max_history)
314                .await?;
315
316            Ok(Devices {
317                d_service,
318                d_state,
319                d_duty,
320                d_inflow,
321                d_duration,
322            })
323        })
324    }
325
326    fn create_instance(
327        cfg: &DriverConfig,
328    ) -> Pin<Box<dyn Future<Output = Result<Box<Self>>> + Send>> {
329        let addr = Instance::get_cfg_address(cfg);
330        let gpm = Instance::get_cfg_gpm(cfg);
331
332        let fut = async move {
333            // Validate the configuration.
334
335            let addr = addr?;
336            let gpm = gpm?;
337
338            Span::current().record("cfg", addr.to_string());
339
340            // Connect with the remote process that is connected to
341            // the sump pump.
342
343            let (rx, _tx) = Instance::connect(&addr)?.into_split();
344
345            Ok(Box::new(Instance {
346                state: State::Unknown,
347                gpm,
348                rx,
349                _tx,
350            }))
351        };
352
353        Box::pin(fut)
354    }
355
356    fn run<'a>(
357        &'a mut self,
358        devices: Arc<Mutex<Devices>>,
359    ) -> Pin<Box<dyn Future<Output = Infallible> + Send + 'a>> {
360        let fut = async move {
361            // Record the peer's address in the "cfg" field of the
362            // span.
363
364            {
365                let addr = self
366                    .rx
367                    .peer_addr()
368                    .map(|v| format!("{}", v))
369                    .unwrap_or_else(|_| String::from("**unknown**"));
370
371                Span::current().record("cfg", addr.as_str());
372            }
373
374            let mut devices = devices.lock().await;
375
376            devices.d_service.report_update(true).await;
377
378            loop {
379                match self.get_reading().await {
380                    Ok((stamp, true)) => {
381                        if self.state.on_event(stamp) {
382                            devices.d_state.report_update(true).await;
383                        }
384                    }
385
386                    Ok((stamp, false)) => {
387                        let gpm = self.gpm;
388
389                        if let Some((cycle, duty, in_flow)) =
390                            self.state.off_event(stamp, gpm)
391                        {
392                            debug!(
393                                "cycle: {}, duty: {:.1}%, inflow: {:.2} gpm",
394                                Instance::elapsed(cycle),
395                                duty,
396                                in_flow
397                            );
398
399                            devices.d_state.report_update(false).await;
400                            devices.d_duty.report_update(duty).await;
401                            devices.d_inflow.report_update(in_flow).await;
402                            devices
403                                .d_duration
404                                .report_update(
405                                    ((cycle as f64) / 600.0).round() / 100.0,
406                                )
407                                .await;
408                        }
409                    }
410
411                    Err(e) => {
412                        devices.d_state.report_update(false).await;
413                        devices.d_service.report_update(false).await;
414                        panic!("couldn't read sump state -- {:?}", e);
415                    }
416                }
417            }
418        };
419
420        Box::pin(fut)
421    }
422}
423
424#[cfg(test)]
425mod tests {
426    use super::*;
427
428    #[test]
429    fn test_states() {
430        let mut state = State::Unknown;
431
432        assert_eq!(state.on_event(0), false);
433        assert_eq!(state, State::Unknown);
434
435        state = State::Off { off_time: 100 };
436
437        assert_eq!(state.on_event(0), false);
438        assert_eq!(state, State::Off { off_time: 100 });
439        assert_eq!(state.on_event(200), true);
440        assert_eq!(
441            state,
442            State::On {
443                off_time: 100,
444                on_time: 200
445            }
446        );
447
448        assert_eq!(state.on_event(200), false);
449        assert_eq!(
450            state,
451            State::On {
452                off_time: 100,
453                on_time: 200
454            }
455        );
456
457        state = State::Unknown;
458
459        assert_eq!(state.off_event(1000, 50.0), None);
460        assert_eq!(state, State::Off { off_time: 1000 });
461        assert_eq!(state.off_event(1100, 50.0), None);
462        assert_eq!(state, State::Off { off_time: 1000 });
463
464        state = State::On {
465            off_time: 1000,
466            on_time: 101000,
467        };
468
469        assert_eq!(state.off_event(1000, 50.0), None);
470        assert_eq!(state, State::Off { off_time: 1000 });
471
472        state = State::On {
473            off_time: 1000,
474            on_time: 101000,
475        };
476
477        assert_eq!(state.off_event(101500, 50.0), None);
478        assert_eq!(
479            state,
480            State::On {
481                off_time: 1000,
482                on_time: 101000
483            }
484        );
485
486        assert!(state.off_event(101501, 50.0).is_some());
487        assert_eq!(state, State::Off { off_time: 101501 });
488
489        state = State::On {
490            off_time: 0,
491            on_time: 540000,
492        };
493
494        assert_eq!(state.off_event(600000, 50.0), Some((600000, 10.0, 5.0)));
495        assert_eq!(state, State::Off { off_time: 600000 });
496
497        state = State::On {
498            off_time: 0,
499            on_time: 54000,
500        };
501
502        assert_eq!(state.off_event(60000, 60.0), Some((60000, 10.0, 6.0)));
503        assert_eq!(state, State::Off { off_time: 60000 });
504    }
505
506    #[test]
507    fn test_elapsed() {
508        assert_eq!(Instance::elapsed(0), "0s");
509        assert_eq!(Instance::elapsed(1000), "1s");
510        assert_eq!(Instance::elapsed(59000), "59s");
511        assert_eq!(Instance::elapsed(60000), "1m0s");
512
513        assert_eq!(Instance::elapsed(3569000), "59m29s");
514        assert_eq!(Instance::elapsed(3570000), "1h0m");
515        assert_eq!(Instance::elapsed(3599000), "1h0m");
516        assert_eq!(Instance::elapsed(3600000), "1h0m");
517
518        assert_eq!(Instance::elapsed(3600000 * 24 - 31000), "23h59m");
519        assert_eq!(Instance::elapsed(3600000 * 24 - 30000), "1d0h0m");
520        assert_eq!(Instance::elapsed(3600000 * 24 - 1000), "1d0h0m");
521        assert_eq!(Instance::elapsed(3600000 * 24), "1d0h0m");
522    }
523}