medea_reactive/collections/
vec.rs

1//! Reactive vector based on [`Vec`].
2
3use std::{marker::PhantomData, mem, slice::Iter, vec::Vec as StdVec};
4
5use futures::stream::{self, LocalBoxStream};
6
7use crate::subscribers_store::{
8    SubscribersStore, common, progressable,
9    progressable::{Processed, processed::AllProcessed},
10};
11
12/// Reactive vector based on [`Vec`] with additional functionality of tracking
13/// progress made by its subscribers.
14///
15/// Its [`Vec::on_push()`] and [`Vec::on_remove()`] subscriptions return values
16/// wrapped in a [`progressable::Guarded`], and the implementation tracks all
17/// [`progressable::Guard`]s.
18pub type ProgressableVec<T> =
19    Vec<T, progressable::SubStore<T>, progressable::Guarded<T>>;
20
21/// Reactive vector based on [`Vec`].
22pub type ObservableVec<T> = Vec<T, common::SubStore<T>, T>;
23
24/// Reactive vector based on [`Vec`].
25///
26/// # Usage
27///
28/// ```rust
29/// # use futures::{executor, StreamExt as _, Stream};
30/// use medea_reactive::collections::ObservableVec;
31///
32/// # executor::block_on(async {
33/// let mut vec = ObservableVec::new();
34///
35/// // You can subscribe on push event:
36/// let mut pushes = vec.on_push();
37///
38/// vec.push("foo");
39///
40/// let pushed_item = pushes.next().await.unwrap();
41/// assert_eq!(pushed_item, "foo");
42///
43/// // Also you can subscribe on remove event:
44/// let mut removals = vec.on_remove();
45///
46/// vec.remove(0);
47///
48/// let removed_item = removals.next().await.unwrap();
49/// assert_eq!(removed_item, "foo");
50///
51/// // On Vec structure drop, all items will be sent to the on_remove stream:
52/// vec.push("foo-1");
53/// vec.push("foo-2");
54/// drop(vec);
55/// let removed_items: Vec<_> = removals.take(2).collect().await;
56/// assert_eq!(removed_items[0], "foo-1");
57/// assert_eq!(removed_items[1], "foo-2");
58/// # });
59/// ```
60///
61/// # Waiting for subscribers to complete
62///
63/// ```rust
64/// # use futures::{executor, StreamExt as _, Stream};
65/// use medea_reactive::collections::ProgressableVec;
66///
67/// # executor::block_on(async {
68/// let mut vec = ProgressableVec::new();
69///
70/// let mut on_push = vec.on_push();
71/// vec.push(1);
72///
73/// // vec.when_push_processed().await; <- wouldn't be resolved
74/// let value = on_push.next().await.unwrap();
75/// // vec.when_push_processed().await; <- wouldn't be resolved
76/// drop(value);
77///
78/// vec.when_push_processed().await; // will be resolved
79///
80/// # });
81/// ```
82#[derive(Debug)]
83pub struct Vec<T, S: SubscribersStore<T, O>, O> {
84    /// Data stored by this [`Vec`].
85    store: StdVec<T>,
86
87    /// Subscribers of the [`Vec::on_push`] method.
88    on_push_subs: S,
89
90    /// Subscribers of the [`Vec::on_remove`] method.
91    on_remove_subs: S,
92
93    /// Phantom type of [`Vec::on_push()`] and [`Vec::on_remove()`] output.
94    _output: PhantomData<O>,
95}
96
97impl<T> ProgressableVec<T>
98where
99    T: Clone + 'static,
100{
101    /// Returns [`Future`] resolving when all push updates will be processed by
102    /// [`Vec::on_push()`] subscribers.
103    pub fn when_push_processed(&self) -> Processed<'static> {
104        self.on_push_subs.when_all_processed()
105    }
106
107    /// Returns [`Future`] resolving when all remove updates will be processed
108    /// by [`Vec::on_remove()`] subscribers.
109    pub fn when_remove_processed(&self) -> Processed<'static> {
110        self.on_remove_subs.when_all_processed()
111    }
112
113    /// Returns [`Future`] resolving when all push and remove updates will be
114    /// processed by subscribers.
115    pub fn when_all_processed(&self) -> AllProcessed<'static> {
116        crate::when_all_processed(vec![
117            self.when_remove_processed().into(),
118            self.when_push_processed().into(),
119        ])
120    }
121}
122
123impl<T, S: SubscribersStore<T, O>, O> Vec<T, S, O> {
124    /// Returns new empty [`Vec`].
125    #[must_use]
126    pub fn new() -> Self {
127        Self::default()
128    }
129
130    /// An iterator visiting all elements in arbitrary order. The iterator
131    /// element type is `&'a T`.
132    pub fn iter(&self) -> impl Iterator<Item = &T> {
133        self.into_iter()
134    }
135
136    /// Returns the [`Stream`] to which the pushed values will be sent.
137    ///
138    /// [`Stream`]: futures::Stream
139    pub fn on_push(&self) -> LocalBoxStream<'static, O> {
140        self.on_push_subs.subscribe()
141    }
142
143    /// Returns the [`Stream`] to which the removed values will be sent.
144    ///
145    /// Note that to this [`Stream`] will be sent all items of the
146    /// [`Vec`] on drop.
147    ///
148    /// [`Stream`]: futures::Stream
149    pub fn on_remove(&self) -> LocalBoxStream<'static, O> {
150        self.on_remove_subs.subscribe()
151    }
152}
153
154impl<T, S, O> Vec<T, S, O>
155where
156    T: Clone,
157    S: SubscribersStore<T, O>,
158    O: 'static,
159{
160    /// Appends a value to the back of this [`Vec`].
161    ///
162    /// This will produce [`Vec::on_push()`] event.
163    pub fn push(&mut self, value: T) {
164        self.store.push(value.clone());
165
166        self.on_push_subs.send_update(value);
167    }
168
169    /// Removes and returns the value at position `index` within this [`Vec`],
170    /// shifting all values after it to the left.
171    ///
172    /// This will produce [`Vec::on_remove()`] event.
173    pub fn remove(&mut self, index: usize) -> T {
174        let value = self.store.remove(index);
175        self.on_remove_subs.send_update(value.clone());
176
177        value
178    }
179
180    /// Returns [`Stream`] containing values from this [`Vec`].
181    ///
182    /// Returned [`Stream`] contains only current values. It won't update on new
183    /// pushes, but you can merge returned [`Stream`] with a [`Vec::on_push`]
184    /// [`Stream`] if you want to process current values and values that will be
185    /// inserted.
186    ///
187    /// [`Stream`]: futures::Stream
188    #[expect(clippy::needless_collect, reason = "false positive: lifetimes")]
189    pub fn replay_on_push(&self) -> LocalBoxStream<'static, O> {
190        Box::pin(stream::iter(
191            self.store
192                .clone()
193                .into_iter()
194                .map(|val| self.on_push_subs.wrap(val))
195                .collect::<StdVec<_>>(),
196        ))
197    }
198}
199
200// Implemented manually to omit redundant `: Default` trait bounds, imposed by
201// `#[derive(Default)]`.
202impl<T, S: SubscribersStore<T, O>, O> Default for Vec<T, S, O> {
203    fn default() -> Self {
204        Self {
205            store: StdVec::new(),
206            on_push_subs: S::default(),
207            on_remove_subs: S::default(),
208            _output: PhantomData,
209        }
210    }
211}
212
213impl<T, S: SubscribersStore<T, O>, O> From<StdVec<T>> for Vec<T, S, O> {
214    fn from(from: StdVec<T>) -> Self {
215        Self {
216            store: from,
217            on_push_subs: S::default(),
218            on_remove_subs: S::default(),
219            _output: PhantomData,
220        }
221    }
222}
223
224impl<'a, T, S: SubscribersStore<T, O>, O> IntoIterator for &'a Vec<T, S, O> {
225    type IntoIter = Iter<'a, T>;
226    type Item = &'a T;
227
228    fn into_iter(self) -> Self::IntoIter {
229        self.store.iter()
230    }
231}
232
233impl<T, S: SubscribersStore<T, O>, O> Drop for Vec<T, S, O> {
234    /// Sends all items of a dropped [`Vec`] to the [`Vec::on_remove()`]
235    /// subscriptions.
236    fn drop(&mut self) {
237        for value in mem::take(&mut self.store) {
238            self.on_remove_subs.send_update(value);
239        }
240    }
241}
242
243impl<T, S, O> AsRef<[T]> for Vec<T, S, O>
244where
245    T: Clone,
246    S: SubscribersStore<T, O>,
247{
248    fn as_ref(&self) -> &[T] {
249        &self.store
250    }
251}
252
253#[cfg(test)]
254mod tests {
255    use futures::{StreamExt as _, poll, task::Poll};
256
257    use super::ProgressableVec;
258
259    #[tokio::test]
260    async fn replay_on_push() {
261        let mut vec = ProgressableVec::from(vec![1, 2, 3]);
262
263        let replay_on_push = vec.replay_on_push();
264        let on_push = vec.on_push();
265
266        vec.push(4);
267
268        assert_eq!(poll!(vec.when_push_processed()), Poll::Pending);
269        let replayed: Vec<_> = replay_on_push.collect().await;
270        assert_eq!(poll!(vec.when_push_processed()), Poll::Pending);
271
272        let replayed: Vec<_> =
273            replayed.into_iter().map(|val| val.into_inner()).collect();
274
275        assert_eq!(poll!(vec.when_push_processed()), Poll::Pending);
276        drop(on_push);
277        assert_eq!(poll!(vec.when_push_processed()), Poll::Ready(()));
278
279        assert_eq!(replayed.len(), 3);
280        assert!(replayed.contains(&1));
281        assert!(replayed.contains(&2));
282        assert!(replayed.contains(&3));
283    }
284
285    #[tokio::test]
286    async fn when_push_processed() {
287        let mut vec = ProgressableVec::new();
288        vec.push(0);
289
290        let mut on_push = vec.on_push();
291
292        assert_eq!(poll!(vec.when_push_processed()), Poll::Ready(()));
293        vec.push(1);
294        assert_eq!(poll!(vec.when_push_processed()), Poll::Pending);
295        //
296        let (val, guard) = on_push.next().await.unwrap().into_parts();
297
298        assert_eq!(val, 1);
299        assert_eq!(poll!(vec.when_push_processed()), Poll::Pending);
300        drop(guard);
301        assert_eq!(poll!(vec.when_push_processed()), Poll::Ready(()));
302    }
303
304    #[tokio::test]
305    async fn multiple_when_push_processed_subs() {
306        let mut vec = ProgressableVec::new();
307        vec.push(0);
308
309        let mut on_push1 = vec.on_push();
310        let mut on_push2 = vec.on_push();
311
312        assert_eq!(poll!(vec.when_push_processed()), Poll::Ready(()));
313        vec.push(0);
314        assert_eq!(poll!(vec.when_push_processed()), Poll::Pending);
315
316        assert_eq!(on_push1.next().await.unwrap().into_inner(), 0);
317        assert_eq!(poll!(vec.when_push_processed()), Poll::Pending);
318        assert_eq!(on_push2.next().await.unwrap().into_inner(), 0);
319
320        assert_eq!(poll!(vec.when_push_processed()), Poll::Ready(()));
321    }
322
323    #[tokio::test]
324    async fn when_remove_processed() {
325        let mut vec = ProgressableVec::new();
326        vec.push(10);
327
328        let mut on_remove = vec.on_remove();
329
330        assert_eq!(poll!(vec.when_remove_processed()), Poll::Ready(()));
331        assert_eq!(vec.remove(0), 10);
332        assert_eq!(poll!(vec.when_remove_processed()), Poll::Pending);
333
334        let (val, guard) = on_remove.next().await.unwrap().into_parts();
335
336        assert_eq!(val, 10);
337        assert_eq!(poll!(vec.when_remove_processed()), Poll::Pending);
338        drop(guard);
339        assert_eq!(poll!(vec.when_remove_processed()), Poll::Ready(()));
340    }
341
342    #[tokio::test]
343    async fn multiple_when_remove_processed_subs() {
344        let mut vec = ProgressableVec::new();
345        vec.push(10);
346
347        let mut on_remove1 = vec.on_remove();
348        let mut on_remove2 = vec.on_remove();
349
350        assert_eq!(poll!(vec.when_remove_processed()), Poll::Ready(()));
351        assert_eq!(vec.remove(0), 10);
352        assert_eq!(poll!(vec.when_remove_processed()), Poll::Pending);
353
354        assert_eq!(on_remove1.next().await.unwrap().into_inner(), 10);
355        assert_eq!(poll!(vec.when_remove_processed()), Poll::Pending);
356        assert_eq!(on_remove2.next().await.unwrap().into_inner(), 10);
357
358        assert_eq!(poll!(vec.when_remove_processed()), Poll::Ready(()));
359    }
360}