Skip to main content

aimdb_core/
transform.rs

1//! Reactive transform primitives for derived records
2//!
3//! This module provides the `.transform()` and `.transform_join()` API for declaring
4//! reactive derivations from one or more input records to an output record.
5//!
6//! # Transform Archetypes
7//!
8//! - **Map** (1:1, stateless): Transform each input value to zero-or-one output value
9//! - **Accumulate** (N:1, stateful): Aggregate a stream of values with persistent state
10//! - **Join** (M×N:1, stateful, multi-input): Combine values from multiple input records
11//!
12//! All three are handled by a unified API surface:
13//! - Single-input: `.transform()` with `TransformBuilder`
14//! - Multi-input: `.transform_join()` with `JoinBuilder`
15//!
16//! # Design Principles
17//!
18//! - Transforms are **owned by AimDB** — visible in the dependency graph
19//! - Transforms are **mutually exclusive** with `.source()` on the same record
20//! - Multiple `.tap()` observers can still be attached to a transform's output
21//! - Input subscriptions use existing `Consumer<T, R>` / `BufferReader<T>` API
22//! - Build-time validation catches missing input keys and cyclic dependencies
23
24use core::any::Any;
25use core::fmt::Debug;
26use core::marker::PhantomData;
27
28extern crate alloc;
29use alloc::{
30    boxed::Box,
31    string::{String, ToString},
32    vec::Vec,
33};
34
35use alloc::sync::Arc;
36
37use crate::typed_record::BoxFuture;
38
39// ============================================================================
40// TransformDescriptor — stored per output record in TypedRecord
41// ============================================================================
42
43/// Transform descriptor stored in `TypedRecord`.
44///
45/// Contains the input record keys and a type-erased spawn function that captures
46/// all type information (input types, state type) in its closure. At spawn time
47/// it receives a `Producer<T, R>` and the `AimDb<R>` handle.
48///
49/// This follows the same pattern as `ProducerServiceFn<T, R>`.
50pub(crate) struct TransformDescriptor<T, R: aimdb_executor::Spawn + 'static>
51where
52    T: Send + 'static + Debug + Clone,
53{
54    /// Record keys this transform subscribes to (for build-time validation).
55    pub input_keys: Vec<String>,
56
57    /// Spawn function: takes (Producer<T, R>, Arc<AimDb<R>>, Arc<dyn Any + Send + Sync>) → Future.
58    ///
59    /// The closure captures input types, state, and user logic. At spawn time it
60    /// receives:
61    /// - `Producer<T, R>` bound to the output record
62    /// - `Arc<AimDb<R>>` for subscribing to input records
63    /// - `Arc<dyn Any + Send + Sync>` runtime context (same as source/tap)
64    #[allow(clippy::type_complexity)]
65    pub spawn_fn: Box<
66        dyn FnOnce(
67                crate::Producer<T, R>,
68                Arc<crate::AimDb<R>>,
69                Arc<dyn Any + Send + Sync>,
70            ) -> BoxFuture<'static, ()>
71            + Send
72            + Sync,
73    >,
74}
75
76// ============================================================================
77// Single-Input Transform: TransformBuilder → TransformPipeline
78// ============================================================================
79
80/// Configures a single-input transform pipeline.
81///
82/// Created by `RecordRegistrar::transform_raw()`. Use `.map()` for stateless
83/// transforms or `.with_state()` for stateful transforms.
84pub struct TransformBuilder<I, O, R: aimdb_executor::Spawn + 'static> {
85    input_key: String,
86    _phantom: PhantomData<(I, O, R)>,
87}
88
89impl<I, O, R> TransformBuilder<I, O, R>
90where
91    I: Send + Sync + Clone + Debug + 'static,
92    O: Send + Sync + Clone + Debug + 'static,
93    R: aimdb_executor::Spawn + 'static,
94{
95    pub(crate) fn new(input_key: String) -> Self {
96        Self {
97            input_key,
98            _phantom: PhantomData,
99        }
100    }
101
102    /// Stateless 1:1 map. Returning `None` skips output for this input value.
103    pub fn map<F>(self, f: F) -> TransformPipeline<I, O, R>
104    where
105        F: Fn(&I) -> Option<O> + Send + Sync + 'static,
106    {
107        // A stateless map is a stateful transform with () state
108        TransformPipeline {
109            input_key: self.input_key,
110            spawn_factory: Box::new(move |input_key| {
111                let transform_fn = move |val: &I, _state: &mut ()| f(val);
112                create_single_transform_descriptor::<I, O, (), R>(input_key, (), transform_fn)
113            }),
114            _phantom_i: PhantomData,
115        }
116    }
117
118    /// Begin configuring a stateful transform. `S` is the user-defined state type.
119    pub fn with_state<S: Send + Sync + 'static>(
120        self,
121        initial: S,
122    ) -> StatefulTransformBuilder<I, O, S, R> {
123        StatefulTransformBuilder {
124            input_key: self.input_key,
125            initial_state: initial,
126            _phantom: PhantomData,
127        }
128    }
129}
130
131/// Intermediate builder for stateful single-input transforms.
132pub struct StatefulTransformBuilder<I, O, S, R: aimdb_executor::Spawn + 'static> {
133    input_key: String,
134    initial_state: S,
135    _phantom: PhantomData<(I, O, R)>,
136}
137
138impl<I, O, S, R> StatefulTransformBuilder<I, O, S, R>
139where
140    I: Send + Sync + Clone + Debug + 'static,
141    O: Send + Sync + Clone + Debug + 'static,
142    S: Send + Sync + 'static,
143    R: aimdb_executor::Spawn + 'static,
144{
145    /// Called for each input value. Receives mutable state, returns optional output.
146    pub fn on_value<F>(self, f: F) -> TransformPipeline<I, O, R>
147    where
148        F: Fn(&I, &mut S) -> Option<O> + Send + Sync + 'static,
149    {
150        let initial = self.initial_state;
151        TransformPipeline {
152            input_key: self.input_key,
153            spawn_factory: Box::new(move |input_key| {
154                create_single_transform_descriptor::<I, O, S, R>(input_key, initial, f)
155            }),
156            _phantom_i: PhantomData,
157        }
158    }
159}
160
161/// Completed single-input transform pipeline, ready to be stored in `TypedRecord`.
162pub struct TransformPipeline<
163    I,
164    O: Send + Sync + Clone + Debug + 'static,
165    R: aimdb_executor::Spawn + 'static,
166> {
167    pub(crate) input_key: String,
168    /// Factory that produces a TransformDescriptor given the input key.
169    /// This indirection lets the pipeline be constructed before we have the runtime.
170    pub(crate) spawn_factory: Box<dyn FnOnce(String) -> TransformDescriptor<O, R> + Send + Sync>,
171    _phantom_i: PhantomData<I>,
172}
173
174impl<I, O, R> TransformPipeline<I, O, R>
175where
176    I: Send + Sync + Clone + Debug + 'static,
177    O: Send + Sync + Clone + Debug + 'static,
178    R: aimdb_executor::Spawn + 'static,
179{
180    /// Consume this pipeline and produce the `TransformDescriptor` for storage.
181    pub(crate) fn into_descriptor(self) -> TransformDescriptor<O, R> {
182        (self.spawn_factory)(self.input_key)
183    }
184}
185
186/// Helper: create a single-input TransformDescriptor from types and closure.
187fn create_single_transform_descriptor<I, O, S, R>(
188    input_key: String,
189    initial_state: S,
190    transform_fn: impl Fn(&I, &mut S) -> Option<O> + Send + Sync + 'static,
191) -> TransformDescriptor<O, R>
192where
193    I: Send + Sync + Clone + Debug + 'static,
194    O: Send + Sync + Clone + Debug + 'static,
195    S: Send + Sync + 'static,
196    R: aimdb_executor::Spawn + 'static,
197{
198    let input_key_clone = input_key.clone();
199    let input_keys = alloc::vec![input_key];
200
201    TransformDescriptor {
202        input_keys,
203        spawn_fn: Box::new(move |producer, db, _ctx| {
204            Box::pin(run_single_transform::<I, O, S, R>(
205                db,
206                input_key_clone,
207                producer,
208                initial_state,
209                transform_fn,
210            ))
211        }),
212    }
213}
214
215// ============================================================================
216// Multi-Input Join: JoinBuilder → JoinPipeline
217// ============================================================================
218
219/// Tells the join handler which input produced a value.
220///
221/// Users match on the index (corresponding to `.input()` call order)
222/// and downcast to recover the typed value.
223pub enum JoinTrigger {
224    /// An input at the given index fired with a type-erased value.
225    Input {
226        index: usize,
227        value: Box<dyn Any + Send>,
228    },
229}
230
231impl JoinTrigger {
232    /// Convenience: try to downcast the trigger value as the expected input type.
233    pub fn as_input<T: 'static>(&self) -> Option<&T> {
234        match self {
235            JoinTrigger::Input { value, .. } => value.downcast_ref::<T>(),
236        }
237    }
238
239    /// Returns the input index that triggered this event.
240    pub fn index(&self) -> usize {
241        match self {
242            JoinTrigger::Input { index, .. } => *index,
243        }
244    }
245}
246
247/// Type-erased input descriptor for joins.
248///
249/// Each input captures a subscribe-and-forward function that, given the AimDb handle,
250/// subscribes to the input buffer and forwards typed values as `JoinTrigger` into a
251/// shared `mpsc::UnboundedSender`.
252///
253/// Configures a multi-input join transform.
254///
255/// Created by `RecordRegistrar::transform_join_raw()`. Add inputs with `.input()`,
256/// then set state and handler with `.with_state().on_trigger()`.
257#[cfg(feature = "std")]
258pub struct JoinBuilder<O, R: aimdb_executor::Spawn + 'static> {
259    inputs: Vec<(String, JoinInputFactory<R>)>,
260    _phantom: PhantomData<(O, R)>,
261}
262
263/// Type-erased factory for creating a forwarder task for one join input.
264#[cfg(feature = "std")]
265type JoinInputFactory<R> = Box<
266    dyn FnOnce(
267            Arc<crate::AimDb<R>>,
268            usize,
269            tokio::sync::mpsc::UnboundedSender<JoinTrigger>,
270        ) -> BoxFuture<'static, ()>
271        + Send
272        + Sync,
273>;
274
275#[cfg(feature = "std")]
276impl<O, R> JoinBuilder<O, R>
277where
278    O: Send + Sync + Clone + Debug + 'static,
279    R: aimdb_executor::Spawn + 'static,
280{
281    pub(crate) fn new() -> Self {
282        Self {
283            inputs: Vec::new(),
284            _phantom: PhantomData,
285        }
286    }
287
288    /// Add a typed input to the join.
289    ///
290    /// The input index corresponds to the order of `.input()` calls,
291    /// starting from 0.
292    pub fn input<I>(mut self, key: impl crate::RecordKey) -> Self
293    where
294        I: Send + Sync + Clone + Debug + 'static,
295    {
296        let key_str = key.as_str().to_string();
297        let key_for_factory = key_str.clone();
298
299        let factory: JoinInputFactory<R> = Box::new(
300            move |db: Arc<crate::AimDb<R>>,
301                  index: usize,
302                  tx: tokio::sync::mpsc::UnboundedSender<JoinTrigger>| {
303                Box::pin(async move {
304                    // Create consumer and subscribe to the input buffer
305                    let consumer =
306                        crate::typed_api::Consumer::<I, R>::new(db, key_for_factory.clone());
307                    let mut reader = match consumer.subscribe() {
308                        Ok(r) => r,
309                        Err(e) => {
310                            #[cfg(feature = "tracing")]
311                            tracing::error!(
312                                "🔄 Join input '{}' (index {}) subscription failed: {:?}",
313                                key_for_factory,
314                                index,
315                                e
316                            );
317                            // Defense-in-depth: always emit something on subscription failure
318                            #[cfg(all(feature = "std", not(feature = "tracing")))]
319                            eprintln!(
320                                "AIMDB TRANSFORM ERROR: Join input '{}' (index {}) subscription failed: {:?}",
321                                key_for_factory, index, e
322                            );
323                            return;
324                        }
325                    };
326
327                    // Forward loop: recv from buffer, send as JoinTrigger
328                    while let Ok(value) = reader.recv().await {
329                        let trigger = JoinTrigger::Input {
330                            index,
331                            value: Box::new(value),
332                        };
333                        if tx.send(trigger).is_err() {
334                            // Main join task dropped — exit
335                            break;
336                        }
337                    }
338                }) as BoxFuture<'static, ()>
339            },
340        );
341
342        self.inputs.push((key_str, factory));
343        self
344    }
345
346    /// Set the join state and begin configuring the trigger handler.
347    pub fn with_state<S: Send + Sync + 'static>(self, initial: S) -> JoinStateBuilder<O, S, R> {
348        JoinStateBuilder {
349            inputs: self.inputs,
350            initial_state: initial,
351            _phantom: PhantomData,
352        }
353    }
354}
355
356/// Intermediate builder for setting the join trigger handler.
357#[cfg(feature = "std")]
358pub struct JoinStateBuilder<O, S, R: aimdb_executor::Spawn + 'static> {
359    inputs: Vec<(String, JoinInputFactory<R>)>,
360    initial_state: S,
361    _phantom: PhantomData<(O, R)>,
362}
363
364#[cfg(feature = "std")]
365impl<O, S, R> JoinStateBuilder<O, S, R>
366where
367    O: Send + Sync + Clone + Debug + 'static,
368    S: Send + Sync + 'static,
369    R: aimdb_executor::Spawn + 'static,
370{
371    /// Async handler called whenever any input produces a value.
372    ///
373    /// Receives a `JoinTrigger` (with index + typed value), mutable state,
374    /// and a `Producer<O, R>` for emitting output values.
375    pub fn on_trigger<F, Fut>(self, handler: F) -> JoinPipeline<O, R>
376    where
377        F: Fn(JoinTrigger, &mut S, &crate::Producer<O, R>) -> Fut + Send + Sync + 'static,
378        Fut: core::future::Future<Output = ()> + Send + 'static,
379    {
380        let inputs = self.inputs;
381        let initial = self.initial_state;
382
383        let input_keys_for_descriptor: Vec<String> =
384            inputs.iter().map(|(k, _)| k.clone()).collect();
385
386        JoinPipeline {
387            _input_keys: input_keys_for_descriptor.clone(),
388            spawn_factory: Box::new(move |_| TransformDescriptor {
389                input_keys: input_keys_for_descriptor,
390                spawn_fn: Box::new(move |producer, db, ctx| {
391                    Box::pin(run_join_transform(
392                        db, inputs, producer, initial, handler, ctx,
393                    ))
394                }),
395            }),
396        }
397    }
398}
399
400/// Completed multi-input join pipeline, ready to be stored in `TypedRecord`.
401#[cfg(feature = "std")]
402pub struct JoinPipeline<
403    O: Send + Sync + Clone + Debug + 'static,
404    R: aimdb_executor::Spawn + 'static,
405> {
406    pub(crate) _input_keys: Vec<String>,
407    pub(crate) spawn_factory: Box<dyn FnOnce(()) -> TransformDescriptor<O, R> + Send + Sync>,
408}
409
410#[cfg(feature = "std")]
411impl<O, R> JoinPipeline<O, R>
412where
413    O: Send + Sync + Clone + Debug + 'static,
414    R: aimdb_executor::Spawn + 'static,
415{
416    /// Consume this pipeline and produce the `TransformDescriptor` for storage.
417    pub(crate) fn into_descriptor(self) -> TransformDescriptor<O, R> {
418        (self.spawn_factory)(())
419    }
420}
421
422// ============================================================================
423// Transform Task Runners
424// ============================================================================
425
426/// Spawned task for a single-input stateful transform.
427///
428/// Subscribes to the input record's buffer, calls the user closure per value,
429/// and produces output values to the output record's buffer.
430#[allow(unused_variables)]
431async fn run_single_transform<I, O, S, R>(
432    db: Arc<crate::AimDb<R>>,
433    input_key: String,
434    producer: crate::Producer<O, R>,
435    mut state: S,
436    transform_fn: impl Fn(&I, &mut S) -> Option<O> + Send + Sync + 'static,
437) where
438    I: Send + Sync + Clone + Debug + 'static,
439    O: Send + Sync + Clone + Debug + 'static,
440    S: Send + 'static,
441    R: aimdb_executor::Spawn + 'static,
442{
443    let output_key = producer.key().to_string();
444
445    // OBSERVABILITY (Incident Lesson #1): Always confirm startup
446    #[cfg(feature = "tracing")]
447    tracing::info!("🔄 Transform started: '{}' → '{}'", input_key, output_key);
448
449    // Subscribe to the input record's buffer.
450    // Incident Lesson #3: subscription failure is FATAL for transforms.
451    let consumer = crate::typed_api::Consumer::<I, R>::new(db, input_key.clone());
452    let mut reader = match consumer.subscribe() {
453        Ok(r) => r,
454        Err(_e) => {
455            #[cfg(feature = "tracing")]
456            tracing::error!(
457                "🔄 Transform '{}' → '{}' FATAL: failed to subscribe to input: {:?}",
458                input_key,
459                output_key,
460                _e
461            );
462            // Defense-in-depth: always emit on subscription failure
463            #[cfg(all(feature = "std", not(feature = "tracing")))]
464            eprintln!(
465                "AIMDB TRANSFORM ERROR: '{}' → '{}' failed to subscribe to input: {:?}",
466                input_key, output_key, _e
467            );
468            return;
469        }
470    };
471
472    #[cfg(feature = "tracing")]
473    tracing::debug!(
474        "✅ Transform '{}' → '{}' subscribed, entering event loop",
475        input_key,
476        output_key
477    );
478
479    // React to each input value
480    loop {
481        match reader.recv().await {
482            Ok(input_value) => {
483                if let Some(output_value) = transform_fn(&input_value, &mut state) {
484                    let _ = producer.produce(output_value).await;
485                }
486            }
487            Err(crate::DbError::BufferLagged { .. }) => {
488                #[cfg(feature = "tracing")]
489                tracing::warn!(
490                    "🔄 Transform '{}' → '{}' lagged behind, some values skipped",
491                    input_key,
492                    output_key
493                );
494                // Continue processing — lag is not fatal
495                continue;
496            }
497            Err(_) => {
498                // Buffer closed or other error — exit
499                #[cfg(feature = "tracing")]
500                tracing::warn!(
501                    "🔄 Transform '{}' → '{}' input closed, task exiting",
502                    input_key,
503                    output_key
504                );
505                break;
506            }
507        }
508    }
509}
510
511/// Spawned task for a multi-input join transform.
512///
513/// Spawns N lightweight forwarder tasks (one per input), each subscribing to
514/// its input buffer and forwarding type-erased `JoinTrigger` values to a shared
515/// `mpsc::UnboundedChannel`. The main task reads from this channel and calls
516/// the user handler.
517#[cfg(feature = "std")]
518#[allow(unused_variables)]
519async fn run_join_transform<O, S, R, F, Fut>(
520    db: Arc<crate::AimDb<R>>,
521    inputs: Vec<(String, JoinInputFactory<R>)>,
522    producer: crate::Producer<O, R>,
523    mut state: S,
524    handler: F,
525    runtime_ctx: Arc<dyn Any + Send + Sync>,
526) where
527    O: Send + Sync + Clone + Debug + 'static,
528    S: Send + 'static,
529    R: aimdb_executor::Spawn + 'static,
530    F: Fn(JoinTrigger, &mut S, &crate::Producer<O, R>) -> Fut + Send + Sync + 'static,
531    Fut: core::future::Future<Output = ()> + Send + 'static,
532{
533    let output_key = producer.key().to_string();
534    let input_keys: Vec<String> = inputs.iter().map(|(k, _)| k.clone()).collect();
535
536    // OBSERVABILITY: Always confirm startup
537    #[cfg(feature = "tracing")]
538    tracing::info!(
539        "🔄 Join transform started: {:?} → '{}'",
540        input_keys,
541        output_key
542    );
543
544    // Extract runtime for spawning forwarder tasks
545    let runtime: &R = runtime_ctx
546        .downcast_ref::<Arc<R>>()
547        .map(|arc| arc.as_ref())
548        .or_else(|| runtime_ctx.downcast_ref::<R>())
549        .expect("Failed to extract runtime from context for join transform");
550
551    // Create the shared trigger channel
552    let (trigger_tx, mut trigger_rx) = tokio::sync::mpsc::unbounded_channel();
553
554    // Spawn per-input forwarder tasks
555    for (index, (_key, factory)) in inputs.into_iter().enumerate() {
556        let tx = trigger_tx.clone();
557        let db = db.clone();
558
559        // Each forwarder subscribes to one input and sends JoinTrigger values
560        let forwarder_future = factory(db, index, tx);
561        if let Err(_e) = runtime.spawn(forwarder_future) {
562            #[cfg(feature = "tracing")]
563            tracing::error!(
564                "🔄 Join transform '{}' FATAL: failed to spawn forwarder for input index {}",
565                output_key,
566                index
567            );
568            return;
569        }
570    }
571
572    // Drop our copy of the sender — when all forwarders exit, the channel closes
573    drop(trigger_tx);
574
575    #[cfg(feature = "tracing")]
576    tracing::debug!(
577        "✅ Join transform '{}' all forwarders spawned, entering event loop",
578        output_key
579    );
580
581    // Event loop: dispatch typed triggers to the user handler
582    while let Some(trigger) = trigger_rx.recv().await {
583        handler(trigger, &mut state, &producer).await;
584    }
585
586    #[cfg(feature = "tracing")]
587    tracing::warn!(
588        "🔄 Join transform '{}' all inputs closed, task exiting",
589        output_key
590    );
591}