reifydb_transaction/single/
write.rs1use std::mem::{take, transmute};
5
6use indexmap::IndexMap;
7use reifydb_core::interface::store::{
8 SingleVersionCommit, SingleVersionContains, SingleVersionGet, SingleVersionValues,
9};
10use reifydb_runtime::sync::rwlock::{RwLock, RwLockWriteGuard};
11use reifydb_type::{
12 Result,
13 util::{cowvec::CowVec, hex},
14};
15
16use super::*;
17use crate::error::TransactionError;
18
19#[allow(dead_code)]
21pub struct KeyWriteLock {
22 pub(super) _arc: Arc<RwLock<()>>,
23 pub(super) _guard: RwLockWriteGuard<'static, ()>,
24}
25
26impl KeyWriteLock {
27 pub(super) fn new(arc: Arc<RwLock<()>>) -> Self {
37 let guard = arc.write();
39
40 let guard = unsafe { transmute::<RwLockWriteGuard<'_, ()>, RwLockWriteGuard<'static, ()>>(guard) };
44
45 Self {
46 _arc: arc,
47 _guard: guard,
48 }
49 }
50}
51
52pub struct SingleWriteTransaction<'a> {
53 pub(super) inner: &'a SingleTransactionInner,
54 pub(super) keys: Vec<EncodedKey>,
55 pub(super) _key_locks: Vec<KeyWriteLock>,
56 pub(super) pending: IndexMap<EncodedKey, Delta>,
57 pub(super) completed: bool,
58}
59
60impl<'a> SingleWriteTransaction<'a> {
61 pub(super) fn new(
62 inner: &'a SingleTransactionInner,
63 keys: Vec<EncodedKey>,
64 key_locks: Vec<KeyWriteLock>,
65 ) -> Self {
66 Self {
67 inner,
68 keys,
69 _key_locks: key_locks,
70 pending: IndexMap::new(),
71 completed: false,
72 }
73 }
74
75 #[inline]
76 fn check_key_allowed(&self, key: &EncodedKey) -> Result<()> {
77 if self.keys.iter().any(|k| k == key) {
78 Ok(())
79 } else {
80 Err(TransactionError::KeyOutOfScope {
81 key: hex::encode(&key),
82 }
83 .into())
84 }
85 }
86
87 pub fn get(&mut self, key: &EncodedKey) -> Result<Option<SingleVersionValues>> {
88 self.check_key_allowed(key)?;
89
90 if let Some(delta) = self.pending.get(key) {
91 return match delta {
92 Delta::Set {
93 values,
94 ..
95 } => Ok(Some(SingleVersionValues {
96 key: key.clone(),
97 values: values.clone(),
98 })),
99 Delta::Unset {
100 ..
101 }
102 | Delta::Remove {
103 ..
104 }
105 | Delta::Drop {
106 ..
107 } => Ok(None),
108 };
109 }
110
111 let store = self.inner.store.read().clone();
114 SingleVersionGet::get(&store, key)
115 }
116
117 pub fn contains_key(&mut self, key: &EncodedKey) -> Result<bool> {
118 self.check_key_allowed(key)?;
119
120 if let Some(delta) = self.pending.get(key) {
121 return match delta {
122 Delta::Set {
123 ..
124 } => Ok(true),
125 Delta::Unset {
126 ..
127 }
128 | Delta::Remove {
129 ..
130 }
131 | Delta::Drop {
132 ..
133 } => Ok(false),
134 };
135 }
136
137 let store = self.inner.store.read().clone();
139 SingleVersionContains::contains(&store, key)
140 }
141
142 pub fn set(&mut self, key: &EncodedKey, values: EncodedValues) -> Result<()> {
143 self.check_key_allowed(key)?;
144
145 let delta = Delta::Set {
146 key: key.clone(),
147 values,
148 };
149 self.pending.insert(key.clone(), delta);
150 Ok(())
151 }
152
153 pub fn unset(&mut self, key: &EncodedKey, values: EncodedValues) -> Result<()> {
154 self.check_key_allowed(key)?;
155
156 self.pending.insert(
157 key.clone(),
158 Delta::Unset {
159 key: key.clone(),
160 values,
161 },
162 );
163 Ok(())
164 }
165
166 pub fn remove(&mut self, key: &EncodedKey) -> Result<()> {
167 self.check_key_allowed(key)?;
168
169 self.pending.insert(
170 key.clone(),
171 Delta::Remove {
172 key: key.clone(),
173 },
174 );
175 Ok(())
176 }
177
178 pub fn commit(&mut self) -> Result<()> {
179 let deltas: Vec<Delta> = take(&mut self.pending).into_iter().map(|(_, delta)| delta).collect();
180
181 if !deltas.is_empty() {
182 let mut store = self.inner.store.write();
183 SingleVersionCommit::commit(&mut *store, CowVec::new(deltas))?;
184 }
185
186 self.completed = true;
187 Ok(())
188 }
189
190 pub fn rollback(&mut self) -> Result<()> {
191 self.pending.clear();
192 self.completed = true;
193 Ok(())
194 }
195}