fluxus_api/io/
collection_source.rs1use async_trait::async_trait;
2use fluxus_core::{Record, Source, StreamResult};
3use std::collections::VecDeque;
4use std::time::{SystemTime, UNIX_EPOCH};
5
6pub struct CollectionSource<T> {
8 data: VecDeque<T>,
9}
10
11impl<T> CollectionSource<T> {
12 pub fn new(data: impl IntoIterator<Item = T>) -> Self {
13 Self {
14 data: data.into_iter().collect(),
15 }
16 }
17}
18
19#[async_trait]
20impl<T> Source<T> for CollectionSource<T>
21where
22 T: Clone + Send + Sync + 'static,
23{
24 async fn init(&mut self) -> StreamResult<()> {
25 Ok(())
26 }
27
28 async fn next(&mut self) -> StreamResult<Option<Record<T>>> {
29 let value = self.data.pop_front();
30 Ok(value.map(|data| Record {
31 data,
32 timestamp: SystemTime::now()
33 .duration_since(UNIX_EPOCH)
34 .unwrap_or_default()
35 .as_millis() as i64,
36 }))
37 }
38
39 async fn close(&mut self) -> StreamResult<()> {
40 Ok(())
41 }
42}