echodb/
tx.rs

1// Copyright © SurrealDB Ltd
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This module stores the database transaction logic.
16
17use crate::err::Error;
18use crate::Database;
19use imbl::ordmap::Entry;
20use imbl::OrdMap;
21use std::borrow::Borrow;
22use std::fmt::Debug;
23use std::mem::drop;
24use std::ops::Range;
25use std::sync::Arc;
26use tokio::sync::OwnedMutexGuard;
27
28/// A serializable snapshot isolated database transaction
29pub struct Transaction<K, V>
30where
31	K: Ord + Clone + Debug + Sync + Send + 'static,
32	V: Eq + Clone + Debug + Sync + Send + 'static,
33{
34	/// Is the transaction complete?
35	done: bool,
36	/// Is the transaction writeable?
37	write: bool,
38	/// The current snapshot for this transaction
39	snapshot: OrdMap<K, V>,
40	/// The parent database for this transaction
41	database: Database<K, V>,
42	/// The parent datastore transaction write lock
43	writelock: Option<OwnedMutexGuard<()>>,
44}
45
46impl<K, V> Transaction<K, V>
47where
48	K: Ord + Clone + Debug + Sync + Send + 'static,
49	V: Eq + Clone + Debug + Sync + Send + 'static,
50{
51	/// Create a new read-only transaction
52	pub(crate) fn read(db: Database<K, V>, lock: Option<OwnedMutexGuard<()>>) -> Transaction<K, V> {
53		Transaction {
54			done: false,
55			write: false,
56			snapshot: (*(*db.datastore.load())).clone(),
57			database: db,
58			writelock: lock,
59		}
60	}
61	/// Create a new writeable transaction
62	pub(crate) fn write(
63		db: Database<K, V>,
64		lock: Option<OwnedMutexGuard<()>>,
65	) -> Transaction<K, V> {
66		Transaction {
67			done: false,
68			write: true,
69			snapshot: (*(*db.datastore.load())).clone(),
70			database: db,
71			writelock: lock,
72		}
73	}
74
75	/// Check if the transaction is closed
76	pub fn closed(&self) -> bool {
77		self.done
78	}
79
80	/// Cancel the transaction and rollback any changes
81	pub fn cancel(&mut self) -> Result<(), Error> {
82		// Check to see if transaction is closed
83		if self.done == true {
84			return Err(Error::TxClosed);
85		}
86		// Mark this transaction as done
87		self.done = true;
88		// Release the commit lock
89		if let Some(lock) = self.writelock.take() {
90			drop(lock);
91		}
92		// Continue
93		Ok(())
94	}
95
96	/// Commit the transaction and store all changes
97	pub fn commit(&mut self) -> Result<(), Error> {
98		// Check to see if transaction is closed
99		if self.done == true {
100			return Err(Error::TxClosed);
101		}
102		// Check to see if transaction is writable
103		if self.write == false {
104			return Err(Error::TxNotWritable);
105		}
106		// Mark this transaction as done
107		self.done = true;
108		// Atomically update the datastore using ArcSwap
109		self.database.datastore.store(Arc::new(self.snapshot.clone()));
110		// Release the commit lock
111		if let Some(lock) = self.writelock.take() {
112			drop(lock);
113		}
114		// Continue
115		Ok(())
116	}
117
118	/// Check if a key exists in the database
119	pub fn exists<Q>(&self, key: Q) -> Result<bool, Error>
120	where
121		Q: Borrow<K>,
122	{
123		// Check to see if transaction is closed
124		if self.done == true {
125			return Err(Error::TxClosed);
126		}
127		// Check the key
128		let res = self.snapshot.contains_key(key.borrow());
129		// Return result
130		Ok(res)
131	}
132
133	/// Fetch a key from the database
134	pub fn get<Q>(&self, key: Q) -> Result<Option<V>, Error>
135	where
136		Q: Borrow<K>,
137	{
138		// Check to see if transaction is closed
139		if self.done == true {
140			return Err(Error::TxClosed);
141		}
142		// Get the key
143		let res = self.snapshot.get(key.borrow()).cloned();
144		// Return result
145		Ok(res)
146	}
147
148	/// Insert or update a key in the database
149	pub fn set<Q>(&mut self, key: Q, val: V) -> Result<(), Error>
150	where
151		Q: Into<K>,
152	{
153		// Check to see if transaction is closed
154		if self.done == true {
155			return Err(Error::TxClosed);
156		}
157		// Check to see if transaction is writable
158		if self.write == false {
159			return Err(Error::TxNotWritable);
160		}
161		// Set the key
162		self.snapshot.insert(key.into(), val);
163		// Return result
164		Ok(())
165	}
166
167	/// Insert a key if it doesn't exist in the database
168	pub fn put<Q>(&mut self, key: Q, val: V) -> Result<(), Error>
169	where
170		Q: Borrow<K> + Into<K>,
171	{
172		// Check to see if transaction is closed
173		if self.done == true {
174			return Err(Error::TxClosed);
175		}
176		// Check to see if transaction is writable
177		if self.write == false {
178			return Err(Error::TxNotWritable);
179		}
180		// Set the key
181		match self.snapshot.entry(key.into()) {
182			Entry::Vacant(v) => {
183				v.insert(val);
184			}
185			_ => return Err(Error::KeyAlreadyExists),
186		};
187		// Return result
188		Ok(())
189	}
190
191	/// Insert a key if it matches a value
192	pub fn putc<Q>(&mut self, key: Q, val: V, chk: Option<V>) -> Result<(), Error>
193	where
194		Q: Borrow<K> + Into<K>,
195	{
196		// Check to see if transaction is closed
197		if self.done == true {
198			return Err(Error::TxClosed);
199		}
200		// Check to see if transaction is writable
201		if self.write == false {
202			return Err(Error::TxNotWritable);
203		}
204		// Set the key
205		match (self.snapshot.entry(key.into()), &chk) {
206			(Entry::Occupied(mut v), Some(w)) if v.get() == w => {
207				v.insert(val);
208			}
209			(Entry::Vacant(v), None) => {
210				v.insert(val);
211			}
212			_ => return Err(Error::ValNotExpectedValue),
213		};
214		// Return result
215		Ok(())
216	}
217
218	/// Delete a key from the database
219	pub fn del<Q>(&mut self, key: Q) -> Result<(), Error>
220	where
221		Q: Borrow<K>,
222	{
223		// Check to see if transaction is closed
224		if self.done == true {
225			return Err(Error::TxClosed);
226		}
227		// Check to see if transaction is writable
228		if self.write == false {
229			return Err(Error::TxNotWritable);
230		}
231		// Remove the key
232		self.snapshot.remove(key.borrow());
233		// Return result
234		Ok(())
235	}
236
237	/// Delete a key if it matches a value
238	pub fn delc<Q>(&mut self, key: Q, chk: Option<V>) -> Result<(), Error>
239	where
240		Q: Borrow<K> + Into<K>,
241	{
242		// Check to see if transaction is closed
243		if self.done == true {
244			return Err(Error::TxClosed);
245		}
246		// Check to see if transaction is writable
247		if self.write == false {
248			return Err(Error::TxNotWritable);
249		}
250		// Remove the key
251		match (self.snapshot.entry(key.into()), &chk) {
252			(Entry::Occupied(v), Some(w)) if v.get() == w => {
253				v.remove();
254			}
255			(Entry::Vacant(_), None) => {
256				// Nothing to delete
257			}
258			_ => return Err(Error::ValNotExpectedValue),
259		};
260		// Return result
261		Ok(())
262	}
263
264	/// Retrieve a range of keys from the databases
265	pub fn keys<Q>(&self, rng: Range<Q>, limit: usize) -> Result<Vec<K>, Error>
266	where
267		Q: Into<K>,
268	{
269		// Check to see if transaction is closed
270		if self.done == true {
271			return Err(Error::TxClosed);
272		}
273		// Compute the range
274		let beg = rng.start.into();
275		let end = rng.end.into();
276		// Scan the keys
277		let res = self.snapshot.range(beg..end).take(limit).map(|(k, _)| k.clone()).collect();
278		// Return result
279		Ok(res)
280	}
281
282	/// Retrieve a range of key-value pairs from the databases
283	pub fn scan<Q>(&self, rng: Range<Q>, limit: usize) -> Result<Vec<(K, V)>, Error>
284	where
285		Q: Into<K>,
286	{
287		// Check to see if transaction is closed
288		if self.done == true {
289			return Err(Error::TxClosed);
290		}
291		// Compute the range
292		let beg = rng.start.into();
293		let end = rng.end.into();
294		// Scan the keys
295		let res = self
296			.snapshot
297			.range(beg..end)
298			.take(limit)
299			.map(|(k, v)| (k.clone(), v.clone()))
300			.collect();
301		// Return result
302		Ok(res)
303	}
304}