pantry/
lib.rs

1//! The [`Pantry`] is useful for temporarily storing for later use values that
2//! might "decay" (become unusable) over time.
3//!
4//! Create a [`Pantry`] value and use its [`store`] function to store values
5//! for later use.  A key is provided along with the value, so that the value
6//! can be retrieved later using the same key.  A worker thread is spawned
7//! which monitors the values and automatically drops any which have "decayed".
8//! Values must implement the [`Perishable`] trait, whose [`perished`] function
9//! asynchronously completes once the value has decayed.
10//!
11//! Use the [`fetch`] asynchronous function on the [`Pantry`] with a key to
12//! retrieve a value previously stored using that key.  A value is only
13//! returned if it was stored using the same key and has not decayed since it
14//! was stored.
15//!
16//! Multiple values may have the same key, with the caveat that the values
17//! stored under a given key may not be returned in the same order in which
18//! they were stored.
19//!
20//! [`fetch`]: struct.Pantry.html#method.fetch
21//! [`Pantry`]: struct.Pantry.html
22//! [`Perishable`]: trait.Perishable.html
23//! [`perished`]: trait.Perishable.html#method.perished
24//! [`store`]: struct.Pantry.html#method.store
25
26#![warn(clippy::pedantic)]
27#![warn(missing_docs)]
28
29use async_trait::async_trait;
30use futures::{
31    channel::{
32        mpsc,
33        oneshot::{
34            self,
35            Receiver,
36            Sender,
37        },
38    },
39    executor,
40    future,
41    future::LocalBoxFuture,
42    FutureExt as _,
43    StreamExt as _,
44};
45use std::{
46    collections::{
47        hash_map,
48        HashMap,
49    },
50    hash::Hash,
51    thread,
52};
53
54/// This is the trait that values must implement in order to be stored in the
55/// [`Pantry`].  Note that since the trait has an asynchronous method,
56/// currently the [`async_trait`] attribute from the [`async-trait`] crate must
57/// be added to implementations.
58///
59/// # Examples
60///
61/// ```rust
62/// # extern crate async_std;
63/// # extern crate async_trait;
64/// # extern crate pantry;
65/// use async_trait::async_trait;
66/// use pantry::Perishable;
67///
68/// struct SpyOrders(String);
69///
70/// #[async_trait]
71/// impl Perishable for SpyOrders {
72///     async fn perished(&mut self) {
73///         // This message will self-destruct after
74///         // sitting in the pantry for 5 seconds!
75///         async_std::future::timeout(
76///             std::time::Duration::from_secs(5),
77///             futures::future::pending::<()>(),
78///         )
79///         .await
80///         .unwrap_or(())
81///     }
82/// }
83/// ```
84///
85/// [`Pantry`]: struct.Pantry.html
86/// [`async_trait`]: https://docs.rs/async-trait/0.1.41/async_trait/index.html
87/// [`async-trait`]: https://docs.rs/async-trait/0.1.41/async_trait/
88#[async_trait]
89pub trait Perishable: Send + 'static {
90    /// This asynchronous function should complete once the value has "decayed"
91    /// or become unusable or unsuitable for reuse.  The worker thread of the
92    /// [`Pantry`] runs this future to completion, automatically dropping the
93    /// value if it completes.
94    ///
95    /// Note that this is an asynchronous trait method.  Currently, the
96    /// [`async_trait`] attribute from the [`async-trait`] crate is used
97    /// to realize this specification.
98    ///
99    /// [`Pantry`]: struct.Pantry.html
100    /// [`async_trait`]: https://docs.rs/async-trait/0.1.41/async_trait/index.html
101    /// [`async-trait`]: https://docs.rs/async-trait/0.1.41/async_trait/
102    async fn perished(&mut self);
103}
104
105// These are the types of message that may be sent to the worker thread
106// of the pantry.
107enum WorkerMessage<K, V> {
108    // This takes a value and stores it in the pantry.
109    Take {
110        key: K,
111        value: V,
112    },
113
114    // This requests that a previously-stored value be retrieved
115    // and delivered back through the provided oneshot sender.
116    Give {
117        key: K,
118        return_sender: Sender<V>,
119    },
120}
121
122// This is the type of value returned by a completed monitor, indicating
123// what (if any) work should be done as a consequence.
124enum MonitorKind<K, V> {
125    // This means the worker was sent a message.  The message receiver
126    // is also provided back so that it can be used to await the next
127    // message.
128    Message {
129        message: WorkerMessage<K, V>,
130        receiver: mpsc::UnboundedReceiver<WorkerMessage<K, V>>,
131    },
132
133    // This means the message sender was closed, indicating the worker
134    // thread should be joined.
135    Stop,
136
137    // This means a value was dropped or removed from the pantry.
138    // No further work is required.
139    Value,
140}
141
142// These types are used for the channels used to pass a value requester
143// into a value monitor, so that the monitor can transfer the value back
144// out to the requester.
145type Requester<V> = Sender<Sender<V>>;
146type Requestee<V> = Receiver<Sender<V>>;
147
148// This is used to create a monitor which holds onto a value and watches
149// to see if it perishes.  The monitor can also receive a request to pass
150// the value back out before it perishes.
151async fn monitor_value<K, V>(
152    mut value: V,
153    requestee: Requestee<V>,
154) -> MonitorKind<K, V>
155where
156    V: Perishable,
157{
158    // The monitor completes if either of the following completes:
159    #![allow(clippy::mut_mut)]
160    futures::select!(
161        // The value has perished.
162        _ = value.perished().fuse() => (),
163
164        // The value has been requested to be fetched back out,
165        // or the requester for the value has been dropped.
166        return_sender = requestee.fuse() => {
167            // We should get a return sender, because we shouldn't drop the
168            // requester before sending a return sender.
169            //
170            // A failure to send the value means the value requester gave up
171            // waiting for the value.
172            let _ = return_sender
173                .expect("requester dropped before sending return sender")
174                .send(value);
175        },
176    );
177
178    // The output indicates that this future dealt with a value stored
179    // in the pantry (by either dropping it or sending it back out).
180    MonitorKind::Value
181}
182
183struct ParkedValuePool<K, V> {
184    requesters: Vec<Requester<V>>,
185    monitors: Vec<LocalBoxFuture<'static, MonitorKind<K, V>>>,
186}
187
188impl<K, V> ParkedValuePool<K, V>
189where
190    K: 'static,
191    V: Perishable,
192{
193    fn add(
194        &mut self,
195        value: V,
196    ) {
197        // Adding a value actually adds two different things: a requester and a
198        // monitor.
199        let (sender, receiver) = oneshot::channel();
200
201        // First we store the oneshot sender as the "requester" we can
202        // use later to retrieve the value from the monitor.
203        self.requesters.push(sender);
204
205        // Then we store a monitor, which is a future the worker thread
206        // will try to drive to completion in order to fetch or drop
207        // the value.  It holds the value as well as the receiver matching
208        // the "requester", so that the value can be fetched back out of it.
209        //
210        // Note that the monitor will be taken out by the worker thread
211        // the next time it loops.
212        self.monitors.push(monitor_value(value, receiver).boxed_local());
213    }
214
215    fn new() -> Self {
216        Self {
217            requesters: Vec::new(),
218            monitors: Vec::new(),
219        }
220    }
221
222    fn remove(&mut self) -> Option<Requester<V>> {
223        // To drop a value early, we need only drop the "requester" for it.
224        // Doing so will wake the monitor since it "cancels" the oneshot.
225        self.requesters.pop()
226    }
227}
228
229async fn await_next_message<K, V>(
230    receiver: mpsc::UnboundedReceiver<WorkerMessage<K, V>>
231) -> MonitorKind<K, V> {
232    let (message, receiver) = receiver.into_future().await;
233    message.map_or(MonitorKind::Stop, |message| MonitorKind::Message {
234        message,
235        receiver,
236    })
237}
238
239async fn worker<K, V>(receiver: mpsc::UnboundedReceiver<WorkerMessage<K, V>>)
240where
241    K: Eq + Hash + 'static,
242    V: Perishable,
243{
244    let mut pools: HashMap<K, ParkedValuePool<K, V>> = HashMap::new();
245    let mut monitors = Vec::new();
246    let mut receiver = Some(receiver);
247    loop {
248        // Add to our collection any monitors that have been created since the
249        // last loop.  The first loop picks up any monitors created before the
250        // worker thread actually started.
251        monitors.extend(pools.iter_mut().flat_map(|(_, pool)| {
252            pool.requesters.retain(|requester| !requester.is_canceled());
253            pool.monitors.drain(..)
254        }));
255
256        // Add a special monitor to receive the next worker message,
257        // if the receiver is idle.
258        if let Some(receiver) = receiver.take() {
259            monitors.push(await_next_message(receiver).boxed_local());
260        }
261
262        // Wait until a monitor completes.  If it indicates a message
263        // received, handle the message.
264        let (monitor_kind, _, monitors_left) =
265            future::select_all(monitors.into_iter()).await;
266        monitors = monitors_left;
267        match monitor_kind {
268            MonitorKind::Message {
269                message,
270                receiver: receiver_back,
271            } => {
272                receiver = Some(receiver_back);
273                match message {
274                    // Taking a value to store in the pantry is easy.
275                    WorkerMessage::Take {
276                        key,
277                        value,
278                    } => {
279                        pools
280                            .entry(key)
281                            .or_insert_with(ParkedValuePool::new)
282                            .add(value);
283                    },
284
285                    // Giving a value back out is more difficult.  The monitor
286                    // created for it when the value was stored owns the value.
287                    // Getting it back out requires that we signal the monitor
288                    // to pass back ownership.  Once we get it we deliver it
289                    // back through the oneshot sender provided with the `Give`
290                    // message.  It's possible we have nothing to give back, so
291                    // what we send back is an `Option<V>` not a `V`.
292                    WorkerMessage::Give {
293                        key,
294                        return_sender,
295                    } => {
296                        // Attempt to remove a requester from the pools.  If we
297                        // get a requestor, send the return sender through it
298                        // for the monitor to use for returning the value
299                        // back out of the pantry.
300                        if let hash_map::Entry::Occupied(mut entry) =
301                            pools.entry(key)
302                        {
303                            let pool = entry.get_mut();
304                            if let Some(requester) = pool.remove() {
305                                // It's possible for this to fail if the
306                                // value perished during this loop.
307                                let _ = requester.send(return_sender);
308                            };
309                            if pool.requesters.is_empty() {
310                                entry.remove();
311                            }
312                        };
313                    },
314                }
315            },
316            MonitorKind::Stop => break,
317            MonitorKind::Value => (),
318        }
319    }
320}
321
322/// Each value of this type maintains a collection of stored values that might
323/// "decay" or become unusable over time.  Values are added to the collection
324/// by calling [`store`] and providing the value along with a key that can be
325/// used to retrieve the value later.  Values added to the collection are
326/// monitored by a worker thread and dropped if the futures returned by their
327/// [`perished`] functions complete.
328///
329/// As long as the [`perished`] future remains uncompleted for a value, the
330/// value remains held in the collection and can be retrieved by calling
331/// [`fetch`] with the same key used to store the value in the first place.
332///
333/// # Examples
334///
335/// ```rust
336/// # extern crate async_std;
337/// # extern crate async_trait;
338/// # extern crate pantry;
339/// use async_trait::async_trait;
340/// use futures::executor::block_on;
341/// use pantry::{
342///     Pantry,
343///     Perishable,
344/// };
345/// use std::time::Duration;
346///
347/// async fn delay_async(duration: Duration) {
348///     async_std::future::timeout(duration, futures::future::pending::<()>())
349///         .await
350///         .unwrap_or(())
351/// }
352///
353/// fn delay(duration: Duration) {
354///     block_on(delay_async(duration));
355/// }
356///
357/// struct SpyOrders(&'static str);
358///
359/// #[async_trait]
360/// impl Perishable for SpyOrders {
361///     async fn perished(&mut self) {
362///         // This message will self-destruct after
363///         // sitting in the pantry for 150 milliseconds!
364///         delay_async(Duration::from_millis(150)).await
365///     }
366/// }
367///
368/// fn main() {
369///     let pantry = Pantry::new();
370///     let for_james = SpyOrders("Steal Dr. Evil's cat");
371///     let for_jason = SpyOrders("Save the Queen");
372///     let key = "spies";
373///     pantry.store(key, for_james);
374///     delay(Duration::from_millis(100));
375///     pantry.store(key, for_jason);
376///     delay(Duration::from_millis(100));
377///     let value1 = block_on(async { pantry.fetch(key).await });
378///     let value2 = block_on(async { pantry.fetch(key).await });
379///     assert!(value1.is_some());
380///     assert_eq!("Save the Queen", value1.unwrap().0);
381///     assert!(value2.is_none());
382/// }
383/// ```
384///
385/// [`perished`]: trait.Perishable.html#method.perished
386/// [`fetch`]: #method.fetch
387/// [`store`]: #method.store
388pub struct Pantry<K, V> {
389    // This sender is used to deliver messages to the worker thread.
390    work_in: mpsc::UnboundedSender<WorkerMessage<K, V>>,
391
392    // This is our handle to join the worker thread when dropped.
393    worker: Option<std::thread::JoinHandle<()>>,
394}
395
396impl<K, V> Pantry<K, V>
397where
398    K: Eq + Hash + Send + 'static,
399    V: Perishable,
400{
401    /// Create a new `Pantry` with no values in it.  This spawns the worker
402    /// thread which monitors values added to it and detects when they should
403    /// be dropped.
404    #[must_use]
405    pub fn new() -> Self {
406        // Make the channel used to communicate with the worker thread.
407        let (sender, receiver) = mpsc::unbounded();
408
409        // Store the sender end of the channel and spawn the worker thread,
410        // giving it the receiver end.
411        Self {
412            work_in: sender,
413            worker: Some(thread::spawn(|| {
414                executor::block_on(worker(receiver))
415            })),
416        }
417    }
418
419    /// Transfer ownership of the given value to the `Pantry`, associating with
420    /// it the given key, which can be used later with [`fetch`] to transfer
421    /// ownership of the value back out.
422    ///
423    /// [`fetch`]: #method.fetch
424    pub fn store(
425        &self,
426        key: K,
427        value: V,
428    ) {
429        // Tell the worker thread to take the value and associate the key with
430        // it.
431        //
432        // It shouldn't be possible for this to fail, since the worker holds
433        // the receiver for this channel, and isn't dropped until the client
434        // itself is dropped.  So if it does fail, we want to know about it
435        // since it would mean we have a bug.
436        self.work_in
437            .unbounded_send(WorkerMessage::Take {
438                key,
439                value,
440            })
441            .expect("worker messager receiver dropped unexpectedly");
442    }
443
444    /// Attempt to retrieve the value previously stored with [`store`] using
445    /// the given key.  If no value was stored with that key, or the
446    /// [`perished`] future for the value stored with that key has completed,
447    /// `None` is returned.
448    ///
449    /// This function is asynchronous because ownership must be completely
450    /// transfered from the worker thread of the `Pantry`.
451    ///
452    /// [`perished`]: trait.Perishable.html#method.perished
453    /// [`store`]: #method.store
454    pub async fn fetch(
455        &self,
456        key: K,
457    ) -> Option<V> {
458        let (sender, receiver) = oneshot::channel();
459
460        // Tell the worker thread to give us a value matching the key.
461        //
462        // It shouldn't be possible for this to fail, since the worker holds
463        // the receiver for this channel, and isn't dropped until the client
464        // itself is dropped.  So if it does fail, we want to know about it
465        // since it would mean we have a bug.
466        self.work_in
467            .unbounded_send(WorkerMessage::Give {
468                key,
469                return_sender: sender,
470            })
471            .expect("worker messager receiver dropped unexpectedly");
472
473        // Wait for the worker thread to either give us the value back or tell
474        // us (via error) that it didn't have one to give us.
475        receiver.await.ok()
476    }
477}
478
479impl<K, V> Default for Pantry<K, V>
480where
481    K: Eq + Hash + Send + 'static,
482    V: Perishable,
483{
484    fn default() -> Self {
485        Self::new()
486    }
487}
488
489impl<K, V> Drop for Pantry<K, V> {
490    fn drop(&mut self) {
491        // Closing the worker message sender should cause the worker thread to
492        // complete.
493        self.work_in.close_channel();
494
495        // Join the worker thread.
496        //
497        // This shouldn't fail unless the worker panics or we dropped ths
498        // thread join handle.
499        self.worker
500            .take()
501            .expect("worker thread join handle dropped unexpectedly")
502            .join()
503            .expect("worker thread could not be joined");
504    }
505}
506
507#[cfg(test)]
508mod tests {
509    use super::*;
510
511    struct MockPerishable {
512        num: usize,
513        perish: Option<Receiver<()>>,
514        dropped: Option<Sender<()>>,
515    }
516
517    impl MockPerishable {
518        fn perishable(num: usize) -> (Self, Sender<()>, Receiver<()>) {
519            let (perish_sender, perish_receiver) = oneshot::channel();
520            let (dropped_sender, dropped_receiver) = oneshot::channel();
521            let value = Self {
522                num,
523                perish: Some(perish_receiver),
524                dropped: Some(dropped_sender),
525            };
526            (value, perish_sender, dropped_receiver)
527        }
528
529        fn not_perishable(num: usize) -> Self {
530            Self {
531                num,
532                perish: None,
533                dropped: None,
534            }
535        }
536    }
537
538    impl Drop for MockPerishable {
539        fn drop(&mut self) {
540            if let Some(dropped) = self.dropped.take() {
541                dropped.send(()).unwrap_or(());
542            }
543        }
544    }
545
546    #[async_trait]
547    impl Perishable for MockPerishable {
548        async fn perished(&mut self) {
549            if let Some(perish) = self.perish.take() {
550                perish.await.unwrap_or(());
551            } else {
552                futures::future::pending().await
553            }
554        }
555    }
556
557    #[test]
558    fn store_then_fetch() {
559        let pantry = Pantry::new();
560        let value = MockPerishable::not_perishable(1337);
561        let key = 42;
562        pantry.store(key, value);
563        let value =
564            futures::executor::block_on(async { pantry.fetch(key).await });
565        assert!(value.is_some());
566        assert_eq!(1337, value.unwrap().num);
567    }
568
569    #[test]
570    fn fetch_without_store() {
571        let pantry: Pantry<usize, MockPerishable> = Pantry::new();
572        let key = 42;
573        let value =
574            futures::executor::block_on(async { pantry.fetch(key).await });
575        assert!(value.is_none());
576    }
577
578    #[test]
579    fn store_then_double_fetch() {
580        let pantry = Pantry::new();
581        let value = MockPerishable::not_perishable(1337);
582        let key = 42;
583        pantry.store(key, value);
584        let value =
585            futures::executor::block_on(async { pantry.fetch(key).await });
586        assert!(value.is_some());
587        assert_eq!(1337, value.unwrap().num);
588        let value =
589            futures::executor::block_on(async { pantry.fetch(key).await });
590        assert!(value.is_none());
591    }
592
593    #[test]
594    fn double_store_then_double_fetch_same_key() {
595        let pantry = Pantry::new();
596        let value1 = MockPerishable::not_perishable(1337);
597        let value2 = MockPerishable::not_perishable(85);
598        let key = 42;
599        pantry.store(key, value1);
600        pantry.store(key, value2);
601        let value1 =
602            futures::executor::block_on(async { pantry.fetch(key).await });
603        let value2 =
604            futures::executor::block_on(async { pantry.fetch(key).await });
605        assert!(value1.is_some());
606        assert!(value2.is_some());
607        assert!(matches!(
608            (value1.unwrap().num, value2.unwrap().num),
609            (1337, 85) | (85, 1337)
610        ));
611    }
612
613    #[test]
614    fn double_store_then_double_fetch_different_keys() {
615        let pantry = Pantry::new();
616        let value1 = MockPerishable::not_perishable(1337);
617        let value2 = MockPerishable::not_perishable(85);
618        let key1 = 42;
619        let key2 = 33;
620        pantry.store(key1, value1);
621        pantry.store(key2, value2);
622        let value1 =
623            futures::executor::block_on(async { pantry.fetch(key1).await });
624        let value2 =
625            futures::executor::block_on(async { pantry.fetch(key2).await });
626        assert!(value1.is_some());
627        assert!(value2.is_some());
628        assert!(matches!(
629            (value1.unwrap().num, value2.unwrap().num),
630            (1337, 85)
631        ));
632    }
633
634    #[test]
635    fn store_then_perish_then_fetch() {
636        let pantry = Pantry::new();
637        let (value, perish, dropped) = MockPerishable::perishable(1337);
638        let key = 42;
639        pantry.store(key, value);
640        assert!(perish.send(()).is_ok());
641        assert!(futures::executor::block_on(async { dropped.await }).is_ok());
642        let value =
643            futures::executor::block_on(async { pantry.fetch(key).await });
644        assert!(value.is_none());
645    }
646
647    #[test]
648    fn store_perishible_then_fetch_without_perish() {
649        let pantry = Pantry::new();
650        let (value, perish, dropped) = MockPerishable::perishable(1337);
651        let key = 42;
652        pantry.store(key, value);
653        let value =
654            futures::executor::block_on(async { pantry.fetch(key).await });
655        assert!(dropped.now_or_never().is_none());
656        drop(perish);
657        assert!(value.is_some());
658        assert_eq!(1337, value.unwrap().num);
659    }
660
661    #[test]
662    fn double_store_then_one_perishes_then_double_fetch_same_key() {
663        let pantry = Pantry::new();
664        let (value1, perish, dropped) = MockPerishable::perishable(1337);
665        let value2 = MockPerishable::not_perishable(85);
666        let key = 42;
667        pantry.store(key, value1);
668        pantry.store(key, value2);
669        assert!(perish.send(()).is_ok());
670        assert!(futures::executor::block_on(async { dropped.await }).is_ok());
671        let value1 =
672            futures::executor::block_on(async { pantry.fetch(key).await });
673        let value2 =
674            futures::executor::block_on(async { pantry.fetch(key).await });
675        assert!(value1.is_some());
676        assert!(value2.is_none());
677        assert_eq!(85, value1.unwrap().num);
678    }
679}