Skip to main content

reifydb_transaction/single/
write.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::mem::{take, transmute};
5
6use indexmap::IndexMap;
7use reifydb_core::interface::store::{SingleVersionCommit, SingleVersionContains, SingleVersionGet, SingleVersionRow};
8use reifydb_runtime::sync::rwlock::{RwLock, RwLockWriteGuard};
9use reifydb_type::{
10	Result,
11	util::{cowvec::CowVec, hex},
12};
13
14use super::*;
15use crate::error::TransactionError;
16
17/// Holds both the Arc and the guard to keep the lock alive.
18/// IMPORTANT: _guard must be declared before _arc so it is dropped first -
19/// the guard borrows from the RwLock inside the Arc.
20pub struct KeyWriteLock {
21	pub(super) _guard: RwLockWriteGuard<'static, ()>,
22	pub(super) _arc: Arc<RwLock<()>>,
23}
24
25impl KeyWriteLock {
26	/// Creates a new KeyWriteLock by taking a write guard and storing it with the Arc.
27	///
28	/// # Safety
29	/// This function uses unsafe code to extend the lifetime of the guard to 'static.
30	/// This is safe because:
31	/// 1. The guard borrows from the RwLock inside the Arc
32	/// 2. We store the Arc in this struct, keeping the RwLock alive
33	/// 3. The guard will be dropped before or with the Arc (due to field order)
34	/// 4. As long as this struct exists, the Arc exists, so the RwLock exists
35	pub(super) fn new(arc: Arc<RwLock<()>>) -> Self {
36		// Take the guard while we still have a reference to arc
37		let guard = arc.write();
38
39		// SAFETY: We're extending the guard's lifetime to 'static.
40		// This is sound because we're also storing the Arc, which keeps
41		// the underlying RwLock alive for as long as this struct exists.
42		let guard = unsafe { transmute::<RwLockWriteGuard<'_, ()>, RwLockWriteGuard<'static, ()>>(guard) };
43
44		Self {
45			_arc: arc,
46			_guard: guard,
47		}
48	}
49}
50
51pub struct SingleWriteTransaction<'a> {
52	pub(super) inner: &'a SingleTransactionInner,
53	pub(super) keys: Vec<EncodedKey>,
54	pub(super) _key_locks: Vec<KeyWriteLock>,
55	pub(super) pending: IndexMap<EncodedKey, Delta>,
56	pub(super) completed: bool,
57}
58
59impl<'a> SingleWriteTransaction<'a> {
60	pub(super) fn new(
61		inner: &'a SingleTransactionInner,
62		keys: Vec<EncodedKey>,
63		key_locks: Vec<KeyWriteLock>,
64	) -> Self {
65		Self {
66			inner,
67			keys,
68			_key_locks: key_locks,
69			pending: IndexMap::new(),
70			completed: false,
71		}
72	}
73
74	#[inline]
75	fn check_key_allowed(&self, key: &EncodedKey) -> Result<()> {
76		if self.keys.iter().any(|k| k == key) {
77			Ok(())
78		} else {
79			Err(TransactionError::KeyOutOfScope {
80				key: hex::encode(key),
81			}
82			.into())
83		}
84	}
85
86	pub fn get(&mut self, key: &EncodedKey) -> Result<Option<SingleVersionRow>> {
87		self.check_key_allowed(key)?;
88
89		if let Some(delta) = self.pending.get(key) {
90			return match delta {
91				Delta::Set {
92					row,
93					..
94				} => Ok(Some(SingleVersionRow {
95					key: key.clone(),
96					row: row.clone(),
97				})),
98				Delta::Unset {
99					..
100				}
101				| Delta::Remove {
102					..
103				}
104				| Delta::Drop {
105					key: _,
106				} => Ok(None),
107			};
108		}
109
110		// Clone the store to avoid holding the lock
111		// TransactionStore is Arc-based, so clone is cheap
112		let store = self.inner.store.read().clone();
113		SingleVersionGet::get(&store, key)
114	}
115
116	pub fn contains_key(&mut self, key: &EncodedKey) -> Result<bool> {
117		self.check_key_allowed(key)?;
118
119		if let Some(delta) = self.pending.get(key) {
120			return match delta {
121				Delta::Set {
122					..
123				} => Ok(true),
124				Delta::Unset {
125					..
126				}
127				| Delta::Remove {
128					..
129				}
130				| Delta::Drop {
131					key: _,
132				} => Ok(false),
133			};
134		}
135
136		// Clone the store to avoid holding the lock
137		let store = self.inner.store.read().clone();
138		SingleVersionContains::contains(&store, key)
139	}
140
141	pub fn set(&mut self, key: &EncodedKey, row: EncodedRow) -> Result<()> {
142		self.check_key_allowed(key)?;
143
144		let delta = Delta::Set {
145			key: key.clone(),
146			row,
147		};
148		self.pending.insert(key.clone(), delta);
149		Ok(())
150	}
151
152	pub fn unset(&mut self, key: &EncodedKey, row: EncodedRow) -> Result<()> {
153		self.check_key_allowed(key)?;
154
155		self.pending.insert(
156			key.clone(),
157			Delta::Unset {
158				key: key.clone(),
159				row,
160			},
161		);
162		Ok(())
163	}
164
165	pub fn remove(&mut self, key: &EncodedKey) -> Result<()> {
166		self.check_key_allowed(key)?;
167
168		self.pending.insert(
169			key.clone(),
170			Delta::Remove {
171				key: key.clone(),
172			},
173		);
174		Ok(())
175	}
176
177	pub fn commit(&mut self) -> Result<()> {
178		let deltas: Vec<Delta> = take(&mut self.pending).into_iter().map(|(_, delta)| delta).collect();
179
180		if !deltas.is_empty() {
181			let mut store = self.inner.store.write();
182			SingleVersionCommit::commit(&mut *store, CowVec::new(deltas))?;
183		}
184
185		self.completed = true;
186		Ok(())
187	}
188
189	pub fn rollback(&mut self) -> Result<()> {
190		self.pending.clear();
191		self.completed = true;
192		Ok(())
193	}
194}