1use core::any::Any;
25use core::fmt::Debug;
26use core::marker::PhantomData;
27
28extern crate alloc;
29use alloc::{
30 boxed::Box,
31 string::{String, ToString},
32 vec::Vec,
33};
34
35use alloc::sync::Arc;
36
37use crate::typed_record::BoxFuture;
38
39pub(crate) struct TransformDescriptor<T, R: aimdb_executor::Spawn + 'static>
51where
52 T: Send + 'static + Debug + Clone,
53{
54 pub input_keys: Vec<String>,
56
57 #[allow(clippy::type_complexity)]
65 pub spawn_fn: Box<
66 dyn FnOnce(
67 crate::Producer<T, R>,
68 Arc<crate::AimDb<R>>,
69 Arc<dyn Any + Send + Sync>,
70 ) -> BoxFuture<'static, ()>
71 + Send
72 + Sync,
73 >,
74}
75
76pub struct TransformBuilder<I, O, R: aimdb_executor::Spawn + 'static> {
85 input_key: String,
86 _phantom: PhantomData<(I, O, R)>,
87}
88
89impl<I, O, R> TransformBuilder<I, O, R>
90where
91 I: Send + Sync + Clone + Debug + 'static,
92 O: Send + Sync + Clone + Debug + 'static,
93 R: aimdb_executor::Spawn + 'static,
94{
95 pub(crate) fn new(input_key: String) -> Self {
96 Self {
97 input_key,
98 _phantom: PhantomData,
99 }
100 }
101
102 pub fn map<F>(self, f: F) -> TransformPipeline<I, O, R>
104 where
105 F: Fn(&I) -> Option<O> + Send + Sync + 'static,
106 {
107 TransformPipeline {
109 input_key: self.input_key,
110 spawn_factory: Box::new(move |input_key| {
111 let transform_fn = move |val: &I, _state: &mut ()| f(val);
112 create_single_transform_descriptor::<I, O, (), R>(input_key, (), transform_fn)
113 }),
114 _phantom_i: PhantomData,
115 }
116 }
117
118 pub fn with_state<S: Send + Sync + 'static>(
120 self,
121 initial: S,
122 ) -> StatefulTransformBuilder<I, O, S, R> {
123 StatefulTransformBuilder {
124 input_key: self.input_key,
125 initial_state: initial,
126 _phantom: PhantomData,
127 }
128 }
129}
130
131pub struct StatefulTransformBuilder<I, O, S, R: aimdb_executor::Spawn + 'static> {
133 input_key: String,
134 initial_state: S,
135 _phantom: PhantomData<(I, O, R)>,
136}
137
138impl<I, O, S, R> StatefulTransformBuilder<I, O, S, R>
139where
140 I: Send + Sync + Clone + Debug + 'static,
141 O: Send + Sync + Clone + Debug + 'static,
142 S: Send + Sync + 'static,
143 R: aimdb_executor::Spawn + 'static,
144{
145 pub fn on_value<F>(self, f: F) -> TransformPipeline<I, O, R>
147 where
148 F: Fn(&I, &mut S) -> Option<O> + Send + Sync + 'static,
149 {
150 let initial = self.initial_state;
151 TransformPipeline {
152 input_key: self.input_key,
153 spawn_factory: Box::new(move |input_key| {
154 create_single_transform_descriptor::<I, O, S, R>(input_key, initial, f)
155 }),
156 _phantom_i: PhantomData,
157 }
158 }
159}
160
161pub struct TransformPipeline<
163 I,
164 O: Send + Sync + Clone + Debug + 'static,
165 R: aimdb_executor::Spawn + 'static,
166> {
167 pub(crate) input_key: String,
168 pub(crate) spawn_factory: Box<dyn FnOnce(String) -> TransformDescriptor<O, R> + Send + Sync>,
171 _phantom_i: PhantomData<I>,
172}
173
174impl<I, O, R> TransformPipeline<I, O, R>
175where
176 I: Send + Sync + Clone + Debug + 'static,
177 O: Send + Sync + Clone + Debug + 'static,
178 R: aimdb_executor::Spawn + 'static,
179{
180 pub(crate) fn into_descriptor(self) -> TransformDescriptor<O, R> {
182 (self.spawn_factory)(self.input_key)
183 }
184}
185
186fn create_single_transform_descriptor<I, O, S, R>(
188 input_key: String,
189 initial_state: S,
190 transform_fn: impl Fn(&I, &mut S) -> Option<O> + Send + Sync + 'static,
191) -> TransformDescriptor<O, R>
192where
193 I: Send + Sync + Clone + Debug + 'static,
194 O: Send + Sync + Clone + Debug + 'static,
195 S: Send + Sync + 'static,
196 R: aimdb_executor::Spawn + 'static,
197{
198 let input_key_clone = input_key.clone();
199 let input_keys = alloc::vec![input_key];
200
201 TransformDescriptor {
202 input_keys,
203 spawn_fn: Box::new(move |producer, db, _ctx| {
204 Box::pin(run_single_transform::<I, O, S, R>(
205 db,
206 input_key_clone,
207 producer,
208 initial_state,
209 transform_fn,
210 ))
211 }),
212 }
213}
214
215pub enum JoinTrigger {
224 Input {
226 index: usize,
227 value: Box<dyn Any + Send>,
228 },
229}
230
231impl JoinTrigger {
232 pub fn as_input<T: 'static>(&self) -> Option<&T> {
234 match self {
235 JoinTrigger::Input { value, .. } => value.downcast_ref::<T>(),
236 }
237 }
238
239 pub fn index(&self) -> usize {
241 match self {
242 JoinTrigger::Input { index, .. } => *index,
243 }
244 }
245}
246
247#[cfg(feature = "std")]
258pub struct JoinBuilder<O, R: aimdb_executor::Spawn + 'static> {
259 inputs: Vec<(String, JoinInputFactory<R>)>,
260 _phantom: PhantomData<(O, R)>,
261}
262
263#[cfg(feature = "std")]
265type JoinInputFactory<R> = Box<
266 dyn FnOnce(
267 Arc<crate::AimDb<R>>,
268 usize,
269 tokio::sync::mpsc::UnboundedSender<JoinTrigger>,
270 ) -> BoxFuture<'static, ()>
271 + Send
272 + Sync,
273>;
274
275#[cfg(feature = "std")]
276impl<O, R> JoinBuilder<O, R>
277where
278 O: Send + Sync + Clone + Debug + 'static,
279 R: aimdb_executor::Spawn + 'static,
280{
281 pub(crate) fn new() -> Self {
282 Self {
283 inputs: Vec::new(),
284 _phantom: PhantomData,
285 }
286 }
287
288 pub fn input<I>(mut self, key: impl crate::RecordKey) -> Self
293 where
294 I: Send + Sync + Clone + Debug + 'static,
295 {
296 let key_str = key.as_str().to_string();
297 let key_for_factory = key_str.clone();
298
299 let factory: JoinInputFactory<R> = Box::new(
300 move |db: Arc<crate::AimDb<R>>,
301 index: usize,
302 tx: tokio::sync::mpsc::UnboundedSender<JoinTrigger>| {
303 Box::pin(async move {
304 let consumer =
306 crate::typed_api::Consumer::<I, R>::new(db, key_for_factory.clone());
307 let mut reader = match consumer.subscribe() {
308 Ok(r) => r,
309 Err(e) => {
310 #[cfg(feature = "tracing")]
311 tracing::error!(
312 "🔄 Join input '{}' (index {}) subscription failed: {:?}",
313 key_for_factory,
314 index,
315 e
316 );
317 #[cfg(all(feature = "std", not(feature = "tracing")))]
319 eprintln!(
320 "AIMDB TRANSFORM ERROR: Join input '{}' (index {}) subscription failed: {:?}",
321 key_for_factory, index, e
322 );
323 return;
324 }
325 };
326
327 while let Ok(value) = reader.recv().await {
329 let trigger = JoinTrigger::Input {
330 index,
331 value: Box::new(value),
332 };
333 if tx.send(trigger).is_err() {
334 break;
336 }
337 }
338 }) as BoxFuture<'static, ()>
339 },
340 );
341
342 self.inputs.push((key_str, factory));
343 self
344 }
345
346 pub fn with_state<S: Send + Sync + 'static>(self, initial: S) -> JoinStateBuilder<O, S, R> {
348 JoinStateBuilder {
349 inputs: self.inputs,
350 initial_state: initial,
351 _phantom: PhantomData,
352 }
353 }
354}
355
356#[cfg(feature = "std")]
358pub struct JoinStateBuilder<O, S, R: aimdb_executor::Spawn + 'static> {
359 inputs: Vec<(String, JoinInputFactory<R>)>,
360 initial_state: S,
361 _phantom: PhantomData<(O, R)>,
362}
363
364#[cfg(feature = "std")]
365impl<O, S, R> JoinStateBuilder<O, S, R>
366where
367 O: Send + Sync + Clone + Debug + 'static,
368 S: Send + Sync + 'static,
369 R: aimdb_executor::Spawn + 'static,
370{
371 pub fn on_trigger<F, Fut>(self, handler: F) -> JoinPipeline<O, R>
376 where
377 F: Fn(JoinTrigger, &mut S, &crate::Producer<O, R>) -> Fut + Send + Sync + 'static,
378 Fut: core::future::Future<Output = ()> + Send + 'static,
379 {
380 let inputs = self.inputs;
381 let initial = self.initial_state;
382
383 let input_keys_for_descriptor: Vec<String> =
384 inputs.iter().map(|(k, _)| k.clone()).collect();
385
386 JoinPipeline {
387 _input_keys: input_keys_for_descriptor.clone(),
388 spawn_factory: Box::new(move |_| TransformDescriptor {
389 input_keys: input_keys_for_descriptor,
390 spawn_fn: Box::new(move |producer, db, ctx| {
391 Box::pin(run_join_transform(
392 db, inputs, producer, initial, handler, ctx,
393 ))
394 }),
395 }),
396 }
397 }
398}
399
400#[cfg(feature = "std")]
402pub struct JoinPipeline<
403 O: Send + Sync + Clone + Debug + 'static,
404 R: aimdb_executor::Spawn + 'static,
405> {
406 pub(crate) _input_keys: Vec<String>,
407 pub(crate) spawn_factory: Box<dyn FnOnce(()) -> TransformDescriptor<O, R> + Send + Sync>,
408}
409
410#[cfg(feature = "std")]
411impl<O, R> JoinPipeline<O, R>
412where
413 O: Send + Sync + Clone + Debug + 'static,
414 R: aimdb_executor::Spawn + 'static,
415{
416 pub(crate) fn into_descriptor(self) -> TransformDescriptor<O, R> {
418 (self.spawn_factory)(())
419 }
420}
421
422#[allow(unused_variables)]
431async fn run_single_transform<I, O, S, R>(
432 db: Arc<crate::AimDb<R>>,
433 input_key: String,
434 producer: crate::Producer<O, R>,
435 mut state: S,
436 transform_fn: impl Fn(&I, &mut S) -> Option<O> + Send + Sync + 'static,
437) where
438 I: Send + Sync + Clone + Debug + 'static,
439 O: Send + Sync + Clone + Debug + 'static,
440 S: Send + 'static,
441 R: aimdb_executor::Spawn + 'static,
442{
443 let output_key = producer.key().to_string();
444
445 #[cfg(feature = "tracing")]
447 tracing::info!("🔄 Transform started: '{}' → '{}'", input_key, output_key);
448
449 let consumer = crate::typed_api::Consumer::<I, R>::new(db, input_key.clone());
452 let mut reader = match consumer.subscribe() {
453 Ok(r) => r,
454 Err(_e) => {
455 #[cfg(feature = "tracing")]
456 tracing::error!(
457 "🔄 Transform '{}' → '{}' FATAL: failed to subscribe to input: {:?}",
458 input_key,
459 output_key,
460 _e
461 );
462 #[cfg(all(feature = "std", not(feature = "tracing")))]
464 eprintln!(
465 "AIMDB TRANSFORM ERROR: '{}' → '{}' failed to subscribe to input: {:?}",
466 input_key, output_key, _e
467 );
468 return;
469 }
470 };
471
472 #[cfg(feature = "tracing")]
473 tracing::debug!(
474 "✅ Transform '{}' → '{}' subscribed, entering event loop",
475 input_key,
476 output_key
477 );
478
479 loop {
481 match reader.recv().await {
482 Ok(input_value) => {
483 if let Some(output_value) = transform_fn(&input_value, &mut state) {
484 let _ = producer.produce(output_value).await;
485 }
486 }
487 Err(crate::DbError::BufferLagged { .. }) => {
488 #[cfg(feature = "tracing")]
489 tracing::warn!(
490 "🔄 Transform '{}' → '{}' lagged behind, some values skipped",
491 input_key,
492 output_key
493 );
494 continue;
496 }
497 Err(_) => {
498 #[cfg(feature = "tracing")]
500 tracing::warn!(
501 "🔄 Transform '{}' → '{}' input closed, task exiting",
502 input_key,
503 output_key
504 );
505 break;
506 }
507 }
508 }
509}
510
511#[cfg(feature = "std")]
518#[allow(unused_variables)]
519async fn run_join_transform<O, S, R, F, Fut>(
520 db: Arc<crate::AimDb<R>>,
521 inputs: Vec<(String, JoinInputFactory<R>)>,
522 producer: crate::Producer<O, R>,
523 mut state: S,
524 handler: F,
525 runtime_ctx: Arc<dyn Any + Send + Sync>,
526) where
527 O: Send + Sync + Clone + Debug + 'static,
528 S: Send + 'static,
529 R: aimdb_executor::Spawn + 'static,
530 F: Fn(JoinTrigger, &mut S, &crate::Producer<O, R>) -> Fut + Send + Sync + 'static,
531 Fut: core::future::Future<Output = ()> + Send + 'static,
532{
533 let output_key = producer.key().to_string();
534 let input_keys: Vec<String> = inputs.iter().map(|(k, _)| k.clone()).collect();
535
536 #[cfg(feature = "tracing")]
538 tracing::info!(
539 "🔄 Join transform started: {:?} → '{}'",
540 input_keys,
541 output_key
542 );
543
544 let runtime: &R = runtime_ctx
546 .downcast_ref::<Arc<R>>()
547 .map(|arc| arc.as_ref())
548 .or_else(|| runtime_ctx.downcast_ref::<R>())
549 .expect("Failed to extract runtime from context for join transform");
550
551 let (trigger_tx, mut trigger_rx) = tokio::sync::mpsc::unbounded_channel();
553
554 for (index, (_key, factory)) in inputs.into_iter().enumerate() {
556 let tx = trigger_tx.clone();
557 let db = db.clone();
558
559 let forwarder_future = factory(db, index, tx);
561 if let Err(_e) = runtime.spawn(forwarder_future) {
562 #[cfg(feature = "tracing")]
563 tracing::error!(
564 "🔄 Join transform '{}' FATAL: failed to spawn forwarder for input index {}",
565 output_key,
566 index
567 );
568 return;
569 }
570 }
571
572 drop(trigger_tx);
574
575 #[cfg(feature = "tracing")]
576 tracing::debug!(
577 "✅ Join transform '{}' all forwarders spawned, entering event loop",
578 output_key
579 );
580
581 while let Some(trigger) = trigger_rx.recv().await {
583 handler(trigger, &mut state, &producer).await;
584 }
585
586 #[cfg(feature = "tracing")]
587 tracing::warn!(
588 "🔄 Join transform '{}' all inputs closed, task exiting",
589 output_key
590 );
591}