reifydb_sdk/operator/windowed/mod.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Tumbling-aggregator authoring surface.
5//!
6//! Most stateful operators share an
7//! identical event-handling shape. They have repeatedly been written from
8//! scratch on top of [`crate::operator::FFIOperator`], and have repeatedly
9//! shipped the same families of bugs:
10//!
11//! 1. **Per-slot replacement on Update.** `Update` is treated like another `Insert` and the slot's contribution is
12//! added twice.
13//! 2. **`DiffType::Remove` silently dropped.** No match arm exists for Remove, so the state retains rows the source
14//! already deleted.
15//! 3. **Per-slot extrema** (open/close/high/low) computed as a running min/max instead of as min/max over the *current*
16//! per-slot map, so a same-slot Update with a lower price never lowers the high.
17//! 4. **Window boundary off-by-one.** Inconsistent `[start, end)` vs `[start, end]` semantics across operators.
18//!
19//! This module exists to make those bugs *unrepresentable in user code*.
20//! An operator implements [`TumblingOperator`] -- a small set of pure
21//! functions over a typed slot input and contribution. The shared driver
22//! (slice 2 of the rollout) handles diff routing, per-slot replacement,
23//! Remove, and window boundary math, exhaustively and in one place.
24//!
25//! For overlapping-window operators (rolling buffers of the last N
26//! tumbling outputs) see the sibling [`RollingOperator`] trait; both
27//! traits live under the `windowed` namespace because they share the
28//! `Slot` / `WindowSpan` machinery in [`span`].
29//!
30//! # Contract
31//!
32//! - The slot map for a window is keyed by [`TumblingOperator::SlotKey`]. Insert and Update of an input row whose
33//! extracted slot key is `k` *replace* the contribution at `k`, never add to it. This is the per-slot replacement
34//! contract documented on `chaindex::testing::oracle::naive_ohlcv` and matches the way the existing chaindex chaos
35//! oracles model events.
36//! - [`TumblingOperator::fold_into_slot`] is a pure function of `(prev, input)`. It is called with `prev = None` on
37//! first observation of a slot and `prev = Some(existing)` if a row at the same slot was seen earlier in the same
38//! input batch (e.g. multiple rows in a single `BorrowedChange` map to the same slot). Implementations that simply
39//! want last-write-wins should ignore `prev`.
40//! - [`TumblingOperator::combine`] reduces the entire per-slot map of a window into the operator's public output. **It
41//! must be a pure function of the map's contents, independent of iteration order.** This is the invariant that
42//! prevents bug class 3 above: extrema must be computed by folding `slots.values()`, never by maintaining a running
43//! max/min that mutates as rows arrive.
44//! - [`TumblingOperator::window_for`] returns a [`WindowSpan`] with `[start, end)` semantics over the operator's chosen
45//! slot coordinate (`u64` timestamps, Solana slot numbers, `DateTime` newtypes, etc.). See [`span`] for the canonical
46//! helper and [`span::Slot`] for the coordinate trait.
47
48use std::{
49 collections::{BTreeMap, HashMap},
50 fmt::Debug,
51 hash::Hash,
52};
53
54use reifydb_core::{
55 encoded::key::{EncodedKey, IntoEncodedKey},
56 interface::catalog::flow::FlowNodeId,
57};
58use reifydb_type::value::Value;
59use serde::{Serialize, de::DeserializeOwned};
60
61use crate::{
62 error::Result,
63 operator::{
64 change::BorrowedColumns,
65 column::{operator::OperatorColumn, row::Row},
66 windowed::span::{Slot, WindowSpan},
67 },
68};
69
70pub mod multi_rolling;
71pub mod rolling;
72pub mod span;
73pub mod tumbling;
74
75/// A typed view extracted from one input row, sufficient to drive the
76/// aggregation. Implementations are typically small `Copy` structs
77/// (e.g. `{ price: f64, size: f64 }` for VWAP).
78pub trait SlotInput: Clone + Debug {}
79impl<T: Clone + Debug> SlotInput for T {}
80
81/// The per-slot reduced value persisted inside a window's slot map.
82/// Must be cheap to clone and serde-roundtrippable so the driver can
83/// persist the full per-window map through [`crate::state::cache::StateCache`].
84pub trait SlotContribution: Clone + Debug + Serialize + DeserializeOwned {}
85impl<T> SlotContribution for T where T: Clone + Debug + Serialize + DeserializeOwned {}
86
87/// The output emitted for a window once its slot map has any contributions.
88/// Implementations are typically the typed row struct generated by chaindex's
89/// `row!()` macro; the contract suite here only requires `PartialEq + Debug`
90/// so order-independence and roundtrip equality can be checked.
91pub trait WindowOutput: Clone + Debug + PartialEq {}
92impl<T> WindowOutput for T where T: Clone + Debug + PartialEq {}
93
94/// The core authoring trait. Implementors describe their aggregation as four
95/// pure functions; the driver handles all FFI plumbing, diff routing, and
96/// state persistence.
97pub trait TumblingOperator {
98 /// The grouping key for a window's identity (e.g. `(base_mint,
99 /// quote_mint)`). The pair `(GroupKey, WindowSpan<SlotKey>)`
100 /// uniquely identifies one slot map.
101 ///
102 /// `Ord` is required because the driver and contract suite bucket
103 /// windows in deterministic order through a `BTreeMap`; relying on
104 /// `HashMap` iteration would make the order-independence check
105 /// itself non-deterministic.
106 type GroupKey: Clone + Eq + Ord + Hash + Debug + Serialize + DeserializeOwned;
107
108 /// The within-window slot identity *and* the coordinate space the
109 /// window itself lives in. Two input rows with the same `SlotKey`
110 /// map to the same per-slot contribution and the second one
111 /// replaces the first (last-write-wins, modulo `fold_into_slot`).
112 ///
113 /// Typical choices: `u64` (unix timestamp, Solana slot, block
114 /// height), or a newtype around one of those.
115 type SlotKey: Slot + Hash + Serialize + DeserializeOwned;
116
117 /// Typed view of an input row, produced by [`Self::extract`].
118 type SlotInput: SlotInput;
119
120 /// Per-slot reduced value, persisted inside the window's slot map.
121 type SlotContribution: SlotContribution;
122
123 /// Public output emitted to downstream operators.
124 type Output: WindowOutput;
125
126 /// Project one row from a `BorrowedColumns` batch into a routed,
127 /// typed input. Returning `None` skips the row (e.g. for
128 /// missing/invalid values). The returned `SlotKey` must lie inside
129 /// the [`WindowSpan`] that [`Self::window_for`] would return for it.
130 fn extract(
131 &self,
132 cols: &BorrowedColumns<'_>,
133 row_index: usize,
134 ) -> Option<(Self::GroupKey, Self::SlotKey, Self::SlotInput)>;
135
136 /// Reduce one input row into the slot's contribution. `prev` is
137 /// `Some` if a row for this same `SlotKey` was already folded earlier
138 /// in the *same* input batch; it is `None` on first observation.
139 ///
140 /// Across batches, the driver re-loads the persisted map and calls
141 /// this with `prev = Some(persisted)` if a new row arrives at an
142 /// existing slot, or `prev = None` if the slot is new.
143 ///
144 /// Implementations that want pure last-write-wins (the OHLCV case)
145 /// can simply ignore `prev` and derive the contribution from `input`.
146 fn fold_into_slot(
147 &self,
148 prev: Option<&Self::SlotContribution>,
149 input: &Self::SlotInput,
150 ) -> Self::SlotContribution;
151
152 /// Combine all slot contributions in the window into the public
153 /// output. **Must be order-independent w.r.t. `slots`**: the result
154 /// must depend only on the multiset of `(SlotKey, SlotContribution)`
155 /// pairs in `slots`, never on iteration order.
156 ///
157 /// `prev_window_close` is the carry-forward value from the
158 /// immediately-preceding non-empty emit for the same group, supplied
159 /// by the driver. It is `None` for the first window per group, or
160 /// when no carry has been recorded. Operators that don't need
161 /// cross-window state ignore this argument; operators like TWAP that
162 /// weight a leading gap by the prior closing price use it.
163 ///
164 /// Returning `None` means "this window has no rows to emit" (e.g.
165 /// the slot map is empty after a Remove cleared it).
166 fn combine(
167 &self,
168 group: &Self::GroupKey,
169 span: WindowSpan<Self::SlotKey>,
170 slots: &BTreeMap<Self::SlotKey, Self::SlotContribution>,
171 prev_window_close: Option<&Self::SlotContribution>,
172 ) -> Option<Self::Output>;
173
174 /// The window a given slot key belongs to. Implementations on a
175 /// fixed grid should use [`WindowSpan::for_slot`].
176 fn window_for(&self, slot: Self::SlotKey) -> WindowSpan<Self::SlotKey>;
177
178 /// Derive the value the driver should carry forward to the *next*
179 /// window's `combine` for the same group. Default: the largest-key
180 /// slot's contribution (the latest event in this window).
181 ///
182 /// `carried_forward` is the carry value that was just consumed by
183 /// `combine` for this window. Operators whose carry depends on a
184 /// derived quantity (e.g. Heikin-Ashi's recursive ha_open/ha_close)
185 /// need access to the prior carry to compute the next one.
186 fn carry_forward(
187 &self,
188 slots: &BTreeMap<Self::SlotKey, Self::SlotContribution>,
189 _carried_forward: Option<&Self::SlotContribution>,
190 ) -> Option<Self::SlotContribution> {
191 slots.last_key_value().map(|(_, v)| v.clone())
192 }
193}
194
195/// The persisted state for one window: a sorted map from slot key to its
196/// reduced contribution. Always serde-roundtripped through `StateCache`.
197pub type WindowSlots<A> = BTreeMap<<A as TumblingOperator>::SlotKey, <A as TumblingOperator>::SlotContribution>;
198
199/// FFI registration surface for a [`TumblingOperator`].
200///
201/// `TumblingOperator` is the *pure* aggregation surface (the four functions
202/// the contract harness checks). `FFITumblingOperator` adds the bits the FFI
203/// driver needs to register the operator with the runtime: per-operator
204/// metadata constants, a config-driven constructor, and a row-key encoder
205/// so the driver can allocate `RowNumber` per `(group, window_start)`.
206///
207/// Authors implement BOTH traits in adjacent `impl` blocks. The contract
208/// harness in `crate::testing::windowed` only sees the pure aggregator, so
209/// test fixtures don't need FFI metadata; production operators registered
210/// through [`TumblingDriver`] must impl both.
211pub trait FFITumblingOperator: TumblingOperator + Sized
212where
213 Self::Output: Row,
214 for<'a> &'a Self::GroupKey: IntoEncodedKey,
215{
216 /// Operator name as registered with the runtime.
217 const NAME: &'static str;
218 /// Operator version string.
219 const VERSION: &'static str;
220 /// Human-readable operator description.
221 const DESCRIPTION: &'static str;
222 /// Schema for input columns the operator expects in `BorrowedChange`.
223 const INPUT_COLUMNS: &'static [OperatorColumn];
224 /// Schema for output columns the operator emits.
225 const OUTPUT_COLUMNS: &'static [OperatorColumn];
226 /// Capability bitmask (see [`reifydb_abi::operator::capabilities`]).
227 const CAPABILITIES: u32;
228
229 /// Construct an aggregator from FFI config. Called once per operator
230 /// instance from [`crate::operator::FFIOperator::new`].
231 fn from_config(operator_id: FlowNodeId, config: &HashMap<String, Value>) -> Result<Self>;
232
233 /// Encode the row key for `(group, window_start)`. The driver passes
234 /// the resulting key to `OperatorContext::get_or_create_row_number`
235 /// to allocate the output row identity for this window.
236 ///
237 /// Use [`EncodedKey::builder`] and append the same fields the
238 /// operator's downstream consumers expect to look up by, in the
239 /// canonical chaindex pattern (e.g. `.str(base).str(quote).u64(window_start)`).
240 fn encode_row_key(&self, group: &Self::GroupKey, window_start: Self::SlotKey) -> EncodedKey;
241}