fluxus_api/io/
collection_source.rs

1use async_trait::async_trait;
2use fluxus_core::{Record, Source, StreamResult};
3use std::collections::VecDeque;
4use std::time::{SystemTime, UNIX_EPOCH};
5
6/// A source that produces elements from a collection
7pub struct CollectionSource<T> {
8    data: VecDeque<T>,
9}
10
11impl<T> CollectionSource<T> {
12    pub fn new(data: impl IntoIterator<Item = T>) -> Self {
13        Self {
14            data: data.into_iter().collect(),
15        }
16    }
17}
18
19#[async_trait]
20impl<T> Source<T> for CollectionSource<T>
21where
22    T: Clone + Send + Sync + 'static,
23{
24    async fn init(&mut self) -> StreamResult<()> {
25        Ok(())
26    }
27
28    async fn next(&mut self) -> StreamResult<Option<Record<T>>> {
29        let value = self.data.pop_front();
30        Ok(value.map(|data| Record {
31            data,
32            timestamp: SystemTime::now()
33                .duration_since(UNIX_EPOCH)
34                .unwrap_or_default()
35                .as_millis() as i64,
36        }))
37    }
38
39    async fn close(&mut self) -> StreamResult<()> {
40        Ok(())
41    }
42}