drmem_db_redis/
lib.rs

1use async_trait::async_trait;
2use chrono::*;
3use drmem_api::{
4    client, device,
5    driver::{ReportReading, RxDeviceSetting, TxDeviceSetting},
6    Error, Result, Store,
7};
8use futures::task::{Context, Poll};
9use futures::Future;
10use redis::{
11    aio,
12    streams::{StreamId, StreamInfoStreamReply},
13};
14use std::collections::HashMap;
15use std::convert::TryInto;
16use std::pin::Pin;
17use std::time;
18use tokio::sync::{mpsc, oneshot};
19use tokio_stream::{self, Stream, StreamExt};
20use tracing::{debug, error, info, info_span, warn};
21use tracing_futures::Instrument;
22
23type AioMplexConnection = aio::MultiplexedConnection;
24type AioConnection = aio::Connection;
25type SettingTable = HashMap<device::Name, TxDeviceSetting>;
26
27pub mod config;
28
29// Translates a Redis error into a DrMem error. The translation is
30// slightly lossy in that we lose the exact Redis error that occurred
31// and, instead map it into a more general "backend" error. We
32// propagate the associated message so, hopefully, that's enough to
33// rebuild the context of the error.
34//
35// This is a job for `impl From<RedisError> for Error`, but it won't
36// work here because neither of those types are defined in this
37// module. We'd have to put the trait implementation in the
38// `drmem-api` crate which, then, requires all projects to build the
39// `redis` crate. Since we only need to do the translationin this
40// module, this function will be the translater.
41
42fn xlat_err(e: redis::RedisError) -> Error {
43    match e.kind() {
44        redis::ErrorKind::ResponseError
45        | redis::ErrorKind::ClusterDown
46        | redis::ErrorKind::CrossSlot
47        | redis::ErrorKind::MasterDown
48        | redis::ErrorKind::IoError => Error::DbCommunicationError,
49
50        redis::ErrorKind::AuthenticationFailed
51        | redis::ErrorKind::InvalidClientConfig => Error::AuthenticationError,
52
53        redis::ErrorKind::TypeError => Error::TypeError,
54
55        redis::ErrorKind::ExecAbortError
56        | redis::ErrorKind::BusyLoadingError
57        | redis::ErrorKind::TryAgain
58        | redis::ErrorKind::ClientError
59        | redis::ErrorKind::ExtensionError
60        | redis::ErrorKind::ReadOnly => Error::OperationError,
61
62        redis::ErrorKind::NoScriptError
63        | redis::ErrorKind::Moved
64        | redis::ErrorKind::Ask => Error::NotFound,
65
66        _ => Error::UnknownError,
67    }
68}
69
70// Encodes a `device::Value` into a binary which gets stored in
71// redis. This encoding lets us store type information in redis so
72// there's no rounding errors or misinterpretation of the data.
73
74fn to_redis(val: &device::Value) -> Vec<u8> {
75    match val {
76        device::Value::Bool(false) => vec![b'B', b'F'],
77        device::Value::Bool(true) => vec![b'B', b'T'],
78
79        // Integers start with an 'I' followed by 4 bytes.
80        device::Value::Int(v) => {
81            let mut buf: Vec<u8> = Vec::with_capacity(9);
82
83            buf.push(b'I');
84            buf.extend_from_slice(&v.to_be_bytes());
85            buf
86        }
87
88        // Floating point values start with a 'D' and are followed by
89        // 8 bytes.
90        device::Value::Flt(v) => {
91            let mut buf: Vec<u8> = Vec::with_capacity(9);
92
93            buf.push(b'D');
94            buf.extend_from_slice(&v.to_be_bytes());
95            buf
96        }
97
98        // Strings start with an 'S', followed by a 4-byte length
99        // field, and then followed by the string content.
100        device::Value::Str(s) => {
101            let s = s.as_bytes();
102            let mut buf: Vec<u8> = Vec::with_capacity(5 + s.len());
103
104            buf.push(b'S');
105            buf.extend_from_slice(&(s.len() as u32).to_be_bytes());
106            buf.extend_from_slice(s);
107            buf
108        }
109    }
110}
111
112// Decodes an `i32` from an 4-byte buffer.
113
114fn decode_integer(buf: &[u8]) -> Result<device::Value> {
115    if buf.len() >= 4 {
116        let buf = buf[..4].try_into().unwrap();
117
118        return Ok(device::Value::Int(i32::from_be_bytes(buf)));
119    }
120    Err(Error::TypeError)
121}
122
123// Decodes an `f64` from an 8-byte buffer.
124
125fn decode_float(buf: &[u8]) -> Result<device::Value> {
126    if buf.len() >= 8 {
127        let buf = buf[..8].try_into().unwrap();
128
129        return Ok(device::Value::Flt(f64::from_be_bytes(buf)));
130    }
131    Err(Error::TypeError)
132}
133
134// Decodes a UTF-8 encoded string from a raw, u8 buffer.
135
136fn decode_string(buf: &[u8]) -> Result<device::Value> {
137    if buf.len() >= 4 {
138        let len_buf = buf[..4].try_into().unwrap();
139        let len = u32::from_be_bytes(len_buf) as usize;
140
141        if buf.len() >= (4 + len) {
142            let str_vec = buf[4..4 + len].to_vec();
143
144            return match String::from_utf8(str_vec) {
145                Ok(s) => Ok(device::Value::Str(s)),
146                Err(_) => Err(Error::TypeError),
147            };
148        }
149    }
150    Err(Error::TypeError)
151}
152
153// Returns a `device::Value` from a `redis::Value`. The only
154// enumeration we support is the `redis::Value::Data` form since
155// that's the one used to return redis data.
156
157fn from_value(v: &redis::Value) -> Result<device::Value> {
158    if let redis::Value::Data(buf) = v {
159        // The buffer has to have at least one character in order to
160        // be decoded.
161
162        if !buf.is_empty() {
163            match buf[0] as char {
164                'B' if buf.len() > 1 => match buf[1] {
165                    b'F' => Ok(device::Value::Bool(false)),
166                    b'T' => Ok(device::Value::Bool(true)),
167                    _ => Err(Error::TypeError),
168                },
169                'I' => decode_integer(&buf[1..]),
170                'D' => decode_float(&buf[1..]),
171                'S' => decode_string(&buf[1..]),
172
173                // Any other character in the tag field is unknown and
174                // can't be decoded as a `device::Value`.
175                _ => Err(Error::TypeError),
176            }
177        } else {
178            Err(Error::TypeError)
179        }
180    } else {
181        Err(Error::TypeError)
182    }
183}
184
185// Subtracts 1 microsecond from a SystemTime value. If subtracting
186// can't be done (would put the SystemTime out of range) then the
187// passed in value is returned.
188
189fn st_minus_1us(st: time::SystemTime) -> time::SystemTime {
190    if let Some(val) = st.checked_sub(time::Duration::from_micros(1)) {
191        val
192    } else {
193        st
194    }
195}
196
197// Converts millisecond/microsecond values to SystemTime. The
198// microsecond parameter is clipped to the range 0 - 999.
199
200fn msus_to_st(ms: u64, us: u64) -> Result<time::SystemTime> {
201    let ts = time::UNIX_EPOCH.checked_add(time::Duration::from_millis(ms));
202
203    if let Some(ts) = ts {
204        let ts = ts.checked_add(time::Duration::from_micros(std::cmp::min(
205            us, 999u64,
206        )));
207
208        if let Some(ts) = ts {
209            return Ok(ts);
210        }
211    }
212    Err(Error::InvArgument(String::from("bad timestamp value")))
213}
214
215// Converts a redis stream ID ("###-###") into a `SystemTime`. In
216// DrMem, we follow the redis convention that the main portion of the
217// ID is milliseconds from 1970. The secondary portion is used to hold
218// microseconds ("0" - "999", not "000" - "999").
219
220fn id_to_ts(id: &str) -> Result<time::SystemTime> {
221    let fields: Vec<&str> = id.split('-').collect();
222
223    if let &[a, b] = &fields[..] {
224        // The redis stream id has the form "#-#" where the first
225        // number is a 64-bit value representing milliseconds since
226        // 1970. The second portion is a sequence number which is only
227        // used if the first number has a duplicate.  This keeps the
228        // timestamps increasing in value. DrMem has a base time of 20
229        // Hz (50 ms), so we should never have more than one timestamp
230        // occur in the same millisecond. However, some may want to
231        // push the boundaries, so we'll use the second number as a
232        // microsecond field. This code will accept the second field
233        // to be 0 - 999. If it exceeds 999, we'll clip it.
234
235        if let (Ok(ms), Ok(us)) = (a.parse::<u64>(), b.parse::<u64>()) {
236            return msus_to_st(ms, us);
237        }
238    }
239
240    Err(Error::InvArgument(String::from("unknown timestamp format")))
241}
242
243type ReadFuture = Pin<
244    Box<
245        dyn Future<Output = (AioConnection, redis::RedisResult<redis::Value>)>
246            + Send,
247    >,
248>;
249
250type ReadingResult = ((String, ((String, HashMap<String, redis::Value>),)),);
251
252struct ReadingStream {
253    key: String,
254    id: String,
255    fut: ReadFuture,
256}
257
258impl ReadingStream {
259    const TIMEOUT: usize = 5_000;
260
261    // Converts a `time::SystemTime` into a redis stream id.
262    // Microseconds are mapping into the secondary portion of the id.
263    //
264    // XXX: This function uses `.unwrap()`, which means this function
265    // could `panic!`. However, the timestamps are being generated by
266    // redis or DrMem so they should always be in range. If we allow
267    // drivers to specify their own timestamps, this may need to be
268    // revisited.
269
270    fn ts_to_id(ts: time::SystemTime) -> String {
271        let us = ts.duration_since(time::UNIX_EPOCH).unwrap().as_micros();
272
273        format!("{}-{}", us / 1000, us % 1000)
274    }
275
276    fn read_next_cmd(key: &str, id: &str) -> redis::Cmd {
277        let opts = redis::streams::StreamReadOptions::default()
278            .block(Self::TIMEOUT)
279            .count(1);
280
281        redis::Cmd::xread_options(&[key], &[id], &opts)
282    }
283
284    // Create a future that returns the next device reading from a
285    // redis stream (or times-out trying.) The connection is
286    // "threaded" through the future (i.e. it takes ownership and
287    // returns it with the result.) This is necessary because an
288    // AioConnection isn't clonable.
289
290    fn mk_fut(mut con: AioConnection, key: String, id: String) -> ReadFuture {
291        Box::pin(async move {
292            let result =
293                Self::read_next_cmd(&key, &id).query_async(&mut con).await;
294
295            (con, result)
296        })
297    }
298
299    pub fn new(
300        con: AioConnection,
301        key: &str,
302        id: Option<time::SystemTime>,
303    ) -> Self {
304        let key = key.to_string();
305        let id = id.map(Self::ts_to_id).unwrap_or_else(|| String::from("$"));
306        let fut = Self::mk_fut(con, key.clone(), id.clone());
307
308        ReadingStream { key, id, fut }
309    }
310
311    fn parse_reading(data: &redis::Value) -> Option<(String, device::Reading)> {
312        let result: redis::RedisResult<ReadingResult> =
313            redis::from_redis_value(data);
314
315        match result {
316            Ok(((_, ((ref new_id, ref rmap),)),)) => {
317                let reading = device::Reading {
318                    ts: id_to_ts(new_id).ok()?,
319                    value: from_value(rmap.get("value")?).ok()?,
320                };
321
322                Some((new_id.to_string(), reading))
323            }
324            Err(e) => {
325                error!("couldn't parse reading: {:?}", &e);
326                None
327            }
328        }
329    }
330}
331
332// Implements a stream. Note this stream is probably not cancel-safe;
333// without knowing the internals of the Future that returns the redis
334// result, we can only assume the connection can be put in a bad state
335// if this future (poll_next) is canceled. In DrMem, we typically
336// consume the entire stream and, in the case of a client canceling
337// its use of the stream, we've going to tear down the redis
338// connection.
339
340impl Stream for ReadingStream {
341    type Item = device::Reading;
342
343    // Polls the redis connection (via XREAD) for another reading and
344    // returns it through the stream. The redis command set doesn't
345    // have an infinite blocking command so this future uses the
346    // timeout parameter to periodically wake up and retry the read.
347
348    fn poll_next(
349        mut self: Pin<&mut Self>,
350        cx: &mut Context<'_>,
351    ) -> Poll<Option<Self::Item>> {
352        loop {
353            // If there is a `Poll::Ready` return value, then redis
354            // sent an update.
355
356            if let Poll::Ready(result) = Pin::new(&mut self.fut).poll(cx) {
357                // If redis returned an error, report it and close the
358                // stream.
359
360                let (con, result) = match result {
361                    (con, Ok(v)) => (con, v),
362                    (_, Err(e)) => {
363                        warn!("read error -- {}", &e);
364                        break Poll::Ready(None);
365                    }
366                };
367
368                // If there's no data, the timeout occurred. Redis has
369                // no "block forever" request, so the best that can be
370                // done is to wake up periodically and retry the
371                // request.
372
373                if result != redis::Value::Nil {
374                    if let Some((id, reading)) = Self::parse_reading(&result) {
375                        // This future is no longer good. Create a new
376                        // future using the updated `id`.
377
378                        self.id = id;
379                        self.fut = Self::mk_fut(
380                            con,
381                            self.key.clone(),
382                            self.id.clone(),
383                        );
384
385                        // Return the reading data.
386
387                        break Poll::Ready(Some(reading));
388                    } else {
389                        break Poll::Ready(None);
390                    }
391                } else {
392                    // The read command timed out. Re-issue the future
393                    // using the same `id` and loop.
394
395                    self.fut =
396                        Self::mk_fut(con, self.key.clone(), self.id.clone());
397                }
398            } else {
399                break Poll::Pending;
400            }
401        }
402    }
403}
404
405/// Defines a context that uses redis for the back-end storage.
406pub struct RedisStore {
407    /// This connection is used for interacting with the database.
408    db_con: AioMplexConnection,
409    table: SettingTable,
410    cfg: config::Config,
411}
412
413impl RedisStore {
414    fn make_client(
415        cfg: &config::Config,
416        name: &Option<String>,
417        pword: &Option<String>,
418    ) -> Result<redis::Client> {
419        use redis::{ConnectionAddr, ConnectionInfo, RedisConnectionInfo};
420
421        let addr = cfg.get_addr();
422
423        let ci = ConnectionInfo {
424            addr: ConnectionAddr::Tcp(addr.ip().to_string(), addr.port()),
425            redis: RedisConnectionInfo {
426                db: cfg.get_dbn(),
427                username: name.clone(),
428                password: pword.clone(),
429            },
430        };
431
432        redis::Client::open(ci).map_err(xlat_err)
433    }
434
435    // Creates a single-user connection to redis.
436
437    async fn make_connection(
438        cfg: &config::Config,
439        name: Option<String>,
440        pword: Option<String>,
441    ) -> Result<AioConnection> {
442        let client = Self::make_client(cfg, &name, &pword)?;
443
444        debug!("creating new redis connection");
445
446        client.get_tokio_connection().await.map_err(|e| {
447            error!("redis error: {}", &e);
448            xlat_err(e)
449        })
450    }
451
452    // Creates a mulitplexed connection to redis.
453
454    async fn make_mplex_connection(
455        cfg: &config::Config,
456        name: Option<String>,
457        pword: Option<String>,
458    ) -> Result<AioMplexConnection> {
459        let client = Self::make_client(cfg, &name, &pword)?;
460
461        debug!("creating new, shared redis connection");
462
463        client
464            .get_multiplexed_tokio_connection()
465            .await
466            .map_err(|e| {
467                error!("redis error: {}", &e);
468                xlat_err(e)
469            })
470    }
471
472    /// Builds a new backend context which interacts with `redis`.
473    /// The parameters in `cfg` will be used to locate the `redis`
474    /// instance. If `name` and `pword` are not `None`, they will be
475    /// used for credentials when connecting to `redis`.
476
477    pub async fn new(
478        cfg: &config::Config,
479        name: Option<String>,
480        pword: Option<String>,
481    ) -> Result<Self> {
482        let db_con = Self::make_mplex_connection(cfg, name, pword).await?;
483
484        Ok(RedisStore {
485            db_con,
486            table: HashMap::new(),
487            cfg: cfg.clone(),
488        })
489    }
490
491    // Returns the key that returns meta information for the device.
492
493    fn info_key(name: &str) -> String {
494        format!("{}#info", name)
495    }
496
497    // Returns the key that returns time-series information for the
498    // device.
499
500    fn hist_key(name: &str) -> String {
501        format!("{}#hist", name)
502    }
503
504    fn init_device_cmd(
505        name: &str,
506        driver: &str,
507        units: &Option<String>,
508    ) -> redis::Pipeline {
509        let hist_key = Self::hist_key(name);
510        let info_key = Self::info_key(name);
511
512        // Start an array of required fields.
513
514        let mut fields: Vec<(&str, String)> =
515            vec![("driver", String::from(driver))];
516
517        // Optionally add a "units" field.
518
519        if let Some(units) = units {
520            fields.push(("units", units.clone()))
521        };
522
523        // Create a command pipeline that deletes the two keys and
524        // then creates them properly with default values.
525
526        redis::pipe()
527            .atomic()
528            .del(&hist_key)
529            .ignore()
530            .xadd(&hist_key, "1", &[("value", &[1u8])])
531            .ignore()
532            .xdel(&hist_key, &["1"])
533            .ignore()
534            .del(&info_key)
535            .ignore()
536            .hset_multiple(&info_key, &fields)
537            .ignore()
538            .clone()
539    }
540
541    // Builds the low-level command that returns the last value of the
542    // device.
543
544    fn last_value_cmd(name: &str) -> redis::Cmd {
545        let name = Self::hist_key(name);
546
547        redis::Cmd::xrevrange_count(name, "+", "-", 1usize)
548    }
549
550    fn match_pattern_cmd(pattern: &Option<String>) -> redis::Cmd {
551        // Take the pattern from the caller and append "#info" since
552        // we only want to look at device information keys.
553
554        let pattern = pattern
555            .as_ref()
556            .map(|v| Self::info_key(v))
557            .unwrap_or_else(|| String::from("*#info"));
558
559        // Query REDIS to return all keys that match our pattern.
560
561        redis::Cmd::keys(pattern)
562    }
563
564    // Builds the low-level command that returns the type of the
565    // device's meta record.
566
567    fn info_type_cmd(name: &str) -> redis::Cmd {
568        let key = Self::info_key(name);
569
570        redis::cmd("TYPE").arg(&key).clone()
571    }
572
573    // Builds the low-level command that returns the type of the
574    // device's history record.
575
576    fn hist_type_cmd(name: &str) -> redis::Cmd {
577        let key = Self::hist_key(name);
578
579        redis::cmd("TYPE").arg(&key).clone()
580    }
581
582    // Creates a redis command pipeline which returns the standard,
583    // meta-data for a device.
584
585    fn device_info_cmd(name: &str) -> redis::Cmd {
586        let info_key = Self::info_key(name);
587
588        redis::Cmd::hgetall(info_key)
589    }
590
591    // Creates the redis command to pull stream information associated
592    // with the device's history.
593
594    fn xinfo_cmd(name: &str) -> redis::Cmd {
595        let hist_key = Self::hist_key(name);
596
597        redis::Cmd::xinfo_stream(hist_key)
598    }
599
600    // Generates a redis command pipeline that adds a value to a
601    // device's history.
602
603    fn report_new_value_cmd(key: &str, val: &device::Value) -> redis::Cmd {
604        let data = [("value", to_redis(val))];
605
606        redis::Cmd::xadd(key, "*", &data)
607    }
608
609    fn report_bounded_new_value_cmd(
610        key: &str,
611        val: &device::Value,
612        mh: usize,
613    ) -> redis::Cmd {
614        let opts = redis::streams::StreamMaxlen::Approx(mh);
615        let data = [("value", to_redis(val))];
616
617        redis::Cmd::xadd_maxlen(key, opts, "*", &data)
618    }
619
620    fn hash_to_info(
621        st: &SettingTable,
622        name: &device::Name,
623        hmap: &HashMap<String, String>,
624    ) -> Result<client::DevInfoReply> {
625        // Redis doesn't return an error if the key doesn't exist; it
626        // returns an empty array. So if our HashMap is empty, the key
627        // didn't exist.
628
629        if !hmap.is_empty() {
630            // If a "units" field exists and it's a string, we can
631            // save it in the `units` field of the reply.
632
633            let units = hmap.get("units").map(String::clone);
634
635            // If a "driver" field exists and it's a string, save it
636            // in the "drivers" field of the reply.
637
638            let driver = hmap
639                .get("driver")
640                .map(String::clone)
641                .unwrap_or_else(|| String::from("*missing*"));
642
643            Ok(client::DevInfoReply {
644                name: name.clone(),
645                units,
646                settable: st.contains_key(name),
647                driver,
648                total_points: 0,
649                first_point: None,
650                last_point: None,
651            })
652        } else {
653            Err(Error::NotFound)
654        }
655    }
656
657    // Converts the `StreamId` type, from redis, into our
658    // `device::Reading` type.
659
660    fn stream_id_to_reading(sid: &StreamId) -> Result<device::Reading> {
661        if let Some(val) = sid.map.get("value") {
662            Ok(device::Reading {
663                ts: id_to_ts(sid.id.as_str())?,
664                value: from_value(val)?,
665            })
666        } else {
667            Err(Error::TypeError)
668        }
669    }
670
671    // Looks up a device in the redis store and, if found, returns a
672    // `client::DevInfoReply` containing the information.
673
674    async fn lookup_device(
675        &mut self,
676        name: device::Name,
677    ) -> Result<client::DevInfoReply> {
678        let info = Self::device_info_cmd(name.to_string().as_str())
679            .query_async::<AioMplexConnection, HashMap<String, String>>(
680                &mut self.db_con,
681            )
682            .await
683            .map_err(xlat_err)
684            .and_then(|v| Self::hash_to_info(&self.table, &name, &v))?;
685
686        Self::xinfo_cmd(name.to_string().as_str())
687            .query_async::<AioMplexConnection, StreamInfoStreamReply>(
688                &mut self.db_con,
689            )
690            .await
691            .map_err(xlat_err)
692            .and_then(move |v| {
693                let info = if v.length > 0 {
694                    client::DevInfoReply {
695                        total_points: v.length as u32,
696                        first_point: Some(Self::stream_id_to_reading(
697                            &v.first_entry,
698                        )?),
699                        last_point: Some(Self::stream_id_to_reading(
700                            &v.last_entry,
701                        )?),
702                        ..info
703                    }
704                } else {
705                    client::DevInfoReply {
706                        total_points: 0,
707                        first_point: None,
708                        last_point: None,
709                        ..info
710                    }
711                };
712
713                Ok(info)
714            })
715    }
716
717    fn parse_last_value(
718        name: &str,
719        reply: &redis::Value,
720    ) -> Option<device::Reading> {
721        if redis::Value::Bulk(vec![]) == *reply {
722            warn!("no previous value for {}", name);
723            return None;
724        }
725
726        let data: redis::RedisResult<((
727            String,
728            HashMap<String, redis::Value>,
729        ),)> = redis::from_redis_value(reply);
730
731        match data {
732            Ok(((key, m),)) => {
733                if let Ok(ts) = id_to_ts(&key) {
734                    if let Some(val) = m.get("value") {
735                        if let Ok(val) = from_value(val) {
736                            return Some(device::Reading { ts, value: val });
737                        } else {
738                            error!(
739                                "last value for {} is in an unknown format",
740                                name
741                            );
742                        }
743                    } else {
744                        error!(
745                            "last value for {} doesn't have a \"value\" field",
746                            name
747                        );
748                    }
749                } else {
750                    error!("couldn't parse timestamp, {}, for {}", key, name)
751                }
752            }
753            Err(e) => {
754                error!(
755                    "redis error ({}) when converting last value of {}",
756                    e, name
757                )
758            }
759        }
760        None
761    }
762
763    // Obtains the last value reported for a device, or `None` if
764    // there is no history for it.
765
766    async fn last_value(&mut self, name: &str) -> Option<device::Reading> {
767        let result: redis::RedisResult<redis::Value> =
768            Self::last_value_cmd(name)
769                .query_async(&mut self.db_con)
770                .await;
771
772        match result {
773            Ok(reply) => Self::parse_last_value(name, &reply),
774            Err(e) => {
775                error!(
776                    "redis error ({}) when getting last value of {}",
777                    e, name
778                );
779                None
780            }
781        }
782    }
783
784    // Does some sanity checks on a device to see if it appears to be
785    // valid.
786
787    async fn validate_device(&mut self, name: &str) -> Result<()> {
788        // This section verifies the device has a NAME#info key that
789        // is a hash map.
790
791        {
792            let cmd = Self::info_type_cmd(name);
793            let result: redis::RedisResult<String> =
794                cmd.query_async(&mut self.db_con).await;
795
796            match result {
797                Ok(data_type) if data_type.as_str() == "hash" => (),
798                Ok(_) => {
799                    error!("{} info is of the wrong key type", name);
800                    return Err(Error::TypeError);
801                }
802                Err(_) => {
803                    warn!("{} info doesn't exist", name);
804                    return Err(Error::NotFound);
805                }
806            }
807        }
808
809        // This section verifies the device has a NAME#hist key that
810        // is a time-series stream.
811
812        {
813            let cmd = Self::hist_type_cmd(name);
814            let result: redis::RedisResult<String> =
815                cmd.query_async(&mut self.db_con).await;
816
817            match result {
818                Ok(data_type) if data_type.as_str() == "stream" => Ok(()),
819                Ok(_) => {
820                    error!("{} history is of the wrong key type", name);
821                    Err(Error::TypeError)
822                }
823                Err(_) => {
824                    warn!("{} history doesn't exist", name);
825                    Err(Error::NotFound)
826                }
827            }
828        }
829    }
830
831    // Initializes the state of a DrMem device in the REDIS database.
832    // It creates two keys: one key is appended with "#info" and
833    // addresses a hash table which will contain device meta
834    // information; the other key is appended with "#hist" and is a
835    // time-series stream which holds recent history of a device's
836    // values.
837
838    async fn init_device(
839        &mut self,
840        name: &str,
841        driver: &str,
842        units: &Option<String>,
843    ) -> Result<()> {
844        debug!("initializing {}", name);
845        Self::init_device_cmd(name, driver, units)
846            .query_async(&mut self.db_con)
847            .await
848            .map_err(xlat_err)
849    }
850
851    // Creates a closure for a driver to report a device's changing
852    // values.
853
854    fn mk_report_func(
855        &self,
856        name: &str,
857        max_history: &Option<usize>,
858    ) -> ReportReading<device::Value> {
859        let db_con = self.db_con.clone();
860        let name = String::from(name);
861
862        if let Some(mh) = *max_history {
863            Box::new(move |v| {
864                let mut db_con = db_con.clone();
865                let hist_key = Self::hist_key(&name);
866                let name = name.clone();
867
868                Box::pin(async move {
869                    if let Err(e) =
870                        Self::report_bounded_new_value_cmd(&hist_key, &v, mh)
871                            .query_async::<AioMplexConnection, ()>(&mut db_con)
872                            .await
873                    {
874                        warn!("couldn't save {} data to redis ... {}", &name, e)
875                    }
876                })
877            })
878        } else {
879            Box::new(move |v| {
880                let mut db_con = db_con.clone();
881                let hist_key = Self::hist_key(&name);
882                let name = name.clone();
883
884                Box::pin(async move {
885                    if let Err(e) = Self::report_new_value_cmd(&hist_key, &v)
886                        .query_async::<AioMplexConnection, ()>(&mut db_con)
887                        .await
888                    {
889                        warn!("couldn't save {} data to redis ... {}", &name, e)
890                    }
891                })
892            })
893        }
894    }
895}
896
897#[async_trait]
898impl Store for RedisStore {
899    /// Registers a device in the redis backend.
900
901    async fn register_read_only_device(
902        &mut self,
903        driver_name: &str,
904        name: &device::Name,
905        units: &Option<String>,
906        max_history: &Option<usize>,
907    ) -> Result<(ReportReading<device::Value>, Option<device::Value>)> {
908        let name = name.to_string();
909
910        debug!("registering '{}' as read-only", &name);
911
912        if self.validate_device(&name).await.is_err() {
913            self.init_device(&name, driver_name, units).await?;
914
915            info!("'{}' has been successfully created", &name);
916        }
917        Ok((
918            self.mk_report_func(&name, max_history),
919            self.last_value(&name).await.map(|v| v.value),
920        ))
921    }
922
923    async fn register_read_write_device(
924        &mut self,
925        driver_name: &str,
926        name: &device::Name,
927        units: &Option<String>,
928        max_history: &Option<usize>,
929    ) -> Result<(
930        ReportReading<device::Value>,
931        RxDeviceSetting,
932        Option<device::Value>,
933    )> {
934        let sname = name.to_string();
935
936        debug!("registering '{}' as read-write", &sname);
937
938        if self.validate_device(&sname).await.is_err() {
939            self.init_device(&sname, driver_name, units).await?;
940
941            info!("'{}' has been successfully created", &sname);
942        }
943
944        let (tx, rx) = mpsc::channel(20);
945
946        if self.table.insert(name.clone(), tx).is_some() {
947            warn!("{} already had a setting channel", &name);
948        }
949
950        Ok((
951            self.mk_report_func(&sname, max_history),
952            rx,
953            self.last_value(&sname).await.map(|v| v.value),
954        ))
955    }
956
957    // Implement the request to pull device information. Any task with
958    // a client channel can make this request although the primary
959    // client will be from GraphQL requests.
960
961    async fn get_device_info(
962        &mut self,
963        pattern: &Option<String>,
964    ) -> Result<Vec<client::DevInfoReply>> {
965        // Get a list of all the keys that match the pattern. For
966        // Redis, these keys will have "#info" appended at the end.
967
968        let result: Vec<String> = Self::match_pattern_cmd(pattern)
969            .query_async(&mut self.db_con)
970            .await
971            .map_err(xlat_err)?;
972
973        // Create an empty container to hold the device info records.
974
975        let mut devices = vec![];
976
977        // Loop through the results and pull all the device
978        // information. Strip off the trailing "#info" before getting
979        // the device information.
980
981        for key in result {
982            // Only process keys that are valid device names.
983
984            if let Ok(name) =
985                key.trim_end_matches("#info").parse::<device::Name>()
986            {
987                let dev_info = self.lookup_device(name).await?;
988
989                devices.push(dev_info)
990            }
991        }
992        Ok(devices)
993    }
994
995    // This method implements the set_device mutation in the GraphQL
996    // API.
997
998    async fn set_device(
999        &self,
1000        name: device::Name,
1001        value: device::Value,
1002    ) -> Result<device::Value> {
1003        if let Some(tx) = self.table.get(&name) {
1004            let (tx_rpy, rx_rpy) = oneshot::channel();
1005
1006            // Send the request and return from the function with the
1007            // reply. If any error occurs during communication, fall
1008            // through to report it.
1009
1010            if let Ok(()) = tx.send((value, tx_rpy)).await {
1011                if let Ok(reply) = rx_rpy.await {
1012                    return reply;
1013                }
1014            }
1015
1016            // Some portion of the RPC failed. Return an error.
1017
1018            Err(Error::MissingPeer(
1019                "cannot communicate with driver".to_string(),
1020            ))
1021        } else {
1022            Err(Error::NotFound)
1023        }
1024    }
1025
1026    async fn get_setting_chan(
1027        &self,
1028        name: device::Name,
1029        _own: bool,
1030    ) -> Result<TxDeviceSetting> {
1031        if let Some(tx) = self.table.get(&name) {
1032            Ok(tx.clone())
1033        } else {
1034            Err(Error::NotFound)
1035        }
1036    }
1037
1038    async fn monitor_device(
1039        &mut self,
1040        name: device::Name,
1041        start: Option<DateTime<Utc>>,
1042        end: Option<DateTime<Utc>>,
1043    ) -> Result<device::DataStream<device::Reading>> {
1044        match Self::make_connection(&self.cfg, None, None).await {
1045            Ok(con) => {
1046                let name = name.to_string();
1047                let key = RedisStore::hist_key(&name);
1048
1049                match (start.map(|v| v.into()), end.map(|v| v.into())) {
1050                    // With no start time, use the latest value of the
1051                    // device. If there's an end time, add a stream
1052                    // combinator that ends the stream once the date
1053                    // reaches it.
1054                    (None, end) => {
1055                        let ts = self
1056                            .last_value(&name)
1057                            .await
1058                            .map(|tmp| st_minus_1us(tmp.ts));
1059
1060                        // If there's an end date, append a filter to
1061                        // the stream so it stops once the timestamp
1062                        // reach it.
1063
1064                        if let Some(end) = end {
1065                            let date_test =
1066                                move |v: &device::Reading| v.ts <= end;
1067
1068                            Ok(Box::pin(
1069                                ReadingStream::new(con, &key, ts)
1070                                    .take_while(date_test),
1071                            )
1072                                as device::DataStream<device::Reading>)
1073                        } else {
1074                            Ok(Box::pin(ReadingStream::new(con, &key, ts))
1075                                as device::DataStream<device::Reading>)
1076                        }
1077                    }
1078
1079                    // Given a start time with no end time, start
1080                    // reading the redis stream at that point.
1081                    (Some(start), None) => Ok(Box::pin(ReadingStream::new(
1082                        con,
1083                        &key,
1084                        Some(st_minus_1us(start)),
1085                    ))
1086                        as device::DataStream<device::Reading>),
1087
1088                    // Start reading at the start time and stop the
1089                    // stream at the end time.
1090                    (Some(start_tmp), Some(end_tmp)) => {
1091                        let start = std::cmp::min(start_tmp, end_tmp);
1092                        let end = std::cmp::max(start_tmp, end_tmp);
1093                        let date_test = move |v: &device::Reading| v.ts <= end;
1094
1095                        Ok(Box::pin(
1096                            ReadingStream::new(
1097                                con,
1098                                &key,
1099                                Some(st_minus_1us(start)),
1100                            )
1101                            .take_while(date_test),
1102                        )
1103                            as device::DataStream<device::Reading>)
1104                    }
1105                }
1106            }
1107            Err(e) => {
1108                error!("couldn't make a connection : {}", e);
1109
1110                Ok(Box::pin(tokio_stream::empty())
1111                    as device::DataStream<device::Reading>)
1112            }
1113        }
1114    }
1115}
1116
1117pub async fn open(cfg: &config::Config) -> Result<impl Store> {
1118    RedisStore::new(cfg, None, None)
1119        .instrument(
1120            info_span!("redis-db", addr=?cfg.get_addr(), db=cfg.get_dbn()),
1121        )
1122        .await
1123}
1124
1125// This is the test module to make sure the redis backend works
1126// correctly. We're not testing redis itself -- we assume the redis
1127// project is verifying its behavior. Many functions have been broken
1128// out into smaller, helper functions so that we can create tests
1129// for them without the need of a redis installation.
1130//
1131// That being said, there are some requirements which are hard to test
1132// because they're dependent on redis' behavior.
1133//
1134// For instance, when monitoring a device, we want to immediately
1135// return any "current" value before blocking for future values. If
1136// there is a "last value", we need to use its timestamp for the next,
1137// blocking call. We should test this, but it doesn't seem like an
1138// easy thing to do (without requiring a redis instance.)
1139
1140#[cfg(test)]
1141mod tests {
1142    use super::*;
1143    use drmem_api::device;
1144
1145    // We only want to convert Value::Data() forms. These tests make
1146    // sure the other variants don't translate.
1147
1148    #[test]
1149    fn test_reject_invalid_forms() {
1150        if let Ok(v) = from_value(&redis::Value::Int(0)) {
1151            panic!("Value::Int incorrectly translated to {:?}", v);
1152        }
1153        if let Ok(v) = from_value(&redis::Value::Bulk(vec![])) {
1154            panic!("Value::Bulk incorrectly translated to {:?}", v);
1155        }
1156        if let Ok(v) = from_value(&redis::Value::Status(String::from(""))) {
1157            panic!("Value::Status incorrectly translated to {:?}", v);
1158        }
1159        if let Ok(v) = from_value(&redis::Value::Okay) {
1160            panic!("Value::Okay incorrectly translated to {:?}", v);
1161        }
1162    }
1163
1164    // Test correct decoding of device::Value::Bool values.
1165
1166    #[test]
1167    fn test_bool_decoder() {
1168        assert_eq!(
1169            Ok(device::Value::Bool(false)),
1170            from_value(&redis::Value::Data(vec![b'B', b'F']))
1171        );
1172        assert_eq!(
1173            Ok(device::Value::Bool(true)),
1174            from_value(&redis::Value::Data(vec![b'B', b'T']))
1175        );
1176    }
1177
1178    // Test correct encoding of device::Value::Bool values.
1179
1180    #[test]
1181    fn test_bool_encoder() {
1182        assert_eq!(vec![b'B', b'F'], to_redis(&device::Value::Bool(false)));
1183        assert_eq!(vec![b'B', b'T'], to_redis(&device::Value::Bool(true)));
1184    }
1185
1186    const INT_TEST_CASES: &[(i32, &[u8])] = &[
1187        (0, &[b'I', 0x00, 0x00, 0x00, 0x00]),
1188        (1, &[b'I', 0x00, 0x00, 0x00, 0x01]),
1189        (-1, &[b'I', 0xff, 0xff, 0xff, 0xff]),
1190        (0x7fffffff, &[b'I', 0x7f, 0xff, 0xff, 0xff]),
1191        (-0x80000000, &[b'I', 0x80, 0x00, 0x00, 0x00]),
1192        (0x01234567, &[b'I', 0x01, 0x23, 0x45, 0x67]),
1193    ];
1194
1195    // Test correct encoding of device::Value::Int values.
1196
1197    #[test]
1198    fn test_int_encoder() {
1199        for (v, rv) in INT_TEST_CASES {
1200            assert_eq!(*rv, to_redis(&device::Value::Int(*v)));
1201        }
1202    }
1203
1204    // Test correct decoding of device::Value::Int values.
1205
1206    #[test]
1207    fn test_int_decoder() {
1208        assert!(from_value(&redis::Value::Data(vec![])).is_err());
1209        assert!(from_value(&redis::Value::Data(vec![b'I'])).is_err());
1210        assert!(from_value(&redis::Value::Data(vec![b'I', 0u8])).is_err());
1211        assert!(from_value(&redis::Value::Data(vec![b'I', 0u8, 0u8])).is_err());
1212        assert!(
1213            from_value(&redis::Value::Data(vec![b'I', 0u8, 0u8, 0u8])).is_err()
1214        );
1215
1216        for (v, rv) in INT_TEST_CASES {
1217            let data = redis::Value::Data(rv.to_vec());
1218
1219            assert_eq!(Ok(device::Value::Int(*v)), from_value(&data));
1220        }
1221    }
1222
1223    const FLT_TEST_CASES: &[(f64, &[u8])] = &[
1224        (0.0, &[b'D', 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]),
1225        (
1226            -0.0,
1227            &[b'D', 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00],
1228        ),
1229        (1.0, &[b'D', 0x3f, 0xf0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]),
1230        (
1231            -1.0,
1232            &[b'D', 0xbf, 0xf0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00],
1233        ),
1234        (
1235            9007199254740991.0,
1236            &[b'D', 67, 63, 255, 255, 255, 255, 255, 255],
1237        ),
1238        (9007199254740992.0, &[b'D', 67, 64, 0, 0, 0, 0, 0, 0]),
1239    ];
1240
1241    // Test correct encoding of device::Value::Flt values.
1242
1243    #[test]
1244    fn test_float_encoder() {
1245        for (v, rv) in FLT_TEST_CASES {
1246            assert_eq!(*rv, to_redis(&device::Value::Flt(*v)));
1247        }
1248    }
1249
1250    // Test correct decoding of device::Value::Flt values.
1251
1252    #[test]
1253    fn test_float_decoder() {
1254        assert!(from_value(&redis::Value::Data(vec![])).is_err());
1255        assert!(from_value(&redis::Value::Data(vec![b'D'])).is_err());
1256        assert!(from_value(&redis::Value::Data(vec![b'D', 0u8])).is_err());
1257        assert!(from_value(&redis::Value::Data(vec![b'D', 0u8, 0u8])).is_err());
1258        assert!(
1259            from_value(&redis::Value::Data(vec![b'D', 0u8, 0u8, 0u8])).is_err()
1260        );
1261        assert!(from_value(&redis::Value::Data(vec![
1262            b'D', 0u8, 0u8, 0u8, 0u8
1263        ]))
1264        .is_err());
1265        assert!(from_value(&redis::Value::Data(vec![
1266            b'D', 0u8, 0u8, 0u8, 0u8, 0u8
1267        ]))
1268        .is_err());
1269        assert!(from_value(&redis::Value::Data(vec![
1270            b'D', 0u8, 0u8, 0u8, 0u8, 0u8, 0u8
1271        ]))
1272        .is_err());
1273        assert!(from_value(&redis::Value::Data(vec![
1274            b'D', 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8
1275        ]))
1276        .is_err());
1277
1278        for (v, rv) in FLT_TEST_CASES {
1279            let data = redis::Value::Data(rv.to_vec());
1280
1281            assert_eq!(Ok(device::Value::Flt(*v)), from_value(&data));
1282        }
1283    }
1284
1285    const STR_TEST_CASES: &[(&str, &[u8])] = &[
1286        ("", &[b'S', 0u8, 0u8, 0u8, 0u8]),
1287        ("ABC", &[b'S', 0u8, 0u8, 0u8, 3u8, b'A', b'B', b'C']),
1288    ];
1289
1290    // Test correct encoding of device::Value::Str values.
1291
1292    #[test]
1293    fn test_string_encoder() {
1294        for (v, rv) in STR_TEST_CASES {
1295            assert_eq!(*rv, to_redis(&device::Value::Str(String::from(*v))));
1296        }
1297    }
1298
1299    // Test correct decoding of device::Value::Str values.
1300
1301    #[test]
1302    fn test_string_decoder() {
1303        // Buffers smaller than 5 bytes are an error.
1304
1305        assert!(from_value(&redis::Value::Data(vec![])).is_err());
1306        assert!(from_value(&redis::Value::Data(vec![b'S'])).is_err());
1307        assert!(from_value(&redis::Value::Data(vec![b'S', 0u8])).is_err());
1308        assert!(from_value(&redis::Value::Data(vec![b'S', 0u8, 0u8])).is_err());
1309        assert!(
1310            from_value(&redis::Value::Data(vec![b'S', 0u8, 0u8, 0u8])).is_err()
1311        );
1312
1313        // Loop through the test cases.
1314
1315        for (v, rv) in STR_TEST_CASES {
1316            let data = redis::Value::Data(rv.to_vec());
1317
1318            assert_eq!(
1319                Ok(device::Value::Str(String::from(*v))),
1320                from_value(&data)
1321            );
1322        }
1323
1324        // Verify proper response (both good and bad) when the buffer
1325        // doesn't match the size of the string.
1326
1327        assert!(from_value(&redis::Value::Data(vec![
1328            b'S', 0u8, 0u8, 0u8, 1u8
1329        ]))
1330        .is_err());
1331        assert!(from_value(&redis::Value::Data(vec![
1332            b'S', 0u8, 0u8, 0u8, 2u8, b'A'
1333        ]))
1334        .is_err());
1335        assert_eq!(
1336            Ok(device::Value::Str(String::from("AB"))),
1337            from_value(&redis::Value::Data(vec![
1338                b'S', 0u8, 0u8, 0u8, 2u8, b'A', b'B', 0, 0
1339            ]))
1340        );
1341    }
1342
1343    #[test]
1344    fn test_pattern_cmd() {
1345        assert_eq!(
1346            &RedisStore::match_pattern_cmd(&None).get_packed_command(),
1347            b"*2\r
1348$4\r\nKEYS\r
1349$6\r\n*#info\r\n"
1350        );
1351        assert_eq!(
1352            &RedisStore::match_pattern_cmd(&Some(String::from("device")))
1353                .get_packed_command(),
1354            b"*2\r
1355$4\r\nKEYS\r
1356$11\r\ndevice#info\r\n"
1357        );
1358        assert_eq!(
1359            &RedisStore::match_pattern_cmd(&Some(String::from("*weather*")))
1360                .get_packed_command(),
1361            b"*2\r
1362$4\r\nKEYS\r
1363$14\r\n*weather*#info\r\n"
1364        );
1365    }
1366
1367    #[test]
1368    fn test_ts_to_id() {
1369        let dur = time::Duration::from_secs(1000);
1370        let ts = time::UNIX_EPOCH + dur;
1371
1372        assert_eq!(ReadingStream::ts_to_id(ts), "1000000-0");
1373
1374        let dur = time::Duration::from_micros(1234567);
1375        let ts = time::UNIX_EPOCH + dur;
1376
1377        assert_eq!(ReadingStream::ts_to_id(ts), "1234-567");
1378    }
1379
1380    #[test]
1381    fn test_read_next_cmd() {
1382        let cmd = ReadingStream::read_next_cmd("device#hist", "$");
1383
1384        assert_eq!(
1385            &cmd.get_packed_command(),
1386            b"*8\r
1387$5\r\nXREAD\r
1388$5\r\nBLOCK\r
1389$4\r\n5000\r
1390$5\r\nCOUNT\r
1391$1\r\n1\r
1392$7\r\nSTREAMS\r
1393$11\r\ndevice#hist\r
1394$1\r\n$\r\n"
1395        );
1396    }
1397
1398    #[test]
1399    fn test_info_type_cmd() {
1400        let cmd = RedisStore::info_type_cmd("device");
1401
1402        assert_eq!(
1403            &cmd.get_packed_command(),
1404            b"*2\r
1405$4\r\nTYPE\r
1406$11\r\ndevice#info\r\n"
1407        );
1408    }
1409
1410    #[test]
1411    fn test_hist_type_cmd() {
1412        let cmd = RedisStore::hist_type_cmd("device");
1413
1414        assert_eq!(
1415            &cmd.get_packed_command(),
1416            b"*2\r
1417$4\r\nTYPE\r
1418$11\r\ndevice#hist\r\n"
1419        );
1420    }
1421
1422    #[test]
1423    fn test_dev_info_cmd() {
1424        let cmd = RedisStore::device_info_cmd("device");
1425
1426        assert_eq!(
1427            &cmd.get_packed_command(),
1428            b"*2\r
1429$7\r\nHGETALL\r
1430$11\r\ndevice#info\r\n"
1431        );
1432    }
1433
1434    #[test]
1435    fn test_last_value_cmd() {
1436        let pipe = RedisStore::last_value_cmd("device");
1437
1438        assert_eq!(
1439            &pipe.get_packed_command(),
1440            b"*6\r
1441$9\r\nXREVRANGE\r
1442$11\r\ndevice#hist\r
1443$1\r\n+\r
1444$1\r\n-\r
1445$5\r\nCOUNT\r
1446$1\r\n1\r\n"
1447        );
1448    }
1449
1450    #[test]
1451    fn test_parsing_last_value() {
1452        const NAME: &str = "device";
1453
1454        assert_eq!(
1455            RedisStore::parse_last_value(NAME, &redis::Value::Nil),
1456            None
1457        );
1458        assert_eq!(
1459            RedisStore::parse_last_value(NAME, &redis::Value::Bulk(vec![])),
1460            None
1461        );
1462
1463        let val = redis::Value::Bulk(vec![redis::Value::Bulk(vec![
1464            redis::Value::Data(b"1000000-0".to_vec()),
1465            redis::Value::Bulk(vec![
1466                redis::Value::Data(b"value".to_vec()),
1467                redis::Value::Data(b"BT".to_vec()),
1468            ]),
1469        ])]);
1470
1471        assert_eq!(
1472            RedisStore::parse_last_value(NAME, &val),
1473            Some(device::Reading {
1474                ts: time::UNIX_EPOCH + time::Duration::from_secs(1000),
1475                value: device::Value::Bool(true)
1476            })
1477        );
1478
1479        let val = redis::Value::Bulk(vec![redis::Value::Bulk(vec![
1480            redis::Value::Data(b"1234-567".to_vec()),
1481            redis::Value::Bulk(vec![
1482                redis::Value::Data(b"value".to_vec()),
1483                redis::Value::Data(b"BF".to_vec()),
1484            ]),
1485        ])]);
1486
1487        assert_eq!(
1488            RedisStore::parse_last_value(NAME, &val),
1489            Some(device::Reading {
1490                ts: time::UNIX_EPOCH + time::Duration::from_micros(1234567),
1491                value: device::Value::Bool(false)
1492            })
1493        );
1494    }
1495
1496    #[test]
1497    fn test_xinfo_cmd() {
1498        assert_eq!(
1499            &RedisStore::xinfo_cmd("junk").get_packed_command(),
1500            b"*3\r
1501$5\r\nXINFO\r
1502$6\r\nSTREAM\r
1503$9\r\njunk#hist\r\n"
1504        );
1505    }
1506
1507    #[test]
1508    fn test_report_value_cmd() {
1509        assert_eq!(
1510            &RedisStore::report_new_value_cmd("key", &(true.into()))
1511                .get_packed_command(),
1512            b"*5\r
1513$4\r\nXADD\r
1514$3\r\nkey\r
1515$1\r\n*\r
1516$5\r\nvalue\r
1517$2\r\nBT\r\n"
1518        );
1519        assert_eq!(
1520            &RedisStore::report_new_value_cmd("key", &(0x00010203i32.into()))
1521                .get_packed_command(),
1522            b"*5\r
1523$4\r\nXADD\r
1524$3\r\nkey\r
1525$1\r\n*\r
1526$5\r\nvalue\r
1527$5\r\nI\x00\x01\x02\x03\r\n"
1528        );
1529        assert_eq!(
1530            &RedisStore::report_new_value_cmd("key", &(0x12345678i32.into()))
1531                .get_packed_command(),
1532            b"*5\r
1533$4\r\nXADD\r
1534$3\r\nkey\r
1535$1\r\n*\r
1536$5\r\nvalue\r
1537$5\r\nI\x12\x34\x56\x78\r\n"
1538        );
1539        assert_eq!(
1540            &RedisStore::report_new_value_cmd("key", &(1.0.into()))
1541                .get_packed_command(),
1542            b"*5\r
1543$4\r\nXADD\r
1544$3\r\nkey\r
1545$1\r\n*\r
1546$5\r\nvalue\r
1547$9\r\nD\x3f\xf0\x00\x00\x00\x00\x00\x00\r\n"
1548        );
1549        assert_eq!(
1550            &RedisStore::report_new_value_cmd(
1551                "key",
1552                &(String::from("hello").into())
1553            )
1554            .get_packed_command(),
1555            b"*5\r
1556$4\r\nXADD\r
1557$3\r\nkey\r
1558$1\r\n*\r
1559$5\r\nvalue\r
1560$10\r\nS\x00\x00\x00\x05hello\r\n"
1561        );
1562
1563        assert_eq!(
1564            &RedisStore::report_bounded_new_value_cmd("key", &(true.into()), 0)
1565                .get_packed_command(),
1566            b"*8\r
1567$4\r\nXADD\r
1568$3\r\nkey\r
1569$6\r\nMAXLEN\r
1570$1\r\n~\r
1571$1\r\n0\r
1572$1\r\n*\r
1573$5\r\nvalue\r
1574$2\r\nBT\r\n"
1575        );
1576        assert_eq!(
1577            &RedisStore::report_bounded_new_value_cmd(
1578                "key",
1579                &(0x00010203i32.into()),
1580                1
1581            )
1582            .get_packed_command(),
1583            b"*8\r
1584$4\r\nXADD\r
1585$3\r\nkey\r
1586$6\r\nMAXLEN\r
1587$1\r\n~\r
1588$1\r\n1\r
1589$1\r\n*\r
1590$5\r\nvalue\r
1591$5\r\nI\x00\x01\x02\x03\r\n"
1592        );
1593        assert_eq!(
1594            &RedisStore::report_bounded_new_value_cmd(
1595                "key",
1596                &(0x12345678i32.into()),
1597                2
1598            )
1599            .get_packed_command(),
1600            b"*8\r
1601$4\r\nXADD\r
1602$3\r\nkey\r
1603$6\r\nMAXLEN\r
1604$1\r\n~\r
1605$1\r\n2\r
1606$1\r\n*\r
1607$5\r\nvalue\r
1608$5\r\nI\x12\x34\x56\x78\r\n"
1609        );
1610        assert_eq!(
1611            &RedisStore::report_bounded_new_value_cmd("key", &(1.0.into()), 3)
1612                .get_packed_command(),
1613            b"*8\r
1614$4\r\nXADD\r
1615$3\r\nkey\r
1616$6\r\nMAXLEN\r
1617$1\r\n~\r
1618$1\r\n3\r
1619$1\r\n*\r
1620$5\r\nvalue\r
1621$9\r\nD\x3f\xf0\x00\x00\x00\x00\x00\x00\r\n"
1622        );
1623        assert_eq!(
1624            &RedisStore::report_bounded_new_value_cmd(
1625                "key",
1626                &(String::from("hello").into()),
1627                4
1628            )
1629            .get_packed_command(),
1630            b"*8\r
1631$4\r\nXADD\r
1632$3\r\nkey\r
1633$6\r\nMAXLEN\r
1634$1\r\n~\r
1635$1\r\n4\r
1636$1\r\n*\r
1637$5\r\nvalue\r
1638$10\r\nS\x00\x00\x00\x05hello\r\n"
1639        );
1640    }
1641
1642    #[test]
1643    fn test_init_dev() {
1644        assert_eq!(
1645            String::from_utf8_lossy(
1646                &RedisStore::init_device_cmd("device", "mem", &None)
1647                    .get_packed_pipeline()
1648            ),
1649            "*1\r
1650$5\r\nMULTI\r
1651*2\r
1652$3\r\nDEL\r
1653$11\r\ndevice#hist\r
1654*5\r
1655$4\r\nXADD\r
1656$11\r\ndevice#hist\r
1657$1\r\n1\r
1658$5\r\nvalue\r
1659$1\r\n\x01\r
1660*3\r
1661$4\r\nXDEL\r
1662$11\r\ndevice#hist\r
1663$1\r\n1\r
1664*2\r
1665$3\r\nDEL\r
1666$11\r\ndevice#info\r
1667*4\r
1668$5\r\nHMSET\r
1669$11\r\ndevice#info\r
1670$6\r\ndriver\r
1671$3\r\nmem\r
1672*1\r
1673$4\r\nEXEC\r\n"
1674        );
1675        assert_eq!(
1676            String::from_utf8_lossy(
1677                &RedisStore::init_device_cmd(
1678                    "device",
1679                    "pump",
1680                    &Some(String::from("gpm"))
1681                )
1682                .get_packed_pipeline()
1683            ),
1684            "*1\r
1685$5\r\nMULTI\r
1686*2\r
1687$3\r\nDEL\r
1688$11\r\ndevice#hist\r
1689*5\r
1690$4\r\nXADD\r
1691$11\r\ndevice#hist\r
1692$1\r\n1\r
1693$5\r\nvalue\r
1694$1\r\n\x01\r
1695*3\r
1696$4\r\nXDEL\r
1697$11\r\ndevice#hist\r
1698$1\r\n1\r
1699*2\r
1700$3\r\nDEL\r
1701$11\r\ndevice#info\r
1702*6\r
1703$5\r\nHMSET\r
1704$11\r\ndevice#info\r
1705$6\r\ndriver\r
1706$4\r\npump\r
1707$5\r\nunits\r
1708$3\r\ngpm\r
1709*1\r
1710$4\r\nEXEC\r\n"
1711        );
1712    }
1713
1714    #[test]
1715    fn test_streamid_to_reading() {
1716        // Look for various failure modes.
1717
1718        assert!(RedisStore::stream_id_to_reading(&StreamId {
1719            id: "1000-0".into(),
1720            map: HashMap::from([])
1721        })
1722        .is_err());
1723        assert!(RedisStore::stream_id_to_reading(&StreamId {
1724            id: "1000-0".into(),
1725            map: HashMap::from([(
1726                "junk".into(),
1727                redis::Value::Data(b"10".to_vec())
1728            )])
1729        })
1730        .is_err());
1731        assert!(RedisStore::stream_id_to_reading(&StreamId {
1732            id: "1000-0".into(),
1733            map: HashMap::from([(
1734                "value".into(),
1735                redis::Value::Data(b"10".to_vec())
1736            )])
1737        })
1738        .is_err());
1739
1740        // Look for valid conversions.
1741
1742        assert_eq!(
1743            RedisStore::stream_id_to_reading(&StreamId {
1744                id: "1000-0".into(),
1745                map: HashMap::from([(
1746                    "value".into(),
1747                    redis::Value::Data(to_redis(&device::Value::Bool(true)))
1748                )])
1749            }),
1750            Ok(device::Reading {
1751                ts: time::UNIX_EPOCH + time::Duration::from_millis(1000),
1752                value: device::Value::Bool(true)
1753            })
1754        );
1755        assert_eq!(
1756            RedisStore::stream_id_to_reading(&StreamId {
1757                id: "1500-0".into(),
1758                map: HashMap::from([(
1759                    "value".into(),
1760                    redis::Value::Data(to_redis(&device::Value::Int(123)))
1761                )])
1762            }),
1763            Ok(device::Reading {
1764                ts: time::UNIX_EPOCH + time::Duration::from_millis(1500),
1765                value: device::Value::Int(123)
1766            })
1767        );
1768        assert_eq!(
1769            RedisStore::stream_id_to_reading(&StreamId {
1770                id: "2500-0".into(),
1771                map: HashMap::from([(
1772                    "value".into(),
1773                    redis::Value::Data(to_redis(&device::Value::Int(-321)))
1774                )])
1775            }),
1776            Ok(device::Reading {
1777                ts: time::UNIX_EPOCH + time::Duration::from_millis(2500),
1778                value: device::Value::Int(-321)
1779            })
1780        );
1781        assert_eq!(
1782            RedisStore::stream_id_to_reading(&StreamId {
1783                id: "2500-0".into(),
1784                map: HashMap::from([(
1785                    "value".into(),
1786                    redis::Value::Data(to_redis(&device::Value::Flt(1.0)))
1787                )])
1788            }),
1789            Ok(device::Reading {
1790                ts: time::UNIX_EPOCH + time::Duration::from_millis(2500),
1791                value: device::Value::Flt(1.0)
1792            })
1793        );
1794        assert_eq!(
1795            RedisStore::stream_id_to_reading(&StreamId {
1796                id: "2500-0".into(),
1797                map: HashMap::from([(
1798                    "value".into(),
1799                    redis::Value::Data(to_redis(&device::Value::Flt(-1.0)))
1800                )])
1801            }),
1802            Ok(device::Reading {
1803                ts: time::UNIX_EPOCH + time::Duration::from_millis(2500),
1804                value: device::Value::Flt(-1.0)
1805            })
1806        );
1807        assert_eq!(
1808            RedisStore::stream_id_to_reading(&StreamId {
1809                id: "2500-0".into(),
1810                map: HashMap::from([(
1811                    "value".into(),
1812                    redis::Value::Data(to_redis(&device::Value::Flt(1.0e100)))
1813                )])
1814            }),
1815            Ok(device::Reading {
1816                ts: time::UNIX_EPOCH + time::Duration::from_millis(2500),
1817                value: device::Value::Flt(1.0e100)
1818            })
1819        );
1820        assert_eq!(
1821            RedisStore::stream_id_to_reading(&StreamId {
1822                id: "2500-0".into(),
1823                map: HashMap::from([(
1824                    "value".into(),
1825                    redis::Value::Data(to_redis(&device::Value::Flt(1.0e-100)))
1826                )])
1827            }),
1828            Ok(device::Reading {
1829                ts: time::UNIX_EPOCH + time::Duration::from_millis(2500),
1830                value: device::Value::Flt(1.0e-100)
1831            })
1832        );
1833        assert_eq!(
1834            RedisStore::stream_id_to_reading(&StreamId {
1835                id: "2500-0".into(),
1836                map: HashMap::from([(
1837                    "value".into(),
1838                    redis::Value::Data(to_redis(&device::Value::Str(
1839                        "Hello".into()
1840                    )))
1841                )])
1842            }),
1843            Ok(device::Reading {
1844                ts: time::UNIX_EPOCH + time::Duration::from_millis(2500),
1845                value: device::Value::Str("Hello".into())
1846            })
1847        );
1848    }
1849
1850    #[test]
1851    fn test_hash_to_info() {
1852        let device = "path:junk".parse::<device::Name>().unwrap();
1853        let mut st = HashMap::new();
1854        let mut fm = HashMap::new();
1855
1856        assert_eq!(
1857            RedisStore::hash_to_info(
1858                &st,
1859                &"path:junk".parse::<device::Name>().unwrap(),
1860                &fm
1861            ),
1862            Err(Error::NotFound)
1863        );
1864
1865        let _ = fm.insert("units".to_string(), "gpm".to_string());
1866
1867        assert_eq!(
1868            RedisStore::hash_to_info(&st, &device, &fm),
1869            Ok(client::DevInfoReply {
1870                name: device.clone(),
1871                units: Some(String::from("gpm")),
1872                settable: false,
1873                driver: String::from("*missing*"),
1874                total_points: 0,
1875                first_point: None,
1876                last_point: None,
1877            })
1878        );
1879
1880        let _ = fm.insert("driver".to_string(), "sump".to_string().into());
1881
1882        assert_eq!(
1883            RedisStore::hash_to_info(&st, &device, &fm),
1884            Ok(client::DevInfoReply {
1885                name: device.clone(),
1886                units: Some(String::from("gpm")),
1887                settable: false,
1888                driver: String::from("sump"),
1889                total_points: 0,
1890                first_point: None,
1891                last_point: None,
1892            })
1893        );
1894
1895        let (tx, _) = mpsc::channel(10);
1896        let _ = st.insert(device.clone(), tx);
1897
1898        assert_eq!(
1899            RedisStore::hash_to_info(&st, &device, &fm),
1900            Ok(client::DevInfoReply {
1901                name: device.clone(),
1902                units: Some(String::from("gpm")),
1903                settable: true,
1904                driver: String::from("sump"),
1905                total_points: 0,
1906                first_point: None,
1907                last_point: None,
1908            })
1909        );
1910    }
1911}