use std::iter::once;
use reifydb_core::{
encoded::{
key::{EncodedKey, EncodedKeyRange},
row::EncodedRow,
},
interface::catalog::flow::FlowNodeId,
key::{EncodableKey, flow_node_internal_state::FlowNodeInternalStateKey},
util::encoding::keycode::serializer::KeySerializer,
};
use reifydb_type::{Result, util::cowvec::CowVec, value::row_number::RowNumber};
use crate::{
operator::stateful::{
counter::{Counter, CounterDirection},
utils::{internal_state_get, internal_state_set},
},
transaction::FlowTransaction,
};
pub struct RowNumberProvider {
node: FlowNodeId,
counter: Counter,
}
impl RowNumberProvider {
pub fn new(node: FlowNodeId) -> Self {
Self {
node,
counter: Counter::with_prefix(node, b'C', CounterDirection::Ascending),
}
}
pub fn get_or_create_row_numbers<'a, I>(
&self,
txn: &mut FlowTransaction,
keys: I,
) -> Result<Vec<(RowNumber, bool)>>
where
I: IntoIterator<Item = &'a EncodedKey>,
{
let mut results = Vec::new();
for key in keys {
let map_key = self.make_map_key(key);
if let Some(existing_row) = internal_state_get(self.node, txn, &map_key)? {
let bytes = existing_row.as_slice();
if bytes.len() >= 8 {
let row_num = u64::from_be_bytes([
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
bytes[7],
]);
results.push((RowNumber(row_num), false));
continue;
}
}
let new_row_number = self.counter.next(txn)?;
let row_num_bytes = new_row_number.0.to_be_bytes().to_vec();
internal_state_set(self.node, txn, &map_key, EncodedRow(CowVec::new(row_num_bytes)))?;
let reverse_key = self.make_reverse_map_key(new_row_number);
internal_state_set(
self.node,
txn,
&reverse_key,
EncodedRow(CowVec::new(key.as_ref().to_vec())),
)?;
results.push((new_row_number, true));
}
Ok(results)
}
pub fn get_or_create_row_number(
&self,
txn: &mut FlowTransaction,
key: &EncodedKey,
) -> Result<(RowNumber, bool)> {
Ok(self.get_or_create_row_numbers(txn, once(key))?.into_iter().next().unwrap())
}
pub fn get_key_for_row_number(
&self,
txn: &mut FlowTransaction,
row_number: RowNumber,
) -> Result<Option<EncodedKey>> {
let reverse_key = self.make_reverse_map_key(row_number);
if let Some(key_bytes) = internal_state_get(self.node, txn, &reverse_key)? {
Ok(Some(EncodedKey::new(key_bytes.to_vec())))
} else {
Ok(None)
}
}
fn make_map_key(&self, key: &EncodedKey) -> EncodedKey {
let mut serializer = KeySerializer::new();
serializer.extend_u8(b'M');
serializer.extend_bytes(key.as_ref());
serializer.finish()
}
fn make_reverse_map_key(&self, row_number: RowNumber) -> EncodedKey {
let mut serializer = KeySerializer::new();
serializer.extend_u8(b'R');
serializer.extend_u64(row_number.0);
serializer.finish()
}
pub fn remove_by_prefix(&self, txn: &mut FlowTransaction, key_prefix: &[u8]) -> Result<()> {
let mut prefix = Vec::new();
let mut serializer = KeySerializer::new();
serializer.extend_u8(b'M');
prefix.extend_from_slice(&serializer.finish());
prefix.extend_from_slice(key_prefix);
let state_prefix = FlowNodeInternalStateKey::new(self.node, prefix.clone());
let full_range = EncodedKeyRange::prefix(&state_prefix.encode());
let keys_to_remove = {
let stream = txn.range(full_range, 1024);
let mut keys = Vec::new();
for result in stream {
let multi = result?;
keys.push(multi.key);
}
keys
};
for key in keys_to_remove {
txn.remove(&key)?;
}
Ok(())
}
}
#[cfg(test)]
pub mod tests {
use reifydb_catalog::catalog::Catalog;
use reifydb_core::common::CommitVersion;
use reifydb_runtime::context::clock::{Clock, MockClock};
use reifydb_transaction::interceptor::interceptors::Interceptors;
use super::*;
use crate::operator::stateful::test_utils::test::*;
#[test]
fn test_first_row_number() {
let mut txn = create_test_transaction();
let mut txn = FlowTransaction::deferred(
&mut txn,
CommitVersion(1),
Catalog::testing(),
Interceptors::new(),
Clock::Mock(MockClock::from_millis(1000)),
);
let provider = RowNumberProvider::new(FlowNodeId(1));
let key = test_key("first");
let (row_num, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
assert_eq!(row_num.0, 1);
assert!(is_new);
}
#[test]
fn test_duplicate_key_same_row_number() {
let mut txn = create_test_transaction();
let mut txn = FlowTransaction::deferred(
&mut txn,
CommitVersion(1),
Catalog::testing(),
Interceptors::new(),
Clock::Mock(MockClock::from_millis(1000)),
);
let provider = RowNumberProvider::new(FlowNodeId(1));
let key = test_key("duplicate");
let (row_num1, is_new1) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
assert_eq!(row_num1.0, 1);
assert!(is_new1);
let (row_num2, is_new2) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
assert_eq!(row_num2.0, 1);
assert!(!is_new2);
assert_eq!(row_num1, row_num2);
}
#[test]
fn test_sequential_row_numbers() {
let mut txn = create_test_transaction();
let mut txn = FlowTransaction::deferred(
&mut txn,
CommitVersion(1),
Catalog::testing(),
Interceptors::new(),
Clock::Mock(MockClock::from_millis(1000)),
);
let provider = RowNumberProvider::new(FlowNodeId(1));
for i in 1..=5 {
let key = test_key(&format!("key_{}", i));
let (row_num, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
assert_eq!(row_num.0, i as u64);
assert!(is_new);
}
}
#[test]
fn test_mixed_new_and_existing() {
let mut txn = create_test_transaction();
let mut txn = FlowTransaction::deferred(
&mut txn,
CommitVersion(1),
Catalog::testing(),
Interceptors::new(),
Clock::Mock(MockClock::from_millis(1000)),
);
let provider = RowNumberProvider::new(FlowNodeId(1));
let key1 = test_key("mixed_1");
let key2 = test_key("mixed_2");
let key3 = test_key("mixed_3");
let (rn1, new1) = provider.get_or_create_row_number(&mut txn, &key1).unwrap();
let (rn2, new2) = provider.get_or_create_row_number(&mut txn, &key2).unwrap();
let (rn3, new3) = provider.get_or_create_row_number(&mut txn, &key3).unwrap();
assert_eq!(rn1.0, 1);
assert!(new1);
assert_eq!(rn2.0, 2);
assert!(new2);
assert_eq!(rn3.0, 3);
assert!(new3);
let key4 = test_key("mixed_4");
let (rn2_again, new2_again) = provider.get_or_create_row_number(&mut txn, &key2).unwrap();
let (rn4, new4) = provider.get_or_create_row_number(&mut txn, &key4).unwrap();
let (rn1_again, new1_again) = provider.get_or_create_row_number(&mut txn, &key1).unwrap();
assert_eq!(rn2_again.0, 2);
assert!(!new2_again);
assert_eq!(rn4.0, 4); assert!(new4);
assert_eq!(rn1_again.0, 1);
assert!(!new1_again);
}
#[test]
fn test_multiple_providers_isolated() {
let mut txn = create_test_transaction();
let mut txn = FlowTransaction::deferred(
&mut txn,
CommitVersion(1),
Catalog::testing(),
Interceptors::new(),
Clock::Mock(MockClock::from_millis(1000)),
);
let provider1 = RowNumberProvider::new(FlowNodeId(1));
let provider2 = RowNumberProvider::new(FlowNodeId(2));
let key = test_key("shared_key");
let (rn1, _) = provider1.get_or_create_row_number(&mut txn, &key).unwrap();
let (rn2, _) = provider2.get_or_create_row_number(&mut txn, &key).unwrap();
assert_eq!(rn1.0, 1);
assert_eq!(rn2.0, 1);
let key2 = test_key("key2");
let (rn1_2, _) = provider1.get_or_create_row_number(&mut txn, &key2).unwrap();
assert_eq!(rn1_2.0, 2);
let (rn2_2, _) = provider2.get_or_create_row_number(&mut txn, &key2).unwrap();
assert_eq!(rn2_2.0, 2);
}
#[test]
fn test_counter_persistence() {
let mut txn = create_test_transaction();
let mut txn = FlowTransaction::deferred(
&mut txn,
CommitVersion(1),
Catalog::testing(),
Interceptors::new(),
Clock::Mock(MockClock::from_millis(1000)),
);
let provider = RowNumberProvider::new(FlowNodeId(1));
for i in 1..=3 {
let key = test_key(&format!("persist_{}", i));
let (rn, _) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
assert_eq!(rn.0, i as u64);
}
let new_key = test_key("persist_new");
let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &new_key).unwrap();
assert_eq!(rn.0, 4);
assert!(is_new);
}
#[test]
fn test_large_row_numbers() {
let mut txn = create_test_transaction();
let mut txn = FlowTransaction::deferred(
&mut txn,
CommitVersion(1),
Catalog::testing(),
Interceptors::new(),
Clock::Mock(MockClock::from_millis(1000)),
);
let provider = RowNumberProvider::new(FlowNodeId(1));
for i in 1..=1000 {
let key = test_key(&format!("large_{}", i));
let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
assert_eq!(rn.0, i as u64);
assert!(is_new);
}
let key = test_key("large_1");
let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
assert_eq!(rn.0, 1);
assert!(!is_new);
let key = test_key("large_1001");
let (rn, is_new) = provider.get_or_create_row_number(&mut txn, &key).unwrap();
assert_eq!(rn.0, 1001);
assert!(is_new);
}
#[test]
fn test_mixed_existing_and_new_keys() {
let mut txn = create_test_transaction();
let mut txn = FlowTransaction::deferred(
&mut txn,
CommitVersion(1),
Catalog::testing(),
Interceptors::new(),
Clock::Mock(MockClock::from_millis(1000)),
);
let provider = RowNumberProvider::new(FlowNodeId(1));
let key1 = test_key("key_1");
let key2 = test_key("key_2");
let key3 = test_key("key_3");
let (rn1, _) = provider.get_or_create_row_number(&mut txn, &key1).unwrap();
assert_eq!(rn1.0, 1);
let (rn2, _) = provider.get_or_create_row_number(&mut txn, &key2).unwrap();
assert_eq!(rn2.0, 2);
let (rn3, _) = provider.get_or_create_row_number(&mut txn, &key3).unwrap();
assert_eq!(rn3.0, 3);
let key4 = test_key("key_4");
let key5 = test_key("key_5");
let keys = vec![&key2, &key4, &key1, &key5, &key3];
let results = provider.get_or_create_row_numbers(&mut txn, keys.into_iter()).unwrap();
assert_eq!(results.len(), 5);
assert_eq!(results[0].0.0, 2);
assert!(!results[0].1);
assert_eq!(results[1].0.0, 4);
assert!(results[1].1);
assert_eq!(results[2].0.0, 1);
assert!(!results[2].1);
assert_eq!(results[3].0.0, 5);
assert!(results[3].1);
assert_eq!(results[4].0.0, 3);
assert!(!results[4].1);
let key6 = test_key("key_6");
let (rn6, is_new6) = provider.get_or_create_row_number(&mut txn, &key6).unwrap();
assert_eq!(rn6.0, 6);
assert!(is_new6);
let (check_rn4, is_new4) = provider.get_or_create_row_number(&mut txn, &key4).unwrap();
assert_eq!(check_rn4.0, 4);
assert!(!is_new4);
let (check_rn5, is_new5) = provider.get_or_create_row_number(&mut txn, &key5).unwrap();
assert_eq!(check_rn5.0, 5);
assert!(!is_new5);
let reverse_key4 = provider.get_key_for_row_number(&mut txn, RowNumber(4)).unwrap();
assert_eq!(reverse_key4, Some(key4));
let reverse_key5 = provider.get_key_for_row_number(&mut txn, RowNumber(5)).unwrap();
assert_eq!(reverse_key5, Some(key5));
let reverse_key1 = provider.get_key_for_row_number(&mut txn, RowNumber(1)).unwrap();
assert_eq!(reverse_key1, Some(key1));
let reverse_key2 = provider.get_key_for_row_number(&mut txn, RowNumber(2)).unwrap();
assert_eq!(reverse_key2, Some(key2));
}
}