reifydb-transaction 0.4.10

Transaction management and concurrency control for ReifyDB
Documentation
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2025 ReifyDB

use std::mem::{take, transmute};

use indexmap::IndexMap;
use reifydb_core::interface::store::{SingleVersionCommit, SingleVersionContains, SingleVersionGet, SingleVersionRow};
use reifydb_runtime::sync::rwlock::{RwLock, RwLockWriteGuard};
use reifydb_type::{
	Result,
	util::{cowvec::CowVec, hex},
};

use super::*;
use crate::error::TransactionError;

/// Holds both the Arc and the guard to keep the lock alive.
/// IMPORTANT: _guard must be declared before _arc so it is dropped first —
/// the guard borrows from the RwLock inside the Arc.
pub struct KeyWriteLock {
	pub(super) _guard: RwLockWriteGuard<'static, ()>,
	pub(super) _arc: Arc<RwLock<()>>,
}

impl KeyWriteLock {
	/// Creates a new KeyWriteLock by taking a write guard and storing it with the Arc.
	///
	/// # Safety
	/// This function uses unsafe code to extend the lifetime of the guard to 'static.
	/// This is safe because:
	/// 1. The guard borrows from the RwLock inside the Arc
	/// 2. We store the Arc in this struct, keeping the RwLock alive
	/// 3. The guard will be dropped before or with the Arc (due to field order)
	/// 4. As long as this struct exists, the Arc exists, so the RwLock exists
	pub(super) fn new(arc: Arc<RwLock<()>>) -> Self {
		// Take the guard while we still have a reference to arc
		let guard = arc.write();

		// SAFETY: We're extending the guard's lifetime to 'static.
		// This is sound because we're also storing the Arc, which keeps
		// the underlying RwLock alive for as long as this struct exists.
		let guard = unsafe { transmute::<RwLockWriteGuard<'_, ()>, RwLockWriteGuard<'static, ()>>(guard) };

		Self {
			_arc: arc,
			_guard: guard,
		}
	}
}

pub struct SingleWriteTransaction<'a> {
	pub(super) inner: &'a SingleTransactionInner,
	pub(super) keys: Vec<EncodedKey>,
	pub(super) _key_locks: Vec<KeyWriteLock>,
	pub(super) pending: IndexMap<EncodedKey, Delta>,
	pub(super) completed: bool,
}

impl<'a> SingleWriteTransaction<'a> {
	pub(super) fn new(
		inner: &'a SingleTransactionInner,
		keys: Vec<EncodedKey>,
		key_locks: Vec<KeyWriteLock>,
	) -> Self {
		Self {
			inner,
			keys,
			_key_locks: key_locks,
			pending: IndexMap::new(),
			completed: false,
		}
	}

	#[inline]
	fn check_key_allowed(&self, key: &EncodedKey) -> Result<()> {
		if self.keys.iter().any(|k| k == key) {
			Ok(())
		} else {
			Err(TransactionError::KeyOutOfScope {
				key: hex::encode(key),
			}
			.into())
		}
	}

	pub fn get(&mut self, key: &EncodedKey) -> Result<Option<SingleVersionRow>> {
		self.check_key_allowed(key)?;

		if let Some(delta) = self.pending.get(key) {
			return match delta {
				Delta::Set {
					row,
					..
				} => Ok(Some(SingleVersionRow {
					key: key.clone(),
					row: row.clone(),
				})),
				Delta::Unset {
					..
				}
				| Delta::Remove {
					..
				}
				| Delta::Drop {
					..
				} => Ok(None),
			};
		}

		// Clone the store to avoid holding the lock
		// TransactionStore is Arc-based, so clone is cheap
		let store = self.inner.store.read().clone();
		SingleVersionGet::get(&store, key)
	}

	pub fn contains_key(&mut self, key: &EncodedKey) -> Result<bool> {
		self.check_key_allowed(key)?;

		if let Some(delta) = self.pending.get(key) {
			return match delta {
				Delta::Set {
					..
				} => Ok(true),
				Delta::Unset {
					..
				}
				| Delta::Remove {
					..
				}
				| Delta::Drop {
					..
				} => Ok(false),
			};
		}

		// Clone the store to avoid holding the lock
		let store = self.inner.store.read().clone();
		SingleVersionContains::contains(&store, key)
	}

	pub fn set(&mut self, key: &EncodedKey, row: EncodedRow) -> Result<()> {
		self.check_key_allowed(key)?;

		let delta = Delta::Set {
			key: key.clone(),
			row,
		};
		self.pending.insert(key.clone(), delta);
		Ok(())
	}

	pub fn unset(&mut self, key: &EncodedKey, row: EncodedRow) -> Result<()> {
		self.check_key_allowed(key)?;

		self.pending.insert(
			key.clone(),
			Delta::Unset {
				key: key.clone(),
				row,
			},
		);
		Ok(())
	}

	pub fn remove(&mut self, key: &EncodedKey) -> Result<()> {
		self.check_key_allowed(key)?;

		self.pending.insert(
			key.clone(),
			Delta::Remove {
				key: key.clone(),
			},
		);
		Ok(())
	}

	pub fn commit(&mut self) -> Result<()> {
		let deltas: Vec<Delta> = take(&mut self.pending).into_iter().map(|(_, delta)| delta).collect();

		if !deltas.is_empty() {
			let mut store = self.inner.store.write();
			SingleVersionCommit::commit(&mut *store, CowVec::new(deltas))?;
		}

		self.completed = true;
		Ok(())
	}

	pub fn rollback(&mut self) -> Result<()> {
		self.pending.clear();
		self.completed = true;
		Ok(())
	}
}