reifydb-sdk 0.5.0

SDK for building ReifyDB operators, procedures, transforms and more
Documentation
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2025 ReifyDB

//! Tumbling-aggregator authoring surface.
//!
//! Most stateful operators share an
//! identical event-handling shape. They have repeatedly been written from
//! scratch on top of [`crate::operator::FFIOperator`], and have repeatedly
//! shipped the same families of bugs:
//!
//! 1. **Per-slot replacement on Update.** `Update` is treated like another `Insert` and the slot's contribution is
//!    added twice.
//! 2. **`DiffType::Remove` silently dropped.** No match arm exists for Remove, so the state retains rows the source
//!    already deleted.
//! 3. **Per-slot extrema** (open/close/high/low) computed as a running min/max instead of as min/max over the *current*
//!    per-slot map, so a same-slot Update with a lower price never lowers the high.
//! 4. **Window boundary off-by-one.** Inconsistent `[start, end)` vs `[start, end]` semantics across operators.
//!
//! This module exists to make those bugs *unrepresentable in user code*.
//! An operator implements [`TumblingOperator`] -- a small set of pure
//! functions over a typed slot input and contribution. The shared driver
//! (slice 2 of the rollout) handles diff routing, per-slot replacement,
//! Remove, and window boundary math, exhaustively and in one place.
//!
//! For overlapping-window operators (rolling buffers of the last N
//! tumbling outputs) see the sibling [`RollingOperator`] trait; both
//! traits live under the `windowed` namespace because they share the
//! `Slot` / `WindowSpan` machinery in [`span`].
//!
//! # Contract
//!
//! - The slot map for a window is keyed by [`TumblingOperator::SlotKey`]. Insert and Update of an input row whose
//!   extracted slot key is `k` *replace* the contribution at `k`, never add to it. This is the per-slot replacement
//!   contract documented on `chaindex::testing::oracle::naive_ohlcv` and matches the way the existing chaindex chaos
//!   oracles model events.
//! - [`TumblingOperator::fold_into_slot`] is a pure function of `(prev, input)`. It is called with `prev = None` on
//!   first observation of a slot and `prev = Some(existing)` if a row at the same slot was seen earlier in the same
//!   input batch (e.g. multiple rows in a single `BorrowedChange` map to the same slot). Implementations that simply
//!   want last-write-wins should ignore `prev`.
//! - [`TumblingOperator::combine`] reduces the entire per-slot map of a window into the operator's public output. **It
//!   must be a pure function of the map's contents, independent of iteration order.** This is the invariant that
//!   prevents bug class 3 above: extrema must be computed by folding `slots.values()`, never by maintaining a running
//!   max/min that mutates as rows arrive.
//! - [`TumblingOperator::window_for`] returns a [`WindowSpan`] with `[start, end)` semantics over the operator's chosen
//!   slot coordinate (`u64` timestamps, Solana slot numbers, `DateTime` newtypes, etc.). See [`span`] for the canonical
//!   helper and [`span::Slot`] for the coordinate trait.

use std::{
	collections::{BTreeMap, HashMap},
	fmt::Debug,
	hash::Hash,
};

use reifydb_core::{
	encoded::key::{EncodedKey, IntoEncodedKey},
	interface::catalog::flow::FlowNodeId,
};
use reifydb_type::value::Value;
use serde::{Serialize, de::DeserializeOwned};

use crate::{
	error::Result,
	operator::{
		change::BorrowedColumns,
		column::{operator::OperatorColumn, row::Row},
		windowed::span::{Slot, WindowSpan},
	},
};

pub mod multi_rolling;
pub mod rolling;
pub mod span;
pub mod tumbling;

/// A typed view extracted from one input row, sufficient to drive the
/// aggregation. Implementations are typically small `Copy` structs
/// (e.g. `{ price: f64, size: f64 }` for VWAP).
pub trait SlotInput: Clone + Debug {}
impl<T: Clone + Debug> SlotInput for T {}

/// The per-slot reduced value persisted inside a window's slot map.
/// Must be cheap to clone and serde-roundtrippable so the driver can
/// persist the full per-window map through [`crate::state::cache::StateCache`].
pub trait SlotContribution: Clone + Debug + Serialize + DeserializeOwned {}
impl<T> SlotContribution for T where T: Clone + Debug + Serialize + DeserializeOwned {}

/// The output emitted for a window once its slot map has any contributions.
/// Implementations are typically the typed row struct generated by chaindex's
/// `row!()` macro; the contract suite here only requires `PartialEq + Debug`
/// so order-independence and roundtrip equality can be checked.
pub trait WindowOutput: Clone + Debug + PartialEq {}
impl<T> WindowOutput for T where T: Clone + Debug + PartialEq {}

/// The core authoring trait. Implementors describe their aggregation as four
/// pure functions; the driver handles all FFI plumbing, diff routing, and
/// state persistence.
pub trait TumblingOperator {
	/// The grouping key for a window's identity (e.g. `(base_mint,
	/// quote_mint)`). The pair `(GroupKey, WindowSpan<SlotKey>)`
	/// uniquely identifies one slot map.
	///
	/// `Ord` is required because the driver and contract suite bucket
	/// windows in deterministic order through a `BTreeMap`; relying on
	/// `HashMap` iteration would make the order-independence check
	/// itself non-deterministic.
	type GroupKey: Clone + Eq + Ord + Hash + Debug + Serialize + DeserializeOwned;

	/// The within-window slot identity *and* the coordinate space the
	/// window itself lives in. Two input rows with the same `SlotKey`
	/// map to the same per-slot contribution and the second one
	/// replaces the first (last-write-wins, modulo `fold_into_slot`).
	///
	/// Typical choices: `u64` (unix timestamp, Solana slot, block
	/// height), or a newtype around one of those.
	type SlotKey: Slot + Hash + Serialize + DeserializeOwned;

