1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
use crate::tx2::tx2_utils::*;
use crate::*;

struct Inner<T: 'static + Send> {
    bucket: Vec<T>,
    notify: Arc<tokio::sync::Notify>,
}

/// Control efficient access to shared resource pool.
pub struct ResourceBucket<T: 'static + Send>(Arc<Share<Inner<T>>>);

impl<T: 'static + Send> Clone for ResourceBucket<T> {
    fn clone(&self) -> Self {
        Self(self.0.clone())
    }
}

impl<T: 'static + Send> Default for ResourceBucket<T> {
    fn default() -> Self {
        Self::new()
    }
}

impl<T: 'static + Send> ResourceBucket<T> {
    /// Create a new resource bucket.
    pub fn new() -> Self {
        Self(Arc::new(Share::new(Inner {
            bucket: Vec::new(),
            notify: Arc::new(tokio::sync::Notify::new()),
        })))
    }

    /// Add a resource to the bucket.
    /// Could be a new resource, or a previously acquired resource.
    pub fn release(&self, t: T) {
        let _ = self.0.share_mut(move |i, _| {
            i.bucket.push(t);
            i.notify.notify_one();
            Ok(())
        });
    }

    /// Acquire a resource that is immediately available from the bucket
    /// or generate a new one.
    pub fn acquire_or_else<F>(&self, f: F) -> T
    where
        F: FnOnce() -> T + 'static + Send,
    {
        if let Ok(t) = self.0.share_mut(|i, _| {
            if !i.bucket.is_empty() {
                return Ok(i.bucket.remove(0));
            }
            Err(().into())
        }) {
            return t;
        }
        f()
    }

    /// Acquire a resource from the bucket.
    pub fn acquire(
        &self,
        timeout: Option<KitsuneTimeout>,
    ) -> impl std::future::Future<Output = KitsuneResult<T>> + 'static + Send {
        let inner = self.0.clone();
        async move {
            // first, see if there is a resource to return immediately
            // or, capture a notifier for when there might be again
            let notify = match inner.share_mut(|i, _| {
                if !i.bucket.is_empty() {
                    return Ok((Some(i.bucket.remove(0)), None));
                }
                Ok((None, Some(i.notify.clone())))
            }) {
                Err(e) => return Err(e),
                Ok((Some(t), None)) => return Ok(t),
                Ok((None, Some(notify))) => notify,
                _ => unreachable!(),
            };

            loop {
                // capture the notifier future
                let n = notify.notified();

                // mix with timeout if appropriate
                match timeout {
                    Some(timeout) => {
                        timeout
                            .mix(async move {
                                n.await;
                                Ok(())
                            })
                            .await
                    }
                    None => {
                        n.await;
                        Ok(())
                    }
                }?;

                // we've been notified, see if there is data
                match inner.share_mut(|i, _| {
                    if !i.bucket.is_empty() {
                        return Ok(Some(i.bucket.remove(0)));
                    }
                    Ok(None)
                }) {
                    Err(e) => return Err(e),
                    Ok(Some(t)) => return Ok(t),
                    _ => (),
                }
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test(flavor = "multi_thread")]
    async fn test_async_bucket_timeout() {
        let t = Some(KitsuneTimeout::from_millis(10));
        let bucket = <ResourceBucket<&'static str>>::new();
        let j1 = metric_task(bucket.acquire(t));
        let j2 = metric_task(bucket.acquire(t));
        assert!(j1.await.unwrap().is_err());
        assert!(j2.await.unwrap().is_err());
    }

    #[tokio::test(flavor = "multi_thread")]
    async fn test_async_bucket() {
        let bucket = <ResourceBucket<&'static str>>::new();
        let j1 = metric_task(bucket.acquire(None));
        let j2 = metric_task(bucket.acquire(None));
        bucket.release("1");
        bucket.release("2");
        let j1 = j1.await.unwrap().unwrap();
        let j2 = j2.await.unwrap().unwrap();
        assert!((j1 == "1" && j2 == "2") || (j2 == "1" && j1 == "2"));
    }

    #[tokio::test(flavor = "multi_thread")]
    async fn test_async_bucket_acquire_or_else() {
        let bucket = <ResourceBucket<&'static str>>::new();
        let j1 = metric_task(bucket.acquire(None));
        let j2 = bucket.acquire_or_else(|| "2");
        bucket.release("1");
        let j1 = j1.await.unwrap().unwrap();
        assert_eq!(j1, "1");
        assert_eq!(j2, "2");
    }
}