use std::{
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
sync::Arc,
};
use cleanup::cleanup_old_windows;
use reifydb_core::{
common::CommitVersion,
encoded::key::EncodedKey,
interface::catalog::config::{ConfigKey, GetConfig},
util::bloom::BloomFilter,
};
use reifydb_runtime::{
actor::system::ActorSystem,
context::{clock::Clock, rng::Rng},
sync::rwlock::RwLock,
};
use reifydb_type::Result;
use tracing::{Span, field, instrument};
use crate::multi::{conflict::ConflictManager, transaction::version::VersionProvider, watermark::watermark::WaterMark};
pub mod cleanup;
pub(crate) struct CommittedWindow {
transactions: Vec<CommittedTxn>,
modified_keys: HashSet<EncodedKey>,
bloom: BloomFilter,
max_version: CommitVersion,
lock: RwLock<()>,
}
impl CommittedWindow {
fn new(min_version: CommitVersion) -> Self {
Self {
transactions: Vec::with_capacity(200),
modified_keys: HashSet::with_capacity(500),
bloom: BloomFilter::new(500),
max_version: min_version,
lock: RwLock::new(()),
}
}
fn add_transaction(&mut self, txn: CommittedTxn) {
self.max_version = self.max_version.max(txn.version);
if let Some(ref conflicts) = txn.conflict_manager {
for key in conflicts.get_write_keys() {
self.modified_keys.insert(key.clone());
self.bloom.add(&key);
}
}
self.transactions.push(txn);
}
fn might_have_key(&self, key: &EncodedKey) -> bool {
if !self.bloom.might_contain(key) {
return false;
}
self.modified_keys.contains(key)
}
pub(crate) fn get_modified_keys(&self) -> &HashSet<EncodedKey> {
&self.modified_keys
}
pub(super) fn max_version(&self) -> CommitVersion {
self.max_version
}
}
pub(crate) struct OracleInner {
pub last_cleanup: CommitVersion,
pub time_windows: BTreeMap<CommitVersion, CommittedWindow>,
pub key_to_windows: HashMap<EncodedKey, BTreeSet<CommitVersion>>,
pub evicted_up_through: CommitVersion,
}
#[derive(Debug)]
pub(crate) struct CommittedTxn {
version: CommitVersion,
conflict_manager: Option<ConflictManager>,
}
pub(crate) enum CreateCommitResult {
Success(CommitVersion),
Conflict(ConflictManager),
TooOld,
}
pub(crate) struct Oracle<L>
where
L: VersionProvider,
{
pub(crate) clock: L,
pub(crate) inner: RwLock<OracleInner>,
pub(crate) query: WaterMark,
pub(crate) command: WaterMark,
shutdown_signal: Arc<RwLock<bool>>,
actor_system: ActorSystem,
metrics_clock: Clock,
rng: Rng,
config: Arc<dyn GetConfig>,
}
impl<L> Oracle<L>
where
L: VersionProvider,
{
pub fn new(
clock: L,
actor_system: ActorSystem,
metrics_clock: Clock,
rng: Rng,
config: Arc<dyn GetConfig>,
) -> Self {
let shutdown_signal = Arc::new(RwLock::new(false));
Self {
clock,
inner: RwLock::new(OracleInner {
last_cleanup: CommitVersion(0),
time_windows: BTreeMap::new(),
key_to_windows: HashMap::with_capacity(10000),
evicted_up_through: CommitVersion(0),
}),
query: WaterMark::new("txn-mark-query".into(), &actor_system),
command: WaterMark::new("txn-mark-cmd".into(), &actor_system),
shutdown_signal,
actor_system,
metrics_clock,
rng,
config,
}
}
pub fn config(&self) -> Arc<dyn GetConfig> {
self.config.clone()
}
pub fn actor_system(&self) -> ActorSystem {
self.actor_system.clone()
}
pub fn metrics_clock(&self) -> &Clock {
&self.metrics_clock
}
pub fn rng(&self) -> &Rng {
&self.rng
}
#[instrument(name = "transaction::oracle::new_commit", level = "debug", skip(self, done_read, conflicts), fields(
%version,
read_keys = field::Empty,
write_keys = field::Empty,
relevant_windows = field::Empty,
windows_checked = field::Empty,
txns_checked = field::Empty,
inner_read_lock_us = field::Empty,
find_windows_us = field::Empty,
conflict_check_us = field::Empty,
clock_next_us = field::Empty,
inner_write_lock_us = field::Empty,
add_txn_us = field::Empty,
cleanup_us = field::Empty,
has_conflict = field::Empty
))]
pub(crate) fn new_commit(
&self,
done_read: &mut bool,
version: CommitVersion,
conflicts: ConflictManager,
) -> Result<CreateCommitResult> {
let lock_start = self.metrics_clock.instant();
let inner = self.inner.read();
Span::current().record("inner_read_lock_us", lock_start.elapsed().as_micros() as u64);
if version < inner.evicted_up_through {
return Ok(CreateCommitResult::TooOld);
}
let read_keys = conflicts.get_read_keys();
let write_keys = conflicts.get_write_keys();
Span::current().record("read_keys", read_keys.len());
Span::current().record("write_keys", write_keys.len());
let has_keys = !read_keys.is_empty() || !write_keys.is_empty();
let find_start = self.metrics_clock.instant();
let relevant_windows: Vec<CommitVersion> = if !has_keys {
inner.time_windows.range(version..).take(5).map(|(&v, _)| v).collect()
} else {
let mut windows = HashSet::new();
for key in read_keys.iter().chain(write_keys.iter()) {
if let Some(window_versions) = inner.key_to_windows.get(key) {
windows.extend(window_versions.iter().copied());
}
}
if windows.is_empty() {
if conflicts.has_range_operations() {
inner.time_windows.keys().copied().collect()
} else {
Vec::new()
}
} else {
windows.into_iter().collect()
}
};
Span::current().record("find_windows_us", find_start.elapsed().as_micros() as u64);
Span::current().record("relevant_windows", relevant_windows.len());
let conflict_start = self.metrics_clock.instant();
let mut windows_checked = 0u64;
let mut txns_checked = 0u64;
for window_version in &relevant_windows {
if let Some(window) = inner.time_windows.get(window_version) {
windows_checked += 1;
if window.max_version <= version {
continue;
}
if !conflicts.has_range_operations() {
let needs_detailed_check = read_keys
.iter()
.chain(write_keys.iter())
.any(|key| window.might_have_key(key));
if !needs_detailed_check {
continue;
}
}
let _window_lock = window.lock.read();
for committed_txn in &window.transactions {
txns_checked += 1;
if committed_txn.version <= version {
continue;
}
if let Some(old_conflicts) = &committed_txn.conflict_manager
&& conflicts.has_conflict(old_conflicts)
{
Span::current().record(
"conflict_check_us",
conflict_start.elapsed().as_micros() as u64,
);
Span::current().record("windows_checked", windows_checked);
Span::current().record("txns_checked", txns_checked);
Span::current().record("has_conflict", true);
return Ok(CreateCommitResult::Conflict(conflicts));
}
}
}
}
Span::current().record("conflict_check_us", conflict_start.elapsed().as_micros() as u64);
Span::current().record("windows_checked", windows_checked);
Span::current().record("txns_checked", txns_checked);
drop(inner);
if !*done_read {
self.query.done(version);
*done_read = true;
}
let commit_version = {
let clock = self.clock.clone();
let clock_start = self.metrics_clock.instant();
let version = clock.next()?;
Span::current().record("clock_next_us", clock_start.elapsed().as_micros() as u64);
self.command.begin(version);
version
};
let needs_cleanup = {
let write_lock_start = self.metrics_clock.instant();
let mut inner = self.inner.write();
Span::current().record("inner_write_lock_us", write_lock_start.elapsed().as_micros() as u64);
let add_start = self.metrics_clock.instant();
let window_size = self.config.get_config_uint8(ConfigKey::OracleWindowSize);
inner.add_committed_transaction(commit_version, conflicts, window_size);
Span::current().record("add_txn_us", add_start.elapsed().as_micros() as u64);
let water_mark = self.config.get_config_uint8(ConfigKey::OracleWaterMark) as usize;
inner.time_windows.len() > water_mark
};
if needs_cleanup {
let cleanup_start = self.metrics_clock.instant();
let mut inner = self.inner.write();
let inner = &mut *inner;
cleanup_old_windows(
&mut inner.time_windows,
&mut inner.key_to_windows,
&mut inner.evicted_up_through,
);
Span::current().record("cleanup_us", cleanup_start.elapsed().as_micros() as u64);
}
Ok(CreateCommitResult::Success(commit_version))
}
pub(crate) fn bootstrapping_completed(&self) {
let mut inner = self.inner.write();
inner.time_windows.clear();
inner.key_to_windows.clear();
}
pub(crate) fn version(&self) -> Result<CommitVersion> {
self.clock.current()
}
pub fn stop(&mut self) {
{
let mut shutdown = self.shutdown_signal.write();
*shutdown = true;
}
{
let mut inner = self.inner.write();
inner.time_windows.clear();
inner.key_to_windows.clear();
}
self.actor_system.shutdown();
}
pub(crate) fn done_query(&self, version: CommitVersion) {
self.query.done(version);
}
pub(crate) fn done_commit(&self, version: CommitVersion) {
self.command.done(version);
}
pub(crate) fn advance_version_for_replica(&self, version: CommitVersion) {
self.clock.advance_to(version);
}
}
impl OracleInner {
fn add_committed_transaction(&mut self, version: CommitVersion, conflicts: ConflictManager, window_size: u64) {
let window_start = CommitVersion((version.0 / window_size) * window_size);
let window =
self.time_windows.entry(window_start).or_insert_with(|| CommittedWindow::new(window_start));
let write_keys = conflicts.get_write_keys();
for key in write_keys {
self.key_to_windows.entry(key.clone()).or_default().insert(window_start);
}
let txn = CommittedTxn {
version,
conflict_manager: Some(conflicts),
};
window.add_transaction(txn);
self.last_cleanup = self.last_cleanup.max(version);
}
}
impl<L> Drop for Oracle<L>
where
L: VersionProvider,
{
fn drop(&mut self) {
self.stop();
}
}
#[cfg(test)]
pub mod tests {
use std::{
sync::{
Arc, Barrier,
atomic::{AtomicU64, Ordering},
},
thread,
thread::sleep,
time::Duration,
};
use reifydb_core::encoded::key::EncodedKeyRange;
use reifydb_runtime::{context::clock::MockClock, pool::Pools};
use reifydb_type::value::Value;
use super::*;
use crate::multi::transaction::version::VersionProvider;
#[derive(Debug, Clone)]
struct MockVersionProvider {
current: Arc<AtomicU64>,
}
impl MockVersionProvider {
fn new(start: impl Into<CommitVersion>) -> Self {
Self {
current: Arc::new(AtomicU64::new(start.into().0)),
}
}
}
impl VersionProvider for MockVersionProvider {
fn next(&self) -> Result<CommitVersion> {
Ok(CommitVersion(self.current.fetch_add(1, Ordering::Relaxed) + 1))
}
fn current(&self) -> Result<CommitVersion> {
Ok(CommitVersion(self.current.load(Ordering::Relaxed)))
}
fn advance_to(&self, version: CommitVersion) {
self.current.fetch_max(version.0, Ordering::Relaxed);
}
}
fn create_test_key(s: &str) -> EncodedKey {
EncodedKey::new(s.as_bytes().to_vec())
}
fn create_test_oracle(start: impl Into<CommitVersion>) -> Oracle<MockVersionProvider> {
let clock = MockVersionProvider::new(start);
let actor_system = ActorSystem::new(Pools::default(), Clock::Real);
struct DummyConfig;
impl GetConfig for DummyConfig {
fn get_config(&self, key: ConfigKey) -> Value {
key.default_value()
}
fn get_config_at(&self, key: ConfigKey, _version: CommitVersion) -> Value {
key.default_value()
}
}
let config = Arc::new(DummyConfig);
Oracle::new(clock, actor_system, Clock::Mock(MockClock::from_millis(1000)), Rng::seeded(42), config)
}
#[test]
fn test_oracle_basic_creation() {
let oracle = create_test_oracle(0);
assert_eq!(oracle.version().unwrap(), 0);
}
#[test]
fn test_window_creation_and_indexing() {
let oracle = create_test_oracle(0);
let mut conflicts = ConflictManager::new();
let key1 = create_test_key("key1");
let key2 = create_test_key("key2");
conflicts.mark_write(&key1);
conflicts.mark_write(&key2);
let mut done_read = false;
let result = oracle.new_commit(&mut done_read, CommitVersion(1), conflicts).unwrap();
match result {
CreateCommitResult::Success(version) => {
assert!(version.0 >= 1);
let inner = oracle.inner.read();
assert!(inner.key_to_windows.contains_key(&key1));
assert!(inner.key_to_windows.contains_key(&key2));
assert!(inner.time_windows.len() > 0);
}
CreateCommitResult::Conflict(_) => panic!("Unexpected conflict for first transaction"),
CreateCommitResult::TooOld => panic!("Unexpected TooOld for first transaction"),
}
}
#[test]
fn test_conflict_detection_between_transactions() {
let oracle = create_test_oracle(1);
let shared_key = create_test_key("shared_key");
let mut conflicts1 = ConflictManager::new();
conflicts1.mark_read(&shared_key);
conflicts1.mark_write(&shared_key);
let mut done_read1 = false;
let result1 = oracle.new_commit(&mut done_read1, CommitVersion(1), conflicts1).unwrap();
let _commit_v1 = match result1 {
CreateCommitResult::Success(v) => v, _ => panic!("First transaction should succeed"),
};
let mut conflicts2 = ConflictManager::new();
conflicts2.mark_read(&shared_key);
conflicts2.mark_write(&shared_key);
let mut done_read2 = false;
let result2 = oracle.new_commit(&mut done_read2, CommitVersion(1), conflicts2).unwrap();
assert!(matches!(result2, CreateCommitResult::Conflict(_)));
}
#[test]
fn test_no_conflict_different_keys() {
let oracle = create_test_oracle(0);
let key1 = create_test_key("key1");
let key2 = create_test_key("key2");
let mut conflicts1 = ConflictManager::new();
conflicts1.mark_read(&key1);
conflicts1.mark_write(&key1);
let mut done_read1 = false;
let result1 = oracle.new_commit(&mut done_read1, CommitVersion(1), conflicts1).unwrap();
assert!(matches!(result1, CreateCommitResult::Success(_)));
let mut conflicts2 = ConflictManager::new();
conflicts2.mark_read(&key2);
conflicts2.mark_write(&key2);
let mut done_read2 = false;
let result2 = oracle.new_commit(&mut done_read2, CommitVersion(1), conflicts2).unwrap();
assert!(matches!(result2, CreateCommitResult::Success(_)));
}
#[test]
fn test_key_indexing_multiple_windows() {
let oracle = create_test_oracle(0);
let key1 = create_test_key("key1");
let key2 = create_test_key("key2");
for i in 0..3 {
let mut conflicts = ConflictManager::new();
if i % 2 == 0 {
conflicts.mark_write(&key1);
} else {
conflicts.mark_write(&key2);
}
let mut done_read = false;
let version_start = CommitVersion(i as u64 * 500 + 1);
let result = oracle.new_commit(&mut done_read, version_start, conflicts).unwrap();
assert!(matches!(result, CreateCommitResult::Success(_)));
}
let inner = oracle.inner.read();
let key1_windows = inner.key_to_windows.get(&key1).unwrap();
assert!(key1_windows.len() >= 1);
let key2_windows = inner.key_to_windows.get(&key2).unwrap();
assert!(key2_windows.len() >= 1);
}
#[test]
fn test_version_filtering_in_conflict_detection() {
let oracle = create_test_oracle(2);
let shared_key = create_test_key("shared_key");
let mut conflicts1 = ConflictManager::new();
conflicts1.mark_write(&shared_key);
let mut done_read1 = false;
let result1 = oracle.new_commit(&mut done_read1, CommitVersion(5), conflicts1).unwrap();
let commit_v1 = match result1 {
CreateCommitResult::Success(v) => v,
_ => panic!("First transaction should succeed"),
};
let mut conflicts2 = ConflictManager::new();
conflicts2.mark_read(&shared_key);
conflicts2.mark_write(&shared_key);
let mut done_read2 = false;
let result2 = oracle.new_commit(&mut done_read2, CommitVersion(3), conflicts2).unwrap();
assert!(matches!(result2, CreateCommitResult::Success(_)));
let mut conflicts3 = ConflictManager::new();
conflicts3.mark_read(&shared_key);
conflicts3.mark_write(&shared_key);
let mut done_read3 = false;
let read_version = CommitVersion(commit_v1.0 - 1); let result3 = oracle.new_commit(&mut done_read3, read_version, conflicts3).unwrap();
assert!(matches!(result3, CreateCommitResult::Conflict(_)));
}
#[test]
fn test_range_operations_fallback() {
let oracle = create_test_oracle(1);
let key1 = create_test_key("key1");
let mut conflicts1 = ConflictManager::new();
conflicts1.mark_write(&key1);
let mut done_read1 = false;
let result1 = oracle.new_commit(&mut done_read1, CommitVersion(1), conflicts1).unwrap();
assert!(matches!(result1, CreateCommitResult::Success(_)));
let mut conflicts2 = ConflictManager::new();
let range = EncodedKeyRange::parse("a..z");
conflicts2.mark_range(range);
conflicts2.mark_write(&create_test_key("other_key"));
let mut done_read2 = false;
let result2 = oracle.new_commit(&mut done_read2, CommitVersion(1), conflicts2).unwrap();
assert!(matches!(result2, CreateCommitResult::Conflict(_)));
}
#[test]
fn test_empty_conflict_manager() {
let oracle = create_test_oracle(0);
let conflicts = ConflictManager::new();
let mut done_read = false;
let result = oracle.new_commit(&mut done_read, CommitVersion(1), conflicts).unwrap();
match result {
CreateCommitResult::Success(_) => {
let inner = oracle.inner.read();
assert!(inner.key_to_windows.is_empty());
}
CreateCommitResult::Conflict(_) => {
panic!("Empty conflict manager should not cause conflicts")
}
CreateCommitResult::TooOld => panic!("Unexpected TooOld for empty conflict manager"),
}
}
#[test]
fn test_write_write_conflict() {
let oracle = create_test_oracle(1);
let shared_key = create_test_key("shared_key");
let mut conflicts1 = ConflictManager::new();
conflicts1.mark_write(&shared_key);
let mut done_read1 = false;
let result1 = oracle.new_commit(&mut done_read1, CommitVersion(1), conflicts1).unwrap();
assert!(matches!(result1, CreateCommitResult::Success(_)));
let mut conflicts2 = ConflictManager::new();
conflicts2.mark_write(&shared_key);
let mut done_read2 = false;
let result2 = oracle.new_commit(&mut done_read2, CommitVersion(1), conflicts2).unwrap();
assert!(matches!(result2, CreateCommitResult::Conflict(_)));
}
#[test]
fn test_read_write_conflict() {
let oracle = create_test_oracle(1);
let shared_key = create_test_key("shared_key");
let mut conflicts1 = ConflictManager::new();
conflicts1.mark_write(&shared_key);
let mut done_read1 = false;
let result1 = oracle.new_commit(&mut done_read1, CommitVersion(1), conflicts1).unwrap();
assert!(matches!(result1, CreateCommitResult::Success(_)));
let mut conflicts2 = ConflictManager::new();
conflicts2.mark_read(&shared_key);
let mut done_read2 = false;
let result2 = oracle.new_commit(&mut done_read2, CommitVersion(1), conflicts2).unwrap();
assert!(matches!(result2, CreateCommitResult::Conflict(_)));
}
#[test]
fn test_sequential_transactions_no_conflict() {
let oracle = create_test_oracle(0);
let shared_key = create_test_key("shared_key");
let mut conflicts1 = ConflictManager::new();
conflicts1.mark_read(&shared_key);
conflicts1.mark_write(&shared_key);
let mut done_read1 = false;
let result1 = oracle.new_commit(&mut done_read1, CommitVersion(1), conflicts1).unwrap();
let commit_v1 = match result1 {
CreateCommitResult::Success(v) => v,
_ => panic!("First transaction should succeed"),
};
let mut conflicts2 = ConflictManager::new();
conflicts2.mark_read(&shared_key);
conflicts2.mark_write(&shared_key);
let mut done_read2 = false;
let read_version = CommitVersion(commit_v1.0 + 1); let result2 = oracle.new_commit(&mut done_read2, read_version, conflicts2).unwrap();
assert!(matches!(result2, CreateCommitResult::Success(_)));
}
#[test]
fn test_comptokenize_multi_key_scenario() {
let oracle = create_test_oracle(1);
let key_a = create_test_key("key_a");
let key_b = create_test_key("key_b");
let key_c = create_test_key("key_c");
let mut conflicts1 = ConflictManager::new();
conflicts1.mark_read(&key_a);
conflicts1.mark_write(&key_b);
let mut done_read1 = false;
let result1 = oracle.new_commit(&mut done_read1, CommitVersion(1), conflicts1).unwrap();
assert!(matches!(result1, CreateCommitResult::Success(_)));
let mut conflicts2 = ConflictManager::new();
conflicts2.mark_read(&key_b);
conflicts2.mark_write(&key_c);
let mut done_read2 = false;
let result2 = oracle.new_commit(&mut done_read2, CommitVersion(1), conflicts2).unwrap();
assert!(matches!(result2, CreateCommitResult::Conflict(_)));
let mut conflicts3 = ConflictManager::new();
conflicts3.mark_read(&key_c);
conflicts3.mark_write(&key_a);
let mut done_read3 = false;
let result3 = oracle.new_commit(&mut done_read3, CommitVersion(1), conflicts3).unwrap();
assert!(matches!(result3, CreateCommitResult::Success(_)));
}
#[test]
fn test_concurrent_commits_dont_skip_watermark_versions() {
const NUM_CONCURRENT: usize = 100;
const ITERATIONS: usize = 10;
for iteration in 0..ITERATIONS {
let oracle = Arc::new(create_test_oracle(0));
let mut handles = vec![];
for i in 0..NUM_CONCURRENT {
let oracle_clone = oracle.clone();
let key = create_test_key(&format!("key_{}_{}", iteration, i));
let handle = thread::spawn(move || {
let mut conflicts = ConflictManager::new();
conflicts.mark_write(&key);
let mut done_read = false;
let result = oracle_clone
.new_commit(&mut done_read, CommitVersion(1), conflicts)
.unwrap();
match result {
CreateCommitResult::Success(version) => {
if i % 3 == 0 {
sleep(Duration::from_micros(100));
}
oracle_clone.done_commit(version);
Some(version)
}
CreateCommitResult::Conflict(_) => None,
CreateCommitResult::TooOld => None,
}
});
handles.push(handle);
}
let mut max_version = CommitVersion(0);
let mut success_count = 0;
for handle in handles {
if let Some(v) = handle.join().unwrap() {
max_version = max_version.max(v);
success_count += 1;
}
}
assert_eq!(
success_count, NUM_CONCURRENT,
"Expected {} successful commits, got {}",
NUM_CONCURRENT, success_count
);
sleep(Duration::from_millis(100));
let done_until = oracle.command.done_until();
assert_eq!(
done_until, max_version,
"Watermark race condition detected! done_until={} but max_version={}. \
Some version was skipped.",
done_until.0, max_version.0
);
}
}
#[test]
fn test_version_begin_ordering() {
let oracle = Arc::new(create_test_oracle(0));
let barrier = Arc::new(Barrier::new(10));
let mut handles = vec![];
for i in 0..10 {
let oracle_clone = oracle.clone();
let barrier_clone = barrier.clone();
let key = create_test_key(&format!("order_key_{}", i));
let handle = thread::spawn(move || {
barrier_clone.wait();
let mut conflicts = ConflictManager::new();
conflicts.mark_write(&key);
let mut done_read = false;
let result =
oracle_clone.new_commit(&mut done_read, CommitVersion(1), conflicts).unwrap();
if let CreateCommitResult::Success(version) = result {
oracle_clone.done_commit(version);
version
} else {
CommitVersion(0)
}
});
handles.push(handle);
}
let mut versions: Vec<u64> = vec![];
for handle in handles {
let v = handle.join().unwrap();
if v.0 > 0 {
versions.push(v.0);
}
}
sleep(Duration::from_millis(50));
versions.sort();
for i in 1..versions.len() {
assert_eq!(
versions[i],
versions[i - 1] + 1,
"Version gap detected: {} -> {}. Versions should be contiguous.",
versions[i - 1],
versions[i]
);
}
let done_until = oracle.command.done_until();
assert_eq!(
done_until.0,
*versions.last().unwrap_or(&0),
"Watermark should be at highest committed version"
);
}
}