reifydb-sub-flow 0.4.10

Flow subsystem for stream processing and data flows
Documentation
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2025 ReifyDB
use reifydb_core::{
	encoded::{
		key::{EncodedKey, EncodedKeyRange},
		row::EncodedRow,
		shape::RowShape,
	},
	key::{EncodableKey, flow_node_state::FlowNodeStateKey},
};
use reifydb_type::Result;

use super::utils;
use crate::{operator::stateful::raw::RawStatefulOperator, transaction::FlowTransaction};

/// Window-based state management for time or count-based windowing
/// Extends TransformOperator directly and uses utility functions for state management
pub trait WindowStateful: RawStatefulOperator {
	/// Get or create the layout for state rows
	fn layout(&self) -> RowShape;

	/// Create a new state encoded with default values
	fn create_state(&self) -> EncodedRow {
		let layout = self.layout();
		layout.allocate()
	}

	/// Load state for a window
	fn load_state(&self, txn: &mut FlowTransaction, window_key: &EncodedKey) -> Result<EncodedRow> {
		utils::load_or_create_row(self.id(), txn, window_key, &self.layout())
	}

	/// Save state for a window
	fn save_state(&self, txn: &mut FlowTransaction, window_key: &EncodedKey, row: EncodedRow) -> Result<()> {
		utils::save_row(self.id(), txn, window_key, row)
	}

	/// Scan keys within a range without removing them (read-only)
	fn scan_keys_in_range(&self, txn: &mut FlowTransaction, range: &EncodedKeyRange) -> Result<Vec<EncodedKey>> {
		let prefixed_range = range.clone().with_prefix(FlowNodeStateKey::new(self.id(), vec![]).encode());
		let stream = txn.range(prefixed_range, 1024);
		let mut keys = Vec::new();
		for result in stream {
			let multi = result?;
			keys.push(EncodedKey::new(multi.key.to_vec()));
		}
		Ok(keys)
	}

	/// Expire windows within a given range
	/// The range should be constructed by the caller based on their window ordering semantics
	fn expire_range(&self, txn: &mut FlowTransaction, range: EncodedKeyRange) -> Result<u32> {
		let prefixed_range = range.with_prefix(FlowNodeStateKey::new(self.id(), vec![]).encode());

		let keys_to_remove = {
			let stream = txn.range(prefixed_range, 1024);
			let mut keys = Vec::new();
			for result in stream {
				let multi = result?;
				keys.push(multi.key);
			}
			keys
		};

		let mut count = 0;
		for key in keys_to_remove {
			txn.remove(&key)?;
			count += 1;
		}

		Ok(count as u32)
	}
}

#[cfg(test)]
pub mod tests {
	use std::ops::Bound::{Excluded, Unbounded};

	use reifydb_catalog::catalog::Catalog;
	use reifydb_core::{
		common::CommitVersion, interface::catalog::flow::FlowNodeId,
		util::encoding::keycode::serializer::KeySerializer,
	};
	use reifydb_runtime::context::clock::{Clock, MockClock};
	use reifydb_transaction::interceptor::interceptors::Interceptors;

	use super::*;
	use crate::{operator::stateful::test_utils::test::*, transaction::FlowTransaction};

	/// Helper to create window keys from u64 for testing
	/// Uses inverted encoding for proper ordering (smaller IDs produce larger keys)
	fn test_window_key(window_id: u64) -> EncodedKey {
		let mut serializer = KeySerializer::with_capacity(16);
		serializer.extend_bytes(b"w:");
		serializer.extend_u64(window_id);
		EncodedKey::new(serializer.finish())
	}

	// Extend TestOperator to implement WindowStateful
	impl WindowStateful for TestOperator {
		fn layout(&self) -> RowShape {
			self.layout.clone()
		}
	}

	#[test]
	fn test_window_key_encoding() {
		// Test different window IDs
		let key1 = test_window_key(1);
		let key2 = test_window_key(2);
		let key100 = test_window_key(100);

		// Keys should be different
		assert_ne!(key1.as_ref(), key2.as_ref());
		assert_ne!(key1.as_ref(), key100.as_ref());

		// Due to inverted encoding, smaller window IDs produce larger keys
		assert!(key1 > key2);
		assert!(key2 > key100);
	}

