pantry/lib.rs
1//! The [`Pantry`] is useful for temporarily storing for later use values that
2//! might "decay" (become unusable) over time.
3//!
4//! Create a [`Pantry`] value and use its [`store`] function to store values
5//! for later use. A key is provided along with the value, so that the value
6//! can be retrieved later using the same key. A worker thread is spawned
7//! which monitors the values and automatically drops any which have "decayed".
8//! Values must implement the [`Perishable`] trait, whose [`perished`] function
9//! asynchronously completes once the value has decayed.
10//!
11//! Use the [`fetch`] asynchronous function on the [`Pantry`] with a key to
12//! retrieve a value previously stored using that key. A value is only
13//! returned if it was stored using the same key and has not decayed since it
14//! was stored.
15//!
16//! Multiple values may have the same key, with the caveat that the values
17//! stored under a given key may not be returned in the same order in which
18//! they were stored.
19//!
20//! [`fetch`]: struct.Pantry.html#method.fetch
21//! [`Pantry`]: struct.Pantry.html
22//! [`Perishable`]: trait.Perishable.html
23//! [`perished`]: trait.Perishable.html#method.perished
24//! [`store`]: struct.Pantry.html#method.store
25
26#![warn(clippy::pedantic)]
27#![warn(missing_docs)]
28
29use async_trait::async_trait;
30use futures::{
31 channel::{
32 mpsc,
33 oneshot::{
34 self,
35 Receiver,
36 Sender,
37 },
38 },
39 executor,
40 future,
41 future::LocalBoxFuture,
42 FutureExt as _,
43 StreamExt as _,
44};
45use std::{
46 collections::{
47 hash_map,
48 HashMap,
49 },
50 hash::Hash,
51 thread,
52};
53
54/// This is the trait that values must implement in order to be stored in the
55/// [`Pantry`]. Note that since the trait has an asynchronous method,
56/// currently the [`async_trait`] attribute from the [`async-trait`] crate must
57/// be added to implementations.
58///
59/// # Examples
60///
61/// ```rust
62/// # extern crate async_std;
63/// # extern crate async_trait;
64/// # extern crate pantry;
65/// use async_trait::async_trait;
66/// use pantry::Perishable;
67///
68/// struct SpyOrders(String);
69///
70/// #[async_trait]
71/// impl Perishable for SpyOrders {
72/// async fn perished(&mut self) {
73/// // This message will self-destruct after
74/// // sitting in the pantry for 5 seconds!
75/// async_std::future::timeout(
76/// std::time::Duration::from_secs(5),
77/// futures::future::pending::<()>(),
78/// )
79/// .await
80/// .unwrap_or(())
81/// }
82/// }
83/// ```
84///
85/// [`Pantry`]: struct.Pantry.html
86/// [`async_trait`]: https://docs.rs/async-trait/0.1.41/async_trait/index.html
87/// [`async-trait`]: https://docs.rs/async-trait/0.1.41/async_trait/
88#[async_trait]
89pub trait Perishable: Send + 'static {
90 /// This asynchronous function should complete once the value has "decayed"
91 /// or become unusable or unsuitable for reuse. The worker thread of the
92 /// [`Pantry`] runs this future to completion, automatically dropping the
93 /// value if it completes.
94 ///
95 /// Note that this is an asynchronous trait method. Currently, the
96 /// [`async_trait`] attribute from the [`async-trait`] crate is used
97 /// to realize this specification.
98 ///
99 /// [`Pantry`]: struct.Pantry.html
100 /// [`async_trait`]: https://docs.rs/async-trait/0.1.41/async_trait/index.html
101 /// [`async-trait`]: https://docs.rs/async-trait/0.1.41/async_trait/
102 async fn perished(&mut self);
103}
104
105// These are the types of message that may be sent to the worker thread
106// of the pantry.
107enum WorkerMessage<K, V> {
108 // This takes a value and stores it in the pantry.
109 Take {
110 key: K,
111 value: V,
112 },
113
114 // This requests that a previously-stored value be retrieved
115 // and delivered back through the provided oneshot sender.
116 Give {
117 key: K,
118 return_sender: Sender<V>,
119 },
120}
121
122// This is the type of value returned by a completed monitor, indicating
123// what (if any) work should be done as a consequence.
124enum MonitorKind<K, V> {
125 // This means the worker was sent a message. The message receiver
126 // is also provided back so that it can be used to await the next
127 // message.
128 Message {
129 message: WorkerMessage<K, V>,
130 receiver: mpsc::UnboundedReceiver<WorkerMessage<K, V>>,
131 },
132
133 // This means the message sender was closed, indicating the worker
134 // thread should be joined.
135 Stop,
136
137 // This means a value was dropped or removed from the pantry.
138 // No further work is required.
139 Value,
140}
141
142// These types are used for the channels used to pass a value requester
143// into a value monitor, so that the monitor can transfer the value back
144// out to the requester.
145type Requester<V> = Sender<Sender<V>>;
146type Requestee<V> = Receiver<Sender<V>>;
147
148// This is used to create a monitor which holds onto a value and watches
149// to see if it perishes. The monitor can also receive a request to pass
150// the value back out before it perishes.
151async fn monitor_value<K, V>(
152 mut value: V,
153 requestee: Requestee<V>,
154) -> MonitorKind<K, V>
155where
156 V: Perishable,
157{
158 // The monitor completes if either of the following completes:
159 #![allow(clippy::mut_mut)]
160 futures::select!(
161 // The value has perished.
162 _ = value.perished().fuse() => (),
163
164 // The value has been requested to be fetched back out,
165 // or the requester for the value has been dropped.
166 return_sender = requestee.fuse() => {
167 // We should get a return sender, because we shouldn't drop the
168 // requester before sending a return sender.
169 //
170 // A failure to send the value means the value requester gave up
171 // waiting for the value.
172 let _ = return_sender
173 .expect("requester dropped before sending return sender")
174 .send(value);
175 },
176 );
177
178 // The output indicates that this future dealt with a value stored
179 // in the pantry (by either dropping it or sending it back out).
180 MonitorKind::Value
181}
182
183struct ParkedValuePool<K, V> {
184 requesters: Vec<Requester<V>>,
185 monitors: Vec<LocalBoxFuture<'static, MonitorKind<K, V>>>,
186}
187
188impl<K, V> ParkedValuePool<K, V>
189where
190 K: 'static,
191 V: Perishable,
192{
193 fn add(
194 &mut self,
195 value: V,
196 ) {
197 // Adding a value actually adds two different things: a requester and a
198 // monitor.
199 let (sender, receiver) = oneshot::channel();
200
201 // First we store the oneshot sender as the "requester" we can
202 // use later to retrieve the value from the monitor.
203 self.requesters.push(sender);
204
205 // Then we store a monitor, which is a future the worker thread
206 // will try to drive to completion in order to fetch or drop
207 // the value. It holds the value as well as the receiver matching
208 // the "requester", so that the value can be fetched back out of it.
209 //
210 // Note that the monitor will be taken out by the worker thread
211 // the next time it loops.
212 self.monitors.push(monitor_value(value, receiver).boxed_local());
213 }
214
215 fn new() -> Self {
216 Self {
217 requesters: Vec::new(),
218 monitors: Vec::new(),
219 }
220 }
221
222 fn remove(&mut self) -> Option<Requester<V>> {
223 // To drop a value early, we need only drop the "requester" for it.
224 // Doing so will wake the monitor since it "cancels" the oneshot.
225 self.requesters.pop()
226 }
227}
228
229async fn await_next_message<K, V>(
230 receiver: mpsc::UnboundedReceiver<WorkerMessage<K, V>>
231) -> MonitorKind<K, V> {
232 let (message, receiver) = receiver.into_future().await;
233 message.map_or(MonitorKind::Stop, |message| MonitorKind::Message {
234 message,
235 receiver,
236 })
237}
238
239async fn worker<K, V>(receiver: mpsc::UnboundedReceiver<WorkerMessage<K, V>>)
240where
241 K: Eq + Hash + 'static,
242 V: Perishable,
243{
244 let mut pools: HashMap<K, ParkedValuePool<K, V>> = HashMap::new();
245 let mut monitors = Vec::new();
246 let mut receiver = Some(receiver);
247 loop {
248 // Add to our collection any monitors that have been created since the
249 // last loop. The first loop picks up any monitors created before the
250 // worker thread actually started.
251 monitors.extend(pools.iter_mut().flat_map(|(_, pool)| {
252 pool.requesters.retain(|requester| !requester.is_canceled());
253 pool.monitors.drain(..)
254 }));
255
256 // Add a special monitor to receive the next worker message,
257 // if the receiver is idle.
258 if let Some(receiver) = receiver.take() {
259 monitors.push(await_next_message(receiver).boxed_local());
260 }
261
262 // Wait until a monitor completes. If it indicates a message
263 // received, handle the message.
264 let (monitor_kind, _, monitors_left) =
265 future::select_all(monitors.into_iter()).await;
266 monitors = monitors_left;
267 match monitor_kind {
268 MonitorKind::Message {
269 message,
270 receiver: receiver_back,
271 } => {
272 receiver = Some(receiver_back);
273 match message {
274 // Taking a value to store in the pantry is easy.
275 WorkerMessage::Take {
276 key,
277 value,
278 } => {
279 pools
280 .entry(key)
281 .or_insert_with(ParkedValuePool::new)
282 .add(value);
283 },
284
285 // Giving a value back out is more difficult. The monitor
286 // created for it when the value was stored owns the value.
287 // Getting it back out requires that we signal the monitor
288 // to pass back ownership. Once we get it we deliver it
289 // back through the oneshot sender provided with the `Give`
290 // message. It's possible we have nothing to give back, so
291 // what we send back is an `Option<V>` not a `V`.
292 WorkerMessage::Give {
293 key,
294 return_sender,
295 } => {
296 // Attempt to remove a requester from the pools. If we
297 // get a requestor, send the return sender through it
298 // for the monitor to use for returning the value
299 // back out of the pantry.
300 if let hash_map::Entry::Occupied(mut entry) =
301 pools.entry(key)
302 {
303 let pool = entry.get_mut();
304 if let Some(requester) = pool.remove() {
305 // It's possible for this to fail if the
306 // value perished during this loop.
307 let _ = requester.send(return_sender);
308 };
309 if pool.requesters.is_empty() {
310 entry.remove();
311 }
312 };
313 },
314 }
315 },
316 MonitorKind::Stop => break,
317 MonitorKind::Value => (),
318 }
319 }
320}
321
322/// Each value of this type maintains a collection of stored values that might
323/// "decay" or become unusable over time. Values are added to the collection
324/// by calling [`store`] and providing the value along with a key that can be
325/// used to retrieve the value later. Values added to the collection are
326/// monitored by a worker thread and dropped if the futures returned by their
327/// [`perished`] functions complete.
328///
329/// As long as the [`perished`] future remains uncompleted for a value, the
330/// value remains held in the collection and can be retrieved by calling
331/// [`fetch`] with the same key used to store the value in the first place.
332///
333/// # Examples
334///
335/// ```rust
336/// # extern crate async_std;
337/// # extern crate async_trait;
338/// # extern crate pantry;
339/// use async_trait::async_trait;
340/// use futures::executor::block_on;
341/// use pantry::{
342/// Pantry,
343/// Perishable,
344/// };
345/// use std::time::Duration;
346///
347/// async fn delay_async(duration: Duration) {
348/// async_std::future::timeout(duration, futures::future::pending::<()>())
349/// .await
350/// .unwrap_or(())
351/// }
352///
353/// fn delay(duration: Duration) {
354/// block_on(delay_async(duration));
355/// }
356///
357/// struct SpyOrders(&'static str);
358///
359/// #[async_trait]
360/// impl Perishable for SpyOrders {
361/// async fn perished(&mut self) {
362/// // This message will self-destruct after
363/// // sitting in the pantry for 150 milliseconds!
364/// delay_async(Duration::from_millis(150)).await
365/// }
366/// }
367///
368/// fn main() {
369/// let pantry = Pantry::new();
370/// let for_james = SpyOrders("Steal Dr. Evil's cat");
371/// let for_jason = SpyOrders("Save the Queen");
372/// let key = "spies";
373/// pantry.store(key, for_james);
374/// delay(Duration::from_millis(100));
375/// pantry.store(key, for_jason);
376/// delay(Duration::from_millis(100));
377/// let value1 = block_on(async { pantry.fetch(key).await });
378/// let value2 = block_on(async { pantry.fetch(key).await });
379/// assert!(value1.is_some());
380/// assert_eq!("Save the Queen", value1.unwrap().0);
381/// assert!(value2.is_none());
382/// }
383/// ```
384///
385/// [`perished`]: trait.Perishable.html#method.perished
386/// [`fetch`]: #method.fetch
387/// [`store`]: #method.store
388pub struct Pantry<K, V> {
389 // This sender is used to deliver messages to the worker thread.
390 work_in: mpsc::UnboundedSender<WorkerMessage<K, V>>,
391
392 // This is our handle to join the worker thread when dropped.
393 worker: Option<std::thread::JoinHandle<()>>,
394}
395
396impl<K, V> Pantry<K, V>
397where
398 K: Eq + Hash + Send + 'static,
399 V: Perishable,
400{
401 /// Create a new `Pantry` with no values in it. This spawns the worker
402 /// thread which monitors values added to it and detects when they should
403 /// be dropped.
404 #[must_use]
405 pub fn new() -> Self {
406 // Make the channel used to communicate with the worker thread.
407 let (sender, receiver) = mpsc::unbounded();
408
409 // Store the sender end of the channel and spawn the worker thread,
410 // giving it the receiver end.
411 Self {
412 work_in: sender,
413 worker: Some(thread::spawn(|| {
414 executor::block_on(worker(receiver))
415 })),
416 }
417 }
418
419 /// Transfer ownership of the given value to the `Pantry`, associating with
420 /// it the given key, which can be used later with [`fetch`] to transfer
421 /// ownership of the value back out.
422 ///
423 /// [`fetch`]: #method.fetch
424 pub fn store(
425 &self,
426 key: K,
427 value: V,
428 ) {
429 // Tell the worker thread to take the value and associate the key with
430 // it.
431 //
432 // It shouldn't be possible for this to fail, since the worker holds
433 // the receiver for this channel, and isn't dropped until the client
434 // itself is dropped. So if it does fail, we want to know about it
435 // since it would mean we have a bug.
436 self.work_in
437 .unbounded_send(WorkerMessage::Take {
438 key,
439 value,
440 })
441 .expect("worker messager receiver dropped unexpectedly");
442 }
443
444 /// Attempt to retrieve the value previously stored with [`store`] using
445 /// the given key. If no value was stored with that key, or the
446 /// [`perished`] future for the value stored with that key has completed,
447 /// `None` is returned.
448 ///
449 /// This function is asynchronous because ownership must be completely
450 /// transfered from the worker thread of the `Pantry`.
451 ///
452 /// [`perished`]: trait.Perishable.html#method.perished
453 /// [`store`]: #method.store
454 pub async fn fetch(
455 &self,
456 key: K,
457 ) -> Option<V> {
458 let (sender, receiver) = oneshot::channel();
459
460 // Tell the worker thread to give us a value matching the key.
461 //
462 // It shouldn't be possible for this to fail, since the worker holds
463 // the receiver for this channel, and isn't dropped until the client
464 // itself is dropped. So if it does fail, we want to know about it
465 // since it would mean we have a bug.
466 self.work_in
467 .unbounded_send(WorkerMessage::Give {
468 key,
469 return_sender: sender,
470 })
471 .expect("worker messager receiver dropped unexpectedly");
472
473 // Wait for the worker thread to either give us the value back or tell
474 // us (via error) that it didn't have one to give us.
475 receiver.await.ok()
476 }
477}
478
479impl<K, V> Default for Pantry<K, V>
480where
481 K: Eq + Hash + Send + 'static,
482 V: Perishable,
483{
484 fn default() -> Self {
485 Self::new()
486 }
487}
488
489impl<K, V> Drop for Pantry<K, V> {
490 fn drop(&mut self) {
491 // Closing the worker message sender should cause the worker thread to
492 // complete.
493 self.work_in.close_channel();
494
495 // Join the worker thread.
496 //
497 // This shouldn't fail unless the worker panics or we dropped ths
498 // thread join handle.
499 self.worker
500 .take()
501 .expect("worker thread join handle dropped unexpectedly")
502 .join()
503 .expect("worker thread could not be joined");
504 }
505}
506
507#[cfg(test)]
508mod tests {
509 use super::*;
510
511 struct MockPerishable {
512 num: usize,
513 perish: Option<Receiver<()>>,
514 dropped: Option<Sender<()>>,
515 }
516
517 impl MockPerishable {
518 fn perishable(num: usize) -> (Self, Sender<()>, Receiver<()>) {
519 let (perish_sender, perish_receiver) = oneshot::channel();
520 let (dropped_sender, dropped_receiver) = oneshot::channel();
521 let value = Self {
522 num,
523 perish: Some(perish_receiver),
524 dropped: Some(dropped_sender),
525 };
526 (value, perish_sender, dropped_receiver)
527 }
528
529 fn not_perishable(num: usize) -> Self {
530 Self {
531 num,
532 perish: None,
533 dropped: None,
534 }
535 }
536 }
537
538 impl Drop for MockPerishable {
539 fn drop(&mut self) {
540 if let Some(dropped) = self.dropped.take() {
541 dropped.send(()).unwrap_or(());
542 }
543 }
544 }
545
546 #[async_trait]
547 impl Perishable for MockPerishable {
548 async fn perished(&mut self) {
549 if let Some(perish) = self.perish.take() {
550 perish.await.unwrap_or(());
551 } else {
552 futures::future::pending().await
553 }
554 }
555 }
556
557 #[test]
558 fn store_then_fetch() {
559 let pantry = Pantry::new();
560 let value = MockPerishable::not_perishable(1337);
561 let key = 42;
562 pantry.store(key, value);
563 let value =
564 futures::executor::block_on(async { pantry.fetch(key).await });
565 assert!(value.is_some());
566 assert_eq!(1337, value.unwrap().num);
567 }
568
569 #[test]
570 fn fetch_without_store() {
571 let pantry: Pantry<usize, MockPerishable> = Pantry::new();
572 let key = 42;
573 let value =
574 futures::executor::block_on(async { pantry.fetch(key).await });
575 assert!(value.is_none());
576 }
577
578 #[test]
579 fn store_then_double_fetch() {
580 let pantry = Pantry::new();
581 let value = MockPerishable::not_perishable(1337);
582 let key = 42;
583 pantry.store(key, value);
584 let value =
585 futures::executor::block_on(async { pantry.fetch(key).await });
586 assert!(value.is_some());
587 assert_eq!(1337, value.unwrap().num);
588 let value =
589 futures::executor::block_on(async { pantry.fetch(key).await });
590 assert!(value.is_none());
591 }
592
593 #[test]
594 fn double_store_then_double_fetch_same_key() {
595 let pantry = Pantry::new();
596 let value1 = MockPerishable::not_perishable(1337);
597 let value2 = MockPerishable::not_perishable(85);
598 let key = 42;
599 pantry.store(key, value1);
600 pantry.store(key, value2);
601 let value1 =
602 futures::executor::block_on(async { pantry.fetch(key).await });
603 let value2 =
604 futures::executor::block_on(async { pantry.fetch(key).await });
605 assert!(value1.is_some());
606 assert!(value2.is_some());
607 assert!(matches!(
608 (value1.unwrap().num, value2.unwrap().num),
609 (1337, 85) | (85, 1337)
610 ));
611 }
612
613 #[test]
614 fn double_store_then_double_fetch_different_keys() {
615 let pantry = Pantry::new();
616 let value1 = MockPerishable::not_perishable(1337);
617 let value2 = MockPerishable::not_perishable(85);
618 let key1 = 42;
619 let key2 = 33;
620 pantry.store(key1, value1);
621 pantry.store(key2, value2);
622 let value1 =
623 futures::executor::block_on(async { pantry.fetch(key1).await });
624 let value2 =
625 futures::executor::block_on(async { pantry.fetch(key2).await });
626 assert!(value1.is_some());
627 assert!(value2.is_some());
628 assert!(matches!(
629 (value1.unwrap().num, value2.unwrap().num),
630 (1337, 85)
631 ));
632 }
633
634 #[test]
635 fn store_then_perish_then_fetch() {
636 let pantry = Pantry::new();
637 let (value, perish, dropped) = MockPerishable::perishable(1337);
638 let key = 42;
639 pantry.store(key, value);
640 assert!(perish.send(()).is_ok());
641 assert!(futures::executor::block_on(async { dropped.await }).is_ok());
642 let value =
643 futures::executor::block_on(async { pantry.fetch(key).await });
644 assert!(value.is_none());
645 }
646
647 #[test]
648 fn store_perishible_then_fetch_without_perish() {
649 let pantry = Pantry::new();
650 let (value, perish, dropped) = MockPerishable::perishable(1337);
651 let key = 42;
652 pantry.store(key, value);
653 let value =
654 futures::executor::block_on(async { pantry.fetch(key).await });
655 assert!(dropped.now_or_never().is_none());
656 drop(perish);
657 assert!(value.is_some());
658 assert_eq!(1337, value.unwrap().num);
659 }
660
661 #[test]
662 fn double_store_then_one_perishes_then_double_fetch_same_key() {
663 let pantry = Pantry::new();
664 let (value1, perish, dropped) = MockPerishable::perishable(1337);
665 let value2 = MockPerishable::not_perishable(85);
666 let key = 42;
667 pantry.store(key, value1);
668 pantry.store(key, value2);
669 assert!(perish.send(()).is_ok());
670 assert!(futures::executor::block_on(async { dropped.await }).is_ok());
671 let value1 =
672 futures::executor::block_on(async { pantry.fetch(key).await });
673 let value2 =
674 futures::executor::block_on(async { pantry.fetch(key).await });
675 assert!(value1.is_some());
676 assert!(value2.is_none());
677 assert_eq!(85, value1.unwrap().num);
678 }
679}