medea_reactive/collections/
vec.rs1use std::{marker::PhantomData, mem, slice::Iter, vec::Vec as StdVec};
4
5use futures::stream::{self, LocalBoxStream};
6
7use crate::subscribers_store::{
8 SubscribersStore, common, progressable,
9 progressable::{Processed, processed::AllProcessed},
10};
11
12pub type ProgressableVec<T> =
19 Vec<T, progressable::SubStore<T>, progressable::Guarded<T>>;
20
21pub type ObservableVec<T> = Vec<T, common::SubStore<T>, T>;
23
24#[derive(Debug)]
83pub struct Vec<T, S: SubscribersStore<T, O>, O> {
84 store: StdVec<T>,
86
87 on_push_subs: S,
89
90 on_remove_subs: S,
92
93 _output: PhantomData<O>,
95}
96
97impl<T> ProgressableVec<T>
98where
99 T: Clone + 'static,
100{
101 pub fn when_push_processed(&self) -> Processed<'static> {
104 self.on_push_subs.when_all_processed()
105 }
106
107 pub fn when_remove_processed(&self) -> Processed<'static> {
110 self.on_remove_subs.when_all_processed()
111 }
112
113 pub fn when_all_processed(&self) -> AllProcessed<'static> {
116 crate::when_all_processed(vec![
117 self.when_remove_processed().into(),
118 self.when_push_processed().into(),
119 ])
120 }
121}
122
123impl<T, S: SubscribersStore<T, O>, O> Vec<T, S, O> {
124 #[must_use]
126 pub fn new() -> Self {
127 Self::default()
128 }
129
130 pub fn iter(&self) -> impl Iterator<Item = &T> {
133 self.into_iter()
134 }
135
136 pub fn on_push(&self) -> LocalBoxStream<'static, O> {
140 self.on_push_subs.subscribe()
141 }
142
143 pub fn on_remove(&self) -> LocalBoxStream<'static, O> {
150 self.on_remove_subs.subscribe()
151 }
152}
153
154impl<T, S, O> Vec<T, S, O>
155where
156 T: Clone,
157 S: SubscribersStore<T, O>,
158 O: 'static,
159{
160 pub fn push(&mut self, value: T) {
164 self.store.push(value.clone());
165
166 self.on_push_subs.send_update(value);
167 }
168
169 pub fn remove(&mut self, index: usize) -> T {
174 let value = self.store.remove(index);
175 self.on_remove_subs.send_update(value.clone());
176
177 value
178 }
179
180 #[expect(clippy::needless_collect, reason = "false positive: lifetimes")]
189 pub fn replay_on_push(&self) -> LocalBoxStream<'static, O> {
190 Box::pin(stream::iter(
191 self.store
192 .clone()
193 .into_iter()
194 .map(|val| self.on_push_subs.wrap(val))
195 .collect::<StdVec<_>>(),
196 ))
197 }
198}
199
200impl<T, S: SubscribersStore<T, O>, O> Default for Vec<T, S, O> {
203 fn default() -> Self {
204 Self {
205 store: StdVec::new(),
206 on_push_subs: S::default(),
207 on_remove_subs: S::default(),
208 _output: PhantomData,
209 }
210 }
211}
212
213impl<T, S: SubscribersStore<T, O>, O> From<StdVec<T>> for Vec<T, S, O> {
214 fn from(from: StdVec<T>) -> Self {
215 Self {
216 store: from,
217 on_push_subs: S::default(),
218 on_remove_subs: S::default(),
219 _output: PhantomData,
220 }
221 }
222}
223
224impl<'a, T, S: SubscribersStore<T, O>, O> IntoIterator for &'a Vec<T, S, O> {
225 type IntoIter = Iter<'a, T>;
226 type Item = &'a T;
227
228 fn into_iter(self) -> Self::IntoIter {
229 self.store.iter()
230 }
231}
232
233impl<T, S: SubscribersStore<T, O>, O> Drop for Vec<T, S, O> {
234 fn drop(&mut self) {
237 for value in mem::take(&mut self.store) {
238 self.on_remove_subs.send_update(value);
239 }
240 }
241}
242
243impl<T, S, O> AsRef<[T]> for Vec<T, S, O>
244where
245 T: Clone,
246 S: SubscribersStore<T, O>,
247{
248 fn as_ref(&self) -> &[T] {
249 &self.store
250 }
251}
252
253#[cfg(test)]
254mod tests {
255 use futures::{StreamExt as _, poll, task::Poll};
256
257 use super::ProgressableVec;
258
259 #[tokio::test]
260 async fn replay_on_push() {
261 let mut vec = ProgressableVec::from(vec![1, 2, 3]);
262
263 let replay_on_push = vec.replay_on_push();
264 let on_push = vec.on_push();
265
266 vec.push(4);
267
268 assert_eq!(poll!(vec.when_push_processed()), Poll::Pending);
269 let replayed: Vec<_> = replay_on_push.collect().await;
270 assert_eq!(poll!(vec.when_push_processed()), Poll::Pending);
271
272 let replayed: Vec<_> =
273 replayed.into_iter().map(|val| val.into_inner()).collect();
274
275 assert_eq!(poll!(vec.when_push_processed()), Poll::Pending);
276 drop(on_push);
277 assert_eq!(poll!(vec.when_push_processed()), Poll::Ready(()));
278
279 assert_eq!(replayed.len(), 3);
280 assert!(replayed.contains(&1));
281 assert!(replayed.contains(&2));
282 assert!(replayed.contains(&3));
283 }
284
285 #[tokio::test]
286 async fn when_push_processed() {
287 let mut vec = ProgressableVec::new();
288 vec.push(0);
289
290 let mut on_push = vec.on_push();
291
292 assert_eq!(poll!(vec.when_push_processed()), Poll::Ready(()));
293 vec.push(1);
294 assert_eq!(poll!(vec.when_push_processed()), Poll::Pending);
295 let (val, guard) = on_push.next().await.unwrap().into_parts();
297
298 assert_eq!(val, 1);
299 assert_eq!(poll!(vec.when_push_processed()), Poll::Pending);
300 drop(guard);
301 assert_eq!(poll!(vec.when_push_processed()), Poll::Ready(()));
302 }
303
304 #[tokio::test]
305 async fn multiple_when_push_processed_subs() {
306 let mut vec = ProgressableVec::new();
307 vec.push(0);
308
309 let mut on_push1 = vec.on_push();
310 let mut on_push2 = vec.on_push();
311
312 assert_eq!(poll!(vec.when_push_processed()), Poll::Ready(()));
313 vec.push(0);
314 assert_eq!(poll!(vec.when_push_processed()), Poll::Pending);
315
316 assert_eq!(on_push1.next().await.unwrap().into_inner(), 0);
317 assert_eq!(poll!(vec.when_push_processed()), Poll::Pending);
318 assert_eq!(on_push2.next().await.unwrap().into_inner(), 0);
319
320 assert_eq!(poll!(vec.when_push_processed()), Poll::Ready(()));
321 }
322
323 #[tokio::test]
324 async fn when_remove_processed() {
325 let mut vec = ProgressableVec::new();
326 vec.push(10);
327
328 let mut on_remove = vec.on_remove();
329
330 assert_eq!(poll!(vec.when_remove_processed()), Poll::Ready(()));
331 assert_eq!(vec.remove(0), 10);
332 assert_eq!(poll!(vec.when_remove_processed()), Poll::Pending);
333
334 let (val, guard) = on_remove.next().await.unwrap().into_parts();
335
336 assert_eq!(val, 10);
337 assert_eq!(poll!(vec.when_remove_processed()), Poll::Pending);
338 drop(guard);
339 assert_eq!(poll!(vec.when_remove_processed()), Poll::Ready(()));
340 }
341
342 #[tokio::test]
343 async fn multiple_when_remove_processed_subs() {
344 let mut vec = ProgressableVec::new();
345 vec.push(10);
346
347 let mut on_remove1 = vec.on_remove();
348 let mut on_remove2 = vec.on_remove();
349
350 assert_eq!(poll!(vec.when_remove_processed()), Poll::Ready(()));
351 assert_eq!(vec.remove(0), 10);
352 assert_eq!(poll!(vec.when_remove_processed()), Poll::Pending);
353
354 assert_eq!(on_remove1.next().await.unwrap().into_inner(), 10);
355 assert_eq!(poll!(vec.when_remove_processed()), Poll::Pending);
356 assert_eq!(on_remove2.next().await.unwrap().into_inner(), 10);
357
358 assert_eq!(poll!(vec.when_remove_processed()), Poll::Ready(()));
359 }
360}