	#[test]
	fn test_create_state() {
		let operator = TestOperator::simple(FlowNodeId(1));
		let state = operator.create_state();

		// State should be allocated based on layout
		assert!(state.len() > 0);
	}

	#[test]
	fn test_load_save_window_state() {
		let mut txn = create_test_transaction();
		let mut txn = FlowTransaction::deferred(
			&mut txn,
			CommitVersion(1),
			Catalog::testing(),
			Interceptors::new(),
			Clock::Mock(MockClock::from_millis(1000)),
		);
		let operator = TestOperator::simple(FlowNodeId(1));
		let window_key = test_window_key(42);

		// Initially should create new state
		let state1 = operator.load_state(&mut txn, &window_key).unwrap();

		// Modify and save
		let mut modified = state1.clone();
		let layout = operator.layout();
		layout.set_i64(&mut modified, 0, 0xAB);
		operator.save_state(&mut txn, &window_key, modified.clone()).unwrap();

		// Load should return modified state
		let state2 = operator.load_state(&mut txn, &window_key).unwrap();
		assert_eq!(layout.get_i64(&state2, 0), 0xAB);
	}

	#[test]
	fn test_multiple_windows() {
		let mut txn = create_test_transaction();
		let mut txn = FlowTransaction::deferred(
			&mut txn,
			CommitVersion(1),
			Catalog::testing(),
			Interceptors::new(),
			Clock::Mock(MockClock::from_millis(1000)),
		);
		let operator = TestOperator::simple(FlowNodeId(1));

		// Create states for multiple windows
		let window_keys: Vec<_> = (0..5).map(|i| test_window_key(i)).collect();
		let layout = operator.layout();
		for (i, window_key) in window_keys.iter().enumerate() {
			let mut state = operator.create_state();
			layout.set_i64(&mut state, 0, i as i64);
			operator.save_state(&mut txn, window_key, state).unwrap();
		}

		// Verify each window has its own state
		for (i, window_key) in window_keys.iter().enumerate() {
			let state = operator.load_state(&mut txn, window_key).unwrap();
			assert_eq!(layout.get_i64(&state, 0), i as i64);
		}
	}

	#[test]
	fn test_expire_before() {
		let mut txn = create_test_transaction();
		let mut txn = FlowTransaction::deferred(
			&mut txn,
			CommitVersion(1),
			Catalog::testing(),
			Interceptors::new(),
			Clock::Mock(MockClock::from_millis(1000)),
		);
		let operator = TestOperator::simple(FlowNodeId(1));

		// Create windows 0 through 9
		let window_keys: Vec<_> = (0..10).map(|i| test_window_key(i)).collect();
		let layout = operator.layout();
		for (i, window_key) in window_keys.iter().enumerate() {
			let mut state = operator.create_state();
			layout.set_i64(&mut state, 0, i as i64);
			operator.save_state(&mut txn, window_key, state).unwrap();
		}

		// Expire windows before 5 (should remove 0-4)
		// Due to inverted encoding, windows with smaller IDs have larger keys
		// So to expire windows < 5, we need range from key(5) to end
		let before_key = test_window_key(5);
		let range = EncodedKeyRange::new(Excluded(before_key), Unbounded);
		let expired = operator.expire_range(&mut txn, range).unwrap();
		assert_eq!(expired, 5);

		// Verify windows 0-4 are gone
		for i in 0..5 {
			let state = operator.load_state(&mut txn, &window_keys[i]).unwrap();
			assert_eq!(layout.get_i64(&state, 0), 0); // Should be newly created (default)
		}

		// Verify windows 5-9 still exist
		for i in 5..10 {
			let state = operator.load_state(&mut txn, &window_keys[i]).unwrap();
			assert_eq!(layout.get_i64(&state, 0), i as i64);
		}
	}