	/// Typed view of an input row, produced by [`Self::extract`].
	type SlotInput: SlotInput;

	/// Per-slot reduced value, persisted inside the window's slot map.
	type SlotContribution: SlotContribution;

	/// Public output emitted to downstream operators.
	type Output: WindowOutput;

	/// Project one row from a `BorrowedColumns` batch into a routed,
	/// typed input. Returning `None` skips the row (e.g. for
	/// missing/invalid values). The returned `SlotKey` must lie inside
	/// the [`WindowSpan`] that [`Self::window_for`] would return for it.
	fn extract(
		&self,
		cols: &BorrowedColumns<'_>,
		row_index: usize,
	) -> Option<(Self::GroupKey, Self::SlotKey, Self::SlotInput)>;

	/// Reduce one input row into the slot's contribution. `prev` is
	/// `Some` if a row for this same `SlotKey` was already folded earlier
	/// in the *same* input batch; it is `None` on first observation.
	///
	/// Across batches, the driver re-loads the persisted map and calls
	/// this with `prev = Some(persisted)` if a new row arrives at an
	/// existing slot, or `prev = None` if the slot is new.
	///
	/// Implementations that want pure last-write-wins (the OHLCV case)
	/// can simply ignore `prev` and derive the contribution from `input`.
	fn fold_into_slot(
		&self,
		prev: Option<&Self::SlotContribution>,
		input: &Self::SlotInput,
	) -> Self::SlotContribution;

	/// Combine all slot contributions in the window into the public
	/// output. **Must be order-independent w.r.t. `slots`**: the result
	/// must depend only on the multiset of `(SlotKey, SlotContribution)`
	/// pairs in `slots`, never on iteration order.
	///
	/// `prev_window_close` is the carry-forward value from the
	/// immediately-preceding non-empty emit for the same group, supplied
	/// by the driver. It is `None` for the first window per group, or
	/// when no carry has been recorded. Operators that don't need
	/// cross-window state ignore this argument; operators like TWAP that
	/// weight a leading gap by the prior closing price use it.
	///
	/// Returning `None` means "this window has no rows to emit" (e.g.
	/// the slot map is empty after a Remove cleared it).
	fn combine(
		&self,
		group: &Self::GroupKey,
		span: WindowSpan<Self::SlotKey>,
		slots: &BTreeMap<Self::SlotKey, Self::SlotContribution>,
		prev_window_close: Option<&Self::SlotContribution>,
	) -> Option<Self::Output>;

	/// The window a given slot key belongs to. Implementations on a
	/// fixed grid should use [`WindowSpan::for_slot`].
	fn window_for(&self, slot: Self::SlotKey) -> WindowSpan<Self::SlotKey>;

	/// Derive the value the driver should carry forward to the *next*
	/// window's `combine` for the same group. Default: the largest-key
	/// slot's contribution (the latest event in this window).
	///
	/// Operators that want a different carry forward (e.g. an aggregated
	/// summary rather than the latest slot) override this.
	fn carry_forward(
		&self,
		slots: &BTreeMap<Self::SlotKey, Self::SlotContribution>,
	) -> Option<Self::SlotContribution> {
		slots.last_key_value().map(|(_, v)| v.clone())
	}
}

/// The persisted state for one window: a sorted map from slot key to its
/// reduced contribution. Always serde-roundtripped through `StateCache`.
pub type WindowSlots<A> = BTreeMap<<A as TumblingOperator>::SlotKey, <A as TumblingOperator>::SlotContribution>;

/// FFI registration surface for a [`TumblingOperator`].
///
/// `TumblingOperator` is the *pure* aggregation surface (the four functions
/// the contract harness checks). `FFITumblingOperator` adds the bits the FFI
/// driver needs to register the operator with the runtime: per-operator
/// metadata constants, a config-driven constructor, and a row-key encoder
/// so the driver can allocate `RowNumber` per `(group, window_start)`.
///
/// Authors implement BOTH traits in adjacent `impl` blocks. The contract
/// harness in `crate::testing::windowed` only sees the pure aggregator, so
/// test fixtures don't need FFI metadata; production operators registered
/// through [`TumblingDriver`] must impl both.
pub trait FFITumblingOperator: TumblingOperator + Sized
where
	Self::Output: Row,
	for<'a> &'a Self::GroupKey: IntoEncodedKey,
{
	/// Operator name as registered with the runtime.
	const NAME: &'static str;
	/// Operator version string.
	const VERSION: &'static str;
	/// Human-readable operator description.
	const DESCRIPTION: &'static str;
	/// Schema for input columns the operator expects in `BorrowedChange`.
	const INPUT_COLUMNS: &'static [OperatorColumn];
	/// Schema for output columns the operator emits.
	const OUTPUT_COLUMNS: &'static [OperatorColumn];
	/// Capability bitmask (see [`reifydb_abi::operator::capabilities`]).
	const CAPABILITIES: u32;

	/// Construct an aggregator from FFI config. Called once per operator
	/// instance from [`crate::operator::FFIOperator::new`].
	fn from_config(operator_id: FlowNodeId, config: &HashMap<String, Value>) -> Result<Self>;

	/// Encode the row key for `(group, window_start)`. The driver passes
	/// the resulting key to `OperatorContext::get_or_create_row_number`
	/// to allocate the output row identity for this window.
	///
	/// Use [`EncodedKey::builder`] and append the same fields the
	/// operator's downstream consumers expect to look up by, in the
	/// canonical chaindex pattern (e.g. `.str(base).str(quote).u64(window_start)`).
	fn encode_row_key(&self, group: &Self::GroupKey, window_start: Self::SlotKey) -> EncodedKey;
}