Skip to main content

indxdb/
tx.rs

1// Copyright © SurrealDB Ltd
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Buffered transaction layer for IndexedDB.
16//!
17//! IndexedDB auto-commits a transaction whenever the event loop is idle and
18//! there are no pending requests. Rust async/await yields to the JS event loop
19//! on every `.await`, so multiple IDB operations within a single Rexie
20//! transaction will cause `TransactionInactiveError` as soon as the second
21//! request fires after an idle microtask checkpoint.
22//!
23//! To work around this, we split the transaction into two phases:
24//!
25//! 1. **Read phase** – each read opens a fresh, short-lived read-only IDB
26//!    transaction. This is safe because reads are idempotent and a potential
27//!    auto-commit between reads is harmless. Reads also consult a `BTreeMap`
28//!    write-buffer so that read-your-own-writes works correctly.
29//!
30//! 2. **Flush phase** (`commit`) – a *new* read-write IDB transaction is
31//!    opened and every buffered mutation is dispatched. Puts use `put_all`
32//!    which fires all IDB requests synchronously without any `.await` between
33//!    them. Deletes are issued sequentially (each awaited), which is safe
34//!    because the Rust executor polls the next delete in the same microtask
35//!    as the previous completion callback. Finally, `transaction.done()` is
36//!    awaited, which resolves when IDB has durably committed everything.
37
38use crate::err::Error;
39use crate::kv::Convert;
40use crate::kv::Key;
41use crate::kv::Val;
42use crate::sp::Operation;
43use crate::sp::Savepoint;
44use rexie::Direction;
45use rexie::KeyRange;
46use rexie::Rexie;
47use rexie::Store;
48use rexie::TransactionMode;
49use std::collections::BTreeMap;
50use std::ops::Range;
51use std::rc::Rc;
52use wasm_bindgen::JsValue;
53
54#[derive(Clone, Debug)]
55pub(crate) enum Buffered {
56	Set(Val),
57	Del,
58}
59
60/// A serializable snapshot isolated database transaction.
61///
62/// All mutations are buffered in-memory. On `commit()` they are flushed to
63/// IndexedDB in a single synchronous batch so that the IDB transaction never
64/// goes idle between requests.
65pub struct Transaction {
66	pub(crate) done: bool,
67	pub(crate) write: bool,
68	/// Shared reference to the Rexie database for opening new IDB transactions.
69	pub(crate) db: Rc<Rexie>,
70	/// Buffered mutations: key -> Set(val) | Del
71	pub(crate) buffer: BTreeMap<Key, Buffered>,
72	pub(crate) savepoints: Vec<Savepoint>,
73	pub(crate) operations: Vec<Operation>,
74}
75
76impl Transaction {
77	pub(crate) fn new(db: Rc<Rexie>, write: bool) -> Transaction {
78		Transaction {
79			done: false,
80			write,
81			db,
82			buffer: BTreeMap::new(),
83			savepoints: Vec::new(),
84			operations: Vec::new(),
85		}
86	}
87
88	pub fn closed(&self) -> bool {
89		self.done
90	}
91
92	/// Open a fresh read-only IDB store for a single read request.
93	fn fresh_read_store(&self) -> Result<Store, Error> {
94		let tx =
95			self.db.transaction(&["kv"], TransactionMode::ReadOnly).map_err(|_| Error::TxError)?;
96		tx.store("kv").map_err(|_| Error::TxError)
97	}
98
99	/// Read a key, checking the write buffer first.
100	async fn buffered_get(&self, key: &Key) -> Result<Option<Val>, Error> {
101		match self.buffer.get(key) {
102			Some(Buffered::Set(v)) => Ok(Some(v.clone())),
103			Some(Buffered::Del) => Ok(None),
104			None => {
105				let store = self.fresh_read_store()?;
106				let res = store.get(key.clone().convert()).await?;
107				match res {
108					Some(v) => Ok(Some(v.convert())),
109					None => Ok(None),
110				}
111			}
112		}
113	}
114
115	// ------------------------------------------------------------------
116	// Transaction lifecycle
117	// ------------------------------------------------------------------
118
119	pub async fn cancel(&mut self) -> Result<(), Error> {
120		if self.done {
121			return Err(Error::TxClosed);
122		}
123		self.done = true;
124		self.buffer.clear();
125		Ok(())
126	}
127
128	/// Commit: flush all buffered writes to IndexedDB in one atomic batch.
129	///
130	/// Opens a fresh read-write IDB transaction. Puts are batched via
131	/// `put_all` (all IDB requests fired synchronously, only the last
132	/// awaited). Deletes are issued sequentially -- each `await` is safe
133	/// because the next `delete()` call is queued in the same microtask
134	/// as the previous request's completion, keeping the transaction alive.
135	pub async fn commit(&mut self) -> Result<(), Error> {
136		if self.done {
137			return Err(Error::TxClosed);
138		}
139		if !self.write {
140			return Err(Error::TxNotWritable);
141		}
142		self.done = true;
143
144		if self.buffer.is_empty() {
145			return Ok(());
146		}
147
148		let flush_tx =
149			self.db.transaction(&["kv"], TransactionMode::ReadWrite).map_err(|_| Error::TxError)?;
150		let flush_store = flush_tx.store("kv").map_err(|_| Error::TxError)?;
151
152		// Build an iterator of (JsValue, Option<JsValue>) for put_all, and
153		// collect deletes separately.
154		let buffer = std::mem::take(&mut self.buffer);
155
156		let mut puts: Vec<(JsValue, Option<JsValue>)> = Vec::new();
157		let mut deletes: Vec<JsValue> = Vec::new();
158
159		for (key, op) in buffer {
160			let js_key: JsValue = key.convert();
161			match op {
162				Buffered::Set(val) => {
163					let js_val: JsValue = val.convert();
164					puts.push((js_val, Some(js_key)));
165				}
166				Buffered::Del => {
167					deletes.push(js_key);
168				}
169			}
170		}
171
172		// Use put_all which fires all IDB requests synchronously (no .await
173		// between them) and only awaits the last request's result.
174		if !puts.is_empty() {
175			flush_store.put_all(puts.into_iter()).await?;
176		}
177
178		// Delete all keys. Each `Store::delete` awaits one IDB request,
179		// but this is safe: completing request N immediately queues
180		// request N+1 within the same microtask (wasm_bindgen_futures
181		// polls continuations synchronously in the IDB callback), so
182		// the transaction always has a pending request and never
183		// auto-commits. This is the same pattern rexie's `scan` uses
184		// internally when iterating a cursor.
185		for js_key in deletes {
186			flush_store.delete(js_key).await?;
187		}
188
189		// Wait for the IDB transaction to durably commit everything.
190		flush_tx.done().await?;
191
192		Ok(())
193	}
194
195	// ------------------------------------------------------------------
196	// Reads
197	// ------------------------------------------------------------------
198
199	pub async fn exists(&self, key: Key) -> Result<bool, Error> {
200		if self.done {
201			return Err(Error::TxClosed);
202		}
203		match self.buffer.get(&key) {
204			Some(Buffered::Set(_)) => Ok(true),
205			Some(Buffered::Del) => Ok(false),
206			None => {
207				let store = self.fresh_read_store()?;
208				let res = store.key_exists(key.convert()).await?;
209				Ok(res)
210			}
211		}
212	}
213
214	pub async fn get(&self, key: Key) -> Result<Option<Val>, Error> {
215		if self.done {
216			return Err(Error::TxClosed);
217		}
218		self.buffered_get(&key).await
219	}
220
221	// ------------------------------------------------------------------
222	// Writes (buffered)
223	// ------------------------------------------------------------------
224
225	pub async fn set(&mut self, key: Key, val: Val) -> Result<(), Error> {
226		if self.done {
227			return Err(Error::TxClosed);
228		}
229		if !self.write {
230			return Err(Error::TxNotWritable);
231		}
232		if !self.savepoints.is_empty() || !self.operations.is_empty() {
233			match self.buffered_get(&key).await? {
234				Some(existing_val) => {
235					self.operations.push(Operation::RestoreValue(key.clone(), existing_val));
236				}
237				None => {
238					self.operations.push(Operation::DeleteKey(key.clone()));
239				}
240			}
241		}
242		self.buffer.insert(key, Buffered::Set(val));
243		Ok(())
244	}
245
246	pub async fn put(&mut self, key: Key, val: Val) -> Result<(), Error> {
247		if self.done {
248			return Err(Error::TxClosed);
249		}
250		if !self.write {
251			return Err(Error::TxNotWritable);
252		}
253		match self.buffered_get(&key).await? {
254			None => self.set(key, val).await,
255			_ => Err(Error::KeyAlreadyExists),
256		}
257	}
258
259	pub async fn putc(&mut self, key: Key, val: Val, chk: Option<Val>) -> Result<(), Error> {
260		if self.done {
261			return Err(Error::TxClosed);
262		}
263		if !self.write {
264			return Err(Error::TxNotWritable);
265		}
266		match (self.buffered_get(&key).await?, chk) {
267			(Some(v), Some(w)) if v == w => self.set(key, val).await,
268			(None, None) => self.set(key, val).await,
269			_ => Err(Error::ValNotExpectedValue),
270		}
271	}
272
273	pub async fn del(&mut self, key: Key) -> Result<(), Error> {
274		if self.done {
275			return Err(Error::TxClosed);
276		}
277		if !self.write {
278			return Err(Error::TxNotWritable);
279		}
280		if !self.savepoints.is_empty() || !self.operations.is_empty() {
281			if let Some(existing_val) = self.buffered_get(&key).await? {
282				self.operations.push(Operation::RestoreDeleted(key.clone(), existing_val));
283			}
284		}
285		self.buffer.insert(key, Buffered::Del);
286		Ok(())
287	}
288
289	pub async fn delc(&mut self, key: Key, chk: Option<Val>) -> Result<(), Error> {
290		if self.done {
291			return Err(Error::TxClosed);
292		}
293		if !self.write {
294			return Err(Error::TxNotWritable);
295		}
296		match (self.buffered_get(&key).await?, chk) {
297			(Some(v), Some(w)) if v == w => self.del(key).await,
298			(None, None) => self.del(key).await,
299			_ => Err(Error::ValNotExpectedValue),
300		}
301	}
302
303	// ------------------------------------------------------------------
304	// Range operations – merge IDB results with the write buffer
305	// ------------------------------------------------------------------
306
307	pub async fn keys(&self, rng: Range<Key>, limit: u32) -> Result<Vec<Key>, Error> {
308		if self.done {
309			return Err(Error::TxClosed);
310		}
311		let Range {
312			start,
313			end,
314		} = rng;
315		let dir = Some(Direction::Next);
316		let kr =
317			KeyRange::bound(&start.clone().convert(), &end.clone().convert(), None, Some(true));
318		let kr = kr.map_err(|e| Error::IndexedDbError(e.to_string()))?;
319
320		let store = self.fresh_read_store()?;
321		let idb_results = store.scan(Some(kr), Some(limit), None, dir).await?;
322
323		let mut merged: BTreeMap<Key, ()> = BTreeMap::new();
324		for (k, _) in idb_results {
325			let key: Key = k.convert();
326			match self.buffer.get(&key) {
327				Some(Buffered::Del) => {}
328				_ => {
329					merged.insert(key, ());
330				}
331			}
332		}
333		for (key, op) in self.buffer.range(start..end) {
334			match op {
335				Buffered::Set(_) => {
336					merged.insert(key.clone(), ());
337				}
338				Buffered::Del => {
339					merged.remove(key);
340				}
341			}
342		}
343
344		let res: Vec<Key> = merged.into_keys().take(limit as usize).collect();
345		Ok(res)
346	}
347
348	pub async fn keysr(&self, rng: Range<Key>, limit: u32) -> Result<Vec<Key>, Error> {
349		if self.done {
350			return Err(Error::TxClosed);
351		}
352		let Range {
353			start,
354			end,
355		} = rng;
356		let dir = Some(Direction::Prev);
357		let kr =
358			KeyRange::bound(&end.clone().convert(), &start.clone().convert(), None, Some(true));
359		let kr = kr.map_err(|e| Error::IndexedDbError(e.to_string()))?;
360
361		let store = self.fresh_read_store()?;
362		let idb_results = store.scan(Some(kr), Some(limit), None, dir).await?;
363
364		let mut merged: BTreeMap<Key, ()> = BTreeMap::new();
365		for (k, _) in idb_results {
366			let key: Key = k.convert();
367			match self.buffer.get(&key) {
368				Some(Buffered::Del) => {}
369				_ => {
370					merged.insert(key, ());
371				}
372			}
373		}
374		for (key, op) in self.buffer.range(start..end) {
375			match op {
376				Buffered::Set(_) => {
377					merged.insert(key.clone(), ());
378				}
379				Buffered::Del => {
380					merged.remove(key);
381				}
382			}
383		}
384
385		let res: Vec<Key> = merged.into_keys().rev().take(limit as usize).collect();
386		Ok(res)
387	}
388
389	pub async fn scan(&self, rng: Range<Key>, limit: u32) -> Result<Vec<(Key, Val)>, Error> {
390		if self.done {
391			return Err(Error::TxClosed);
392		}
393		let Range {
394			start,
395			end,
396		} = rng;
397		let dir = Some(Direction::Next);
398		let kr =
399			KeyRange::bound(&start.clone().convert(), &end.clone().convert(), None, Some(true));
400		let kr = kr.map_err(|e| Error::IndexedDbError(e.to_string()))?;
401
402		let store = self.fresh_read_store()?;
403		let idb_results = store.scan(Some(kr), Some(limit), None, dir).await?;
404
405		let mut merged: BTreeMap<Key, Val> = BTreeMap::new();
406		for (k, v) in idb_results {
407			let key: Key = k.convert();
408			let val: Val = v.convert();
409			match self.buffer.get(&key) {
410				Some(Buffered::Del) => {}
411				Some(Buffered::Set(bv)) => {
412					merged.insert(key, bv.clone());
413				}
414				None => {
415					merged.insert(key, val);
416				}
417			}
418		}
419		for (key, op) in self.buffer.range(start..end) {
420			match op {
421				Buffered::Set(v) => {
422					merged.insert(key.clone(), v.clone());
423				}
424				Buffered::Del => {
425					merged.remove(key);
426				}
427			}
428		}
429
430		let res: Vec<(Key, Val)> = merged.into_iter().take(limit as usize).collect();
431		Ok(res)
432	}
433
434	pub async fn scanr(&self, rng: Range<Key>, limit: u32) -> Result<Vec<(Key, Val)>, Error> {
435		if self.done {
436			return Err(Error::TxClosed);
437		}
438		let Range {
439			start,
440			end,
441		} = rng;
442		let dir = Some(Direction::Prev);
443		let kr =
444			KeyRange::bound(&end.clone().convert(), &start.clone().convert(), None, Some(true));
445		let kr = kr.map_err(|e| Error::IndexedDbError(e.to_string()))?;
446
447		let store = self.fresh_read_store()?;
448		let idb_results = store.scan(Some(kr), Some(limit), None, dir).await?;
449
450		let mut merged: BTreeMap<Key, Val> = BTreeMap::new();
451		for (k, v) in idb_results {
452			let key: Key = k.convert();
453			let val: Val = v.convert();
454			match self.buffer.get(&key) {
455				Some(Buffered::Del) => {}
456				Some(Buffered::Set(bv)) => {
457					merged.insert(key, bv.clone());
458				}
459				None => {
460					merged.insert(key, val);
461				}
462			}
463		}
464		for (key, op) in self.buffer.range(start..end) {
465			match op {
466				Buffered::Set(v) => {
467					merged.insert(key.clone(), v.clone());
468				}
469				Buffered::Del => {
470					merged.remove(key);
471				}
472			}
473		}
474
475		let res: Vec<(Key, Val)> = merged.into_iter().rev().take(limit as usize).collect();
476		Ok(res)
477	}
478
479	// ------------------------------------------------------------------
480	// Savepoints
481	// ------------------------------------------------------------------
482
483	pub async fn set_savepoint(&mut self) -> Result<(), Error> {
484		if self.done {
485			return Err(Error::TxClosed);
486		}
487		if !self.write {
488			return Err(Error::TxNotWritable);
489		}
490		self.savepoints.push(Savepoint {
491			operations: std::mem::take(&mut self.operations),
492		});
493		Ok(())
494	}
495
496	/// Rollback to the most recent savepoint by replaying undo operations
497	/// against the in-memory buffer. No IDB calls needed.
498	pub async fn rollback_to_savepoint(&mut self) -> Result<(), Error> {
499		if self.done {
500			return Err(Error::TxClosed);
501		}
502		if !self.write {
503			return Err(Error::TxNotWritable);
504		}
505		if self.savepoints.is_empty() {
506			return Err(Error::NoSavepoint);
507		}
508		let savepoint = self.savepoints.pop().unwrap();
509		for op in self.operations.iter().rev() {
510			match op {
511				Operation::DeleteKey(key) => {
512					self.buffer.remove(key);
513				}
514				Operation::RestoreValue(key, val) => {
515					self.buffer.insert(key.clone(), Buffered::Set(val.clone()));
516				}
517				Operation::RestoreDeleted(key, val) => {
518					self.buffer.insert(key.clone(), Buffered::Set(val.clone()));
519				}
520			}
521		}
522		self.operations = savepoint.operations;
523		Ok(())
524	}
525}