	#[test]
	fn test_expire_empty_range() {
		let mut txn = create_test_transaction();
		let mut txn = FlowTransaction::deferred(
			&mut txn,
			CommitVersion(1),
			Catalog::testing(),
			Interceptors::new(),
			Clock::Mock(MockClock::from_millis(1000)),
		);
		let operator = TestOperator::simple(FlowNodeId(1));

		// Create windows 5 through 9
		let window_keys: Vec<_> = (5..10).map(|i| test_window_key(i)).collect();
		let layout = operator.layout();
		for (idx, window_key) in window_keys.iter().enumerate() {
			let mut state = operator.create_state();
			layout.set_i64(&mut state, 0, (idx + 5) as i64);
			operator.save_state(&mut txn, window_key, state).unwrap();
		}

		// Expire before 3 (should remove nothing since all windows are >= 5)
		let before_key = test_window_key(3);
		let range = EncodedKeyRange::new(Excluded(before_key), Unbounded);
		let expired = operator.expire_range(&mut txn, range).unwrap();
		assert_eq!(expired, 0);

		// All windows should still exist
		for (idx, window_key) in window_keys.iter().enumerate() {
			let state = operator.load_state(&mut txn, window_key).unwrap();
			assert_eq!(layout.get_i64(&state, 0), (idx + 5) as i64);
		}
	}

	#[test]
	fn test_expire_all() {
		let mut txn = create_test_transaction();
		let mut txn = FlowTransaction::deferred(
			&mut txn,
			CommitVersion(1),
			Catalog::testing(),
			Interceptors::new(),
			Clock::Mock(MockClock::from_millis(1000)),
		);
		let operator = TestOperator::simple(FlowNodeId(1));

		// Create windows 0 through 4
		let window_keys: Vec<_> = (0..5).map(|i| test_window_key(i)).collect();
		let layout = operator.layout();
		for (i, window_key) in window_keys.iter().enumerate() {
			let mut state = operator.create_state();
			layout.set_i64(&mut state, 0, i as i64);
			operator.save_state(&mut txn, window_key, state).unwrap();
		}

		// Expire before 100 (should remove all)
		let before_key = test_window_key(100);
		let range = EncodedKeyRange::new(Excluded(before_key), Unbounded);
		let expired = operator.expire_range(&mut txn, range).unwrap();
		assert_eq!(expired, 5);

		// All windows should be gone
		for window_key in &window_keys {
			let state = operator.load_state(&mut txn, window_key).unwrap();
			assert_eq!(layout.get_i64(&state, 0), 0); // Should be newly created (default)
		}
	}

	#[test]
	fn test_sliding_window_simulation() {
		let mut txn = create_test_transaction();
		let mut txn = FlowTransaction::deferred(
			&mut txn,
			CommitVersion(1),
			Catalog::testing(),
			Interceptors::new(),
			Clock::Mock(MockClock::from_millis(1000)),
		);
		let operator = TestOperator::new(FlowNodeId(1));

		// Simulate a sliding window maintaining last 3 windows
		let window_size = 3;
		let mut all_window_keys = Vec::new();
		let layout = operator.layout();

		for current_window in 0..10 {
			// Add data to current window
			let window_key = test_window_key(current_window);
			all_window_keys.push(window_key.clone());
			let mut state = operator.create_state();
			layout.set_i64(&mut state, 0, current_window as i64);
			operator.save_state(&mut txn, &window_key, state).unwrap();

			// Expire old windows
			if current_window >= window_size {
				let expire_before = current_window - window_size + 1;
				let before_key = test_window_key(expire_before);
				let range = EncodedKeyRange::new(Excluded(before_key), Unbounded);
				operator.expire_range(&mut txn, range).unwrap();
			}
		}

		// Only windows 7, 8, 9 should exist
		for i in 0..7 {
			let state = operator.load_state(&mut txn, &all_window_keys[i]).unwrap();
			assert_eq!(layout.get_i64(&state, 0), 0); // Should be default (expired)
		}

		for i in 7..10 {
			let state = operator.load_state(&mut txn, &all_window_keys[i]).unwrap();
			assert_eq!(layout.get_i64(&state, 0), i as i64); // Should have saved data
		}
	}
}