use reifydb_core::{
encoded::{key::EncodedKey, row::EncodedRow},
interface::catalog::flow::FlowNodeId,
util::encoding::keycode::serializer::KeySerializer,
};
use reifydb_type::{Result, util::cowvec::CowVec, value::row_number::RowNumber};
use crate::{
operator::stateful::utils::{internal_state_get, internal_state_set},
transaction::FlowTransaction,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum CounterDirection {
#[default]
Ascending,
Descending,
}
pub struct Counter {
node: FlowNodeId,
key: EncodedKey,
direction: CounterDirection,
}
impl Counter {
pub fn with_prefix(node: FlowNodeId, prefix: u8, direction: CounterDirection) -> Self {
let mut serializer = KeySerializer::new();
serializer.extend_u8(prefix);
let key = EncodedKey::new(serializer.finish());
Self {
node,
key,
direction,
}
}
pub fn with_key(node: FlowNodeId, key: EncodedKey, direction: CounterDirection) -> Self {
Self {
node,
key,
direction,
}
}
pub fn next(&self, txn: &mut FlowTransaction) -> Result<RowNumber> {
let current = self.load(txn)?;
let next_value = self.compute_next(current);
self.save(txn, next_value)?;
Ok(RowNumber(current))
}
pub fn current(&self, txn: &mut FlowTransaction) -> Result<u64> {
self.load(txn)
}
pub fn set(&self, txn: &mut FlowTransaction, value: u64) -> Result<()> {
self.save(txn, value)
}
fn load(&self, txn: &mut FlowTransaction) -> Result<u64> {
match internal_state_get(self.node, txn, &self.key)? {
None => Ok(self.default_value()),
Some(encoded) => {
let bytes = encoded.as_slice();
if bytes.len() >= 8 {
Ok(u64::from_be_bytes([
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
bytes[7],
]))
} else {
Ok(self.default_value())
}
}
}
}
fn save(&self, txn: &mut FlowTransaction, value: u64) -> Result<()> {
let bytes = value.to_be_bytes().to_vec();
internal_state_set(self.node, txn, &self.key, EncodedRow(CowVec::new(bytes)))?;
Ok(())
}
fn default_value(&self) -> u64 {
match self.direction {
CounterDirection::Ascending => 1,
CounterDirection::Descending => u64::MAX,
}
}
fn compute_next(&self, current: u64) -> u64 {
match self.direction {
CounterDirection::Ascending => current.wrapping_add(1),
CounterDirection::Descending => current.wrapping_sub(1),
}
}
}
#[cfg(test)]
mod tests {
use reifydb_catalog::catalog::Catalog;
use reifydb_core::common::CommitVersion;
use reifydb_runtime::context::clock::{Clock, MockClock};
use reifydb_transaction::interceptor::interceptors::Interceptors;
use super::*;
use crate::operator::stateful::test_utils::test::*;
#[test]
fn test_counter_starts_at_one() {
let mut txn = create_test_transaction();
let mut txn = FlowTransaction::deferred(
&mut txn,
CommitVersion(1),
Catalog::testing(),
Interceptors::new(),
Clock::Mock(MockClock::from_millis(1000)),
);
let counter = Counter::with_prefix(FlowNodeId(1), b'T', CounterDirection::Ascending);
let value = counter.next(&mut txn).unwrap();
assert_eq!(value.0, 1);
}
#[test]
fn test_counter_increments() {
let mut txn = create_test_transaction();
let mut txn = FlowTransaction::deferred(
&mut txn,
CommitVersion(1),
Catalog::testing(),
Interceptors::new(),
Clock::Mock(MockClock::from_millis(1000)),
);
let counter = Counter::with_prefix(FlowNodeId(1), b'T', CounterDirection::Ascending);
let v1 = counter.next(&mut txn).unwrap();
let v2 = counter.next(&mut txn).unwrap();
let v3 = counter.next(&mut txn).unwrap();
assert_eq!(v1.0, 1);
assert_eq!(v2.0, 2);
assert_eq!(v3.0, 3);
}
#[test]
fn test_counter_persistence() {
let mut txn = create_test_transaction();
let mut txn = FlowTransaction::deferred(
&mut txn,
CommitVersion(1),
Catalog::testing(),
Interceptors::new(),
Clock::Mock(MockClock::from_millis(1000)),
);
let node = FlowNodeId(1);
{
let counter = Counter::with_prefix(node, b'P', CounterDirection::Ascending);
counter.next(&mut txn).unwrap();
counter.next(&mut txn).unwrap();
}
{
let counter = Counter::with_prefix(node, b'P', CounterDirection::Ascending);
let value = counter.next(&mut txn).unwrap();
assert_eq!(value.0, 3);
}
}
#[test]
fn test_counter_current() {
let mut txn = create_test_transaction();
let mut txn = FlowTransaction::deferred(
&mut txn,
CommitVersion(1),
Catalog::testing(),
Interceptors::new(),
Clock::Mock(MockClock::from_millis(1000)),
);
let counter = Counter::with_prefix(FlowNodeId(1), b'T', CounterDirection::Ascending);
let current = counter.current(&mut txn).unwrap();
assert_eq!(current, 1);
counter.next(&mut txn).unwrap();
let current = counter.current(&mut txn).unwrap();
assert_eq!(current, 2);
let current_again = counter.current(&mut txn).unwrap();
assert_eq!(current_again, 2);
}
#[test]
fn test_counter_set() {
let mut txn = create_test_transaction();
let mut txn = FlowTransaction::deferred(
&mut txn,
CommitVersion(1),
Catalog::testing(),
Interceptors::new(),
Clock::Mock(MockClock::from_millis(1000)),
);
let counter = Counter::with_prefix(FlowNodeId(1), b'T', CounterDirection::Ascending);
counter.set(&mut txn, 100).unwrap();
let value = counter.next(&mut txn).unwrap();
assert_eq!(value.0, 100);
let value = counter.next(&mut txn).unwrap();
assert_eq!(value.0, 101);
}
#[test]
fn test_counter_with_custom_key() {
let mut txn = create_test_transaction();
let mut txn = FlowTransaction::deferred(
&mut txn,
CommitVersion(1),
Catalog::testing(),
Interceptors::new(),
Clock::Mock(MockClock::from_millis(1000)),
);
let custom_key = {
let mut serializer = KeySerializer::new();
serializer.extend_bytes(b"subscription-id-123");
EncodedKey::new(serializer.finish())
};
let counter = Counter::with_key(FlowNodeId(1), custom_key, CounterDirection::Ascending);
let v1 = counter.next(&mut txn).unwrap();
let v2 = counter.next(&mut txn).unwrap();
assert_eq!(v1.0, 1);
assert_eq!(v2.0, 2);
}
#[test]
fn test_multiple_counters_isolated() {
let mut txn = create_test_transaction();
let mut txn = FlowTransaction::deferred(
&mut txn,
CommitVersion(1),
Catalog::testing(),
Interceptors::new(),
Clock::Mock(MockClock::from_millis(1000)),
);
let node = FlowNodeId(1);
let counter1 = Counter::with_prefix(node, b'A', CounterDirection::Ascending);
let counter2 = Counter::with_prefix(node, b'B', CounterDirection::Ascending);
let v1a = counter1.next(&mut txn).unwrap();
let v2a = counter2.next(&mut txn).unwrap();
let v1b = counter1.next(&mut txn).unwrap();
let v2b = counter2.next(&mut txn).unwrap();
assert_eq!(v1a.0, 1);
assert_eq!(v2a.0, 1);
assert_eq!(v1b.0, 2);
assert_eq!(v2b.0, 2);
}
#[test]
fn test_different_nodes_isolated() {
let mut txn = create_test_transaction();
let mut txn = FlowTransaction::deferred(
&mut txn,
CommitVersion(1),
Catalog::testing(),
Interceptors::new(),
Clock::Mock(MockClock::from_millis(1000)),
);
let counter1 = Counter::with_prefix(FlowNodeId(1), b'X', CounterDirection::Ascending);
let counter2 = Counter::with_prefix(FlowNodeId(2), b'X', CounterDirection::Ascending);
let v1 = counter1.next(&mut txn).unwrap();
let v2 = counter2.next(&mut txn).unwrap();
assert_eq!(v1.0, 1);
assert_eq!(v2.0, 1);
}
#[test]
fn test_wrapping_behavior() {
let mut txn = create_test_transaction();
let mut txn = FlowTransaction::deferred(
&mut txn,
CommitVersion(1),
Catalog::testing(),
Interceptors::new(),
Clock::Mock(MockClock::from_millis(1000)),
);
let counter = Counter::with_prefix(FlowNodeId(1), b'W', CounterDirection::Ascending);
counter.set(&mut txn, u64::MAX).unwrap();
let v1 = counter.next(&mut txn).unwrap();
let v2 = counter.next(&mut txn).unwrap();
assert_eq!(v1.0, u64::MAX);
assert_eq!(v2.0, 0); }
#[test]
fn test_encoded_keys_sort_descending() {
let mut serializer1 = KeySerializer::new();
serializer1.extend_u64(1u64);
let key1 = serializer1.finish();
let mut serializer2 = KeySerializer::new();
serializer2.extend_u64(2u64);
let key2 = serializer2.finish();
assert!(key1 > key2, "encode(1) > encode(2) for descending order");
}
#[test]
fn test_counter_descending_starts_at_max() {
let mut txn = create_test_transaction();
let mut txn = FlowTransaction::deferred(
&mut txn,
CommitVersion(1),
Catalog::testing(),
Interceptors::new(),
Clock::Mock(MockClock::from_millis(1000)),
);
let counter = Counter::with_prefix(FlowNodeId(1), b'T', CounterDirection::Descending);
let value = counter.next(&mut txn).unwrap();
assert_eq!(value.0, u64::MAX);
}
#[test]
fn test_counter_descending_decrements() {
let mut txn = create_test_transaction();
let mut txn = FlowTransaction::deferred(
&mut txn,
CommitVersion(1),
Catalog::testing(),
Interceptors::new(),
Clock::Mock(MockClock::from_millis(1000)),
);
let counter = Counter::with_prefix(FlowNodeId(1), b'T', CounterDirection::Descending);
let v1 = counter.next(&mut txn).unwrap();
let v2 = counter.next(&mut txn).unwrap();
let v3 = counter.next(&mut txn).unwrap();
assert_eq!(v1.0, u64::MAX);
assert_eq!(v2.0, u64::MAX - 1);
assert_eq!(v3.0, u64::MAX - 2);
}
#[test]
fn test_counter_descending_wrapping() {
let mut txn = create_test_transaction();
let mut txn = FlowTransaction::deferred(
&mut txn,
CommitVersion(1),
Catalog::testing(),
Interceptors::new(),
Clock::Mock(MockClock::from_millis(1000)),
);
let counter = Counter::with_prefix(FlowNodeId(1), b'W', CounterDirection::Descending);
counter.set(&mut txn, 1).unwrap();
let v1 = counter.next(&mut txn).unwrap();
let v2 = counter.next(&mut txn).unwrap();
assert_eq!(v1.0, 1);
assert_eq!(v2.0, 0);
let v3 = counter.next(&mut txn).unwrap();
assert_eq!(v3.0, u64::MAX); }
}