1use crate::err::Error;
39use crate::kv::Convert;
40use crate::kv::Key;
41use crate::kv::Val;
42use crate::sp::Operation;
43use crate::sp::Savepoint;
44use rexie::Direction;
45use rexie::KeyRange;
46use rexie::Rexie;
47use rexie::Store;
48use rexie::TransactionMode;
49use std::collections::BTreeMap;
50use std::ops::Range;
51use std::rc::Rc;
52use wasm_bindgen::JsValue;
53
54#[derive(Clone, Debug)]
55pub(crate) enum Buffered {
56 Set(Val),
57 Del,
58}
59
60pub struct Transaction {
66 pub(crate) done: bool,
67 pub(crate) write: bool,
68 pub(crate) db: Rc<Rexie>,
70 pub(crate) buffer: BTreeMap<Key, Buffered>,
72 pub(crate) savepoints: Vec<Savepoint>,
73 pub(crate) operations: Vec<Operation>,
74}
75
76impl Transaction {
77 pub(crate) fn new(db: Rc<Rexie>, write: bool) -> Transaction {
78 Transaction {
79 done: false,
80 write,
81 db,
82 buffer: BTreeMap::new(),
83 savepoints: Vec::new(),
84 operations: Vec::new(),
85 }
86 }
87
88 pub fn closed(&self) -> bool {
89 self.done
90 }
91
92 fn fresh_read_store(&self) -> Result<Store, Error> {
94 let tx =
95 self.db.transaction(&["kv"], TransactionMode::ReadOnly).map_err(|_| Error::TxError)?;
96 tx.store("kv").map_err(|_| Error::TxError)
97 }
98
99 async fn buffered_get(&self, key: &Key) -> Result<Option<Val>, Error> {
101 match self.buffer.get(key) {
102 Some(Buffered::Set(v)) => Ok(Some(v.clone())),
103 Some(Buffered::Del) => Ok(None),
104 None => {
105 let store = self.fresh_read_store()?;
106 let res = store.get(key.clone().convert()).await?;
107 match res {
108 Some(v) => Ok(Some(v.convert())),
109 None => Ok(None),
110 }
111 }
112 }
113 }
114
115 pub async fn cancel(&mut self) -> Result<(), Error> {
120 if self.done {
121 return Err(Error::TxClosed);
122 }
123 self.done = true;
124 self.buffer.clear();
125 Ok(())
126 }
127
128 pub async fn commit(&mut self) -> Result<(), Error> {
136 if self.done {
137 return Err(Error::TxClosed);
138 }
139 if !self.write {
140 return Err(Error::TxNotWritable);
141 }
142 self.done = true;
143
144 if self.buffer.is_empty() {
145 return Ok(());
146 }
147
148 let flush_tx =
149 self.db.transaction(&["kv"], TransactionMode::ReadWrite).map_err(|_| Error::TxError)?;
150 let flush_store = flush_tx.store("kv").map_err(|_| Error::TxError)?;
151
152 let buffer = std::mem::take(&mut self.buffer);
155
156 let mut puts: Vec<(JsValue, Option<JsValue>)> = Vec::new();
157 let mut deletes: Vec<JsValue> = Vec::new();
158
159 for (key, op) in buffer {
160 let js_key: JsValue = key.convert();
161 match op {
162 Buffered::Set(val) => {
163 let js_val: JsValue = val.convert();
164 puts.push((js_val, Some(js_key)));
165 }
166 Buffered::Del => {
167 deletes.push(js_key);
168 }
169 }
170 }
171
172 if !puts.is_empty() {
175 flush_store.put_all(puts.into_iter()).await?;
176 }
177
178 for js_key in deletes {
186 flush_store.delete(js_key).await?;
187 }
188
189 flush_tx.done().await?;
191
192 Ok(())
193 }
194
195 pub async fn exists(&self, key: Key) -> Result<bool, Error> {
200 if self.done {
201 return Err(Error::TxClosed);
202 }
203 match self.buffer.get(&key) {
204 Some(Buffered::Set(_)) => Ok(true),
205 Some(Buffered::Del) => Ok(false),
206 None => {
207 let store = self.fresh_read_store()?;
208 let res = store.key_exists(key.convert()).await?;
209 Ok(res)
210 }
211 }
212 }
213
214 pub async fn get(&self, key: Key) -> Result<Option<Val>, Error> {
215 if self.done {
216 return Err(Error::TxClosed);
217 }
218 self.buffered_get(&key).await
219 }
220
221 pub async fn set(&mut self, key: Key, val: Val) -> Result<(), Error> {
226 if self.done {
227 return Err(Error::TxClosed);
228 }
229 if !self.write {
230 return Err(Error::TxNotWritable);
231 }
232 if !self.savepoints.is_empty() || !self.operations.is_empty() {
233 match self.buffered_get(&key).await? {
234 Some(existing_val) => {
235 self.operations.push(Operation::RestoreValue(key.clone(), existing_val));
236 }
237 None => {
238 self.operations.push(Operation::DeleteKey(key.clone()));
239 }
240 }
241 }
242 self.buffer.insert(key, Buffered::Set(val));
243 Ok(())
244 }
245
246 pub async fn put(&mut self, key: Key, val: Val) -> Result<(), Error> {
247 if self.done {
248 return Err(Error::TxClosed);
249 }
250 if !self.write {
251 return Err(Error::TxNotWritable);
252 }
253 match self.buffered_get(&key).await? {
254 None => self.set(key, val).await,
255 _ => Err(Error::KeyAlreadyExists),
256 }
257 }
258
259 pub async fn putc(&mut self, key: Key, val: Val, chk: Option<Val>) -> Result<(), Error> {
260 if self.done {
261 return Err(Error::TxClosed);
262 }
263 if !self.write {
264 return Err(Error::TxNotWritable);
265 }
266 match (self.buffered_get(&key).await?, chk) {
267 (Some(v), Some(w)) if v == w => self.set(key, val).await,
268 (None, None) => self.set(key, val).await,
269 _ => Err(Error::ValNotExpectedValue),
270 }
271 }
272
273 pub async fn del(&mut self, key: Key) -> Result<(), Error> {
274 if self.done {
275 return Err(Error::TxClosed);
276 }
277 if !self.write {
278 return Err(Error::TxNotWritable);
279 }
280 if !self.savepoints.is_empty() || !self.operations.is_empty() {
281 if let Some(existing_val) = self.buffered_get(&key).await? {
282 self.operations.push(Operation::RestoreDeleted(key.clone(), existing_val));
283 }
284 }
285 self.buffer.insert(key, Buffered::Del);
286 Ok(())
287 }
288
289 pub async fn delc(&mut self, key: Key, chk: Option<Val>) -> Result<(), Error> {
290 if self.done {
291 return Err(Error::TxClosed);
292 }
293 if !self.write {
294 return Err(Error::TxNotWritable);
295 }
296 match (self.buffered_get(&key).await?, chk) {
297 (Some(v), Some(w)) if v == w => self.del(key).await,
298 (None, None) => self.del(key).await,
299 _ => Err(Error::ValNotExpectedValue),
300 }
301 }
302
303 pub async fn keys(&self, rng: Range<Key>, limit: u32) -> Result<Vec<Key>, Error> {
308 if self.done {
309 return Err(Error::TxClosed);
310 }
311 let Range {
312 start,
313 end,
314 } = rng;
315 let dir = Some(Direction::Next);
316 let kr =
317 KeyRange::bound(&start.clone().convert(), &end.clone().convert(), None, Some(true));
318 let kr = kr.map_err(|e| Error::IndexedDbError(e.to_string()))?;
319
320 let store = self.fresh_read_store()?;
321 let idb_results = store.scan(Some(kr), Some(limit), None, dir).await?;
322
323 let mut merged: BTreeMap<Key, ()> = BTreeMap::new();
324 for (k, _) in idb_results {
325 let key: Key = k.convert();
326 match self.buffer.get(&key) {
327 Some(Buffered::Del) => {}
328 _ => {
329 merged.insert(key, ());
330 }
331 }
332 }
333 for (key, op) in self.buffer.range(start..end) {
334 match op {
335 Buffered::Set(_) => {
336 merged.insert(key.clone(), ());
337 }
338 Buffered::Del => {
339 merged.remove(key);
340 }
341 }
342 }
343
344 let res: Vec<Key> = merged.into_keys().take(limit as usize).collect();
345 Ok(res)
346 }
347
348 pub async fn keysr(&self, rng: Range<Key>, limit: u32) -> Result<Vec<Key>, Error> {
349 if self.done {
350 return Err(Error::TxClosed);
351 }
352 let Range {
353 start,
354 end,
355 } = rng;
356 let dir = Some(Direction::Prev);
357 let kr =
358 KeyRange::bound(&end.clone().convert(), &start.clone().convert(), None, Some(true));
359 let kr = kr.map_err(|e| Error::IndexedDbError(e.to_string()))?;
360
361 let store = self.fresh_read_store()?;
362 let idb_results = store.scan(Some(kr), Some(limit), None, dir).await?;
363
364 let mut merged: BTreeMap<Key, ()> = BTreeMap::new();
365 for (k, _) in idb_results {
366 let key: Key = k.convert();
367 match self.buffer.get(&key) {
368 Some(Buffered::Del) => {}
369 _ => {
370 merged.insert(key, ());
371 }
372 }
373 }
374 for (key, op) in self.buffer.range(start..end) {
375 match op {
376 Buffered::Set(_) => {
377 merged.insert(key.clone(), ());
378 }
379 Buffered::Del => {
380 merged.remove(key);
381 }
382 }
383 }
384
385 let res: Vec<Key> = merged.into_keys().rev().take(limit as usize).collect();
386 Ok(res)
387 }
388
389 pub async fn scan(&self, rng: Range<Key>, limit: u32) -> Result<Vec<(Key, Val)>, Error> {
390 if self.done {
391 return Err(Error::TxClosed);
392 }
393 let Range {
394 start,
395 end,
396 } = rng;
397 let dir = Some(Direction::Next);
398 let kr =
399 KeyRange::bound(&start.clone().convert(), &end.clone().convert(), None, Some(true));
400 let kr = kr.map_err(|e| Error::IndexedDbError(e.to_string()))?;
401
402 let store = self.fresh_read_store()?;
403 let idb_results = store.scan(Some(kr), Some(limit), None, dir).await?;
404
405 let mut merged: BTreeMap<Key, Val> = BTreeMap::new();
406 for (k, v) in idb_results {
407 let key: Key = k.convert();
408 let val: Val = v.convert();
409 match self.buffer.get(&key) {
410 Some(Buffered::Del) => {}
411 Some(Buffered::Set(bv)) => {
412 merged.insert(key, bv.clone());
413 }
414 None => {
415 merged.insert(key, val);
416 }
417 }
418 }
419 for (key, op) in self.buffer.range(start..end) {
420 match op {
421 Buffered::Set(v) => {
422 merged.insert(key.clone(), v.clone());
423 }
424 Buffered::Del => {
425 merged.remove(key);
426 }
427 }
428 }
429
430 let res: Vec<(Key, Val)> = merged.into_iter().take(limit as usize).collect();
431 Ok(res)
432 }
433
434 pub async fn scanr(&self, rng: Range<Key>, limit: u32) -> Result<Vec<(Key, Val)>, Error> {
435 if self.done {
436 return Err(Error::TxClosed);
437 }
438 let Range {
439 start,
440 end,
441 } = rng;
442 let dir = Some(Direction::Prev);
443 let kr =
444 KeyRange::bound(&end.clone().convert(), &start.clone().convert(), None, Some(true));
445 let kr = kr.map_err(|e| Error::IndexedDbError(e.to_string()))?;
446
447 let store = self.fresh_read_store()?;
448 let idb_results = store.scan(Some(kr), Some(limit), None, dir).await?;
449
450 let mut merged: BTreeMap<Key, Val> = BTreeMap::new();
451 for (k, v) in idb_results {
452 let key: Key = k.convert();
453 let val: Val = v.convert();
454 match self.buffer.get(&key) {
455 Some(Buffered::Del) => {}
456 Some(Buffered::Set(bv)) => {
457 merged.insert(key, bv.clone());
458 }
459 None => {
460 merged.insert(key, val);
461 }
462 }
463 }
464 for (key, op) in self.buffer.range(start..end) {
465 match op {
466 Buffered::Set(v) => {
467 merged.insert(key.clone(), v.clone());
468 }
469 Buffered::Del => {
470 merged.remove(key);
471 }
472 }
473 }
474
475 let res: Vec<(Key, Val)> = merged.into_iter().rev().take(limit as usize).collect();
476 Ok(res)
477 }
478
479 pub async fn set_savepoint(&mut self) -> Result<(), Error> {
484 if self.done {
485 return Err(Error::TxClosed);
486 }
487 if !self.write {
488 return Err(Error::TxNotWritable);
489 }
490 self.savepoints.push(Savepoint {
491 operations: std::mem::take(&mut self.operations),
492 });
493 Ok(())
494 }
495
496 pub async fn rollback_to_savepoint(&mut self) -> Result<(), Error> {
499 if self.done {
500 return Err(Error::TxClosed);
501 }
502 if !self.write {
503 return Err(Error::TxNotWritable);
504 }
505 if self.savepoints.is_empty() {
506 return Err(Error::NoSavepoint);
507 }
508 let savepoint = self.savepoints.pop().unwrap();
509 for op in self.operations.iter().rev() {
510 match op {
511 Operation::DeleteKey(key) => {
512 self.buffer.remove(key);
513 }
514 Operation::RestoreValue(key, val) => {
515 self.buffer.insert(key.clone(), Buffered::Set(val.clone()));
516 }
517 Operation::RestoreDeleted(key, val) => {
518 self.buffer.insert(key.clone(), Buffered::Set(val.clone()));
519 }
520 }
521 }
522 self.operations = savepoint.operations;
523 Ok(())
524 }
525}