1use async_trait::async_trait;
2use chrono::*;
3use drmem_api::{
4 client, device,
5 driver::{ReportReading, RxDeviceSetting, TxDeviceSetting},
6 Error, Result, Store,
7};
8use futures::task::{Context, Poll};
9use futures::Future;
10use redis::{
11 aio,
12 streams::{StreamId, StreamInfoStreamReply},
13};
14use std::collections::HashMap;
15use std::convert::TryInto;
16use std::pin::Pin;
17use std::time;
18use tokio::sync::{mpsc, oneshot};
19use tokio_stream::{self, Stream, StreamExt};
20use tracing::{debug, error, info, info_span, warn};
21use tracing_futures::Instrument;
22
23type AioMplexConnection = aio::MultiplexedConnection;
24type AioConnection = aio::Connection;
25type SettingTable = HashMap<device::Name, TxDeviceSetting>;
26
27pub mod config;
28
29fn xlat_err(e: redis::RedisError) -> Error {
43 match e.kind() {
44 redis::ErrorKind::ResponseError
45 | redis::ErrorKind::ClusterDown
46 | redis::ErrorKind::CrossSlot
47 | redis::ErrorKind::MasterDown
48 | redis::ErrorKind::IoError => Error::DbCommunicationError,
49
50 redis::ErrorKind::AuthenticationFailed
51 | redis::ErrorKind::InvalidClientConfig => Error::AuthenticationError,
52
53 redis::ErrorKind::TypeError => Error::TypeError,
54
55 redis::ErrorKind::ExecAbortError
56 | redis::ErrorKind::BusyLoadingError
57 | redis::ErrorKind::TryAgain
58 | redis::ErrorKind::ClientError
59 | redis::ErrorKind::ExtensionError
60 | redis::ErrorKind::ReadOnly => Error::OperationError,
61
62 redis::ErrorKind::NoScriptError
63 | redis::ErrorKind::Moved
64 | redis::ErrorKind::Ask => Error::NotFound,
65
66 _ => Error::UnknownError,
67 }
68}
69
70fn to_redis(val: &device::Value) -> Vec<u8> {
75 match val {
76 device::Value::Bool(false) => vec![b'B', b'F'],
77 device::Value::Bool(true) => vec![b'B', b'T'],
78
79 device::Value::Int(v) => {
81 let mut buf: Vec<u8> = Vec::with_capacity(9);
82
83 buf.push(b'I');
84 buf.extend_from_slice(&v.to_be_bytes());
85 buf
86 }
87
88 device::Value::Flt(v) => {
91 let mut buf: Vec<u8> = Vec::with_capacity(9);
92
93 buf.push(b'D');
94 buf.extend_from_slice(&v.to_be_bytes());
95 buf
96 }
97
98 device::Value::Str(s) => {
101 let s = s.as_bytes();
102 let mut buf: Vec<u8> = Vec::with_capacity(5 + s.len());
103
104 buf.push(b'S');
105 buf.extend_from_slice(&(s.len() as u32).to_be_bytes());
106 buf.extend_from_slice(s);
107 buf
108 }
109 }
110}
111
112fn decode_integer(buf: &[u8]) -> Result<device::Value> {
115 if buf.len() >= 4 {
116 let buf = buf[..4].try_into().unwrap();
117
118 return Ok(device::Value::Int(i32::from_be_bytes(buf)));
119 }
120 Err(Error::TypeError)
121}
122
123fn decode_float(buf: &[u8]) -> Result<device::Value> {
126 if buf.len() >= 8 {
127 let buf = buf[..8].try_into().unwrap();
128
129 return Ok(device::Value::Flt(f64::from_be_bytes(buf)));
130 }
131 Err(Error::TypeError)
132}
133
134fn decode_string(buf: &[u8]) -> Result<device::Value> {
137 if buf.len() >= 4 {
138 let len_buf = buf[..4].try_into().unwrap();
139 let len = u32::from_be_bytes(len_buf) as usize;
140
141 if buf.len() >= (4 + len) {
142 let str_vec = buf[4..4 + len].to_vec();
143
144 return match String::from_utf8(str_vec) {
145 Ok(s) => Ok(device::Value::Str(s)),
146 Err(_) => Err(Error::TypeError),
147 };
148 }
149 }
150 Err(Error::TypeError)
151}
152
153fn from_value(v: &redis::Value) -> Result<device::Value> {
158 if let redis::Value::Data(buf) = v {
159 if !buf.is_empty() {
163 match buf[0] as char {
164 'B' if buf.len() > 1 => match buf[1] {
165 b'F' => Ok(device::Value::Bool(false)),
166 b'T' => Ok(device::Value::Bool(true)),
167 _ => Err(Error::TypeError),
168 },
169 'I' => decode_integer(&buf[1..]),
170 'D' => decode_float(&buf[1..]),
171 'S' => decode_string(&buf[1..]),
172
173 _ => Err(Error::TypeError),
176 }
177 } else {
178 Err(Error::TypeError)
179 }
180 } else {
181 Err(Error::TypeError)
182 }
183}
184
185fn st_minus_1us(st: time::SystemTime) -> time::SystemTime {
190 if let Some(val) = st.checked_sub(time::Duration::from_micros(1)) {
191 val
192 } else {
193 st
194 }
195}
196
197fn msus_to_st(ms: u64, us: u64) -> Result<time::SystemTime> {
201 let ts = time::UNIX_EPOCH.checked_add(time::Duration::from_millis(ms));
202
203 if let Some(ts) = ts {
204 let ts = ts.checked_add(time::Duration::from_micros(std::cmp::min(
205 us, 999u64,
206 )));
207
208 if let Some(ts) = ts {
209 return Ok(ts);
210 }
211 }
212 Err(Error::InvArgument(String::from("bad timestamp value")))
213}
214
215fn id_to_ts(id: &str) -> Result<time::SystemTime> {
221 let fields: Vec<&str> = id.split('-').collect();
222
223 if let &[a, b] = &fields[..] {
224 if let (Ok(ms), Ok(us)) = (a.parse::<u64>(), b.parse::<u64>()) {
236 return msus_to_st(ms, us);
237 }
238 }
239
240 Err(Error::InvArgument(String::from("unknown timestamp format")))
241}
242
243type ReadFuture = Pin<
244 Box<
245 dyn Future<Output = (AioConnection, redis::RedisResult<redis::Value>)>
246 + Send,
247 >,
248>;
249
250type ReadingResult = ((String, ((String, HashMap<String, redis::Value>),)),);
251
252struct ReadingStream {
253 key: String,
254 id: String,
255 fut: ReadFuture,
256}
257
258impl ReadingStream {
259 const TIMEOUT: usize = 5_000;
260
261 fn ts_to_id(ts: time::SystemTime) -> String {
271 let us = ts.duration_since(time::UNIX_EPOCH).unwrap().as_micros();
272
273 format!("{}-{}", us / 1000, us % 1000)
274 }
275
276 fn read_next_cmd(key: &str, id: &str) -> redis::Cmd {
277 let opts = redis::streams::StreamReadOptions::default()
278 .block(Self::TIMEOUT)
279 .count(1);
280
281 redis::Cmd::xread_options(&[key], &[id], &opts)
282 }
283
284 fn mk_fut(mut con: AioConnection, key: String, id: String) -> ReadFuture {
291 Box::pin(async move {
292 let result =
293 Self::read_next_cmd(&key, &id).query_async(&mut con).await;
294
295 (con, result)
296 })
297 }
298
299 pub fn new(
300 con: AioConnection,
301 key: &str,
302 id: Option<time::SystemTime>,
303 ) -> Self {
304 let key = key.to_string();
305 let id = id.map(Self::ts_to_id).unwrap_or_else(|| String::from("$"));
306 let fut = Self::mk_fut(con, key.clone(), id.clone());
307
308 ReadingStream { key, id, fut }
309 }
310
311 fn parse_reading(data: &redis::Value) -> Option<(String, device::Reading)> {
312 let result: redis::RedisResult<ReadingResult> =
313 redis::from_redis_value(data);
314
315 match result {
316 Ok(((_, ((ref new_id, ref rmap),)),)) => {
317 let reading = device::Reading {
318 ts: id_to_ts(new_id).ok()?,
319 value: from_value(rmap.get("value")?).ok()?,
320 };
321
322 Some((new_id.to_string(), reading))
323 }
324 Err(e) => {
325 error!("couldn't parse reading: {:?}", &e);
326 None
327 }
328 }
329 }
330}
331
332impl Stream for ReadingStream {
341 type Item = device::Reading;
342
343 fn poll_next(
349 mut self: Pin<&mut Self>,
350 cx: &mut Context<'_>,
351 ) -> Poll<Option<Self::Item>> {
352 loop {
353 if let Poll::Ready(result) = Pin::new(&mut self.fut).poll(cx) {
357 let (con, result) = match result {
361 (con, Ok(v)) => (con, v),
362 (_, Err(e)) => {
363 warn!("read error -- {}", &e);
364 break Poll::Ready(None);
365 }
366 };
367
368 if result != redis::Value::Nil {
374 if let Some((id, reading)) = Self::parse_reading(&result) {
375 self.id = id;
379 self.fut = Self::mk_fut(
380 con,
381 self.key.clone(),
382 self.id.clone(),
383 );
384
385 break Poll::Ready(Some(reading));
388 } else {
389 break Poll::Ready(None);
390 }
391 } else {
392 self.fut =
396 Self::mk_fut(con, self.key.clone(), self.id.clone());
397 }
398 } else {
399 break Poll::Pending;
400 }
401 }
402 }
403}
404
405pub struct RedisStore {
407 db_con: AioMplexConnection,
409 table: SettingTable,
410 cfg: config::Config,
411}
412
413impl RedisStore {
414 fn make_client(
415 cfg: &config::Config,
416 name: &Option<String>,
417 pword: &Option<String>,
418 ) -> Result<redis::Client> {
419 use redis::{ConnectionAddr, ConnectionInfo, RedisConnectionInfo};
420
421 let addr = cfg.get_addr();
422
423 let ci = ConnectionInfo {
424 addr: ConnectionAddr::Tcp(addr.ip().to_string(), addr.port()),
425 redis: RedisConnectionInfo {
426 db: cfg.get_dbn(),
427 username: name.clone(),
428 password: pword.clone(),
429 },
430 };
431
432 redis::Client::open(ci).map_err(xlat_err)
433 }
434
435 async fn make_connection(
438 cfg: &config::Config,
439 name: Option<String>,
440 pword: Option<String>,
441 ) -> Result<AioConnection> {
442 let client = Self::make_client(cfg, &name, &pword)?;
443
444 debug!("creating new redis connection");
445
446 client.get_tokio_connection().await.map_err(|e| {
447 error!("redis error: {}", &e);
448 xlat_err(e)
449 })
450 }
451
452 async fn make_mplex_connection(
455 cfg: &config::Config,
456 name: Option<String>,
457 pword: Option<String>,
458 ) -> Result<AioMplexConnection> {
459 let client = Self::make_client(cfg, &name, &pword)?;
460
461 debug!("creating new, shared redis connection");
462
463 client
464 .get_multiplexed_tokio_connection()
465 .await
466 .map_err(|e| {
467 error!("redis error: {}", &e);
468 xlat_err(e)
469 })
470 }
471
472 pub async fn new(
478 cfg: &config::Config,
479 name: Option<String>,
480 pword: Option<String>,
481 ) -> Result<Self> {
482 let db_con = Self::make_mplex_connection(cfg, name, pword).await?;
483
484 Ok(RedisStore {
485 db_con,
486 table: HashMap::new(),
487 cfg: cfg.clone(),
488 })
489 }
490
491 fn info_key(name: &str) -> String {
494 format!("{}#info", name)
495 }
496
497 fn hist_key(name: &str) -> String {
501 format!("{}#hist", name)
502 }
503
504 fn init_device_cmd(
505 name: &str,
506 driver: &str,
507 units: &Option<String>,
508 ) -> redis::Pipeline {
509 let hist_key = Self::hist_key(name);
510 let info_key = Self::info_key(name);
511
512 let mut fields: Vec<(&str, String)> =
515 vec![("driver", String::from(driver))];
516
517 if let Some(units) = units {
520 fields.push(("units", units.clone()))
521 };
522
523 redis::pipe()
527 .atomic()
528 .del(&hist_key)
529 .ignore()
530 .xadd(&hist_key, "1", &[("value", &[1u8])])
531 .ignore()
532 .xdel(&hist_key, &["1"])
533 .ignore()
534 .del(&info_key)
535 .ignore()
536 .hset_multiple(&info_key, &fields)
537 .ignore()
538 .clone()
539 }
540
541 fn last_value_cmd(name: &str) -> redis::Cmd {
545 let name = Self::hist_key(name);
546
547 redis::Cmd::xrevrange_count(name, "+", "-", 1usize)
548 }
549
550 fn match_pattern_cmd(pattern: &Option<String>) -> redis::Cmd {
551 let pattern = pattern
555 .as_ref()
556 .map(|v| Self::info_key(v))
557 .unwrap_or_else(|| String::from("*#info"));
558
559 redis::Cmd::keys(pattern)
562 }
563
564 fn info_type_cmd(name: &str) -> redis::Cmd {
568 let key = Self::info_key(name);
569
570 redis::cmd("TYPE").arg(&key).clone()
571 }
572
573 fn hist_type_cmd(name: &str) -> redis::Cmd {
577 let key = Self::hist_key(name);
578
579 redis::cmd("TYPE").arg(&key).clone()
580 }
581
582 fn device_info_cmd(name: &str) -> redis::Cmd {
586 let info_key = Self::info_key(name);
587
588 redis::Cmd::hgetall(info_key)
589 }
590
591 fn xinfo_cmd(name: &str) -> redis::Cmd {
595 let hist_key = Self::hist_key(name);
596
597 redis::Cmd::xinfo_stream(hist_key)
598 }
599
600 fn report_new_value_cmd(key: &str, val: &device::Value) -> redis::Cmd {
604 let data = [("value", to_redis(val))];
605
606 redis::Cmd::xadd(key, "*", &data)
607 }
608
609 fn report_bounded_new_value_cmd(
610 key: &str,
611 val: &device::Value,
612 mh: usize,
613 ) -> redis::Cmd {
614 let opts = redis::streams::StreamMaxlen::Approx(mh);
615 let data = [("value", to_redis(val))];
616
617 redis::Cmd::xadd_maxlen(key, opts, "*", &data)
618 }
619
620 fn hash_to_info(
621 st: &SettingTable,
622 name: &device::Name,
623 hmap: &HashMap<String, String>,
624 ) -> Result<client::DevInfoReply> {
625 if !hmap.is_empty() {
630 let units = hmap.get("units").map(String::clone);
634
635 let driver = hmap
639 .get("driver")
640 .map(String::clone)
641 .unwrap_or_else(|| String::from("*missing*"));
642
643 Ok(client::DevInfoReply {
644 name: name.clone(),
645 units,
646 settable: st.contains_key(name),
647 driver,
648 total_points: 0,
649 first_point: None,
650 last_point: None,
651 })
652 } else {
653 Err(Error::NotFound)
654 }
655 }
656
657 fn stream_id_to_reading(sid: &StreamId) -> Result<device::Reading> {
661 if let Some(val) = sid.map.get("value") {
662 Ok(device::Reading {
663 ts: id_to_ts(sid.id.as_str())?,
664 value: from_value(val)?,
665 })
666 } else {
667 Err(Error::TypeError)
668 }
669 }
670
671 async fn lookup_device(
675 &mut self,
676 name: device::Name,
677 ) -> Result<client::DevInfoReply> {
678 let info = Self::device_info_cmd(name.to_string().as_str())
679 .query_async::<AioMplexConnection, HashMap<String, String>>(
680 &mut self.db_con,
681 )
682 .await
683 .map_err(xlat_err)
684 .and_then(|v| Self::hash_to_info(&self.table, &name, &v))?;
685
686 Self::xinfo_cmd(name.to_string().as_str())
687 .query_async::<AioMplexConnection, StreamInfoStreamReply>(
688 &mut self.db_con,
689 )
690 .await
691 .map_err(xlat_err)
692 .and_then(move |v| {
693 let info = if v.length > 0 {
694 client::DevInfoReply {
695 total_points: v.length as u32,
696 first_point: Some(Self::stream_id_to_reading(
697 &v.first_entry,
698 )?),
699 last_point: Some(Self::stream_id_to_reading(
700 &v.last_entry,
701 )?),
702 ..info
703 }
704 } else {
705 client::DevInfoReply {
706 total_points: 0,
707 first_point: None,
708 last_point: None,
709 ..info
710 }
711 };
712
713 Ok(info)
714 })
715 }
716
717 fn parse_last_value(
718 name: &str,
719 reply: &redis::Value,
720 ) -> Option<device::Reading> {
721 if redis::Value::Bulk(vec![]) == *reply {
722 warn!("no previous value for {}", name);
723 return None;
724 }
725
726 let data: redis::RedisResult<((
727 String,
728 HashMap<String, redis::Value>,
729 ),)> = redis::from_redis_value(reply);
730
731 match data {
732 Ok(((key, m),)) => {
733 if let Ok(ts) = id_to_ts(&key) {
734 if let Some(val) = m.get("value") {
735 if let Ok(val) = from_value(val) {
736 return Some(device::Reading { ts, value: val });
737 } else {
738 error!(
739 "last value for {} is in an unknown format",
740 name
741 );
742 }
743 } else {
744 error!(
745 "last value for {} doesn't have a \"value\" field",
746 name
747 );
748 }
749 } else {
750 error!("couldn't parse timestamp, {}, for {}", key, name)
751 }
752 }
753 Err(e) => {
754 error!(
755 "redis error ({}) when converting last value of {}",
756 e, name
757 )
758 }
759 }
760 None
761 }
762
763 async fn last_value(&mut self, name: &str) -> Option<device::Reading> {
767 let result: redis::RedisResult<redis::Value> =
768 Self::last_value_cmd(name)
769 .query_async(&mut self.db_con)
770 .await;
771
772 match result {
773 Ok(reply) => Self::parse_last_value(name, &reply),
774 Err(e) => {
775 error!(
776 "redis error ({}) when getting last value of {}",
777 e, name
778 );
779 None
780 }
781 }
782 }
783
784 async fn validate_device(&mut self, name: &str) -> Result<()> {
788 {
792 let cmd = Self::info_type_cmd(name);
793 let result: redis::RedisResult<String> =
794 cmd.query_async(&mut self.db_con).await;
795
796 match result {
797 Ok(data_type) if data_type.as_str() == "hash" => (),
798 Ok(_) => {
799 error!("{} info is of the wrong key type", name);
800 return Err(Error::TypeError);
801 }
802 Err(_) => {
803 warn!("{} info doesn't exist", name);
804 return Err(Error::NotFound);
805 }
806 }
807 }
808
809 {
813 let cmd = Self::hist_type_cmd(name);
814 let result: redis::RedisResult<String> =
815 cmd.query_async(&mut self.db_con).await;
816
817 match result {
818 Ok(data_type) if data_type.as_str() == "stream" => Ok(()),
819 Ok(_) => {
820 error!("{} history is of the wrong key type", name);
821 Err(Error::TypeError)
822 }
823 Err(_) => {
824 warn!("{} history doesn't exist", name);
825 Err(Error::NotFound)
826 }
827 }
828 }
829 }
830
831 async fn init_device(
839 &mut self,
840 name: &str,
841 driver: &str,
842 units: &Option<String>,
843 ) -> Result<()> {
844 debug!("initializing {}", name);
845 Self::init_device_cmd(name, driver, units)
846 .query_async(&mut self.db_con)
847 .await
848 .map_err(xlat_err)
849 }
850
851 fn mk_report_func(
855 &self,
856 name: &str,
857 max_history: &Option<usize>,
858 ) -> ReportReading<device::Value> {
859 let db_con = self.db_con.clone();
860 let name = String::from(name);
861
862 if let Some(mh) = *max_history {
863 Box::new(move |v| {
864 let mut db_con = db_con.clone();
865 let hist_key = Self::hist_key(&name);
866 let name = name.clone();
867
868 Box::pin(async move {
869 if let Err(e) =
870 Self::report_bounded_new_value_cmd(&hist_key, &v, mh)
871 .query_async::<AioMplexConnection, ()>(&mut db_con)
872 .await
873 {
874 warn!("couldn't save {} data to redis ... {}", &name, e)
875 }
876 })
877 })
878 } else {
879 Box::new(move |v| {
880 let mut db_con = db_con.clone();
881 let hist_key = Self::hist_key(&name);
882 let name = name.clone();
883
884 Box::pin(async move {
885 if let Err(e) = Self::report_new_value_cmd(&hist_key, &v)
886 .query_async::<AioMplexConnection, ()>(&mut db_con)
887 .await
888 {
889 warn!("couldn't save {} data to redis ... {}", &name, e)
890 }
891 })
892 })
893 }
894 }
895}
896
897#[async_trait]
898impl Store for RedisStore {
899 async fn register_read_only_device(
902 &mut self,
903 driver_name: &str,
904 name: &device::Name,
905 units: &Option<String>,
906 max_history: &Option<usize>,
907 ) -> Result<(ReportReading<device::Value>, Option<device::Value>)> {
908 let name = name.to_string();
909
910 debug!("registering '{}' as read-only", &name);
911
912 if self.validate_device(&name).await.is_err() {
913 self.init_device(&name, driver_name, units).await?;
914
915 info!("'{}' has been successfully created", &name);
916 }
917 Ok((
918 self.mk_report_func(&name, max_history),
919 self.last_value(&name).await.map(|v| v.value),
920 ))
921 }
922
923 async fn register_read_write_device(
924 &mut self,
925 driver_name: &str,
926 name: &device::Name,
927 units: &Option<String>,
928 max_history: &Option<usize>,
929 ) -> Result<(
930 ReportReading<device::Value>,
931 RxDeviceSetting,
932 Option<device::Value>,
933 )> {
934 let sname = name.to_string();
935
936 debug!("registering '{}' as read-write", &sname);
937
938 if self.validate_device(&sname).await.is_err() {
939 self.init_device(&sname, driver_name, units).await?;
940
941 info!("'{}' has been successfully created", &sname);
942 }
943
944 let (tx, rx) = mpsc::channel(20);
945
946 if self.table.insert(name.clone(), tx).is_some() {
947 warn!("{} already had a setting channel", &name);
948 }
949
950 Ok((
951 self.mk_report_func(&sname, max_history),
952 rx,
953 self.last_value(&sname).await.map(|v| v.value),
954 ))
955 }
956
957 async fn get_device_info(
962 &mut self,
963 pattern: &Option<String>,
964 ) -> Result<Vec<client::DevInfoReply>> {
965 let result: Vec<String> = Self::match_pattern_cmd(pattern)
969 .query_async(&mut self.db_con)
970 .await
971 .map_err(xlat_err)?;
972
973 let mut devices = vec![];
976
977 for key in result {
982 if let Ok(name) =
985 key.trim_end_matches("#info").parse::<device::Name>()
986 {
987 let dev_info = self.lookup_device(name).await?;
988
989 devices.push(dev_info)
990 }
991 }
992 Ok(devices)
993 }
994
995 async fn set_device(
999 &self,
1000 name: device::Name,
1001 value: device::Value,
1002 ) -> Result<device::Value> {
1003 if let Some(tx) = self.table.get(&name) {
1004 let (tx_rpy, rx_rpy) = oneshot::channel();
1005
1006 if let Ok(()) = tx.send((value, tx_rpy)).await {
1011 if let Ok(reply) = rx_rpy.await {
1012 return reply;
1013 }
1014 }
1015
1016 Err(Error::MissingPeer(
1019 "cannot communicate with driver".to_string(),
1020 ))
1021 } else {
1022 Err(Error::NotFound)
1023 }
1024 }
1025
1026 async fn get_setting_chan(
1027 &self,
1028 name: device::Name,
1029 _own: bool,
1030 ) -> Result<TxDeviceSetting> {
1031 if let Some(tx) = self.table.get(&name) {
1032 Ok(tx.clone())
1033 } else {
1034 Err(Error::NotFound)
1035 }
1036 }
1037
1038 async fn monitor_device(
1039 &mut self,
1040 name: device::Name,
1041 start: Option<DateTime<Utc>>,
1042 end: Option<DateTime<Utc>>,
1043 ) -> Result<device::DataStream<device::Reading>> {
1044 match Self::make_connection(&self.cfg, None, None).await {
1045 Ok(con) => {
1046 let name = name.to_string();
1047 let key = RedisStore::hist_key(&name);
1048
1049 match (start.map(|v| v.into()), end.map(|v| v.into())) {
1050 (None, end) => {
1055 let ts = self
1056 .last_value(&name)
1057 .await
1058 .map(|tmp| st_minus_1us(tmp.ts));
1059
1060 if let Some(end) = end {
1065 let date_test =
1066 move |v: &device::Reading| v.ts <= end;
1067
1068 Ok(Box::pin(
1069 ReadingStream::new(con, &key, ts)
1070 .take_while(date_test),
1071 )
1072 as device::DataStream<device::Reading>)
1073 } else {
1074 Ok(Box::pin(ReadingStream::new(con, &key, ts))
1075 as device::DataStream<device::Reading>)
1076 }
1077 }
1078
1079 (Some(start), None) => Ok(Box::pin(ReadingStream::new(
1082 con,
1083 &key,
1084 Some(st_minus_1us(start)),
1085 ))
1086 as device::DataStream<device::Reading>),
1087
1088 (Some(start_tmp), Some(end_tmp)) => {
1091 let start = std::cmp::min(start_tmp, end_tmp);
1092 let end = std::cmp::max(start_tmp, end_tmp);
1093 let date_test = move |v: &device::Reading| v.ts <= end;
1094
1095 Ok(Box::pin(
1096 ReadingStream::new(
1097 con,
1098 &key,
1099 Some(st_minus_1us(start)),
1100 )
1101 .take_while(date_test),
1102 )
1103 as device::DataStream<device::Reading>)
1104 }
1105 }
1106 }
1107 Err(e) => {
1108 error!("couldn't make a connection : {}", e);
1109
1110 Ok(Box::pin(tokio_stream::empty())
1111 as device::DataStream<device::Reading>)
1112 }
1113 }
1114 }
1115}
1116
1117pub async fn open(cfg: &config::Config) -> Result<impl Store> {
1118 RedisStore::new(cfg, None, None)
1119 .instrument(
1120 info_span!("redis-db", addr=?cfg.get_addr(), db=cfg.get_dbn()),
1121 )
1122 .await
1123}
1124
1125#[cfg(test)]
1141mod tests {
1142 use super::*;
1143 use drmem_api::device;
1144
1145 #[test]
1149 fn test_reject_invalid_forms() {
1150 if let Ok(v) = from_value(&redis::Value::Int(0)) {
1151 panic!("Value::Int incorrectly translated to {:?}", v);
1152 }
1153 if let Ok(v) = from_value(&redis::Value::Bulk(vec![])) {
1154 panic!("Value::Bulk incorrectly translated to {:?}", v);
1155 }
1156 if let Ok(v) = from_value(&redis::Value::Status(String::from(""))) {
1157 panic!("Value::Status incorrectly translated to {:?}", v);
1158 }
1159 if let Ok(v) = from_value(&redis::Value::Okay) {
1160 panic!("Value::Okay incorrectly translated to {:?}", v);
1161 }
1162 }
1163
1164 #[test]
1167 fn test_bool_decoder() {
1168 assert_eq!(
1169 Ok(device::Value::Bool(false)),
1170 from_value(&redis::Value::Data(vec![b'B', b'F']))
1171 );
1172 assert_eq!(
1173 Ok(device::Value::Bool(true)),
1174 from_value(&redis::Value::Data(vec![b'B', b'T']))
1175 );
1176 }
1177
1178 #[test]
1181 fn test_bool_encoder() {
1182 assert_eq!(vec![b'B', b'F'], to_redis(&device::Value::Bool(false)));
1183 assert_eq!(vec![b'B', b'T'], to_redis(&device::Value::Bool(true)));
1184 }
1185
1186 const INT_TEST_CASES: &[(i32, &[u8])] = &[
1187 (0, &[b'I', 0x00, 0x00, 0x00, 0x00]),
1188 (1, &[b'I', 0x00, 0x00, 0x00, 0x01]),
1189 (-1, &[b'I', 0xff, 0xff, 0xff, 0xff]),
1190 (0x7fffffff, &[b'I', 0x7f, 0xff, 0xff, 0xff]),
1191 (-0x80000000, &[b'I', 0x80, 0x00, 0x00, 0x00]),
1192 (0x01234567, &[b'I', 0x01, 0x23, 0x45, 0x67]),
1193 ];
1194
1195 #[test]
1198 fn test_int_encoder() {
1199 for (v, rv) in INT_TEST_CASES {
1200 assert_eq!(*rv, to_redis(&device::Value::Int(*v)));
1201 }
1202 }
1203
1204 #[test]
1207 fn test_int_decoder() {
1208 assert!(from_value(&redis::Value::Data(vec![])).is_err());
1209 assert!(from_value(&redis::Value::Data(vec![b'I'])).is_err());
1210 assert!(from_value(&redis::Value::Data(vec![b'I', 0u8])).is_err());
1211 assert!(from_value(&redis::Value::Data(vec![b'I', 0u8, 0u8])).is_err());
1212 assert!(
1213 from_value(&redis::Value::Data(vec![b'I', 0u8, 0u8, 0u8])).is_err()
1214 );
1215
1216 for (v, rv) in INT_TEST_CASES {
1217 let data = redis::Value::Data(rv.to_vec());
1218
1219 assert_eq!(Ok(device::Value::Int(*v)), from_value(&data));
1220 }
1221 }
1222
1223 const FLT_TEST_CASES: &[(f64, &[u8])] = &[
1224 (0.0, &[b'D', 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]),
1225 (
1226 -0.0,
1227 &[b'D', 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00],
1228 ),
1229 (1.0, &[b'D', 0x3f, 0xf0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]),
1230 (
1231 -1.0,
1232 &[b'D', 0xbf, 0xf0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00],
1233 ),
1234 (
1235 9007199254740991.0,
1236 &[b'D', 67, 63, 255, 255, 255, 255, 255, 255],
1237 ),
1238 (9007199254740992.0, &[b'D', 67, 64, 0, 0, 0, 0, 0, 0]),
1239 ];
1240
1241 #[test]
1244 fn test_float_encoder() {
1245 for (v, rv) in FLT_TEST_CASES {
1246 assert_eq!(*rv, to_redis(&device::Value::Flt(*v)));
1247 }
1248 }
1249
1250 #[test]
1253 fn test_float_decoder() {
1254 assert!(from_value(&redis::Value::Data(vec![])).is_err());
1255 assert!(from_value(&redis::Value::Data(vec![b'D'])).is_err());
1256 assert!(from_value(&redis::Value::Data(vec![b'D', 0u8])).is_err());
1257 assert!(from_value(&redis::Value::Data(vec![b'D', 0u8, 0u8])).is_err());
1258 assert!(
1259 from_value(&redis::Value::Data(vec![b'D', 0u8, 0u8, 0u8])).is_err()
1260 );
1261 assert!(from_value(&redis::Value::Data(vec![
1262 b'D', 0u8, 0u8, 0u8, 0u8
1263 ]))
1264 .is_err());
1265 assert!(from_value(&redis::Value::Data(vec![
1266 b'D', 0u8, 0u8, 0u8, 0u8, 0u8
1267 ]))
1268 .is_err());
1269 assert!(from_value(&redis::Value::Data(vec![
1270 b'D', 0u8, 0u8, 0u8, 0u8, 0u8, 0u8
1271 ]))
1272 .is_err());
1273 assert!(from_value(&redis::Value::Data(vec![
1274 b'D', 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8
1275 ]))
1276 .is_err());
1277
1278 for (v, rv) in FLT_TEST_CASES {
1279 let data = redis::Value::Data(rv.to_vec());
1280
1281 assert_eq!(Ok(device::Value::Flt(*v)), from_value(&data));
1282 }
1283 }
1284
1285 const STR_TEST_CASES: &[(&str, &[u8])] = &[
1286 ("", &[b'S', 0u8, 0u8, 0u8, 0u8]),
1287 ("ABC", &[b'S', 0u8, 0u8, 0u8, 3u8, b'A', b'B', b'C']),
1288 ];
1289
1290 #[test]
1293 fn test_string_encoder() {
1294 for (v, rv) in STR_TEST_CASES {
1295 assert_eq!(*rv, to_redis(&device::Value::Str(String::from(*v))));
1296 }
1297 }
1298
1299 #[test]
1302 fn test_string_decoder() {
1303 assert!(from_value(&redis::Value::Data(vec![])).is_err());
1306 assert!(from_value(&redis::Value::Data(vec![b'S'])).is_err());
1307 assert!(from_value(&redis::Value::Data(vec![b'S', 0u8])).is_err());
1308 assert!(from_value(&redis::Value::Data(vec![b'S', 0u8, 0u8])).is_err());
1309 assert!(
1310 from_value(&redis::Value::Data(vec![b'S', 0u8, 0u8, 0u8])).is_err()
1311 );
1312
1313 for (v, rv) in STR_TEST_CASES {
1316 let data = redis::Value::Data(rv.to_vec());
1317
1318 assert_eq!(
1319 Ok(device::Value::Str(String::from(*v))),
1320 from_value(&data)
1321 );
1322 }
1323
1324 assert!(from_value(&redis::Value::Data(vec![
1328 b'S', 0u8, 0u8, 0u8, 1u8
1329 ]))
1330 .is_err());
1331 assert!(from_value(&redis::Value::Data(vec![
1332 b'S', 0u8, 0u8, 0u8, 2u8, b'A'
1333 ]))
1334 .is_err());
1335 assert_eq!(
1336 Ok(device::Value::Str(String::from("AB"))),
1337 from_value(&redis::Value::Data(vec![
1338 b'S', 0u8, 0u8, 0u8, 2u8, b'A', b'B', 0, 0
1339 ]))
1340 );
1341 }
1342
1343 #[test]
1344 fn test_pattern_cmd() {
1345 assert_eq!(
1346 &RedisStore::match_pattern_cmd(&None).get_packed_command(),
1347 b"*2\r
1348$4\r\nKEYS\r
1349$6\r\n*#info\r\n"
1350 );
1351 assert_eq!(
1352 &RedisStore::match_pattern_cmd(&Some(String::from("device")))
1353 .get_packed_command(),
1354 b"*2\r
1355$4\r\nKEYS\r
1356$11\r\ndevice#info\r\n"
1357 );
1358 assert_eq!(
1359 &RedisStore::match_pattern_cmd(&Some(String::from("*weather*")))
1360 .get_packed_command(),
1361 b"*2\r
1362$4\r\nKEYS\r
1363$14\r\n*weather*#info\r\n"
1364 );
1365 }
1366
1367 #[test]
1368 fn test_ts_to_id() {
1369 let dur = time::Duration::from_secs(1000);
1370 let ts = time::UNIX_EPOCH + dur;
1371
1372 assert_eq!(ReadingStream::ts_to_id(ts), "1000000-0");
1373
1374 let dur = time::Duration::from_micros(1234567);
1375 let ts = time::UNIX_EPOCH + dur;
1376
1377 assert_eq!(ReadingStream::ts_to_id(ts), "1234-567");
1378 }
1379
1380 #[test]
1381 fn test_read_next_cmd() {
1382 let cmd = ReadingStream::read_next_cmd("device#hist", "$");
1383
1384 assert_eq!(
1385 &cmd.get_packed_command(),
1386 b"*8\r
1387$5\r\nXREAD\r
1388$5\r\nBLOCK\r
1389$4\r\n5000\r
1390$5\r\nCOUNT\r
1391$1\r\n1\r
1392$7\r\nSTREAMS\r
1393$11\r\ndevice#hist\r
1394$1\r\n$\r\n"
1395 );
1396 }
1397
1398 #[test]
1399 fn test_info_type_cmd() {
1400 let cmd = RedisStore::info_type_cmd("device");
1401
1402 assert_eq!(
1403 &cmd.get_packed_command(),
1404 b"*2\r
1405$4\r\nTYPE\r
1406$11\r\ndevice#info\r\n"
1407 );
1408 }
1409
1410 #[test]
1411 fn test_hist_type_cmd() {
1412 let cmd = RedisStore::hist_type_cmd("device");
1413
1414 assert_eq!(
1415 &cmd.get_packed_command(),
1416 b"*2\r
1417$4\r\nTYPE\r
1418$11\r\ndevice#hist\r\n"
1419 );
1420 }
1421
1422 #[test]
1423 fn test_dev_info_cmd() {
1424 let cmd = RedisStore::device_info_cmd("device");
1425
1426 assert_eq!(
1427 &cmd.get_packed_command(),
1428 b"*2\r
1429$7\r\nHGETALL\r
1430$11\r\ndevice#info\r\n"
1431 );
1432 }
1433
1434 #[test]
1435 fn test_last_value_cmd() {
1436 let pipe = RedisStore::last_value_cmd("device");
1437
1438 assert_eq!(
1439 &pipe.get_packed_command(),
1440 b"*6\r
1441$9\r\nXREVRANGE\r
1442$11\r\ndevice#hist\r
1443$1\r\n+\r
1444$1\r\n-\r
1445$5\r\nCOUNT\r
1446$1\r\n1\r\n"
1447 );
1448 }
1449
1450 #[test]
1451 fn test_parsing_last_value() {
1452 const NAME: &str = "device";
1453
1454 assert_eq!(
1455 RedisStore::parse_last_value(NAME, &redis::Value::Nil),
1456 None
1457 );
1458 assert_eq!(
1459 RedisStore::parse_last_value(NAME, &redis::Value::Bulk(vec![])),
1460 None
1461 );
1462
1463 let val = redis::Value::Bulk(vec![redis::Value::Bulk(vec![
1464 redis::Value::Data(b"1000000-0".to_vec()),
1465 redis::Value::Bulk(vec![
1466 redis::Value::Data(b"value".to_vec()),
1467 redis::Value::Data(b"BT".to_vec()),
1468 ]),
1469 ])]);
1470
1471 assert_eq!(
1472 RedisStore::parse_last_value(NAME, &val),
1473 Some(device::Reading {
1474 ts: time::UNIX_EPOCH + time::Duration::from_secs(1000),
1475 value: device::Value::Bool(true)
1476 })
1477 );
1478
1479 let val = redis::Value::Bulk(vec![redis::Value::Bulk(vec![
1480 redis::Value::Data(b"1234-567".to_vec()),
1481 redis::Value::Bulk(vec![
1482 redis::Value::Data(b"value".to_vec()),
1483 redis::Value::Data(b"BF".to_vec()),
1484 ]),
1485 ])]);
1486
1487 assert_eq!(
1488 RedisStore::parse_last_value(NAME, &val),
1489 Some(device::Reading {
1490 ts: time::UNIX_EPOCH + time::Duration::from_micros(1234567),
1491 value: device::Value::Bool(false)
1492 })
1493 );
1494 }
1495
1496 #[test]
1497 fn test_xinfo_cmd() {
1498 assert_eq!(
1499 &RedisStore::xinfo_cmd("junk").get_packed_command(),
1500 b"*3\r
1501$5\r\nXINFO\r
1502$6\r\nSTREAM\r
1503$9\r\njunk#hist\r\n"
1504 );
1505 }
1506
1507 #[test]
1508 fn test_report_value_cmd() {
1509 assert_eq!(
1510 &RedisStore::report_new_value_cmd("key", &(true.into()))
1511 .get_packed_command(),
1512 b"*5\r
1513$4\r\nXADD\r
1514$3\r\nkey\r
1515$1\r\n*\r
1516$5\r\nvalue\r
1517$2\r\nBT\r\n"
1518 );
1519 assert_eq!(
1520 &RedisStore::report_new_value_cmd("key", &(0x00010203i32.into()))
1521 .get_packed_command(),
1522 b"*5\r
1523$4\r\nXADD\r
1524$3\r\nkey\r
1525$1\r\n*\r
1526$5\r\nvalue\r
1527$5\r\nI\x00\x01\x02\x03\r\n"
1528 );
1529 assert_eq!(
1530 &RedisStore::report_new_value_cmd("key", &(0x12345678i32.into()))
1531 .get_packed_command(),
1532 b"*5\r
1533$4\r\nXADD\r
1534$3\r\nkey\r
1535$1\r\n*\r
1536$5\r\nvalue\r
1537$5\r\nI\x12\x34\x56\x78\r\n"
1538 );
1539 assert_eq!(
1540 &RedisStore::report_new_value_cmd("key", &(1.0.into()))
1541 .get_packed_command(),
1542 b"*5\r
1543$4\r\nXADD\r
1544$3\r\nkey\r
1545$1\r\n*\r
1546$5\r\nvalue\r
1547$9\r\nD\x3f\xf0\x00\x00\x00\x00\x00\x00\r\n"
1548 );
1549 assert_eq!(
1550 &RedisStore::report_new_value_cmd(
1551 "key",
1552 &(String::from("hello").into())
1553 )
1554 .get_packed_command(),
1555 b"*5\r
1556$4\r\nXADD\r
1557$3\r\nkey\r
1558$1\r\n*\r
1559$5\r\nvalue\r
1560$10\r\nS\x00\x00\x00\x05hello\r\n"
1561 );
1562
1563 assert_eq!(
1564 &RedisStore::report_bounded_new_value_cmd("key", &(true.into()), 0)
1565 .get_packed_command(),
1566 b"*8\r
1567$4\r\nXADD\r
1568$3\r\nkey\r
1569$6\r\nMAXLEN\r
1570$1\r\n~\r
1571$1\r\n0\r
1572$1\r\n*\r
1573$5\r\nvalue\r
1574$2\r\nBT\r\n"
1575 );
1576 assert_eq!(
1577 &RedisStore::report_bounded_new_value_cmd(
1578 "key",
1579 &(0x00010203i32.into()),
1580 1
1581 )
1582 .get_packed_command(),
1583 b"*8\r
1584$4\r\nXADD\r
1585$3\r\nkey\r
1586$6\r\nMAXLEN\r
1587$1\r\n~\r
1588$1\r\n1\r
1589$1\r\n*\r
1590$5\r\nvalue\r
1591$5\r\nI\x00\x01\x02\x03\r\n"
1592 );
1593 assert_eq!(
1594 &RedisStore::report_bounded_new_value_cmd(
1595 "key",
1596 &(0x12345678i32.into()),
1597 2
1598 )
1599 .get_packed_command(),
1600 b"*8\r
1601$4\r\nXADD\r
1602$3\r\nkey\r
1603$6\r\nMAXLEN\r
1604$1\r\n~\r
1605$1\r\n2\r
1606$1\r\n*\r
1607$5\r\nvalue\r
1608$5\r\nI\x12\x34\x56\x78\r\n"
1609 );
1610 assert_eq!(
1611 &RedisStore::report_bounded_new_value_cmd("key", &(1.0.into()), 3)
1612 .get_packed_command(),
1613 b"*8\r
1614$4\r\nXADD\r
1615$3\r\nkey\r
1616$6\r\nMAXLEN\r
1617$1\r\n~\r
1618$1\r\n3\r
1619$1\r\n*\r
1620$5\r\nvalue\r
1621$9\r\nD\x3f\xf0\x00\x00\x00\x00\x00\x00\r\n"
1622 );
1623 assert_eq!(
1624 &RedisStore::report_bounded_new_value_cmd(
1625 "key",
1626 &(String::from("hello").into()),
1627 4
1628 )
1629 .get_packed_command(),
1630 b"*8\r
1631$4\r\nXADD\r
1632$3\r\nkey\r
1633$6\r\nMAXLEN\r
1634$1\r\n~\r
1635$1\r\n4\r
1636$1\r\n*\r
1637$5\r\nvalue\r
1638$10\r\nS\x00\x00\x00\x05hello\r\n"
1639 );
1640 }
1641
1642 #[test]
1643 fn test_init_dev() {
1644 assert_eq!(
1645 String::from_utf8_lossy(
1646 &RedisStore::init_device_cmd("device", "mem", &None)
1647 .get_packed_pipeline()
1648 ),
1649 "*1\r
1650$5\r\nMULTI\r
1651*2\r
1652$3\r\nDEL\r
1653$11\r\ndevice#hist\r
1654*5\r
1655$4\r\nXADD\r
1656$11\r\ndevice#hist\r
1657$1\r\n1\r
1658$5\r\nvalue\r
1659$1\r\n\x01\r
1660*3\r
1661$4\r\nXDEL\r
1662$11\r\ndevice#hist\r
1663$1\r\n1\r
1664*2\r
1665$3\r\nDEL\r
1666$11\r\ndevice#info\r
1667*4\r
1668$5\r\nHMSET\r
1669$11\r\ndevice#info\r
1670$6\r\ndriver\r
1671$3\r\nmem\r
1672*1\r
1673$4\r\nEXEC\r\n"
1674 );
1675 assert_eq!(
1676 String::from_utf8_lossy(
1677 &RedisStore::init_device_cmd(
1678 "device",
1679 "pump",
1680 &Some(String::from("gpm"))
1681 )
1682 .get_packed_pipeline()
1683 ),
1684 "*1\r
1685$5\r\nMULTI\r
1686*2\r
1687$3\r\nDEL\r
1688$11\r\ndevice#hist\r
1689*5\r
1690$4\r\nXADD\r
1691$11\r\ndevice#hist\r
1692$1\r\n1\r
1693$5\r\nvalue\r
1694$1\r\n\x01\r
1695*3\r
1696$4\r\nXDEL\r
1697$11\r\ndevice#hist\r
1698$1\r\n1\r
1699*2\r
1700$3\r\nDEL\r
1701$11\r\ndevice#info\r
1702*6\r
1703$5\r\nHMSET\r
1704$11\r\ndevice#info\r
1705$6\r\ndriver\r
1706$4\r\npump\r
1707$5\r\nunits\r
1708$3\r\ngpm\r
1709*1\r
1710$4\r\nEXEC\r\n"
1711 );
1712 }
1713
1714 #[test]
1715 fn test_streamid_to_reading() {
1716 assert!(RedisStore::stream_id_to_reading(&StreamId {
1719 id: "1000-0".into(),
1720 map: HashMap::from([])
1721 })
1722 .is_err());
1723 assert!(RedisStore::stream_id_to_reading(&StreamId {
1724 id: "1000-0".into(),
1725 map: HashMap::from([(
1726 "junk".into(),
1727 redis::Value::Data(b"10".to_vec())
1728 )])
1729 })
1730 .is_err());
1731 assert!(RedisStore::stream_id_to_reading(&StreamId {
1732 id: "1000-0".into(),
1733 map: HashMap::from([(
1734 "value".into(),
1735 redis::Value::Data(b"10".to_vec())
1736 )])
1737 })
1738 .is_err());
1739
1740 assert_eq!(
1743 RedisStore::stream_id_to_reading(&StreamId {
1744 id: "1000-0".into(),
1745 map: HashMap::from([(
1746 "value".into(),
1747 redis::Value::Data(to_redis(&device::Value::Bool(true)))
1748 )])
1749 }),
1750 Ok(device::Reading {
1751 ts: time::UNIX_EPOCH + time::Duration::from_millis(1000),
1752 value: device::Value::Bool(true)
1753 })
1754 );
1755 assert_eq!(
1756 RedisStore::stream_id_to_reading(&StreamId {
1757 id: "1500-0".into(),
1758 map: HashMap::from([(
1759 "value".into(),
1760 redis::Value::Data(to_redis(&device::Value::Int(123)))
1761 )])
1762 }),
1763 Ok(device::Reading {
1764 ts: time::UNIX_EPOCH + time::Duration::from_millis(1500),
1765 value: device::Value::Int(123)
1766 })
1767 );
1768 assert_eq!(
1769 RedisStore::stream_id_to_reading(&StreamId {
1770 id: "2500-0".into(),
1771 map: HashMap::from([(
1772 "value".into(),
1773 redis::Value::Data(to_redis(&device::Value::Int(-321)))
1774 )])
1775 }),
1776 Ok(device::Reading {
1777 ts: time::UNIX_EPOCH + time::Duration::from_millis(2500),
1778 value: device::Value::Int(-321)
1779 })
1780 );
1781 assert_eq!(
1782 RedisStore::stream_id_to_reading(&StreamId {
1783 id: "2500-0".into(),
1784 map: HashMap::from([(
1785 "value".into(),
1786 redis::Value::Data(to_redis(&device::Value::Flt(1.0)))
1787 )])
1788 }),
1789 Ok(device::Reading {
1790 ts: time::UNIX_EPOCH + time::Duration::from_millis(2500),
1791 value: device::Value::Flt(1.0)
1792 })
1793 );
1794 assert_eq!(
1795 RedisStore::stream_id_to_reading(&StreamId {
1796 id: "2500-0".into(),
1797 map: HashMap::from([(
1798 "value".into(),
1799 redis::Value::Data(to_redis(&device::Value::Flt(-1.0)))
1800 )])
1801 }),
1802 Ok(device::Reading {
1803 ts: time::UNIX_EPOCH + time::Duration::from_millis(2500),
1804 value: device::Value::Flt(-1.0)
1805 })
1806 );
1807 assert_eq!(
1808 RedisStore::stream_id_to_reading(&StreamId {
1809 id: "2500-0".into(),
1810 map: HashMap::from([(
1811 "value".into(),
1812 redis::Value::Data(to_redis(&device::Value::Flt(1.0e100)))
1813 )])
1814 }),
1815 Ok(device::Reading {
1816 ts: time::UNIX_EPOCH + time::Duration::from_millis(2500),
1817 value: device::Value::Flt(1.0e100)
1818 })
1819 );
1820 assert_eq!(
1821 RedisStore::stream_id_to_reading(&StreamId {
1822 id: "2500-0".into(),
1823 map: HashMap::from([(
1824 "value".into(),
1825 redis::Value::Data(to_redis(&device::Value::Flt(1.0e-100)))
1826 )])
1827 }),
1828 Ok(device::Reading {
1829 ts: time::UNIX_EPOCH + time::Duration::from_millis(2500),
1830 value: device::Value::Flt(1.0e-100)
1831 })
1832 );
1833 assert_eq!(
1834 RedisStore::stream_id_to_reading(&StreamId {
1835 id: "2500-0".into(),
1836 map: HashMap::from([(
1837 "value".into(),
1838 redis::Value::Data(to_redis(&device::Value::Str(
1839 "Hello".into()
1840 )))
1841 )])
1842 }),
1843 Ok(device::Reading {
1844 ts: time::UNIX_EPOCH + time::Duration::from_millis(2500),
1845 value: device::Value::Str("Hello".into())
1846 })
1847 );
1848 }
1849
1850 #[test]
1851 fn test_hash_to_info() {
1852 let device = "path:junk".parse::<device::Name>().unwrap();
1853 let mut st = HashMap::new();
1854 let mut fm = HashMap::new();
1855
1856 assert_eq!(
1857 RedisStore::hash_to_info(
1858 &st,
1859 &"path:junk".parse::<device::Name>().unwrap(),
1860 &fm
1861 ),
1862 Err(Error::NotFound)
1863 );
1864
1865 let _ = fm.insert("units".to_string(), "gpm".to_string());
1866
1867 assert_eq!(
1868 RedisStore::hash_to_info(&st, &device, &fm),
1869 Ok(client::DevInfoReply {
1870 name: device.clone(),
1871 units: Some(String::from("gpm")),
1872 settable: false,
1873 driver: String::from("*missing*"),
1874 total_points: 0,
1875 first_point: None,
1876 last_point: None,
1877 })
1878 );
1879
1880 let _ = fm.insert("driver".to_string(), "sump".to_string().into());
1881
1882 assert_eq!(
1883 RedisStore::hash_to_info(&st, &device, &fm),
1884 Ok(client::DevInfoReply {
1885 name: device.clone(),
1886 units: Some(String::from("gpm")),
1887 settable: false,
1888 driver: String::from("sump"),
1889 total_points: 0,
1890 first_point: None,
1891 last_point: None,
1892 })
1893 );
1894
1895 let (tx, _) = mpsc::channel(10);
1896 let _ = st.insert(device.clone(), tx);
1897
1898 assert_eq!(
1899 RedisStore::hash_to_info(&st, &device, &fm),
1900 Ok(client::DevInfoReply {
1901 name: device.clone(),
1902 units: Some(String::from("gpm")),
1903 settable: true,
1904 driver: String::from("sump"),
1905 total_points: 0,
1906 first_point: None,
1907 last_point: None,
1908 })
1909 );
1910 }
1911}