reifydb_transaction/single/
write.rs1use std::mem::{take, transmute};
5
6use indexmap::IndexMap;
7use reifydb_core::interface::store::{SingleVersionCommit, SingleVersionContains, SingleVersionGet, SingleVersionRow};
8use reifydb_runtime::sync::rwlock::{RwLock, RwLockWriteGuard};
9use reifydb_type::{
10 Result,
11 util::{cowvec::CowVec, hex},
12};
13
14use super::*;
15use crate::error::TransactionError;
16
17pub struct KeyWriteLock {
21 pub(super) _guard: RwLockWriteGuard<'static, ()>,
22 pub(super) _arc: Arc<RwLock<()>>,
23}
24
25impl KeyWriteLock {
26 pub(super) fn new(arc: Arc<RwLock<()>>) -> Self {
36 let guard = arc.write();
38
39 let guard = unsafe { transmute::<RwLockWriteGuard<'_, ()>, RwLockWriteGuard<'static, ()>>(guard) };
43
44 Self {
45 _arc: arc,
46 _guard: guard,
47 }
48 }
49}
50
51pub struct SingleWriteTransaction<'a> {
52 pub(super) inner: &'a SingleTransactionInner,
53 pub(super) keys: Vec<EncodedKey>,
54 pub(super) _key_locks: Vec<KeyWriteLock>,
55 pub(super) pending: IndexMap<EncodedKey, Delta>,
56 pub(super) completed: bool,
57}
58
59impl<'a> SingleWriteTransaction<'a> {
60 pub(super) fn new(
61 inner: &'a SingleTransactionInner,
62 keys: Vec<EncodedKey>,
63 key_locks: Vec<KeyWriteLock>,
64 ) -> Self {
65 Self {
66 inner,
67 keys,
68 _key_locks: key_locks,
69 pending: IndexMap::new(),
70 completed: false,
71 }
72 }
73
74 #[inline]
75 fn check_key_allowed(&self, key: &EncodedKey) -> Result<()> {
76 if self.keys.iter().any(|k| k == key) {
77 Ok(())
78 } else {
79 Err(TransactionError::KeyOutOfScope {
80 key: hex::encode(key),
81 }
82 .into())
83 }
84 }
85
86 pub fn get(&mut self, key: &EncodedKey) -> Result<Option<SingleVersionRow>> {
87 self.check_key_allowed(key)?;
88
89 if let Some(delta) = self.pending.get(key) {
90 return match delta {
91 Delta::Set {
92 row,
93 ..
94 } => Ok(Some(SingleVersionRow {
95 key: key.clone(),
96 row: row.clone(),
97 })),
98 Delta::Unset {
99 ..
100 }
101 | Delta::Remove {
102 ..
103 }
104 | Delta::Drop {
105 key: _,
106 } => Ok(None),
107 };
108 }
109
110 let store = self.inner.store.read().clone();
113 SingleVersionGet::get(&store, key)
114 }
115
116 pub fn contains_key(&mut self, key: &EncodedKey) -> Result<bool> {
117 self.check_key_allowed(key)?;
118
119 if let Some(delta) = self.pending.get(key) {
120 return match delta {
121 Delta::Set {
122 ..
123 } => Ok(true),
124 Delta::Unset {
125 ..
126 }
127 | Delta::Remove {
128 ..
129 }
130 | Delta::Drop {
131 key: _,
132 } => Ok(false),
133 };
134 }
135
136 let store = self.inner.store.read().clone();
138 SingleVersionContains::contains(&store, key)
139 }
140
141 pub fn set(&mut self, key: &EncodedKey, row: EncodedRow) -> Result<()> {
142 self.check_key_allowed(key)?;
143
144 let delta = Delta::Set {
145 key: key.clone(),
146 row,
147 };
148 self.pending.insert(key.clone(), delta);
149 Ok(())
150 }
151
152 pub fn unset(&mut self, key: &EncodedKey, row: EncodedRow) -> Result<()> {
153 self.check_key_allowed(key)?;
154
155 self.pending.insert(
156 key.clone(),
157 Delta::Unset {
158 key: key.clone(),
159 row,
160 },
161 );
162 Ok(())
163 }
164
165 pub fn remove(&mut self, key: &EncodedKey) -> Result<()> {
166 self.check_key_allowed(key)?;
167
168 self.pending.insert(
169 key.clone(),
170 Delta::Remove {
171 key: key.clone(),
172 },
173 );
174 Ok(())
175 }
176
177 pub fn commit(&mut self) -> Result<()> {
178 let deltas: Vec<Delta> = take(&mut self.pending).into_iter().map(|(_, delta)| delta).collect();
179
180 if !deltas.is_empty() {
181 let mut store = self.inner.store.write();
182 SingleVersionCommit::commit(&mut *store, CowVec::new(deltas))?;
183 }
184
185 self.completed = true;
186 Ok(())
187 }
188
189 pub fn rollback(&mut self) -> Result<()> {
190 self.pending.clear();
191 self.completed = true;
192 Ok(())
193 }
194}