fluxus_api/source/
transform_source.rs

1use async_trait::async_trait;
2use fluxus_core::{Record, Source, StreamResult};
3use std::sync::Arc;
4
5use crate::{InnerOperator, InnerSource};
6
7#[derive(Clone)]
8pub struct TransformSource<T: Clone> {
9    inner: Arc<InnerSource<T>>,
10    operators: Vec<Arc<InnerOperator<T, T>>>,
11}
12
13impl<T: Clone + Send + Sync + 'static> TransformSource<T> {
14    pub fn new(inner: Arc<InnerSource<T>>) -> Self {
15        Self {
16            inner,
17            operators: Vec::new(),
18        }
19    }
20
21    pub fn set_operators(&mut self, operators: Vec<Arc<InnerOperator<T, T>>>) {
22        self.operators = operators;
23    }
24}
25
26#[async_trait]
27impl<T: Clone + Send + Sync + 'static> Source<T> for TransformSource<T> {
28    async fn init(&mut self) -> StreamResult<()> {
29        Ok(())
30    }
31
32    async fn next(&mut self) -> StreamResult<Option<Record<T>>> {
33        let inner = Arc::clone(&self.inner);
34        unsafe {
35            // Safe because we have exclusive access through &mut self
36            let source = &mut *(Arc::as_ptr(&inner) as *mut InnerSource<T>);
37            source.next().await
38        }
39    }
40
41    async fn close(&mut self) -> StreamResult<()> {
42        let inner = Arc::clone(&self.inner);
43        unsafe {
44            // Safe because we have exclusive access through &mut self
45            let source = &mut *(Arc::as_ptr(&inner) as *mut InnerSource<T>);
46            source.close().await
47        }
48    }
49}