fluxus_api/source/
transform_source_with_operator.rs

1use async_trait::async_trait;
2use fluxus_core::{Operator, Record, Source, StreamResult};
3use std::sync::Arc;
4
5use crate::{InnerOperator, InnerSource};
6
7/// A source that applies a single operator transformation
8#[derive(Clone)]
9pub struct TransformSourceWithOperator<T, R>
10where
11    T: Clone,
12    R: Clone,
13{
14    inner: Arc<InnerSource<T>>,
15    operator: Arc<InnerOperator<T, R>>,
16}
17
18impl<T, R> TransformSourceWithOperator<T, R>
19where
20    T: Clone + Send + Sync + 'static,
21    R: Clone + Send + Sync + 'static,
22{
23    pub fn new<O>(inner: Arc<InnerSource<T>>, operator: O) -> Self
24    where
25        O: Operator<T, R> + Send + Sync + 'static,
26    {
27        Self {
28            inner,
29            operator: Arc::new(operator),
30        }
31    }
32}
33
34#[async_trait]
35impl<T, R> Source<R> for TransformSourceWithOperator<T, R>
36where
37    T: Clone + Send + Sync + 'static,
38    R: Clone + Send + Sync + 'static,
39{
40    async fn init(&mut self) -> StreamResult<()> {
41        Ok(())
42    }
43
44    async fn next(&mut self) -> StreamResult<Option<Record<R>>> {
45        let inner = Arc::clone(&self.inner);
46        let record = unsafe {
47            // Safe because we have exclusive access through &mut self
48            let source = &mut *(Arc::as_ptr(&inner) as *mut InnerSource<T>);
49            source.next().await?
50        };
51
52        match record {
53            Some(record) => {
54                let operator = Arc::clone(&self.operator);
55                let output = unsafe {
56                    // Safe because we have exclusive access through &mut self
57                    let op = &mut *(Arc::as_ptr(&operator) as *mut InnerOperator<T, R>);
58                    op.process(record).await?
59                };
60                Ok(output.into_iter().next())
61            }
62            None => Ok(None),
63        }
64    }
65
66    async fn close(&mut self) -> StreamResult<()> {
67        let inner = Arc::clone(&self.inner);
68        unsafe {
69            // Safe because we have exclusive access through &mut self
70            let source = &mut *(Arc::as_ptr(&inner) as *mut InnerSource<T>);
71            source.close().await
72        }
73    }
74}