fluxus_api/source/
transform_source.rs1use async_trait::async_trait;
2use fluxus_core::{Record, Source, StreamResult};
3use std::sync::Arc;
4
5use crate::{InnerOperator, InnerSource};
6
7#[derive(Clone)]
8pub struct TransformSource<T: Clone> {
9 inner: Arc<InnerSource<T>>,
10 operators: Vec<Arc<InnerOperator<T, T>>>,
11}
12
13impl<T: Clone + Send + Sync + 'static> TransformSource<T> {
14 pub fn new(inner: Arc<InnerSource<T>>) -> Self {
15 Self {
16 inner,
17 operators: Vec::new(),
18 }
19 }
20
21 pub fn set_operators(&mut self, operators: Vec<Arc<InnerOperator<T, T>>>) {
22 self.operators = operators;
23 }
24}
25
26#[async_trait]
27impl<T: Clone + Send + Sync + 'static> Source<T> for TransformSource<T> {
28 async fn init(&mut self) -> StreamResult<()> {
29 Ok(())
30 }
31
32 async fn next(&mut self) -> StreamResult<Option<Record<T>>> {
33 let inner = Arc::clone(&self.inner);
34 unsafe {
35 let source = &mut *(Arc::as_ptr(&inner) as *mut InnerSource<T>);
37 source.next().await
38 }
39 }
40
41 async fn close(&mut self) -> StreamResult<()> {
42 let inner = Arc::clone(&self.inner);
43 unsafe {
44 let source = &mut *(Arc::as_ptr(&inner) as *mut InnerSource<T>);
46 source.close().await
47 }
48 }
49}