fluxus_transformers/
transform_source_with_operator.rs

1use async_trait::async_trait;
2use fluxus_sources::Source;
3use fluxus_utils::models::{Record, StreamResult};
4use std::sync::Arc;
5
6use crate::{InnerOperator, InnerSource, Operator, TransformBase};
7
8/// A source that applies a single operator transformation
9#[derive(Clone)]
10pub struct TransformSourceWithOperator<T, R>
11where
12    T: Clone,
13    R: Clone,
14{
15    base: TransformBase<T>,
16    operator: Arc<InnerOperator<T, R>>,
17    buffer: Vec<Record<R>>,
18}
19
20impl<T, R> TransformSourceWithOperator<T, R>
21where
22    T: Clone + Send + Sync + 'static,
23    R: Clone + Send + Sync + 'static,
24{
25    pub fn new<O>(
26        inner: Arc<InnerSource<T>>,
27        operator: O,
28        operators: Vec<Arc<InnerOperator<T, T>>>,
29    ) -> Self
30    where
31        O: Operator<T, R> + Send + Sync + 'static,
32    {
33        let mut base = TransformBase::new(inner);
34        base.set_operators(operators);
35        Self {
36            base,
37            operator: Arc::new(operator),
38            buffer: Vec::new(),
39        }
40    }
41}
42
43#[async_trait]
44impl<T, R> Source<R> for TransformSourceWithOperator<T, R>
45where
46    T: Clone + Send + Sync + 'static,
47    R: Clone + Send + Sync + 'static,
48{
49    async fn init(&mut self) -> StreamResult<()> {
50        Ok(())
51    }
52
53    async fn next(&mut self) -> StreamResult<Option<Record<R>>> {
54        if !self.buffer.is_empty() {
55            return Ok(self.buffer.pop());
56        }
57        let record = self.base.get_next_record().await?;
58
59        // If there's no next record, return None
60        let Some(record) = record else {
61            return Ok(None);
62        };
63
64        let records = self.base.process_operators(record).await?;
65
66        if records.is_empty() {
67            return self.next().await;
68        }
69
70        let mut final_results = Vec::new();
71        for rec in records {
72            final_results.extend(unsafe {
73                let op = &mut *(Arc::as_ptr(&self.operator) as *mut InnerOperator<T, R>);
74                op.process(rec).await?
75            });
76        }
77        self.buffer = final_results;
78        self.buffer.reverse();
79
80        Ok(self.buffer.pop())
81    }
82
83    async fn close(&mut self) -> StreamResult<()> {
84        self.base.close_inner().await
85    }
86}