pub struct JoinBuilder<O, R: JoinFanInRuntime + 'static> { /* private fields */ }Expand description
Configures a multi-input join transform.
Available on any runtime that implements aimdb_executor::JoinFanInRuntime.
The fan-in queue (bounded channel between input forwarders and the trigger
loop) is created by the runtime adapter at database startup — capacity is an
internal constant chosen per adapter (Tokio: 64, Embassy: 8, WASM: 64).
Obtain via [RecordRegistrar::transform_join].
Implementations§
Source§impl<O, R> JoinBuilder<O, R>
impl<O, R> JoinBuilder<O, R>
Sourcepub fn on_triggers<F, Fut>(self, handler: F) -> JoinPipeline<O, R>
pub fn on_triggers<F, Fut>(self, handler: F) -> JoinPipeline<O, R>
Complete the pipeline by providing an async task that owns the event loop and state.
The closure receives a JoinEventRx to read trigger events and a crate::Producer
to emit output values. Both are owned — moved into the async move block — so the
closure can freely hold borrows across .await points and maintain any state it needs.
The task runs until all input forwarders close (i.e., all upstream records stop producing).
.on_triggers(|mut rx, producer| async move {
let mut last_a: Option<f32> = None;
let mut last_b: Option<f32> = None;
while let Ok(trigger) = rx.recv().await {
match trigger.index() {
0 => last_a = trigger.as_input::<InputA>().copied(),
1 => last_b = trigger.as_input::<InputB>().copied(),
_ => {}
}
if let (Some(a), Some(b)) = (last_a, last_b) {
producer.produce(compute(a, b)).await.ok();
}
}
})