1use core::fmt::Debug;
2use core::marker::PhantomData;
3
4use alloc::{
5 boxed::Box,
6 string::{String, ToString},
7 sync::Arc,
8 vec,
9};
10
11use crate::transform::TransformDescriptor;
12
13pub struct TransformBuilder<I, O, R: aimdb_executor::Spawn + 'static> {
22 input_key: String,
23 _phantom: PhantomData<(I, O, R)>,
24}
25
26impl<I, O, R> TransformBuilder<I, O, R>
27where
28 I: Send + Sync + Clone + Debug + 'static,
29 O: Send + Sync + Clone + Debug + 'static,
30 R: aimdb_executor::Spawn + 'static,
31{
32 pub(crate) fn new(input_key: String) -> Self {
33 Self {
34 input_key,
35 _phantom: PhantomData,
36 }
37 }
38
39 pub fn map<F>(self, f: F) -> TransformPipeline<I, O, R>
41 where
42 F: Fn(&I) -> Option<O> + Send + Sync + 'static,
43 {
44 TransformPipeline {
45 input_key: self.input_key,
46 spawn_factory: Box::new(move |input_key| {
47 let transform_fn = move |val: &I, _state: &mut ()| f(val);
48 create_single_transform_descriptor::<I, O, (), R>(input_key, (), transform_fn)
49 }),
50 _phantom_i: PhantomData,
51 }
52 }
53
54 pub fn with_state<S: Send + Sync + 'static>(
56 self,
57 initial: S,
58 ) -> StatefulTransformBuilder<I, O, S, R> {
59 StatefulTransformBuilder {
60 input_key: self.input_key,
61 initial_state: initial,
62 _phantom: PhantomData,
63 }
64 }
65}
66
67pub struct StatefulTransformBuilder<I, O, S, R: aimdb_executor::Spawn + 'static> {
69 input_key: String,
70 initial_state: S,
71 _phantom: PhantomData<(I, O, R)>,
72}
73
74impl<I, O, S, R> StatefulTransformBuilder<I, O, S, R>
75where
76 I: Send + Sync + Clone + Debug + 'static,
77 O: Send + Sync + Clone + Debug + 'static,
78 S: Send + Sync + 'static,
79 R: aimdb_executor::Spawn + 'static,
80{
81 pub fn on_value<F>(self, f: F) -> TransformPipeline<I, O, R>
83 where
84 F: Fn(&I, &mut S) -> Option<O> + Send + Sync + 'static,
85 {
86 let initial = self.initial_state;
87 TransformPipeline {
88 input_key: self.input_key,
89 spawn_factory: Box::new(move |input_key| {
90 create_single_transform_descriptor::<I, O, S, R>(input_key, initial, f)
91 }),
92 _phantom_i: PhantomData,
93 }
94 }
95}
96
97pub struct TransformPipeline<
99 I,
100 O: Send + Sync + Clone + Debug + 'static,
101 R: aimdb_executor::Spawn + 'static,
102> {
103 pub(crate) input_key: String,
104 pub(crate) spawn_factory: Box<dyn FnOnce(String) -> TransformDescriptor<O, R> + Send + Sync>,
105 pub(crate) _phantom_i: PhantomData<I>,
106}
107
108impl<I, O, R> TransformPipeline<I, O, R>
109where
110 I: Send + Sync + Clone + Debug + 'static,
111 O: Send + Sync + Clone + Debug + 'static,
112 R: aimdb_executor::Spawn + 'static,
113{
114 pub(crate) fn into_descriptor(self) -> TransformDescriptor<O, R> {
115 (self.spawn_factory)(self.input_key)
116 }
117}
118
119fn create_single_transform_descriptor<I, O, S, R>(
120 input_key: String,
121 initial_state: S,
122 transform_fn: impl Fn(&I, &mut S) -> Option<O> + Send + Sync + 'static,
123) -> TransformDescriptor<O, R>
124where
125 I: Send + Sync + Clone + Debug + 'static,
126 O: Send + Sync + Clone + Debug + 'static,
127 S: Send + Sync + 'static,
128 R: aimdb_executor::Spawn + 'static,
129{
130 let input_key_clone = input_key.clone();
131 let input_keys = vec![input_key];
132
133 TransformDescriptor {
134 input_keys,
135 spawn_fn: Box::new(move |producer, db, _ctx| {
136 Box::pin(run_single_transform::<I, O, S, R>(
137 db,
138 input_key_clone,
139 producer,
140 initial_state,
141 transform_fn,
142 ))
143 }),
144 }
145}
146
147#[allow(unused_variables)]
152pub(crate) async fn run_single_transform<I, O, S, R>(
153 db: Arc<crate::AimDb<R>>,
154 input_key: String,
155 producer: crate::Producer<O, R>,
156 mut state: S,
157 transform_fn: impl Fn(&I, &mut S) -> Option<O> + Send + Sync + 'static,
158) where
159 I: Send + Sync + Clone + Debug + 'static,
160 O: Send + Sync + Clone + Debug + 'static,
161 S: Send + 'static,
162 R: aimdb_executor::Spawn + 'static,
163{
164 let output_key = producer.key().to_string();
165
166 #[cfg(feature = "tracing")]
167 tracing::info!("🔄 Transform started: '{}' → '{}'", input_key, output_key);
168
169 let consumer = crate::typed_api::Consumer::<I, R>::new(db, input_key.clone());
170 let mut reader = match consumer.subscribe() {
171 Ok(r) => r,
172 Err(_e) => {
173 #[cfg(feature = "tracing")]
174 tracing::error!(
175 "🔄 Transform '{}' → '{}' FATAL: failed to subscribe to input: {:?}",
176 input_key,
177 output_key,
178 _e
179 );
180 #[cfg(all(feature = "std", not(feature = "tracing")))]
181 eprintln!(
182 "AIMDB TRANSFORM ERROR: '{}' → '{}' failed to subscribe to input: {:?}",
183 input_key, output_key, _e
184 );
185 return;
186 }
187 };
188
189 #[cfg(feature = "tracing")]
190 tracing::debug!(
191 "✅ Transform '{}' → '{}' subscribed, entering event loop",
192 input_key,
193 output_key
194 );
195
196 loop {
197 match reader.recv().await {
198 Ok(input_value) => {
199 if let Some(output_value) = transform_fn(&input_value, &mut state) {
200 let _ = producer.produce(output_value).await;
201 }
202 }
203 Err(crate::DbError::BufferLagged { .. }) => {
204 #[cfg(feature = "tracing")]
205 tracing::warn!(
206 "🔄 Transform '{}' → '{}' lagged behind, some values skipped",
207 input_key,
208 output_key
209 );
210 continue;
211 }
212 Err(_) => {
213 #[cfg(feature = "tracing")]
214 tracing::warn!(
215 "🔄 Transform '{}' → '{}' input closed, task exiting",
216 input_key,
217 output_key
218 );
219 break;
220 }
221 }
222 }
223}