reifydb_transaction/single/
write.rs1use std::mem::{take, transmute};
5
6use indexmap::IndexMap;
7use reifydb_core::interface::store::{SingleVersionCommit, SingleVersionContains, SingleVersionGet, SingleVersionRow};
8use reifydb_runtime::sync::rwlock::{RwLock, RwLockWriteGuard};
9use reifydb_type::{
10 Result,
11 util::{cowvec::CowVec, hex},
12};
13
14use super::*;
15use crate::error::TransactionError;
16
17pub struct KeyWriteLock {
18 pub(super) _guard: RwLockWriteGuard<'static, ()>,
19 pub(super) _arc: Arc<RwLock<()>>,
20}
21
22impl KeyWriteLock {
23 pub(super) fn new(arc: Arc<RwLock<()>>) -> Self {
24 let guard = arc.write();
25
26 let guard = unsafe { transmute::<RwLockWriteGuard<'_, ()>, RwLockWriteGuard<'static, ()>>(guard) };
29
30 Self {
31 _arc: arc,
32 _guard: guard,
33 }
34 }
35}
36
37pub struct SingleWriteTransaction<'a> {
38 pub(super) inner: &'a SingleTransactionInner,
39 pub(super) keys: Vec<EncodedKey>,
40 pub(super) _key_locks: Vec<KeyWriteLock>,
41 pub(super) pending: IndexMap<EncodedKey, Delta>,
42 pub(super) completed: bool,
43}
44
45impl<'a> SingleWriteTransaction<'a> {
46 pub(super) fn new(
47 inner: &'a SingleTransactionInner,
48 keys: Vec<EncodedKey>,
49 key_locks: Vec<KeyWriteLock>,
50 ) -> Self {
51 Self {
52 inner,
53 keys,
54 _key_locks: key_locks,
55 pending: IndexMap::new(),
56 completed: false,
57 }
58 }
59
60 #[inline]
61 fn check_key_allowed(&self, key: &EncodedKey) -> Result<()> {
62 if self.keys.iter().any(|k| k == key) {
63 Ok(())
64 } else {
65 Err(TransactionError::KeyOutOfScope {
66 key: hex::encode(key),
67 }
68 .into())
69 }
70 }
71
72 pub fn get(&mut self, key: &EncodedKey) -> Result<Option<SingleVersionRow>> {
73 self.check_key_allowed(key)?;
74
75 if let Some(delta) = self.pending.get(key) {
76 return match delta {
77 Delta::Set {
78 row,
79 ..
80 } => Ok(Some(SingleVersionRow {
81 key: key.clone(),
82 row: row.clone(),
83 })),
84 Delta::Unset {
85 ..
86 }
87 | Delta::Remove {
88 ..
89 }
90 | Delta::Drop {
91 key: _,
92 } => Ok(None),
93 };
94 }
95
96 let store = self.inner.store.read().clone();
97 SingleVersionGet::get(&store, key)
98 }
99
100 pub fn contains_key(&mut self, key: &EncodedKey) -> Result<bool> {
101 self.check_key_allowed(key)?;
102
103 if let Some(delta) = self.pending.get(key) {
104 return match delta {
105 Delta::Set {
106 ..
107 } => Ok(true),
108 Delta::Unset {
109 ..
110 }
111 | Delta::Remove {
112 ..
113 }
114 | Delta::Drop {
115 key: _,
116 } => Ok(false),
117 };
118 }
119
120 let store = self.inner.store.read().clone();
121 SingleVersionContains::contains(&store, key)
122 }
123
124 pub fn set(&mut self, key: &EncodedKey, row: EncodedRow) -> Result<()> {
125 self.check_key_allowed(key)?;
126
127 let delta = Delta::Set {
128 key: key.clone(),
129 row,
130 };
131 self.pending.insert(key.clone(), delta);
132 Ok(())
133 }
134
135 pub fn unset(&mut self, key: &EncodedKey, row: EncodedRow) -> Result<()> {
136 self.check_key_allowed(key)?;
137
138 self.pending.insert(
139 key.clone(),
140 Delta::Unset {
141 key: key.clone(),
142 row,
143 },
144 );
145 Ok(())
146 }
147
148 pub fn remove(&mut self, key: &EncodedKey) -> Result<()> {
149 self.check_key_allowed(key)?;
150
151 self.pending.insert(
152 key.clone(),
153 Delta::Remove {
154 key: key.clone(),
155 },
156 );
157 Ok(())
158 }
159
160 pub fn commit(&mut self) -> Result<()> {
161 let deltas: Vec<Delta> = take(&mut self.pending).into_iter().map(|(_, delta)| delta).collect();
162
163 if !deltas.is_empty() {
164 let mut store = self.inner.store.write();
165 SingleVersionCommit::commit(&mut *store, CowVec::new(deltas))?;
166 }
167
168 self.completed = true;
169 Ok(())
170 }
171
172 pub fn rollback(&mut self) -> Result<()> {
173 self.pending.clear();
174 self.completed = true;
175 Ok(())
176 }
177}