fluxus_api/io/
collection_source.rs1use async_trait::async_trait;
2use fluxus_sources::Source;
3use fluxus_utils::models::{Record, StreamResult};
4use std::collections::VecDeque;
5use std::time::{SystemTime, UNIX_EPOCH};
6
7pub struct CollectionSource<T> {
9 data: VecDeque<T>,
10}
11
12impl<T> CollectionSource<T> {
13 pub fn new(data: impl IntoIterator<Item = T>) -> Self {
14 Self {
15 data: data.into_iter().collect(),
16 }
17 }
18}
19
20#[async_trait]
21impl<T> Source<T> for CollectionSource<T>
22where
23 T: Clone + Send + Sync + 'static,
24{
25 async fn init(&mut self) -> StreamResult<()> {
26 Ok(())
27 }
28
29 async fn next(&mut self) -> StreamResult<Option<Record<T>>> {
30 let value = self.data.pop_front();
31 Ok(value.map(|data| Record {
32 data,
33 timestamp: SystemTime::now()
34 .duration_since(UNIX_EPOCH)
35 .unwrap_or_default()
36 .as_millis() as i64,
37 }))
38 }
39
40 async fn close(&mut self) -> StreamResult<()> {
41 Ok(())
42 }
43}