Skip to main content

reifydb_transaction/single/
write.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::mem::{take, transmute};
5
6use indexmap::IndexMap;
7use reifydb_core::interface::store::{
8	SingleVersionCommit, SingleVersionContains, SingleVersionGet, SingleVersionValues,
9};
10use reifydb_runtime::sync::rwlock::{RwLock, RwLockWriteGuard};
11use reifydb_type::{
12	Result,
13	util::{cowvec::CowVec, hex},
14};
15
16use super::*;
17use crate::error::TransactionError;
18
19/// Holds both the Arc and the guard to keep the lock alive
20#[allow(dead_code)]
21pub struct KeyWriteLock {
22	pub(super) _arc: Arc<RwLock<()>>,
23	pub(super) _guard: RwLockWriteGuard<'static, ()>,
24}
25
26impl KeyWriteLock {
27	/// Creates a new KeyWriteLock by taking a write guard and storing it with the Arc.
28	///
29	/// # Safety
30	/// This function uses unsafe code to extend the lifetime of the guard to 'static.
31	/// This is safe because:
32	/// 1. The guard borrows from the RwLock inside the Arc
33	/// 2. We store the Arc in this struct, keeping the RwLock alive
34	/// 3. The guard will be dropped before or with the Arc (due to field order)
35	/// 4. As long as this struct exists, the Arc exists, so the RwLock exists
36	pub(super) fn new(arc: Arc<RwLock<()>>) -> Self {
37		// Take the guard while we still have a reference to arc
38		let guard = arc.write();
39
40		// SAFETY: We're extending the guard's lifetime to 'static.
41		// This is sound because we're also storing the Arc, which keeps
42		// the underlying RwLock alive for as long as this struct exists.
43		let guard = unsafe { transmute::<RwLockWriteGuard<'_, ()>, RwLockWriteGuard<'static, ()>>(guard) };
44
45		Self {
46			_arc: arc,
47			_guard: guard,
48		}
49	}
50}
51
52pub struct SingleWriteTransaction<'a> {
53	pub(super) inner: &'a SingleTransactionInner,
54	pub(super) keys: Vec<EncodedKey>,
55	pub(super) _key_locks: Vec<KeyWriteLock>,
56	pub(super) pending: IndexMap<EncodedKey, Delta>,
57	pub(super) completed: bool,
58}
59
60impl<'a> SingleWriteTransaction<'a> {
61	pub(super) fn new(
62		inner: &'a SingleTransactionInner,
63		keys: Vec<EncodedKey>,
64		key_locks: Vec<KeyWriteLock>,
65	) -> Self {
66		Self {
67			inner,
68			keys,
69			_key_locks: key_locks,
70			pending: IndexMap::new(),
71			completed: false,
72		}
73	}
74
75	#[inline]
76	fn check_key_allowed(&self, key: &EncodedKey) -> Result<()> {
77		if self.keys.iter().any(|k| k == key) {
78			Ok(())
79		} else {
80			Err(TransactionError::KeyOutOfScope {
81				key: hex::encode(&key),
82			}
83			.into())
84		}
85	}
86
87	pub fn get(&mut self, key: &EncodedKey) -> Result<Option<SingleVersionValues>> {
88		self.check_key_allowed(key)?;
89
90		if let Some(delta) = self.pending.get(key) {
91			return match delta {
92				Delta::Set {
93					values,
94					..
95				} => Ok(Some(SingleVersionValues {
96					key: key.clone(),
97					values: values.clone(),
98				})),
99				Delta::Unset {
100					..
101				}
102				| Delta::Remove {
103					..
104				}
105				| Delta::Drop {
106					..
107				} => Ok(None),
108			};
109		}
110
111		// Clone the store to avoid holding the lock
112		// TransactionStore is Arc-based, so clone is cheap
113		let store = self.inner.store.read().clone();
114		SingleVersionGet::get(&store, key)
115	}
116
117	pub fn contains_key(&mut self, key: &EncodedKey) -> Result<bool> {
118		self.check_key_allowed(key)?;
119
120		if let Some(delta) = self.pending.get(key) {
121			return match delta {
122				Delta::Set {
123					..
124				} => Ok(true),
125				Delta::Unset {
126					..
127				}
128				| Delta::Remove {
129					..
130				}
131				| Delta::Drop {
132					..
133				} => Ok(false),
134			};
135		}
136
137		// Clone the store to avoid holding the lock
138		let store = self.inner.store.read().clone();
139		SingleVersionContains::contains(&store, key)
140	}
141
142	pub fn set(&mut self, key: &EncodedKey, values: EncodedValues) -> Result<()> {
143		self.check_key_allowed(key)?;
144
145		let delta = Delta::Set {
146			key: key.clone(),
147			values,
148		};
149		self.pending.insert(key.clone(), delta);
150		Ok(())
151	}
152
153	pub fn unset(&mut self, key: &EncodedKey, values: EncodedValues) -> Result<()> {
154		self.check_key_allowed(key)?;
155
156		self.pending.insert(
157			key.clone(),
158			Delta::Unset {
159				key: key.clone(),
160				values,
161			},
162		);
163		Ok(())
164	}
165
166	pub fn remove(&mut self, key: &EncodedKey) -> Result<()> {
167		self.check_key_allowed(key)?;
168
169		self.pending.insert(
170			key.clone(),
171			Delta::Remove {
172				key: key.clone(),
173			},
174		);
175		Ok(())
176	}
177
178	pub fn commit(&mut self) -> Result<()> {
179		let deltas: Vec<Delta> = take(&mut self.pending).into_iter().map(|(_, delta)| delta).collect();
180
181		if !deltas.is_empty() {
182			let mut store = self.inner.store.write();
183			SingleVersionCommit::commit(&mut *store, CowVec::new(deltas))?;
184		}
185
186		self.completed = true;
187		Ok(())
188	}
189
190	pub fn rollback(&mut self) -> Result<()> {
191		self.pending.clear();
192		self.completed = true;
193		Ok(())
194	}
195}