fluxus_api/io/
collection_source.rs

1use async_trait::async_trait;
2use fluxus_sources::Source;
3use fluxus_utils::models::{Record, StreamResult};
4use std::collections::VecDeque;
5use std::time::{SystemTime, UNIX_EPOCH};
6
7/// A source that produces elements from a collection
8pub struct CollectionSource<T> {
9    data: VecDeque<T>,
10}
11
12impl<T> CollectionSource<T> {
13    pub fn new(data: impl IntoIterator<Item = T>) -> Self {
14        Self {
15            data: data.into_iter().collect(),
16        }
17    }
18}
19
20#[async_trait]
21impl<T> Source<T> for CollectionSource<T>
22where
23    T: Clone + Send + Sync + 'static,
24{
25    async fn init(&mut self) -> StreamResult<()> {
26        Ok(())
27    }
28
29    async fn next(&mut self) -> StreamResult<Option<Record<T>>> {
30        let value = self.data.pop_front();
31        Ok(value.map(|data| Record {
32            data,
33            timestamp: SystemTime::now()
34                .duration_since(UNIX_EPOCH)
35                .unwrap_or_default()
36                .as_millis() as i64,
37        }))
38    }
39
40    async fn close(&mut self) -> StreamResult<()> {
41        Ok(())
42    }
43}