1use super::{ValRef, tree::LatestValMeta};
2use crate::{
3 OpCode,
4 cc::{
5 cc::ConcurrencyControl,
6 context::{CCNode, Context, TxOutcome},
7 group::TxnState,
8 wal::{WalDel, WalPut, WalReplace},
9 },
10 index::tree::{Iter, Tree},
11 map::flow::ForegroundWritePermit,
12 types::data::{Key, Record, Ver},
13 utils::{
14 Handle, NULL_CMD,
15 observe::{
16 CounterMetric, EventKind, HistogramMetric, LATENCY_SAMPLE_SHIFT, ObserveEvent,
17 observe_elapsed, sampled_instant,
18 },
19 },
20};
21use crossbeam_epoch::Guard;
22use std::cell::{Cell, UnsafeCell};
23use std::ops::RangeBounds;
24use std::sync::atomic::Ordering::Relaxed;
25
26fn get_impl<K: AsRef<[u8]>>(
27 ctx: &Context,
28 cc: &ConcurrencyControl,
29 tree: &Tree,
30 group_id: u8,
31 start_ts: u64,
32 k: K,
33) -> Result<ValRef, OpCode> {
34 #[cfg(feature = "extra_check")]
35 assert!(!k.as_ref().is_empty(), "key must be non-empty");
36
37 let g = crossbeam_epoch::pin();
38 let key = Key::new(k.as_ref(), Ver::new(start_ts, NULL_CMD));
39 let r = tree.traverse(&g, key, |txid, record_gid| {
40 cc.is_visible_to(ctx, group_id, record_gid, start_ts, txid)
41 })?;
42
43 Ok(r)
44}
45
46fn seek_impl<'a, K>(
47 cc: &'a ConcurrencyControl,
48 tree: &'a Tree,
49 group_id: u8,
50 start_ts: u64,
51 prefix: K,
52) -> Iter<'a>
53where
54 K: AsRef<[u8]>,
55{
56 let b = prefix.as_ref();
57 #[cfg(feature = "extra_check")]
58 assert!(!b.is_empty(), "prefix can't be empty");
59
60 let upper = prefix_upper_exclusive(b);
61 if let Some(ref upper) = upper {
62 tree.range(b..upper.as_slice(), move |ctx, txid, record_gid| {
63 cc.is_visible_to(ctx, group_id, record_gid, start_ts, txid)
64 })
65 } else {
66 tree.range(b.., move |ctx, txid, record_gid| {
67 cc.is_visible_to(ctx, group_id, record_gid, start_ts, txid)
68 })
69 }
70}
71
72fn range_impl<'a, K, R>(
73 cc: &'a ConcurrencyControl,
74 tree: &'a Tree,
75 group_id: u8,
76 start_ts: u64,
77 range: R,
78) -> Iter<'a>
79where
80 K: AsRef<[u8]>,
81 R: RangeBounds<K>,
82{
83 tree.range(range, move |ctx, txid, record_gid| {
84 cc.is_visible_to(ctx, group_id, record_gid, start_ts, txid)
85 })
86}
87
88fn prefix_upper_exclusive(prefix: &[u8]) -> Option<Vec<u8>> {
89 let mut upper = prefix.to_vec();
90 for i in (0..upper.len()).rev() {
91 if upper[i] != u8::MAX {
92 upper[i] += 1;
93 upper.truncate(i + 1);
94 return Some(upper);
95 }
96 }
97 None
98}
99
100pub struct TxnKV<'a> {
102 ctx: &'a Context,
103 state: UnsafeCell<TxnState>,
104 tree: &'a Tree,
105 bucket_id: u64,
106 is_end: Cell<bool>,
107 limit: usize,
108}
109
110#[derive(Clone, Copy, Debug, PartialEq, Eq)]
111enum FailCause {
112 Aborted,
113 Conflict,
114}
115
116impl<'a> TxnKV<'a> {
117 pub(crate) fn new(ctx: &'a Context, tree: &'a Tree) -> Result<Self, OpCode> {
118 let start_ts = ctx.alloc_oracle();
119 let gid = ctx.next_group_id();
120 let g = ctx.group(gid);
121 let start_ckpt = g.ckpt_cnt.load(Relaxed);
122 let mut state = TxnState::new(gid, start_ts, start_ckpt);
123 let bucket_id = tree.bucket_id();
124 let max_ckpt_per_txn = tree.store.opt.max_ckpt_per_txn;
125
126 tree.bucket.state.inc_txn_ref();
127
128 let begin_res = {
129 let mut log = g.logging.lock();
130 log.record_begin(start_ts).map(|lsn| {
131 state.begin_lsn = lsn;
132 state.prev_lsn = lsn;
133 g.active_txns.insert(start_ts, state.prev_lsn);
134 })
135 };
136 if let Err(e) = begin_res {
137 g.leave_inflight();
138 tree.bucket.state.dec_txn_ref();
139 return Err(e);
140 }
141 ctx.opt.observer.counter(CounterMetric::TxnBegin, 1);
142
143 g.cc.commit_tree.compact(ctx);
144 Ok(Self {
145 ctx,
146 state: UnsafeCell::new(state),
147 tree,
148 bucket_id,
149 is_end: Cell::new(false),
150 limit: max_ckpt_per_txn,
151 })
152 }
153
154 fn should_abort(&self) -> Result<(), OpCode> {
155 let state = self.state_ref();
156 let g = self.ctx.group(state.group_id);
157
158 if self.is_end.get() || g.ckpt_cnt.load(Relaxed) - state.start_ckpt >= self.limit {
159 return Err(OpCode::AbortTx);
160 }
161 Ok(())
162 }
163
164 #[inline]
165 fn state_ref(&self) -> &TxnState {
166 unsafe { &*self.state.get() }
167 }
168
169 #[inline]
170 #[allow(clippy::mut_from_ref)]
171 fn state_mut(&self) -> &mut TxnState {
172 unsafe { &mut *self.state.get() }
173 }
174
175 #[inline]
176 fn observe_counter(&self, metric: CounterMetric, delta: u64) {
177 self.ctx.opt.observer.counter(metric, delta);
178 }
179
180 #[inline]
181 fn observe_event(&self, event: ObserveEvent) {
182 self.ctx.opt.observer.event(event);
183 }
184
185 #[inline]
186 fn before_write_budget(&self, estimated_bytes: usize) -> ForegroundWritePermit {
187 self.tree
188 .bucket
189 .before_foreground_write(estimated_bytes as u64)
190 }
191
192 #[inline]
193 fn conflict_abort(&self, txid: u64) -> OpCode {
194 self.observe_counter(CounterMetric::TxnConflictAbort, 1);
195 self.observe_event(ObserveEvent {
196 kind: EventKind::TxnConflictAbort,
197 bucket_id: self.bucket_id,
198 txid,
199 file_id: 0,
200 value: 0,
201 });
202 OpCode::AbortTx
203 }
204
205 #[inline]
206 fn write_abort(&self, start_ts: u64, cause: FailCause) -> OpCode {
207 match cause {
208 FailCause::Aborted => OpCode::AbortTx,
209 FailCause::Conflict => self.conflict_abort(start_ts),
210 }
211 }
212
213 #[inline]
214 fn is_visible_for_write(
215 &self,
216 group_id: usize,
217 start_ts: u64,
218 txid: u64,
219 record_gid: u8,
220 ) -> bool {
221 self.ctx.group(group_id).cc.is_visible_to(
222 self.ctx,
223 group_id as u8,
224 record_gid,
225 start_ts,
226 txid,
227 )
228 }
229
230 fn resolve_latest_meta_for_write(
231 &self,
232 opt: &Option<LatestValMeta>,
233 state: &TxnState,
234 ) -> Result<Option<LatestValMeta>, FailCause> {
235 let Some(rv) = opt else {
236 return Ok(None);
237 };
238 let gid = state.group_id;
239 let start_ts = state.start_ts;
240 if self.is_visible_for_write(gid, start_ts, rv.ver.txid, rv.group_id) {
241 return Ok(Some(*rv));
242 }
243 if self.ctx.is_aborted(rv.ver.txid)
244 && self.ctx.get_aborted(rv.ver.txid) == Some(TxOutcome::Aborted)
245 {
246 return Err(FailCause::Aborted);
247 }
248 Err(FailCause::Conflict)
249 }
250
251 fn clean_aborted(&self, g: &Guard, raw: &[u8]) -> Result<bool, OpCode> {
252 let latest = match self
253 .tree
254 .get(g, Key::new(raw, Ver::new(u64::MAX, u32::MAX)))
255 {
256 Ok((k, _)) => Some(k),
257 Err(OpCode::NotFound | OpCode::Again) => None,
258 Err(e) => return Err(e),
259 };
260 let Some(k) = latest else {
261 return Ok(false);
262 };
263 if self.ctx.get_aborted(k.ver().txid) != Some(TxOutcome::Aborted) {
264 return Ok(false);
265 }
266
267 match self.tree.remove_aborted_head(g, raw, k.ver().txid) {
268 Ok(true) => {
269 g.flush();
270 Ok(true)
271 }
272 Ok(false) => Ok(false),
273 Err(OpCode::Again) => {
274 g.flush();
275 Ok(false)
276 }
277 Err(e) => Err(e),
278 }
279 }
280
281 fn put_impl(&self, k: &[u8], v: &[u8], logged: &mut bool) -> Result<(), OpCode> {
282 let estimated = k.len().saturating_add(v.len());
283 loop {
284 self.should_abort()?;
285 let g = crossbeam_epoch::pin();
286 let state = self.state_mut();
287 let start_ts = state.start_ts;
288 let gid = state.group_id;
289
290 let cmd_id_val = state.cmd_id;
291 state.cmd_id += 1;
292 let key = Key::new(k, Ver::new(start_ts, cmd_id_val));
293 let val = Record::normal(gid as u8, v);
294 let _write_permit = self.before_write_budget(estimated);
295 let mut abort_cause = FailCause::Conflict;
296
297 let res = self.tree.update(&g, key, val, |opt| {
298 let gid = state.group_id;
299 let g = self.ctx.group(gid);
300
301 let current = match self.resolve_latest_meta_for_write(opt, state) {
302 Ok(current) => current,
303 Err(cause) => {
304 abort_cause = cause;
305 return Err(self.write_abort(state.start_ts, cause));
306 }
307 };
308 let r = match current {
309 None => Ok(()),
310 Some(current) => {
311 if !current.is_del {
312 Err(OpCode::Exist)
313 } else {
314 Ok(())
315 }
316 }
317 };
318
319 if r.is_ok() && !*logged {
320 *logged = true;
321 state.modified = true;
322 let mut log = g.logging.lock();
323 let new_pos = log.record_update(
324 &Key::new(k, key.ver().to_owned()),
325 WalPut::new(v.len()),
326 v,
327 state.prev_lsn,
328 self.bucket_id,
329 )?;
330 state.prev_lsn = new_pos;
331 }
332 r.map(|_| (gid as u8, state.prev_lsn))
333 });
334
335 match res {
336 Err(OpCode::AbortTx) if abort_cause == FailCause::Aborted => {
337 let _ = self.clean_aborted(&g, k)?;
338 continue;
339 }
340 Ok(_) => return Ok(()),
341 Err(e) => return Err(e),
342 }
343 }
344 }
345
346 fn update_impl(&self, k: &[u8], v: &[u8], logged: &mut bool) -> Result<(), OpCode> {
347 let estimated = k.len().saturating_add(v.len().saturating_mul(2));
348 #[cfg(feature = "extra_check")]
349 assert!(!k.is_empty(), "key must be non-empty");
350
351 loop {
352 self.should_abort()?;
353 let g = crossbeam_epoch::pin();
354 let state = self.state_mut();
355 let start_ts = state.start_ts;
356 let gid = state.group_id;
357
358 let cmd_id_val = state.cmd_id;
359 state.cmd_id += 1;
360 let key = Key::new(k, Ver::new(start_ts, cmd_id_val));
361 let val = Record::normal(gid as u8, v);
362 let _write_permit = self.before_write_budget(estimated);
363 let mut abort_cause = FailCause::Conflict;
364
365 let res = self.tree.update(&g, key, val, |opt| {
366 let gid = state.group_id;
367 let g = self.ctx.group(gid);
368 let current = match self.resolve_latest_meta_for_write(opt, state) {
369 Ok(current) => current,
370 Err(cause) => {
371 abort_cause = cause;
372 return Err(self.write_abort(state.start_ts, cause));
373 }
374 };
375 let Some(current) = current else {
376 return Err(OpCode::NotFound);
377 };
378 if current.is_del {
379 return Err(OpCode::NotFound);
380 }
381
382 if !*logged {
383 state.modified = true;
384 *logged = true;
385 let mut log = g.logging.lock();
386 let new_pos = log.record_update(
387 &Key::new(k, key.ver().to_owned()),
388 WalReplace::new(v.len()),
389 v,
390 state.prev_lsn,
391 self.bucket_id,
392 )?;
393 state.prev_lsn = new_pos;
394 }
395 Ok((gid as u8, state.prev_lsn))
396 });
397
398 match res {
399 Err(OpCode::AbortTx) if abort_cause == FailCause::Aborted => {
400 let _ = self.clean_aborted(&g, k)?;
401 continue;
402 }
403 Ok(_) => return Ok(()),
404 Err(e) => return Err(e),
405 }
406 }
407 }
408
409 pub fn put<K, V>(&self, k: K, v: V) -> Result<(), OpCode>
411 where
412 K: AsRef<[u8]>,
413 V: AsRef<[u8]>,
414 {
415 let mut logged = false;
416 self.put_impl(k.as_ref(), v.as_ref(), &mut logged)
417 }
418
419 pub fn update<K, V>(&self, k: K, v: V) -> Result<(), OpCode>
421 where
422 K: AsRef<[u8]>,
423 V: AsRef<[u8]>,
424 {
425 let mut logged = false;
426 self.update_impl(k.as_ref(), v.as_ref(), &mut logged)
427 }
428
429 pub fn upsert<K, V>(&self, k: K, v: V) -> Result<(), OpCode>
431 where
432 K: AsRef<[u8]>,
433 V: AsRef<[u8]>,
434 {
435 let mut logged = false;
436 let (k, v) = (k.as_ref(), v.as_ref());
437 let estimated = k.len().saturating_add(v.len().saturating_mul(2));
438 #[cfg(feature = "extra_check")]
439 assert!(!k.is_empty(), "key must be non-empty");
440
441 loop {
442 self.should_abort()?;
443 let g = crossbeam_epoch::pin();
444 let state = self.state_mut();
445 let start_ts = state.start_ts;
446 let gid = state.group_id;
447
448 let cmd_id_val = state.cmd_id;
449 state.cmd_id += 1;
450 let key = Key::new(k, Ver::new(start_ts, cmd_id_val));
451 let val = Record::normal(gid as u8, v);
452 let _write_permit = self.before_write_budget(estimated);
453 let mut abort_cause = FailCause::Conflict;
454
455 let res = self.tree.update(&g, key, val, |opt| {
456 let gid = state.group_id;
457 let g = self.ctx.group(gid);
458
459 let current = match self.resolve_latest_meta_for_write(opt, state) {
460 Ok(current) => current,
461 Err(cause) => {
462 abort_cause = cause;
463 return Err(self.write_abort(state.start_ts, cause));
464 }
465 };
466
467 if !logged {
468 logged = true;
469 state.modified = true;
470 let mut log = g.logging.lock();
471 let new_pos = match current {
472 None => log.record_update(
473 &Key::new(k, key.ver().to_owned()),
474 WalPut::new(v.len()),
475 v,
476 state.prev_lsn,
477 self.bucket_id,
478 )?,
479 Some(_) => log.record_update(
480 &Key::new(k, key.ver().to_owned()),
481 WalReplace::new(v.len()),
482 v,
483 state.prev_lsn,
484 self.bucket_id,
485 )?,
486 };
487 state.prev_lsn = new_pos;
488 }
489 Ok((gid as u8, state.prev_lsn))
490 });
491
492 match res {
493 Err(OpCode::AbortTx) if abort_cause == FailCause::Aborted => {
494 let _ = self.clean_aborted(&g, k)?;
495 continue;
496 }
497 Ok(_) => return Ok(()),
498 Err(e) => return Err(e),
499 }
500 }
501 }
502
503 pub fn del<T>(&self, k: T) -> Result<(), OpCode>
505 where
506 T: AsRef<[u8]>,
507 {
508 let mut logged = false;
509 let k = k.as_ref();
510 #[cfg(feature = "extra_check")]
511 assert!(!k.is_empty(), "key must be non-empty");
512
513 loop {
514 self.should_abort()?;
515 let g = crossbeam_epoch::pin();
516 let state = self.state_mut();
517 let start_ts = state.start_ts;
518 let gid = state.group_id;
519 let cmd_id_val = state.cmd_id;
520 state.cmd_id += 1;
521
522 let key = Key::new(k, Ver::new(start_ts, cmd_id_val));
523 let val = Record::remove(gid as u8);
524 let _write_permit = self.before_write_budget(key.raw.len());
525 let mut abort_cause = FailCause::Conflict;
526
527 let res = self.tree.update(&g, key, val, |opt| {
528 let gid = state.group_id;
529 let g = self.ctx.group(gid);
530 let current = match self.resolve_latest_meta_for_write(opt, state) {
531 Ok(current) => current,
532 Err(cause) => {
533 abort_cause = cause;
534 return Err(self.write_abort(state.start_ts, cause));
535 }
536 };
537 let Some(current) = current else {
538 return Err(OpCode::NotFound);
539 };
540 if current.is_del {
541 return Err(OpCode::NotFound);
542 }
543
544 if !logged {
545 logged = true;
546 state.modified = true;
547 let mut log = g.logging.lock();
548 let new_pos = log.record_update(
549 &key,
550 WalDel::new(),
551 [].as_slice(),
552 state.prev_lsn,
553 self.bucket_id,
554 )?;
555 state.prev_lsn = new_pos;
556 }
557 Ok((gid as u8, state.prev_lsn))
558 });
559
560 match res {
561 Err(OpCode::AbortTx) if abort_cause == FailCause::Aborted => {
562 let _ = self.clean_aborted(&g, k)?;
563 continue;
564 }
565 Ok(_) => return Ok(()),
566 Err(e) => return Err(e),
567 }
568 }
569 }
570
571 pub fn commit(self) -> Result<(), OpCode> {
573 self.should_abort()?;
574 let state = self.state_ref();
575 let commit_started = sampled_instant(state.start_ts, LATENCY_SAMPLE_SHIFT);
576 let g = self.ctx.group(state.group_id);
577
578 #[cfg(feature = "failpoints")]
579 crate::utils::failpoint::check("mace_txn_commit_begin")?;
580
581 if !state.modified {
582 {
583 let mut log = g.logging.lock();
584 log.record_commit(state.start_ts)?;
585 }
586 g.active_txns.remove(&state.start_ts);
587 self.is_end.set(true);
588 self.observe_counter(CounterMetric::TxnCommit, 1);
589 observe_elapsed(
590 self.ctx.opt.observer.as_ref(),
591 HistogramMetric::TxnCommitMicros,
592 commit_started,
593 );
594 return Ok(());
595 }
596
597 let commit_ts = self.ctx.alloc_oracle();
598
599 {
600 let mut log = g.logging.lock();
601 log.record_commit(state.start_ts)?;
602 #[cfg(feature = "failpoints")]
603 crate::utils::failpoint::check("mace_txn_commit_after_record_commit")?;
604 log.sync(false)?;
605 #[cfg(feature = "failpoints")]
606 crate::utils::failpoint::check("mace_txn_commit_after_wal_sync")?;
607 }
608
609 g.cc.commit_tree.append(state.start_ts, commit_ts);
610 g.cc.latest_cts.store(commit_ts, Relaxed);
611 g.cc.collect_wmk(self.ctx);
612
613 g.active_txns.remove(&state.start_ts);
614 self.is_end.set(true);
615 self.observe_counter(CounterMetric::TxnCommit, 1);
616 observe_elapsed(
617 self.ctx.opt.observer.as_ref(),
618 HistogramMetric::TxnCommitMicros,
619 commit_started,
620 );
621 Ok(())
622 }
623
624 #[inline]
626 pub fn get<K>(&self, k: K) -> Result<ValRef, OpCode>
627 where
628 K: AsRef<[u8]>,
629 {
630 let state = self.state_ref();
631 let group_id = state.group_id;
632 get_impl(
633 self.ctx,
634 &self.ctx.group(group_id).cc,
635 self.tree,
636 group_id as u8,
637 state.start_ts,
638 k,
639 )
640 }
641
642 #[inline]
648 pub fn seek<K>(&self, prefix: K) -> Iter<'_>
649 where
650 K: AsRef<[u8]>,
651 {
652 let state = self.state_ref();
653 let group_id = state.group_id;
654 seek_impl(
655 &self.ctx.group(group_id).cc,
656 self.tree,
657 group_id as u8,
658 state.start_ts,
659 prefix,
660 )
661 }
662
663 #[inline]
664 pub fn range<K, R>(&self, range: R) -> Iter<'_>
665 where
666 K: AsRef<[u8]>,
667 R: RangeBounds<K>,
668 {
669 let state = self.state_ref();
670 let group_id = state.group_id;
671 range_impl(
672 &self.ctx.group(group_id).cc,
673 self.tree,
674 group_id as u8,
675 state.start_ts,
676 range,
677 )
678 }
679}
680
681impl Drop for TxnKV<'_> {
682 fn drop(&mut self) {
683 let group_id = self.state_ref().group_id;
684 if !self.is_end.get() {
685 let state = self.state_ref();
686 let grp = self.ctx.group(state.group_id);
687 let modified = state.modified;
688
689 let mut log = grp.logging.lock();
690 log.record_abort(state.start_ts)
691 .inspect_err(|e| {
692 log::error!("can't record abort, {:?}", e);
693 })
694 .expect("can't fail");
695 if modified {
696 log.sync(false)
697 .inspect_err(|e| {
698 log::error!("can't sync abort chain before enqueue, {:?}", e);
699 })
700 .expect("can't fail");
701 }
702 drop(log);
703
704 if modified {
705 self.ctx.add_aborted(state.start_ts);
706 self.ctx.enqueue_abort_clean(
707 state.start_ts,
708 self.bucket_id,
709 state.group_id as u8,
710 state.prev_lsn,
711 state.begin_lsn.file_id,
712 );
713 } else {
714 }
716 self.observe_counter(CounterMetric::TxnAbort, 1);
717 grp.active_txns.remove(&state.start_ts);
718 self.is_end.set(true);
719 }
720 self.ctx.group(group_id).leave_inflight();
721 self.tree.bucket.state.dec_txn_ref();
722 }
723}
724
725pub struct TxnView<'a> {
727 ctx: &'a Context,
728 cc: Handle<CCNode>,
729 group_id: u8,
730 tree: &'a Tree,
731}
732
733impl<'a> TxnView<'a> {
734 pub(crate) fn new(ctx: &'a Context, tree: &'a Tree) -> Result<Self, OpCode> {
735 let cc = ctx.alloc_cc();
736 Ok(Self {
737 ctx,
738 cc,
739 group_id: u8::MAX,
740 tree,
741 })
742 }
743
744 #[inline]
746 pub fn get<K: AsRef<[u8]>>(&self, k: K) -> Result<ValRef, OpCode> {
747 get_impl(
748 self.ctx,
749 &self.cc,
750 self.tree,
751 self.group_id,
752 self.cc.start_ts,
753 k,
754 )
755 }
756
757 #[inline]
763 pub fn seek<K>(&self, prefix: K) -> Iter<'_>
764 where
765 K: AsRef<[u8]>,
766 {
767 seek_impl(&self.cc, self.tree, self.group_id, self.cc.start_ts, prefix)
768 }
769
770 #[inline]
771 pub fn range<K, R>(&self, range: R) -> Iter<'_>
772 where
773 K: AsRef<[u8]>,
774 R: RangeBounds<K>,
775 {
776 range_impl(&self.cc, self.tree, self.group_id, self.cc.start_ts, range)
777 }
778}
779
780impl Drop for TxnView<'_> {
781 fn drop(&mut self) {
782 self.ctx.free_cc(self.cc);
783 }
784}
785
786#[cfg(test)]
787mod test {
788 use super::prefix_upper_exclusive;
789 use crate::{BucketOptions, Mace, OpCode, Options, RandomPath};
790
791 #[test]
792 fn txnkv() {
793 txnkv_impl().unwrap();
794 }
795
796 #[test]
797 fn prefix_upper_exclusive_handles_carry() {
798 assert_eq!(
799 prefix_upper_exclusive(&[0x61, 0x62, 0x63]),
800 Some(vec![0x61, 0x62, 0x64])
801 );
802 assert_eq!(
803 prefix_upper_exclusive(&[0x61, 0xff, 0xff]),
804 Some(vec![0x62])
805 );
806 assert_eq!(prefix_upper_exclusive(&[0xff]), None);
807 assert_eq!(prefix_upper_exclusive(&[0xff, 0xff]), None);
808 }
809
810 fn txnkv_impl() -> Result<(), OpCode> {
811 let path = RandomPath::tmp();
812 let _ = std::fs::remove_dir_all(&*path);
813 let opt = Options::new(&*path).validate().unwrap();
814 let mace = Mace::new(opt)?;
815 let (k1, k2) = ("beast".as_bytes(), "senpai".as_bytes());
816 let (v1, v2) = ("114514".as_bytes(), "1919810".as_bytes());
817 let db = mace.new_bucket("default", BucketOptions::default())?;
818
819 let kv = db.begin()?;
820 kv.put(k1, v1).expect("can't put");
821 kv.put(k2, v2).expect("can't put");
822
823 kv.del(k1).expect("can't del");
824 kv.commit()?;
825
826 let kv = db.begin()?;
827 let r = kv.get(k1);
828 assert!(r.is_err());
829
830 let r = kv.get(k2).expect("can't get");
831 assert_eq!(r.slice(), v2);
832
833 kv.del(k2).expect("can't del");
834 drop(kv);
835
836 let kv = db.begin()?;
837 let r = kv.get(k1);
838 assert!(r.is_err());
839 kv.del(k2).expect("can't del");
840 let r = kv.del(k2);
841 assert!(r.is_err());
842
843 kv.commit()?;
844
845 let kv = db.begin()?;
846 let r = kv.get(k1);
847 assert!(r.is_err());
848 let r = kv.get(k2);
849 assert!(r.is_err());
850
851 kv.commit()?;
852
853 {
854 let kv = db.begin()?;
855 kv.put("1", "10")?;
856 kv.commit()?;
857
858 let kv = db.begin()?;
859 kv.update("1", "11").expect("can't replace");
860 drop(kv);
861
862 let view = db.view()?;
863 let x = view.get("1").expect("can't get");
864 assert_eq!(x.slice(), "10".as_bytes());
865 }
866
867 {
868 let kv = db.begin()?;
869 kv.put("2", "20")?;
870 kv.update("2", "21")?;
871 let r = kv.get("2").unwrap();
872 assert_eq!(r.slice(), "21".as_bytes());
873 kv.del("2")?;
874 drop(kv);
875
876 let view = db.view()?;
877 let x = view.get("2");
878 assert!(x.is_err());
879 }
880
881 {
882 let kv = db.begin()?;
883 kv.put("11", "10")?;
884 kv.commit()?;
885
886 let kv = db.begin()?;
887 kv.upsert("11", "11").expect("can't replace");
888 drop(kv);
889
890 let view = db.view()?;
891 let x = view.get("11").expect("can't get");
892 assert_eq!(x.slice(), "10".as_bytes());
893 }
894
895 {
896 let kv = db.begin()?;
897 kv.put("22", "20")?;
898 kv.upsert("22", "21")?;
899 let r = kv.get("22").unwrap();
900 assert_eq!(r.slice(), "21".as_bytes());
901 kv.del("22")?;
902 drop(kv);
903
904 let view = db.view()?;
905 let x = view.get("22");
906 assert!(x.is_err());
907 }
908
909 {
910 let kv = db.begin()?;
911 kv.put("elder", "+1s")?;
912 kv.del("elder")?;
913 kv.commit()?;
914 let kv = db.begin()?;
915 let r = kv.update("elder", "mo");
916 assert!(r.is_err());
918 kv.upsert("elder", "mo")?;
920 kv.commit()?;
921 let view = db.view()?;
922 assert_eq!(view.get("elder").unwrap().slice(), "mo".as_bytes());
923 }
924
925 {
926 let kv = db.begin()?;
927 kv.put("fast", "v0")?;
928 kv.commit()?;
929
930 let kv = db.begin()?;
931 kv.update("fast", "v1")?;
932 kv.commit()?;
933
934 let view = db.view()?;
935 assert_eq!(view.get("fast")?.slice(), b"v1");
936 }
937
938 {
939 let kv = db.begin()?;
940 let r = kv.update("missing", "v1");
941 assert!(matches!(r, Err(OpCode::NotFound)));
942 }
943 drop(db);
944 drop(mace);
945 Ok(())
946 }
947
948 #[test]
949 fn cross_long_txn() {
950 cross_long_txn_impl().unwrap();
951 }
952
953 fn cross_long_txn_impl() -> Result<(), OpCode> {
954 let path = RandomPath::new();
955 let mut opt = Options::new(&*path);
956 let consolidate_threshold = 256;
957 opt.tmp_store = true;
958 let mace = Mace::new(opt.validate().unwrap())?;
959 let db = mace.new_bucket(
960 "default",
961 BucketOptions {
962 split_elems: consolidate_threshold * 2,
963 consolidate_threshold,
964 ..BucketOptions::default()
965 },
966 )?;
967
968 let kv = db.begin()?;
969 kv.put("foo", "bar")?;
970 kv.commit()?;
971
972 let view = db.view()?;
973 let kv = db.begin()?;
974
975 kv.update("foo", "bar1")?;
976 kv.update("foo", "bar2")?;
977
978 for i in 0..consolidate_threshold {
980 let x = format!("key_{i}");
981 kv.put(&x, &x)?;
982 }
983
984 let r = kv.get("foo")?;
985 assert_eq!(r.slice(), "bar2".as_bytes());
986 kv.commit()?;
987
988 let v = view.get("foo")?;
989 assert_eq!(v.slice(), "bar".as_bytes());
990
991 drop(view);
992 drop(db);
993 drop(mace);
994 Ok(())
995 }
996}