use core::any::Any;
use core::fmt::Debug;
use core::marker::PhantomData;
extern crate alloc;
use alloc::{
boxed::Box,
string::{String, ToString},
vec::Vec,
};
use alloc::sync::Arc;
use crate::typed_record::BoxFuture;
pub(crate) struct TransformDescriptor<T, R: aimdb_executor::Spawn + 'static>
where
T: Send + 'static + Debug + Clone,
{
pub input_keys: Vec<String>,
#[allow(clippy::type_complexity)]
pub spawn_fn: Box<
dyn FnOnce(
crate::Producer<T, R>,
Arc<crate::AimDb<R>>,
Arc<dyn Any + Send + Sync>,
) -> BoxFuture<'static, ()>
+ Send
+ Sync,
>,
}
pub struct TransformBuilder<I, O, R: aimdb_executor::Spawn + 'static> {
input_key: String,
_phantom: PhantomData<(I, O, R)>,
}
impl<I, O, R> TransformBuilder<I, O, R>
where
I: Send + Sync + Clone + Debug + 'static,
O: Send + Sync + Clone + Debug + 'static,
R: aimdb_executor::Spawn + 'static,
{
pub(crate) fn new(input_key: String) -> Self {
Self {
input_key,
_phantom: PhantomData,
}
}
pub fn map<F>(self, f: F) -> TransformPipeline<I, O, R>
where
F: Fn(&I) -> Option<O> + Send + Sync + 'static,
{
TransformPipeline {
input_key: self.input_key,
spawn_factory: Box::new(move |input_key| {
let transform_fn = move |val: &I, _state: &mut ()| f(val);
create_single_transform_descriptor::<I, O, (), R>(input_key, (), transform_fn)
}),
_phantom_i: PhantomData,
}
}
pub fn with_state<S: Send + Sync + 'static>(
self,
initial: S,
) -> StatefulTransformBuilder<I, O, S, R> {
StatefulTransformBuilder {
input_key: self.input_key,
initial_state: initial,
_phantom: PhantomData,
}
}
}
pub struct StatefulTransformBuilder<I, O, S, R: aimdb_executor::Spawn + 'static> {
input_key: String,
initial_state: S,
_phantom: PhantomData<(I, O, R)>,
}
impl<I, O, S, R> StatefulTransformBuilder<I, O, S, R>
where
I: Send + Sync + Clone + Debug + 'static,
O: Send + Sync + Clone + Debug + 'static,
S: Send + Sync + 'static,
R: aimdb_executor::Spawn + 'static,
{
pub fn on_value<F>(self, f: F) -> TransformPipeline<I, O, R>
where
F: Fn(&I, &mut S) -> Option<O> + Send + Sync + 'static,
{
let initial = self.initial_state;
TransformPipeline {
input_key: self.input_key,
spawn_factory: Box::new(move |input_key| {
create_single_transform_descriptor::<I, O, S, R>(input_key, initial, f)
}),
_phantom_i: PhantomData,
}
}
}
pub struct TransformPipeline<
I,
O: Send + Sync + Clone + Debug + 'static,
R: aimdb_executor::Spawn + 'static,
> {
pub(crate) input_key: String,
pub(crate) spawn_factory: Box<dyn FnOnce(String) -> TransformDescriptor<O, R> + Send + Sync>,
_phantom_i: PhantomData<I>,
}
impl<I, O, R> TransformPipeline<I, O, R>
where
I: Send + Sync + Clone + Debug + 'static,
O: Send + Sync + Clone + Debug + 'static,
R: aimdb_executor::Spawn + 'static,
{
pub(crate) fn into_descriptor(self) -> TransformDescriptor<O, R> {
(self.spawn_factory)(self.input_key)
}
}
fn create_single_transform_descriptor<I, O, S, R>(
input_key: String,
initial_state: S,
transform_fn: impl Fn(&I, &mut S) -> Option<O> + Send + Sync + 'static,
) -> TransformDescriptor<O, R>
where
I: Send + Sync + Clone + Debug + 'static,
O: Send + Sync + Clone + Debug + 'static,
S: Send + Sync + 'static,
R: aimdb_executor::Spawn + 'static,
{
let input_key_clone = input_key.clone();
let input_keys = alloc::vec![input_key];
TransformDescriptor {
input_keys,
spawn_fn: Box::new(move |producer, db, _ctx| {
Box::pin(run_single_transform::<I, O, S, R>(
db,
input_key_clone,
producer,
initial_state,
transform_fn,
))
}),
}
}
pub enum JoinTrigger {
Input {
index: usize,
value: Box<dyn Any + Send>,
},
}
impl JoinTrigger {
pub fn as_input<T: 'static>(&self) -> Option<&T> {
match self {
JoinTrigger::Input { value, .. } => value.downcast_ref::<T>(),
}
}
pub fn index(&self) -> usize {
match self {
JoinTrigger::Input { index, .. } => *index,
}
}
}
#[cfg(feature = "std")]
pub struct JoinBuilder<O, R: aimdb_executor::Spawn + 'static> {
inputs: Vec<(String, JoinInputFactory<R>)>,
_phantom: PhantomData<(O, R)>,
}
#[cfg(feature = "std")]
type JoinInputFactory<R> = Box<
dyn FnOnce(
Arc<crate::AimDb<R>>,
usize,
tokio::sync::mpsc::UnboundedSender<JoinTrigger>,
) -> BoxFuture<'static, ()>
+ Send
+ Sync,
>;
#[cfg(feature = "std")]
impl<O, R> JoinBuilder<O, R>
where
O: Send + Sync + Clone + Debug + 'static,
R: aimdb_executor::Spawn + 'static,
{
pub(crate) fn new() -> Self {
Self {
inputs: Vec::new(),
_phantom: PhantomData,
}
}
pub fn input<I>(mut self, key: impl crate::RecordKey) -> Self
where
I: Send + Sync + Clone + Debug + 'static,
{
let key_str = key.as_str().to_string();
let key_for_factory = key_str.clone();
let factory: JoinInputFactory<R> = Box::new(
move |db: Arc<crate::AimDb<R>>,
index: usize,
tx: tokio::sync::mpsc::UnboundedSender<JoinTrigger>| {
Box::pin(async move {
let consumer =
crate::typed_api::Consumer::<I, R>::new(db, key_for_factory.clone());
let mut reader = match consumer.subscribe() {
Ok(r) => r,
Err(e) => {
#[cfg(feature = "tracing")]
tracing::error!(
"🔄 Join input '{}' (index {}) subscription failed: {:?}",
key_for_factory,
index,
e
);
#[cfg(all(feature = "std", not(feature = "tracing")))]
eprintln!(
"AIMDB TRANSFORM ERROR: Join input '{}' (index {}) subscription failed: {:?}",
key_for_factory, index, e
);
return;
}
};
while let Ok(value) = reader.recv().await {
let trigger = JoinTrigger::Input {
index,
value: Box::new(value),
};
if tx.send(trigger).is_err() {
break;
}
}
}) as BoxFuture<'static, ()>
},
);
self.inputs.push((key_str, factory));
self
}
pub fn with_state<S: Send + Sync + 'static>(self, initial: S) -> JoinStateBuilder<O, S, R> {
JoinStateBuilder {
inputs: self.inputs,
initial_state: initial,
_phantom: PhantomData,
}
}
}
#[cfg(feature = "std")]
pub struct JoinStateBuilder<O, S, R: aimdb_executor::Spawn + 'static> {
inputs: Vec<(String, JoinInputFactory<R>)>,
initial_state: S,
_phantom: PhantomData<(O, R)>,
}
#[cfg(feature = "std")]
impl<O, S, R> JoinStateBuilder<O, S, R>
where
O: Send + Sync + Clone + Debug + 'static,
S: Send + Sync + 'static,
R: aimdb_executor::Spawn + 'static,
{
pub fn on_trigger<F, Fut>(self, handler: F) -> JoinPipeline<O, R>
where
F: Fn(JoinTrigger, &mut S, &crate::Producer<O, R>) -> Fut + Send + Sync + 'static,
Fut: core::future::Future<Output = ()> + Send + 'static,
{
let inputs = self.inputs;
let initial = self.initial_state;
let input_keys_for_descriptor: Vec<String> =
inputs.iter().map(|(k, _)| k.clone()).collect();
JoinPipeline {
_input_keys: input_keys_for_descriptor.clone(),
spawn_factory: Box::new(move |_| TransformDescriptor {
input_keys: input_keys_for_descriptor,
spawn_fn: Box::new(move |producer, db, ctx| {
Box::pin(run_join_transform(
db, inputs, producer, initial, handler, ctx,
))
}),
}),
}
}
}
#[cfg(feature = "std")]
pub struct JoinPipeline<
O: Send + Sync + Clone + Debug + 'static,
R: aimdb_executor::Spawn + 'static,
> {
pub(crate) _input_keys: Vec<String>,
pub(crate) spawn_factory: Box<dyn FnOnce(()) -> TransformDescriptor<O, R> + Send + Sync>,
}
#[cfg(feature = "std")]
impl<O, R> JoinPipeline<O, R>
where
O: Send + Sync + Clone + Debug + 'static,
R: aimdb_executor::Spawn + 'static,
{
pub(crate) fn into_descriptor(self) -> TransformDescriptor<O, R> {
(self.spawn_factory)(())
}
}
#[allow(unused_variables)]
async fn run_single_transform<I, O, S, R>(
db: Arc<crate::AimDb<R>>,
input_key: String,
producer: crate::Producer<O, R>,
mut state: S,
transform_fn: impl Fn(&I, &mut S) -> Option<O> + Send + Sync + 'static,
) where
I: Send + Sync + Clone + Debug + 'static,
O: Send + Sync + Clone + Debug + 'static,
S: Send + 'static,
R: aimdb_executor::Spawn + 'static,
{
let output_key = producer.key().to_string();
#[cfg(feature = "tracing")]
tracing::info!("🔄 Transform started: '{}' → '{}'", input_key, output_key);
let consumer = crate::typed_api::Consumer::<I, R>::new(db, input_key.clone());
let mut reader = match consumer.subscribe() {
Ok(r) => r,
Err(_e) => {
#[cfg(feature = "tracing")]
tracing::error!(
"🔄 Transform '{}' → '{}' FATAL: failed to subscribe to input: {:?}",
input_key,
output_key,
_e
);
#[cfg(all(feature = "std", not(feature = "tracing")))]
eprintln!(
"AIMDB TRANSFORM ERROR: '{}' → '{}' failed to subscribe to input: {:?}",
input_key, output_key, _e
);
return;
}
};
#[cfg(feature = "tracing")]
tracing::debug!(
"✅ Transform '{}' → '{}' subscribed, entering event loop",
input_key,
output_key
);
loop {
match reader.recv().await {
Ok(input_value) => {
if let Some(output_value) = transform_fn(&input_value, &mut state) {
let _ = producer.produce(output_value).await;
}
}
Err(crate::DbError::BufferLagged { .. }) => {
#[cfg(feature = "tracing")]
tracing::warn!(
"🔄 Transform '{}' → '{}' lagged behind, some values skipped",
input_key,
output_key
);
continue;
}
Err(_) => {
#[cfg(feature = "tracing")]
tracing::warn!(
"🔄 Transform '{}' → '{}' input closed, task exiting",
input_key,
output_key
);
break;
}
}
}
}
#[cfg(feature = "std")]
#[allow(unused_variables)]
async fn run_join_transform<O, S, R, F, Fut>(
db: Arc<crate::AimDb<R>>,
inputs: Vec<(String, JoinInputFactory<R>)>,
producer: crate::Producer<O, R>,
mut state: S,
handler: F,
runtime_ctx: Arc<dyn Any + Send + Sync>,
) where
O: Send + Sync + Clone + Debug + 'static,
S: Send + 'static,
R: aimdb_executor::Spawn + 'static,
F: Fn(JoinTrigger, &mut S, &crate::Producer<O, R>) -> Fut + Send + Sync + 'static,
Fut: core::future::Future<Output = ()> + Send + 'static,
{
let output_key = producer.key().to_string();
let input_keys: Vec<String> = inputs.iter().map(|(k, _)| k.clone()).collect();
#[cfg(feature = "tracing")]
tracing::info!(
"🔄 Join transform started: {:?} → '{}'",
input_keys,
output_key
);
let runtime: &R = runtime_ctx
.downcast_ref::<Arc<R>>()
.map(|arc| arc.as_ref())
.or_else(|| runtime_ctx.downcast_ref::<R>())
.expect("Failed to extract runtime from context for join transform");
let (trigger_tx, mut trigger_rx) = tokio::sync::mpsc::unbounded_channel();
for (index, (_key, factory)) in inputs.into_iter().enumerate() {
let tx = trigger_tx.clone();
let db = db.clone();
let forwarder_future = factory(db, index, tx);
if let Err(_e) = runtime.spawn(forwarder_future) {
#[cfg(feature = "tracing")]
tracing::error!(
"🔄 Join transform '{}' FATAL: failed to spawn forwarder for input index {}",
output_key,
index
);
return;
}
}
drop(trigger_tx);
#[cfg(feature = "tracing")]
tracing::debug!(
"✅ Join transform '{}' all forwarders spawned, entering event loop",
output_key
);
while let Some(trigger) = trigger_rx.recv().await {
handler(trigger, &mut state, &producer).await;
}
#[cfg(feature = "tracing")]
tracing::warn!(
"🔄 Join transform '{}' all inputs closed, task exiting",
output_key
);
}