Skip to main content

reifydb_transaction/single/
write.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::mem::{take, transmute};
5
6use indexmap::IndexMap;
7use reifydb_core::interface::store::{SingleVersionCommit, SingleVersionContains, SingleVersionGet, SingleVersionRow};
8use reifydb_runtime::sync::rwlock::{RwLock, RwLockWriteGuard};
9use reifydb_type::{
10	Result,
11	util::{cowvec::CowVec, hex},
12};
13
14use super::*;
15use crate::error::TransactionError;
16
17pub struct KeyWriteLock {
18	pub(super) _guard: RwLockWriteGuard<'static, ()>,
19	pub(super) _arc: Arc<RwLock<()>>,
20}
21
22impl KeyWriteLock {
23	pub(super) fn new(arc: Arc<RwLock<()>>) -> Self {
24		let guard = arc.write();
25
26		// SAFETY: We're extending the guard's lifetime to 'static.
27
28		let guard = unsafe { transmute::<RwLockWriteGuard<'_, ()>, RwLockWriteGuard<'static, ()>>(guard) };
29
30		Self {
31			_arc: arc,
32			_guard: guard,
33		}
34	}
35}
36
37pub struct SingleWriteTransaction<'a> {
38	pub(super) inner: &'a SingleTransactionInner,
39	pub(super) keys: Vec<EncodedKey>,
40	pub(super) _key_locks: Vec<KeyWriteLock>,
41	pub(super) pending: IndexMap<EncodedKey, Delta>,
42	pub(super) completed: bool,
43}
44
45impl<'a> SingleWriteTransaction<'a> {
46	pub(super) fn new(
47		inner: &'a SingleTransactionInner,
48		keys: Vec<EncodedKey>,
49		key_locks: Vec<KeyWriteLock>,
50	) -> Self {
51		Self {
52			inner,
53			keys,
54			_key_locks: key_locks,
55			pending: IndexMap::new(),
56			completed: false,
57		}
58	}
59
60	#[inline]
61	fn check_key_allowed(&self, key: &EncodedKey) -> Result<()> {
62		if self.keys.iter().any(|k| k == key) {
63			Ok(())
64		} else {
65			Err(TransactionError::KeyOutOfScope {
66				key: hex::encode(key),
67			}
68			.into())
69		}
70	}
71
72	pub fn get(&mut self, key: &EncodedKey) -> Result<Option<SingleVersionRow>> {
73		self.check_key_allowed(key)?;
74
75		if let Some(delta) = self.pending.get(key) {
76			return match delta {
77				Delta::Set {
78					row,
79					..
80				} => Ok(Some(SingleVersionRow {
81					key: key.clone(),
82					row: row.clone(),
83				})),
84				Delta::Unset {
85					..
86				}
87				| Delta::Remove {
88					..
89				}
90				| Delta::Drop {
91					key: _,
92				} => Ok(None),
93			};
94		}
95
96		let store = self.inner.store.read().clone();
97		SingleVersionGet::get(&store, key)
98	}
99
100	pub fn contains_key(&mut self, key: &EncodedKey) -> Result<bool> {
101		self.check_key_allowed(key)?;
102
103		if let Some(delta) = self.pending.get(key) {
104			return match delta {
105				Delta::Set {
106					..
107				} => Ok(true),
108				Delta::Unset {
109					..
110				}
111				| Delta::Remove {
112					..
113				}
114				| Delta::Drop {
115					key: _,
116				} => Ok(false),
117			};
118		}
119
120		let store = self.inner.store.read().clone();
121		SingleVersionContains::contains(&store, key)
122	}
123
124	pub fn set(&mut self, key: &EncodedKey, row: EncodedRow) -> Result<()> {
125		self.check_key_allowed(key)?;
126
127		let delta = Delta::Set {
128			key: key.clone(),
129			row,
130		};
131		self.pending.insert(key.clone(), delta);
132		Ok(())
133	}
134
135	pub fn unset(&mut self, key: &EncodedKey, row: EncodedRow) -> Result<()> {
136		self.check_key_allowed(key)?;
137
138		self.pending.insert(
139			key.clone(),
140			Delta::Unset {
141				key: key.clone(),
142				row,
143			},
144		);
145		Ok(())
146	}
147
148	pub fn remove(&mut self, key: &EncodedKey) -> Result<()> {
149		self.check_key_allowed(key)?;
150
151		self.pending.insert(
152			key.clone(),
153			Delta::Remove {
154				key: key.clone(),
155			},
156		);
157		Ok(())
158	}
159
160	pub fn commit(&mut self) -> Result<()> {
161		let deltas: Vec<Delta> = take(&mut self.pending).into_iter().map(|(_, delta)| delta).collect();
162
163		if !deltas.is_empty() {
164			let mut store = self.inner.store.write();
165			SingleVersionCommit::commit(&mut *store, CowVec::new(deltas))?;
166		}
167
168		self.completed = true;
169		Ok(())
170	}
171
172	pub fn rollback(&mut self) -> Result<()> {
173		self.pending.clear();
174		self.completed = true;
175		Ok(())
176	}
177}