drmem_db_simple/
lib.rs

1//! Provides a simple, storage back-end for the DrMem control system.
2//!
3//! This is the simplest data-store available. It only saves the last
4//! value for each device. It also doesn't provide persistent storage
5//! for device meta-information so, after a restart, that information
6//! is reset to its default state.
7//!
8//! This back-end is useful for installations that don't require
9//! historical information but, instead, are doing real-time control
10//! with current values.
11
12use async_trait::async_trait;
13use chrono::*;
14use drmem_api::{
15    client, device,
16    driver::{ReportReading, RxDeviceSetting, TxDeviceSetting},
17    Error, Result, Store,
18};
19use std::collections::{hash_map, HashMap};
20use std::{
21    sync::{Arc, Mutex},
22    time,
23};
24use tokio::sync::{broadcast, mpsc, oneshot};
25use tokio_stream::{
26    wrappers::{errors::BroadcastStreamRecvError, BroadcastStream},
27    StreamExt,
28};
29use tracing::{error, warn};
30
31const CHAN_SIZE: usize = 20;
32
33type ReadingState = (
34    broadcast::Sender<device::Reading>,
35    Option<device::Reading>,
36    time::SystemTime,
37);
38
39pub mod config;
40mod glob;
41
42struct DeviceInfo {
43    owner: String,
44    units: Option<String>,
45    tx_setting: Option<TxDeviceSetting>,
46    reading: Arc<Mutex<ReadingState>>,
47}
48
49impl DeviceInfo {
50    pub fn create(
51        owner: String,
52        units: Option<String>,
53        tx_setting: Option<TxDeviceSetting>,
54    ) -> DeviceInfo {
55        let (tx, _) = broadcast::channel(CHAN_SIZE);
56
57        // Build the entry and insert it in the table.
58
59        DeviceInfo {
60            owner,
61            units,
62            tx_setting,
63            reading: Arc::new(Mutex::new((tx, None, time::UNIX_EPOCH))),
64        }
65    }
66}
67
68struct SimpleStore(HashMap<device::Name, DeviceInfo>);
69
70pub async fn open(_cfg: &config::Config) -> Result<impl Store> {
71    Ok(SimpleStore(HashMap::new()))
72}
73
74// Builds the `ReportReading` function. Drivers will call specialized
75// instances of this function to record the latest value of a device.
76
77fn mk_report_func(
78    di: &DeviceInfo,
79    name: &device::Name,
80) -> ReportReading<device::Value> {
81    let reading = di.reading.clone();
82    let name = name.to_string();
83
84    Box::new(move |v| {
85        // Determine the timestamp *before* we take the mutex. The
86        // timing shouldn't pay the price of waiting for the mutex so
87        // we grab it right away.
88
89        let mut ts = time::SystemTime::now();
90
91        // If a lock is obtained, update the current value. The only
92        // way a lock can fail is if it's "poisoned", which means
93        // another thread panicked while holding the lock. This module
94        // holds the only code that uses the mutex and all accesses
95        // are short and infallible, so the error message shouldn't
96        // ever get displayed.
97
98        if let Ok(mut data) = reading.lock() {
99            // At this point, we have access to the previous
100            // timestamp. If the new timestamp is *before* the
101            // previous, then we fudge the timestamp to be 1 𝜇s later
102            // (DrMem doesn't allow data values to be inserted in
103            // random order.) If, somehow, the timestamp will exceed
104            // the range of the `SystemTime` type, the maxmimum
105            // timestamp will be used for this sample (as well as
106            // future samples.)
107
108            if ts <= data.2 {
109                if let Some(nts) =
110                    data.2.checked_add(time::Duration::from_micros(1))
111                {
112                    ts = nts
113                } else {
114                    ts = time::UNIX_EPOCH
115                        .checked_add(time::Duration::new(
116                            i64::MAX as u64,
117                            999_999_999,
118                        ))
119                        .unwrap()
120                }
121            }
122
123            let reading = device::Reading { ts, value: v };
124            let _ = data.0.send(reading.clone());
125
126            // Update the device's state.
127
128            data.1 = Some(reading);
129            data.2 = ts
130        } else {
131            error!("couldn't set current value of {}", &name)
132        }
133        Box::pin(async {})
134    })
135}
136
137#[async_trait]
138impl Store for SimpleStore {
139    /// Handle read-only devices registration. This function creates
140    /// an association between the device name and its associated
141    /// resources. Since the driver is registering a read-only device,
142    /// this function doesn't allocate a channel to provide settings.
143
144    async fn register_read_only_device(
145        &mut self,
146        driver: &str,
147        name: &device::Name,
148        units: &Option<String>,
149        _max_history: &Option<usize>,
150    ) -> Result<(ReportReading<device::Value>, Option<device::Value>)> {
151        // Check to see if the device name already exists.
152
153        match self.0.entry((*name).clone()) {
154            // The device didn't exist. Create it and associate it
155            // with the driver.
156            hash_map::Entry::Vacant(e) => {
157                // Build the entry and insert it in the table.
158
159                let di = e.insert(DeviceInfo::create(
160                    String::from(driver),
161                    units.clone(),
162                    None,
163                ));
164
165                // Create and return the closure that the driver will
166                // use to report updates.
167
168                Ok((mk_report_func(di, name), None))
169            }
170
171            // The device already exists. If it was created from a
172            // previous instance of the driver, allow the registration
173            // to succeed.
174            hash_map::Entry::Occupied(e) => {
175                let dev_info = e.get();
176
177                if dev_info.owner == driver {
178                    let func = mk_report_func(dev_info, name);
179                    let guard = dev_info.reading.lock();
180
181                    Ok((
182                        func,
183                        if let Ok(data) = guard {
184                            data.1.as_ref().map(
185                                |device::Reading { value, .. }| value.clone(),
186                            )
187                        } else {
188                            None
189                        },
190                    ))
191                } else {
192                    Err(Error::InUse)
193                }
194            }
195        }
196    }
197
198    /// Handle read-write devices registration. This function creates
199    /// an association between the device name and its associated
200    /// resources.
201
202    async fn register_read_write_device(
203        &mut self,
204        driver: &str,
205        name: &device::Name,
206        units: &Option<String>,
207        _max_history: &Option<usize>,
208    ) -> Result<(
209        ReportReading<device::Value>,
210        RxDeviceSetting,
211        Option<device::Value>,
212    )> {
213        // Check to see if the device name already exists.
214
215        match self.0.entry((*name).clone()) {
216            // The device didn't exist. Create it and associate it
217            // with the driver.
218            hash_map::Entry::Vacant(e) => {
219                // Create a channel with which to send settings.
220
221                let (tx_sets, rx_sets) = mpsc::channel(CHAN_SIZE);
222
223                // Build the entry and insert it in the table.
224
225                let di = e.insert(DeviceInfo::create(
226                    String::from(driver),
227                    units.clone(),
228                    Some(tx_sets),
229                ));
230
231                // Create and return the closure that the driver will
232                // use to report updates.
233
234                Ok((mk_report_func(di, name), rx_sets, None))
235            }
236
237            // The device already exists. If it was created from a
238            // previous instance of the driver, allow the registration
239            // to succeed.
240            hash_map::Entry::Occupied(mut e) => {
241                let dev_info = e.get_mut();
242
243                if dev_info.owner == driver {
244                    // Create a channel with which to send settings.
245
246                    let (tx_sets, rx_sets) = mpsc::channel(CHAN_SIZE);
247
248                    dev_info.tx_setting = Some(tx_sets);
249
250                    let func = mk_report_func(dev_info, name);
251                    let guard = dev_info.reading.lock();
252
253                    Ok((
254                        func,
255                        rx_sets,
256                        if let Ok(data) = guard {
257                            data.1.as_ref().map(
258                                |device::Reading { value, .. }| value.clone(),
259                            )
260                        } else {
261                            None
262                        },
263                    ))
264                } else {
265                    Err(Error::InUse)
266                }
267            }
268        }
269    }
270
271    async fn get_device_info(
272        &mut self,
273        pattern: &Option<String>,
274    ) -> Result<Vec<client::DevInfoReply>> {
275        let pred: Box<dyn FnMut(&(&device::Name, &DeviceInfo)) -> bool> =
276            if let Some(pattern) = pattern {
277                if let Ok(pattern) = pattern.parse::<device::Name>() {
278                    Box::new(move |(k, _)| pattern == **k)
279                } else {
280                    Box::new(move |(k, _)| {
281                        glob::Pattern::create(pattern).matches(&k.to_string())
282                    })
283                }
284            } else {
285                Box::new(|_| true)
286            };
287        let res: Vec<client::DevInfoReply> = self
288            .0
289            .iter()
290            .filter(pred)
291            .map(|(k, v)| {
292                let (tot, rdg) = if let Ok(data) = v.reading.lock() {
293                    if data.1.is_some() {
294                        (1, data.1.clone())
295                    } else {
296                        (0, None)
297                    }
298                } else {
299                    (0, None)
300                };
301
302                client::DevInfoReply {
303                    name: k.clone(),
304                    units: v.units.clone(),
305                    settable: v.tx_setting.is_some(),
306                    driver: v.owner.clone(),
307                    total_points: tot,
308                    first_point: rdg.clone(),
309                    last_point: rdg,
310                }
311            })
312            .collect();
313
314        Ok(res)
315    }
316
317    async fn set_device(
318        &self,
319        name: device::Name,
320        value: device::Value,
321    ) -> Result<device::Value> {
322        if let Some(di) = self.0.get(&name) {
323            if let Some(tx) = &di.tx_setting {
324                let (tx_rpy, rx_rpy) = oneshot::channel();
325
326                match tx.send((value, tx_rpy)).await {
327                    Ok(()) => match rx_rpy.await {
328                        Ok(reply) => reply,
329                        Err(_) => Err(Error::MissingPeer(
330                            "driver broke connection".to_string(),
331                        )),
332                    },
333                    Err(_) => Err(Error::MissingPeer(
334                        "driver is ignoring settings".to_string(),
335                    )),
336                }
337            } else {
338                Err(Error::OperationError)
339            }
340        } else {
341            Err(Error::NotFound)
342        }
343    }
344
345    async fn get_setting_chan(
346        &self,
347        name: device::Name,
348        _own: bool,
349    ) -> Result<TxDeviceSetting> {
350        if let Some(di) = self.0.get(&name) {
351            if let Some(tx) = &di.tx_setting {
352                return Ok(tx.clone());
353            }
354        }
355        Err(Error::NotFound)
356    }
357
358    // Handles a request to monitor a device's changing value. The
359    // caller must pass in the name of the device. Returns a stream
360    // which returns the last value reported for the device followed
361    // by all new updates.
362
363    async fn monitor_device(
364        &mut self,
365        name: device::Name,
366        start: Option<DateTime<Utc>>,
367        end: Option<DateTime<Utc>>,
368    ) -> Result<device::DataStream<device::Reading>> {
369        // Look-up the name of the device. If it doesn't exist, return
370        // an error.
371
372        if let Some(di) = self.0.get(&name) {
373            // Lock the mutex which protects the broadcast channel and
374            // the device's last values.
375
376            if let Ok(guard) = di.reading.lock() {
377                let chan = guard.0.subscribe();
378
379                // Convert the broadcast channel into a broadcast
380                // stream. Broadcast channels report when a client is
381                // too slow in reading values, by returning an error.
382                // The DrMem core doesn't know (or care) about these
383                // low-level details and doesn't expect them so we
384                // filter the errors, but report them to the log.
385
386                let strm =
387                    BroadcastStream::new(chan).filter_map(move |entry| {
388                        match entry {
389                            Ok(v) => Some(v),
390                            Err(BroadcastStreamRecvError::Lagged(count)) => {
391                                warn!("missed {} readings of {}", count, &name);
392                                None
393                            }
394                        }
395                    });
396
397                match (start.map(|v| v.into()), end.map(|v| v.into())) {
398                    (None, None) => {
399                        if let Some(prev) = &guard.1 {
400                            Ok(Box::pin(
401                                tokio_stream::once(prev.clone()).chain(strm),
402                            ))
403                        } else {
404                            Ok(Box::pin(strm))
405                        }
406                    }
407                    (None, Some(end)) => {
408                        let not_end = move |v: &device::Reading| v.ts <= end;
409
410                        if let Some(prev) = &guard.1 {
411                            Ok(Box::pin(
412                                tokio_stream::once(prev.clone())
413                                    .chain(strm)
414                                    .take_while(not_end),
415                            ))
416                        } else {
417                            Ok(Box::pin(strm.take_while(not_end)))
418                        }
419                    }
420                    (Some(start), None) => {
421                        let valid = move |v: &device::Reading| v.ts >= start;
422
423                        if let Some(prev) = &guard.1 {
424                            Ok(Box::pin(
425                                tokio_stream::once(prev.clone())
426                                    .chain(strm)
427                                    .filter(valid),
428                            ))
429                        } else {
430                            Ok(Box::pin(strm.filter(valid)))
431                        }
432                    }
433                    (Some(start_tmp), Some(end_tmp)) => {
434                        // Make sure the `start` time is before the
435                        // `end` time.
436
437                        let start: time::SystemTime =
438                            std::cmp::min(start_tmp, end_tmp);
439                        let end: time::SystemTime =
440                            std::cmp::max(start_tmp, end_tmp);
441
442                        // Define predicates for filters.
443
444                        let valid = move |v: &device::Reading| v.ts >= start;
445                        let not_end = move |v: &device::Reading| v.ts <= end;
446
447                        if let Some(prev) = &guard.1 {
448                            Ok(Box::pin(
449                                tokio_stream::once(prev.clone())
450                                    .chain(strm)
451                                    .filter(valid)
452                                    .take_while(not_end),
453                            ))
454                        } else {
455                            Ok(Box::pin(strm.filter(valid).take_while(not_end)))
456                        }
457                    }
458                }
459            } else {
460                Err(Error::OperationError)
461            }
462        } else {
463            Err(Error::NotFound)
464        }
465    }
466}
467
468#[cfg(test)]
469mod tests {
470    use crate::{mk_report_func, DeviceInfo, SimpleStore};
471    use drmem_api::{device, Store};
472    use std::{collections::HashMap, time};
473    use tokio::sync::{mpsc::error::TryRecvError, oneshot};
474    use tokio::time::interval;
475    use tokio_stream::StreamExt;
476
477    #[test]
478    fn test_timestamp() {
479        assert!(time::UNIX_EPOCH
480            .checked_add(time::Duration::new(i64::MAX as u64, 999_999_999))
481            .is_some())
482    }
483
484    #[tokio::test]
485    async fn test_read_live_stream() {
486        let mut db = SimpleStore(HashMap::new());
487        let name = "test:device".parse::<device::Name>().unwrap();
488
489        if let Ok((f, None)) = db
490            .register_read_only_device("test", &name, &None, &None)
491            .await
492        {
493            // Test that priming the history with one value returns
494            // the entire sequence.
495
496            {
497                let data = vec![1, 2, 3];
498
499                f(device::Value::Int(data[0]));
500
501                let s = db
502                    .monitor_device(name.clone(), None, None)
503                    .await
504                    .unwrap()
505                    .timeout(time::Duration::from_millis(100));
506
507                tokio::pin!(s);
508
509                for ii in &data[1..] {
510                    f(device::Value::Int(*ii));
511                }
512
513                assert_eq!(
514                    s.try_next().await.unwrap().unwrap().value,
515                    device::Value::Int(1)
516                );
517                assert_eq!(
518                    s.try_next().await.unwrap().unwrap().value,
519                    device::Value::Int(2)
520                );
521                assert_eq!(
522                    s.try_next().await.unwrap().unwrap().value,
523                    device::Value::Int(3)
524                );
525                assert!(s.try_next().await.is_err());
526            }
527
528            // Test that priming the history with two values only
529            // returns the latest and all remaining.
530
531            {
532                let data = vec![1, 2, 3, 4];
533
534                f(device::Value::Int(data[0]));
535                f(device::Value::Int(data[1]));
536
537                let s = db
538                    .monitor_device(name.clone(), None, None)
539                    .await
540                    .unwrap()
541                    .timeout(time::Duration::from_millis(100));
542
543                tokio::pin!(s);
544
545                for ii in &data[2..] {
546                    f(device::Value::Int(*ii));
547                }
548
549                assert_eq!(
550                    s.try_next().await.unwrap().unwrap().value,
551                    device::Value::Int(2)
552                );
553                assert_eq!(
554                    s.try_next().await.unwrap().unwrap().value,
555                    device::Value::Int(3)
556                );
557                assert_eq!(
558                    s.try_next().await.unwrap().unwrap().value,
559                    device::Value::Int(4)
560                );
561                assert!(s.try_next().await.is_err());
562            }
563        } else {
564            panic!("error registering read-only device on empty database")
565        }
566    }
567
568    #[tokio::test]
569    async fn test_read_start_stream() {
570        let mut db = SimpleStore(HashMap::new());
571        let name = "test:device".parse::<device::Name>().unwrap();
572
573        if let Ok((f, None)) = db
574            .register_read_only_device("test", &name, &None, &None)
575            .await
576        {
577            // Verify that monitoring device, starting now, picks up
578            // all future inserted data.
579
580            {
581                let data = vec![1, 2, 3];
582
583                let s = db
584                    .monitor_device(
585                        name.clone(),
586                        Some(time::SystemTime::now().into()),
587                        None,
588                    )
589                    .await
590                    .unwrap()
591                    .timeout(time::Duration::from_millis(100));
592
593                tokio::pin!(s);
594
595                assert!(s.try_next().await.is_err());
596
597                for ii in data {
598                    f(device::Value::Int(ii));
599                }
600
601                assert_eq!(
602                    s.try_next().await.unwrap().unwrap().value,
603                    device::Value::Int(1)
604                );
605                assert_eq!(
606                    s.try_next().await.unwrap().unwrap().value,
607                    device::Value::Int(2)
608                );
609                assert_eq!(
610                    s.try_next().await.unwrap().unwrap().value,
611                    device::Value::Int(3)
612                );
613                assert!(s.try_next().await.is_err());
614            }
615
616            // Verify that, if the latest point is before the starting
617            // timestamp, it doesn't get returned.
618
619            {
620                let data = vec![1, 2, 3];
621
622                f(device::Value::Int(data[0]));
623
624                let s = db
625                    .monitor_device(
626                        name.clone(),
627                        Some(time::SystemTime::now().into()),
628                        None,
629                    )
630                    .await
631                    .unwrap()
632                    .timeout(time::Duration::from_millis(100));
633
634                tokio::pin!(s);
635
636                for ii in &data[1..] {
637                    f(device::Value::Int(*ii));
638                }
639
640                assert_eq!(
641                    s.try_next().await.unwrap().unwrap().value,
642                    device::Value::Int(2)
643                );
644                assert_eq!(
645                    s.try_next().await.unwrap().unwrap().value,
646                    device::Value::Int(3)
647                );
648                assert!(s.try_next().await.is_err());
649            }
650        } else {
651            panic!("error registering read-only device on empty database")
652        }
653    }
654
655    #[tokio::test]
656    async fn test_read_end_stream() {
657        let mut db = SimpleStore(HashMap::new());
658        let name = "test:device".parse::<device::Name>().unwrap();
659
660        if let Ok((f, None)) = db
661            .register_read_only_device("test", &name, &None, &None)
662            .await
663        {
664            // Verify that, if the latest point is before the starting
665            // timestamp, it doesn't get returned.
666
667            {
668                let data = vec![1, 2, 3];
669
670                f(device::Value::Int(data[0]));
671
672                let s = db
673                    .monitor_device(
674                        name.clone(),
675                        None,
676                        Some(time::SystemTime::now().into()),
677                    )
678                    .await
679                    .unwrap()
680                    .timeout(time::Duration::from_millis(100));
681
682                tokio::pin!(s);
683
684                for ii in &data[1..] {
685                    f(device::Value::Int(*ii));
686                }
687
688                assert_eq!(
689                    s.try_next().await.unwrap().unwrap().value,
690                    device::Value::Int(1)
691                );
692                assert_eq!(s.try_next().await.unwrap(), None);
693            }
694        } else {
695            panic!("error registering read-only device on empty database")
696        }
697    }
698
699    #[tokio::test]
700    async fn test_read_start_end_stream() {
701        let mut db = SimpleStore(HashMap::new());
702        let name = "test:device".parse::<device::Name>().unwrap();
703
704        if let Ok((f, None)) = db
705            .register_read_only_device("test", &name, &None, &None)
706            .await
707        {
708            // Verify that, if both times are before the data, nothing
709            // is returned.
710
711            {
712                let data = vec![1, 2, 3, 4, 5];
713
714                let mut interval = interval(time::Duration::from_millis(100));
715
716                let now = time::SystemTime::now();
717                let s = db
718                    .monitor_device(
719                        name.clone(),
720                        Some(
721                            now.checked_sub(time::Duration::from_millis(500))
722                                .unwrap()
723                                .into(),
724                        ),
725                        Some(
726                            now.checked_sub(time::Duration::from_millis(250))
727                                .unwrap()
728                                .into(),
729                        ),
730                    )
731                    .await
732                    .unwrap()
733                    .timeout(time::Duration::from_millis(100));
734
735                tokio::pin!(s);
736
737                for ii in data {
738                    interval.tick().await;
739                    f(device::Value::Int(ii));
740                }
741
742                assert_eq!(s.try_next().await.unwrap(), None);
743            }
744
745            // Verify that, if the latest point is before the starting
746            // timestamp, it doesn't get returned.
747
748            {
749                let data = vec![1, 2, 3, 4, 5];
750
751                f(device::Value::Int(data[0]));
752                let mut interval = interval(time::Duration::from_millis(100));
753
754                let now = time::SystemTime::now();
755                let s = db
756                    .monitor_device(
757                        name.clone(),
758                        Some(now.into()),
759                        Some(
760                            now.checked_add(time::Duration::from_millis(250))
761                                .unwrap()
762                                .into(),
763                        ),
764                    )
765                    .await
766                    .unwrap()
767                    .timeout(time::Duration::from_millis(100));
768
769                tokio::pin!(s);
770
771                for ii in &data[1..] {
772                    interval.tick().await;
773                    f(device::Value::Int(*ii));
774                }
775
776                assert_eq!(
777                    s.try_next().await.unwrap().unwrap().value,
778                    device::Value::Int(2)
779                );
780                assert_eq!(
781                    s.try_next().await.unwrap().unwrap().value,
782                    device::Value::Int(3)
783                );
784                assert_eq!(
785                    s.try_next().await.unwrap().unwrap().value,
786                    device::Value::Int(4)
787                );
788                assert_eq!(s.try_next().await.unwrap(), None);
789            }
790
791            // Verify that, if the latest point is after the starting
792            // timestamp, it isn't part of the results.
793
794            {
795                let data = vec![1, 2, 3, 4, 5];
796
797                let mut interval = interval(time::Duration::from_millis(100));
798
799                let now = time::SystemTime::now();
800                let s = db
801                    .monitor_device(
802                        name.clone(),
803                        Some(now.into()),
804                        Some(
805                            now.checked_add(time::Duration::from_millis(250))
806                                .unwrap()
807                                .into(),
808                        ),
809                    )
810                    .await
811                    .unwrap()
812                    .timeout(time::Duration::from_millis(100));
813
814                tokio::pin!(s);
815
816                for ii in data {
817                    interval.tick().await;
818                    f(device::Value::Int(ii));
819                }
820
821                assert_eq!(
822                    s.try_next().await.unwrap().unwrap().value,
823                    device::Value::Int(1)
824                );
825                assert_eq!(
826                    s.try_next().await.unwrap().unwrap().value,
827                    device::Value::Int(2)
828                );
829                assert_eq!(
830                    s.try_next().await.unwrap().unwrap().value,
831                    device::Value::Int(3)
832                );
833                assert_eq!(s.try_next().await.unwrap(), None);
834            }
835
836            // Verify that, if the latest point is after the starting
837            // timestamp, it isn't part of the results.
838
839            {
840                let data = vec![1, 2, 3, 4, 5];
841
842                let mut interval = interval(time::Duration::from_millis(100));
843
844                let now = time::SystemTime::now();
845                let s = db
846                    .monitor_device(
847                        name.clone(),
848                        Some(
849                            now.checked_add(time::Duration::from_millis(150))
850                                .unwrap()
851                                .into(),
852                        ),
853                        Some(
854                            now.checked_add(time::Duration::from_millis(350))
855                                .unwrap()
856                                .into(),
857                        ),
858                    )
859                    .await
860                    .unwrap()
861                    .timeout(time::Duration::from_millis(100));
862
863                tokio::pin!(s);
864
865                for ii in data {
866                    interval.tick().await;
867                    f(device::Value::Int(ii));
868                }
869
870                assert_eq!(
871                    s.try_next().await.unwrap().unwrap().value,
872                    device::Value::Int(3)
873                );
874                assert_eq!(
875                    s.try_next().await.unwrap().unwrap().value,
876                    device::Value::Int(4)
877                );
878                assert_eq!(s.try_next().await.unwrap(), None);
879            }
880        } else {
881            panic!("error registering read-only device on empty database")
882        }
883    }
884
885    #[tokio::test]
886    async fn test_ro_registration() {
887        let mut db = SimpleStore(HashMap::new());
888        let name = "misc:junk".parse::<device::Name>().unwrap();
889
890        // Register a device named "junk" and associate it with the
891        // driver named "test". We don't define units for this device.
892
893        if let Ok((f, None)) = db
894            .register_read_only_device("test", &name, &None, &None)
895            .await
896        {
897            // Make sure the device was defined and the setting
898            // channel is `None`.
899
900            assert!(db.0.get(&name).unwrap().tx_setting.is_none());
901
902            // Report a value.
903
904            f(device::Value::Int(1)).await;
905
906            // Create a receiving handle for device updates.
907
908            let mut rx =
909                db.0.get(&name)
910                    .unwrap()
911                    .reading
912                    .lock()
913                    .unwrap()
914                    .0
915                    .subscribe();
916
917            // Assert that re-registering this device with a different
918            // driver name results in an error.
919
920            assert!(db
921                .register_read_only_device("test2", &name, &None, &None)
922                .await
923                .is_err());
924
925            // Assert that re-registering this device with the same
926            // driver name is successful.
927
928            if let Ok((f, Some(device::Value::Int(1)))) = db
929                .register_read_only_device("test", &name, &None, &None)
930                .await
931            {
932                // Also, verify that the device update channel wasn't
933                // disrupted by sending a value and receiving it from
934                // the receive handle we opened before re-registering.
935
936                f(device::Value::Int(2)).await;
937                assert_eq!(rx.try_recv().unwrap().value, device::Value::Int(2));
938            } else {
939                panic!("error registering read-only device from same driver")
940            }
941        } else {
942            panic!("error registering read-only device on empty database")
943        }
944    }
945
946    #[tokio::test]
947    async fn test_rw_registration() {
948        let mut db = SimpleStore(HashMap::new());
949        let name = "misc:junk".parse::<device::Name>().unwrap();
950
951        // Register a device named "junk" and associate it with the
952        // driver named "test". We don't define units for this device.
953
954        if let Ok((f, mut set_chan, None)) = db
955            .register_read_write_device("test", &name, &None, &None)
956            .await
957        {
958            // Make sure the device was defined and a setting channel
959            // has been created.
960
961            assert!(db.0.get(&name).unwrap().tx_setting.is_some());
962
963            // Make sure the setting channel is valid.
964
965            {
966                let tx_set =
967                    db.0.get(&name).unwrap().tx_setting.clone().unwrap();
968
969                assert_eq!(tx_set.is_closed(), false);
970
971                let (tx_os, _rx_os) = oneshot::channel();
972
973                assert!(tx_set
974                    .send((device::Value::Int(2), tx_os))
975                    .await
976                    .is_ok());
977                assert_eq!(
978                    set_chan.try_recv().unwrap().0,
979                    device::Value::Int(2)
980                );
981            }
982
983            // Report a value.
984
985            f(device::Value::Int(1)).await;
986
987            // Create a receiving handle for device updates.
988
989            let mut rx =
990                db.0.get(&name)
991                    .unwrap()
992                    .reading
993                    .lock()
994                    .unwrap()
995                    .0
996                    .subscribe();
997
998            // Assert that re-registering this device with a different
999            // driver name results in an error. Also verify that it
1000            // didn't affect the setting channel.
1001
1002            assert!(db
1003                .register_read_only_device("test2", &name, &None, &None)
1004                .await
1005                .is_err());
1006            assert_eq!(
1007                Err(TryRecvError::Empty),
1008                set_chan.try_recv().map(|_| ())
1009            );
1010
1011            // Assert that re-registering this device with the same
1012            // driver name is successful.
1013
1014            if let Ok((f, _, Some(device::Value::Int(1)))) = db
1015                .register_read_write_device("test", &name, &None, &None)
1016                .await
1017            {
1018                assert_eq!(
1019                    Err(TryRecvError::Disconnected),
1020                    set_chan.try_recv().map(|_| ())
1021                );
1022
1023                // Also, verify that the device update channel wasn't
1024                // disrupted by sending a value and receiving it from
1025                // the receive handle we opened before re-registering.
1026
1027                f(device::Value::Int(2)).await;
1028                assert_eq!(rx.try_recv().unwrap().value, device::Value::Int(2));
1029            } else {
1030                panic!("error registering read-only device from same driver")
1031            }
1032        } else {
1033            panic!("error registering read-only device on empty database")
1034        }
1035    }
1036
1037    #[tokio::test]
1038    async fn test_closure() {
1039        let di = DeviceInfo::create(String::from("test"), None, None);
1040        let name = "misc:junk".parse::<device::Name>().unwrap();
1041        let f = mk_report_func(&di, &name);
1042
1043        assert_eq!(di.reading.lock().unwrap().1, None);
1044        f(device::Value::Int(1)).await;
1045        assert_eq!(
1046            di.reading.lock().unwrap().1.as_ref().unwrap().value,
1047            device::Value::Int(1)
1048        );
1049
1050        {
1051            let ts1 = di.reading.lock().unwrap().1.as_ref().unwrap().ts;
1052            let mut rx = di.reading.lock().unwrap().0.subscribe();
1053
1054            f(device::Value::Int(2)).await;
1055            assert_eq!(rx.try_recv().unwrap().value, device::Value::Int(2));
1056            assert_eq!(
1057                di.reading.lock().unwrap().1.as_ref().unwrap().value,
1058                device::Value::Int(2)
1059            );
1060            assert!(ts1 < di.reading.lock().unwrap().1.as_ref().unwrap().ts);
1061        }
1062
1063        f(device::Value::Int(3)).await;
1064        assert_eq!(
1065            di.reading.lock().unwrap().1.as_ref().unwrap().value,
1066            device::Value::Int(3)
1067        );
1068
1069        {
1070            let mut rx1 = di.reading.lock().unwrap().0.subscribe();
1071            let mut rx2 = di.reading.lock().unwrap().0.subscribe();
1072
1073            f(device::Value::Int(4)).await;
1074            assert_eq!(rx1.try_recv().unwrap().value, device::Value::Int(4));
1075            assert_eq!(rx2.try_recv().unwrap().value, device::Value::Int(4));
1076            assert_eq!(
1077                di.reading.lock().unwrap().1.as_ref().unwrap().value,
1078                device::Value::Int(4)
1079            );
1080        }
1081    }
1082}