Skip to main content

reifydb_sdk/operator/windowed/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Tumbling-aggregator authoring surface.
5//!
6//! Most stateful operators share an
7//! identical event-handling shape. They have repeatedly been written from
8//! scratch on top of [`crate::operator::FFIOperator`], and have repeatedly
9//! shipped the same families of bugs:
10//!
11//! 1. **Per-slot replacement on Update.** `Update` is treated like another `Insert` and the slot's contribution is
12//!    added twice.
13//! 2. **`DiffType::Remove` silently dropped.** No match arm exists for Remove, so the state retains rows the source
14//!    already deleted.
15//! 3. **Per-slot extrema** (open/close/high/low) computed as a running min/max instead of as min/max over the *current*
16//!    per-slot map, so a same-slot Update with a lower price never lowers the high.
17//! 4. **Window boundary off-by-one.** Inconsistent `[start, end)` vs `[start, end]` semantics across operators.
18//!
19//! This module exists to make those bugs *unrepresentable in user code*.
20//! An operator implements [`TumblingOperator`] -- a small set of pure
21//! functions over a typed slot input and contribution. The shared driver
22//! (slice 2 of the rollout) handles diff routing, per-slot replacement,
23//! Remove, and window boundary math, exhaustively and in one place.
24//!
25//! For overlapping-window operators (rolling buffers of the last N
26//! tumbling outputs) see the sibling [`RollingOperator`] trait; both
27//! traits live under the `windowed` namespace because they share the
28//! `Slot` / `WindowSpan` machinery in [`span`].
29//!
30//! # Contract
31//!
32//! - The slot map for a window is keyed by [`TumblingOperator::SlotKey`]. Insert and Update of an input row whose
33//!   extracted slot key is `k` *replace* the contribution at `k`, never add to it. This is the per-slot replacement
34//!   contract documented on `chaindex::testing::oracle::naive_ohlcv` and matches the way the existing chaindex chaos
35//!   oracles model events.
36//! - [`TumblingOperator::fold_into_slot`] is a pure function of `(prev, input)`. It is called with `prev = None` on
37//!   first observation of a slot and `prev = Some(existing)` if a row at the same slot was seen earlier in the same
38//!   input batch (e.g. multiple rows in a single `BorrowedChange` map to the same slot). Implementations that simply
39//!   want last-write-wins should ignore `prev`.
40//! - [`TumblingOperator::combine`] reduces the entire per-slot map of a window into the operator's public output. **It
41//!   must be a pure function of the map's contents, independent of iteration order.** This is the invariant that
42//!   prevents bug class 3 above: extrema must be computed by folding `slots.values()`, never by maintaining a running
43//!   max/min that mutates as rows arrive.
44//! - [`TumblingOperator::window_for`] returns a [`WindowSpan`] with `[start, end)` semantics over the operator's chosen
45//!   slot coordinate (`u64` timestamps, Solana slot numbers, `DateTime` newtypes, etc.). See [`span`] for the canonical
46//!   helper and [`span::Slot`] for the coordinate trait.
47
48use std::{
49	collections::{BTreeMap, HashMap},
50	fmt::Debug,
51	hash::Hash,
52};
53
54use reifydb_core::{
55	encoded::key::{EncodedKey, IntoEncodedKey},
56	interface::catalog::flow::FlowNodeId,
57};
58use reifydb_type::value::Value;
59use serde::{Serialize, de::DeserializeOwned};
60
61use crate::{
62	error::Result,
63	operator::{
64		change::BorrowedColumns,
65		column::{operator::OperatorColumn, row::Row},
66		windowed::span::{Slot, WindowSpan},
67	},
68};
69
70pub mod multi_rolling;
71pub mod rolling;
72pub mod span;
73pub mod tumbling;
74
75/// A typed view extracted from one input row, sufficient to drive the
76/// aggregation. Implementations are typically small `Copy` structs
77/// (e.g. `{ price: f64, size: f64 }` for VWAP).
78pub trait SlotInput: Clone + Debug {}
79impl<T: Clone + Debug> SlotInput for T {}
80
81/// The per-slot reduced value persisted inside a window's slot map.
82/// Must be cheap to clone and serde-roundtrippable so the driver can
83/// persist the full per-window map through [`crate::state::cache::StateCache`].
84pub trait SlotContribution: Clone + Debug + Serialize + DeserializeOwned {}
85impl<T> SlotContribution for T where T: Clone + Debug + Serialize + DeserializeOwned {}
86
87/// The output emitted for a window once its slot map has any contributions.
88/// Implementations are typically the typed row struct generated by chaindex's
89/// `row!()` macro; the contract suite here only requires `PartialEq + Debug`
90/// so order-independence and roundtrip equality can be checked.
91pub trait WindowOutput: Clone + Debug + PartialEq {}
92impl<T> WindowOutput for T where T: Clone + Debug + PartialEq {}
93
94/// The core authoring trait. Implementors describe their aggregation as four
95/// pure functions; the driver handles all FFI plumbing, diff routing, and
96/// state persistence.
97pub trait TumblingOperator {
98	/// The grouping key for a window's identity (e.g. `(base_mint,
99	/// quote_mint)`). The pair `(GroupKey, WindowSpan<SlotKey>)`
100	/// uniquely identifies one slot map.
101	///
102	/// `Ord` is required because the driver and contract suite bucket
103	/// windows in deterministic order through a `BTreeMap`; relying on
104	/// `HashMap` iteration would make the order-independence check
105	/// itself non-deterministic.
106	type GroupKey: Clone + Eq + Ord + Hash + Debug + Serialize + DeserializeOwned;
107
108	/// The within-window slot identity *and* the coordinate space the
109	/// window itself lives in. Two input rows with the same `SlotKey`
110	/// map to the same per-slot contribution and the second one
111	/// replaces the first (last-write-wins, modulo `fold_into_slot`).
112	///
113	/// Typical choices: `u64` (unix timestamp, Solana slot, block
114	/// height), or a newtype around one of those.
115	type SlotKey: Slot + Hash + Serialize + DeserializeOwned;
116
117	/// Typed view of an input row, produced by [`Self::extract`].
118	type SlotInput: SlotInput;
119
120	/// Per-slot reduced value, persisted inside the window's slot map.
121	type SlotContribution: SlotContribution;
122
123	/// Public output emitted to downstream operators.
124	type Output: WindowOutput;
125
126	/// Project one row from a `BorrowedColumns` batch into a routed,
127	/// typed input. Returning `None` skips the row (e.g. for
128	/// missing/invalid values). The returned `SlotKey` must lie inside
129	/// the [`WindowSpan`] that [`Self::window_for`] would return for it.
130	fn extract(
131		&self,
132		cols: &BorrowedColumns<'_>,
133		row_index: usize,
134	) -> Option<(Self::GroupKey, Self::SlotKey, Self::SlotInput)>;
135
136	/// Reduce one input row into the slot's contribution. `prev` is
137	/// `Some` if a row for this same `SlotKey` was already folded earlier
138	/// in the *same* input batch; it is `None` on first observation.
139	///
140	/// Across batches, the driver re-loads the persisted map and calls
141	/// this with `prev = Some(persisted)` if a new row arrives at an
142	/// existing slot, or `prev = None` if the slot is new.
143	///
144	/// Implementations that want pure last-write-wins (the OHLCV case)
145	/// can simply ignore `prev` and derive the contribution from `input`.
146	fn fold_into_slot(
147		&self,
148		prev: Option<&Self::SlotContribution>,
149		input: &Self::SlotInput,
150	) -> Self::SlotContribution;
151
152	/// Combine all slot contributions in the window into the public
153	/// output. **Must be order-independent w.r.t. `slots`**: the result
154	/// must depend only on the multiset of `(SlotKey, SlotContribution)`
155	/// pairs in `slots`, never on iteration order.
156	///
157	/// `prev_window_close` is the carry-forward value from the
158	/// immediately-preceding non-empty emit for the same group, supplied
159	/// by the driver. It is `None` for the first window per group, or
160	/// when no carry has been recorded. Operators that don't need
161	/// cross-window state ignore this argument; operators like TWAP that
162	/// weight a leading gap by the prior closing price use it.
163	///
164	/// Returning `None` means "this window has no rows to emit" (e.g.
165	/// the slot map is empty after a Remove cleared it).
166	fn combine(
167		&self,
168		group: &Self::GroupKey,
169		span: WindowSpan<Self::SlotKey>,
170		slots: &BTreeMap<Self::SlotKey, Self::SlotContribution>,
171		prev_window_close: Option<&Self::SlotContribution>,
172	) -> Option<Self::Output>;
173
174	/// The window a given slot key belongs to. Implementations on a
175	/// fixed grid should use [`WindowSpan::for_slot`].
176	fn window_for(&self, slot: Self::SlotKey) -> WindowSpan<Self::SlotKey>;
177
178	/// Derive the value the driver should carry forward to the *next*
179	/// window's `combine` for the same group. Default: the largest-key
180	/// slot's contribution (the latest event in this window).
181	///
182	/// `carried_forward` is the carry value that was just consumed by
183	/// `combine` for this window. Operators whose carry depends on a
184	/// derived quantity (e.g. Heikin-Ashi's recursive ha_open/ha_close)
185	/// need access to the prior carry to compute the next one.
186	fn carry_forward(
187		&self,
188		slots: &BTreeMap<Self::SlotKey, Self::SlotContribution>,
189		_carried_forward: Option<&Self::SlotContribution>,
190	) -> Option<Self::SlotContribution> {
191		slots.last_key_value().map(|(_, v)| v.clone())
192	}
193}
194
195/// The persisted state for one window: a sorted map from slot key to its
196/// reduced contribution. Always serde-roundtripped through `StateCache`.
197pub type WindowSlots<A> = BTreeMap<<A as TumblingOperator>::SlotKey, <A as TumblingOperator>::SlotContribution>;
198
199/// FFI registration surface for a [`TumblingOperator`].
200///
201/// `TumblingOperator` is the *pure* aggregation surface (the four functions
202/// the contract harness checks). `FFITumblingOperator` adds the bits the FFI
203/// driver needs to register the operator with the runtime: per-operator
204/// metadata constants, a config-driven constructor, and a row-key encoder
205/// so the driver can allocate `RowNumber` per `(group, window_start)`.
206///
207/// Authors implement BOTH traits in adjacent `impl` blocks. The contract
208/// harness in `crate::testing::windowed` only sees the pure aggregator, so
209/// test fixtures don't need FFI metadata; production operators registered
210/// through [`TumblingDriver`] must impl both.
211pub trait FFITumblingOperator: TumblingOperator + Sized
212where
213	Self::Output: Row,
214	for<'a> &'a Self::GroupKey: IntoEncodedKey,
215{
216	/// Operator name as registered with the runtime.
217	const NAME: &'static str;
218	/// Operator version string.
219	const VERSION: &'static str;
220	/// Human-readable operator description.
221	const DESCRIPTION: &'static str;
222	/// Schema for input columns the operator expects in `BorrowedChange`.
223	const INPUT_COLUMNS: &'static [OperatorColumn];
224	/// Schema for output columns the operator emits.
225	const OUTPUT_COLUMNS: &'static [OperatorColumn];
226	/// Capability bitmask (see [`reifydb_abi::operator::capabilities`]).
227	const CAPABILITIES: u32;
228
229	/// Construct an aggregator from FFI config. Called once per operator
230	/// instance from [`crate::operator::FFIOperator::new`].
231	fn from_config(operator_id: FlowNodeId, config: &HashMap<String, Value>) -> Result<Self>;
232
233	/// Encode the row key for `(group, window_start)`. The driver passes
234	/// the resulting key to `OperatorContext::get_or_create_row_number`
235	/// to allocate the output row identity for this window.
236	///
237	/// Use [`EncodedKey::builder`] and append the same fields the
238	/// operator's downstream consumers expect to look up by, in the
239	/// canonical chaindex pattern (e.g. `.str(base).str(quote).u64(window_start)`).
240	fn encode_row_key(&self, group: &Self::GroupKey, window_start: Self::SlotKey) -> EncodedKey;
241}