fluxus_transformers/
transform_source_with_operator.rs1use async_trait::async_trait;
2use fluxus_sources::Source;
3use fluxus_utils::models::{Record, StreamResult};
4use std::sync::Arc;
5
6use crate::{InnerOperator, InnerSource, Operator, TransformBase};
7
8#[derive(Clone)]
10pub struct TransformSourceWithOperator<T, R>
11where
12 T: Clone,
13 R: Clone,
14{
15 base: TransformBase<T>,
16 operator: Arc<InnerOperator<T, R>>,
17 buffer: Vec<Record<R>>,
18}
19
20impl<T, R> TransformSourceWithOperator<T, R>
21where
22 T: Clone + Send + Sync + 'static,
23 R: Clone + Send + Sync + 'static,
24{
25 pub fn new<O>(
26 inner: Arc<InnerSource<T>>,
27 operator: O,
28 operators: Vec<Arc<InnerOperator<T, T>>>,
29 ) -> Self
30 where
31 O: Operator<T, R> + Send + Sync + 'static,
32 {
33 let mut base = TransformBase::new(inner);
34 base.set_operators(operators);
35 Self {
36 base,
37 operator: Arc::new(operator),
38 buffer: Vec::new(),
39 }
40 }
41}
42
43#[async_trait]
44impl<T, R> Source<R> for TransformSourceWithOperator<T, R>
45where
46 T: Clone + Send + Sync + 'static,
47 R: Clone + Send + Sync + 'static,
48{
49 async fn init(&mut self) -> StreamResult<()> {
50 Ok(())
51 }
52
53 async fn next(&mut self) -> StreamResult<Option<Record<R>>> {
54 if !self.buffer.is_empty() {
55 return Ok(self.buffer.pop());
56 }
57 let record = self.base.get_next_record().await?;
58
59 let Some(record) = record else {
61 return Ok(None);
62 };
63
64 let records = self.base.process_operators(record).await?;
65
66 if records.is_empty() {
67 return self.next().await;
68 }
69
70 let mut final_results = Vec::new();
71 for rec in records {
72 final_results.extend(unsafe {
73 let op = &mut *(Arc::as_ptr(&self.operator) as *mut InnerOperator<T, R>);
74 op.process(rec).await?
75 });
76 }
77 self.buffer = final_results;
78 self.buffer.reverse();
79
80 Ok(self.buffer.pop())
81 }
82
83 async fn close(&mut self) -> StreamResult<()> {
84 self.base.close_inner().await
85 }
86}