fluxus_api/source/
transform_source_with_operator.rs1use async_trait::async_trait;
2use fluxus_core::{Operator, Record, Source, StreamResult};
3use std::sync::Arc;
4
5use crate::{InnerOperator, InnerSource};
6
7#[derive(Clone)]
9pub struct TransformSourceWithOperator<T, R>
10where
11 T: Clone,
12 R: Clone,
13{
14 inner: Arc<InnerSource<T>>,
15 operator: Arc<InnerOperator<T, R>>,
16}
17
18impl<T, R> TransformSourceWithOperator<T, R>
19where
20 T: Clone + Send + Sync + 'static,
21 R: Clone + Send + Sync + 'static,
22{
23 pub fn new<O>(inner: Arc<InnerSource<T>>, operator: O) -> Self
24 where
25 O: Operator<T, R> + Send + Sync + 'static,
26 {
27 Self {
28 inner,
29 operator: Arc::new(operator),
30 }
31 }
32}
33
34#[async_trait]
35impl<T, R> Source<R> for TransformSourceWithOperator<T, R>
36where
37 T: Clone + Send + Sync + 'static,
38 R: Clone + Send + Sync + 'static,
39{
40 async fn init(&mut self) -> StreamResult<()> {
41 Ok(())
42 }
43
44 async fn next(&mut self) -> StreamResult<Option<Record<R>>> {
45 let inner = Arc::clone(&self.inner);
46 let record = unsafe {
47 let source = &mut *(Arc::as_ptr(&inner) as *mut InnerSource<T>);
49 source.next().await?
50 };
51
52 match record {
53 Some(record) => {
54 let operator = Arc::clone(&self.operator);
55 let output = unsafe {
56 let op = &mut *(Arc::as_ptr(&operator) as *mut InnerOperator<T, R>);
58 op.process(record).await?
59 };
60 Ok(output.into_iter().next())
61 }
62 None => Ok(None),
63 }
64 }
65
66 async fn close(&mut self) -> StreamResult<()> {
67 let inner = Arc::clone(&self.inner);
68 unsafe {
69 let source = &mut *(Arc::as_ptr(&inner) as *mut InnerSource<T>);
71 source.close().await
72 }
73 }
74}