Skip to main content

aimdb_core/transform/
single.rs

1use core::fmt::Debug;
2use core::marker::PhantomData;
3
4use alloc::{
5    boxed::Box,
6    string::{String, ToString},
7    sync::Arc,
8    vec,
9};
10
11use crate::transform::TransformDescriptor;
12
13// ============================================================================
14// TransformBuilder → TransformPipeline
15// ============================================================================
16
17/// Configures a single-input transform pipeline.
18///
19/// Created by `RecordRegistrar::transform_raw()`. Use `.map()` for stateless
20/// transforms or `.with_state()` for stateful transforms.
21pub struct TransformBuilder<I, O, R: aimdb_executor::Spawn + 'static> {
22    input_key: String,
23    _phantom: PhantomData<(I, O, R)>,
24}
25
26impl<I, O, R> TransformBuilder<I, O, R>
27where
28    I: Send + Sync + Clone + Debug + 'static,
29    O: Send + Sync + Clone + Debug + 'static,
30    R: aimdb_executor::Spawn + 'static,
31{
32    pub(crate) fn new(input_key: String) -> Self {
33        Self {
34            input_key,
35            _phantom: PhantomData,
36        }
37    }
38
39    /// Stateless 1:1 map. Returning `None` skips output for this input value.
40    pub fn map<F>(self, f: F) -> TransformPipeline<I, O, R>
41    where
42        F: Fn(&I) -> Option<O> + Send + Sync + 'static,
43    {
44        TransformPipeline {
45            input_key: self.input_key,
46            spawn_factory: Box::new(move |input_key| {
47                let transform_fn = move |val: &I, _state: &mut ()| f(val);
48                create_single_transform_descriptor::<I, O, (), R>(input_key, (), transform_fn)
49            }),
50            _phantom_i: PhantomData,
51        }
52    }
53
54    /// Begin configuring a stateful transform. `S` is the user-defined state type.
55    pub fn with_state<S: Send + Sync + 'static>(
56        self,
57        initial: S,
58    ) -> StatefulTransformBuilder<I, O, S, R> {
59        StatefulTransformBuilder {
60            input_key: self.input_key,
61            initial_state: initial,
62            _phantom: PhantomData,
63        }
64    }
65}
66
67/// Intermediate builder for stateful single-input transforms.
68pub struct StatefulTransformBuilder<I, O, S, R: aimdb_executor::Spawn + 'static> {
69    input_key: String,
70    initial_state: S,
71    _phantom: PhantomData<(I, O, R)>,
72}
73
74impl<I, O, S, R> StatefulTransformBuilder<I, O, S, R>
75where
76    I: Send + Sync + Clone + Debug + 'static,
77    O: Send + Sync + Clone + Debug + 'static,
78    S: Send + Sync + 'static,
79    R: aimdb_executor::Spawn + 'static,
80{
81    /// Called for each input value. Receives mutable state, returns optional output.
82    pub fn on_value<F>(self, f: F) -> TransformPipeline<I, O, R>
83    where
84        F: Fn(&I, &mut S) -> Option<O> + Send + Sync + 'static,
85    {
86        let initial = self.initial_state;
87        TransformPipeline {
88            input_key: self.input_key,
89            spawn_factory: Box::new(move |input_key| {
90                create_single_transform_descriptor::<I, O, S, R>(input_key, initial, f)
91            }),
92            _phantom_i: PhantomData,
93        }
94    }
95}
96
97/// Completed single-input transform pipeline, ready to be stored in `TypedRecord`.
98pub struct TransformPipeline<
99    I,
100    O: Send + Sync + Clone + Debug + 'static,
101    R: aimdb_executor::Spawn + 'static,
102> {
103    pub(crate) input_key: String,
104    pub(crate) spawn_factory: Box<dyn FnOnce(String) -> TransformDescriptor<O, R> + Send + Sync>,
105    pub(crate) _phantom_i: PhantomData<I>,
106}
107
108impl<I, O, R> TransformPipeline<I, O, R>
109where
110    I: Send + Sync + Clone + Debug + 'static,
111    O: Send + Sync + Clone + Debug + 'static,
112    R: aimdb_executor::Spawn + 'static,
113{
114    pub(crate) fn into_descriptor(self) -> TransformDescriptor<O, R> {
115        (self.spawn_factory)(self.input_key)
116    }
117}
118
119fn create_single_transform_descriptor<I, O, S, R>(
120    input_key: String,
121    initial_state: S,
122    transform_fn: impl Fn(&I, &mut S) -> Option<O> + Send + Sync + 'static,
123) -> TransformDescriptor<O, R>
124where
125    I: Send + Sync + Clone + Debug + 'static,
126    O: Send + Sync + Clone + Debug + 'static,
127    S: Send + Sync + 'static,
128    R: aimdb_executor::Spawn + 'static,
129{
130    let input_key_clone = input_key.clone();
131    let input_keys = vec![input_key];
132
133    TransformDescriptor {
134        input_keys,
135        spawn_fn: Box::new(move |producer, db, _ctx| {
136            Box::pin(run_single_transform::<I, O, S, R>(
137                db,
138                input_key_clone,
139                producer,
140                initial_state,
141                transform_fn,
142            ))
143        }),
144    }
145}
146
147// ============================================================================
148// Transform Task Runner
149// ============================================================================
150
151#[allow(unused_variables)]
152pub(crate) async fn run_single_transform<I, O, S, R>(
153    db: Arc<crate::AimDb<R>>,
154    input_key: String,
155    producer: crate::Producer<O, R>,
156    mut state: S,
157    transform_fn: impl Fn(&I, &mut S) -> Option<O> + Send + Sync + 'static,
158) where
159    I: Send + Sync + Clone + Debug + 'static,
160    O: Send + Sync + Clone + Debug + 'static,
161    S: Send + 'static,
162    R: aimdb_executor::Spawn + 'static,
163{
164    let output_key = producer.key().to_string();
165
166    #[cfg(feature = "tracing")]
167    tracing::info!("🔄 Transform started: '{}' → '{}'", input_key, output_key);
168
169    let consumer = crate::typed_api::Consumer::<I, R>::new(db, input_key.clone());
170    let mut reader = match consumer.subscribe() {
171        Ok(r) => r,
172        Err(_e) => {
173            #[cfg(feature = "tracing")]
174            tracing::error!(
175                "🔄 Transform '{}' → '{}' FATAL: failed to subscribe to input: {:?}",
176                input_key,
177                output_key,
178                _e
179            );
180            #[cfg(all(feature = "std", not(feature = "tracing")))]
181            eprintln!(
182                "AIMDB TRANSFORM ERROR: '{}' → '{}' failed to subscribe to input: {:?}",
183                input_key, output_key, _e
184            );
185            return;
186        }
187    };
188
189    #[cfg(feature = "tracing")]
190    tracing::debug!(
191        "✅ Transform '{}' → '{}' subscribed, entering event loop",
192        input_key,
193        output_key
194    );
195
196    loop {
197        match reader.recv().await {
198            Ok(input_value) => {
199                if let Some(output_value) = transform_fn(&input_value, &mut state) {
200                    let _ = producer.produce(output_value).await;
201                }
202            }
203            Err(crate::DbError::BufferLagged { .. }) => {
204                #[cfg(feature = "tracing")]
205                tracing::warn!(
206                    "🔄 Transform '{}' → '{}' lagged behind, some values skipped",
207                    input_key,
208                    output_key
209                );
210                continue;
211            }
212            Err(_) => {
213                #[cfg(feature = "tracing")]
214                tracing::warn!(
215                    "🔄 Transform '{}' → '{}' input closed, task exiting",
216                    input_key,
217                    output_key
218                );
219                break;
220            }
221        }
222    }
223}