Skip to main content

mace/index/
txn.rs

1use super::{ValRef, tree::LatestValMeta};
2use crate::{
3    OpCode,
4    cc::{
5        cc::ConcurrencyControl,
6        context::{CCNode, Context, TxOutcome},
7        group::TxnState,
8        wal::{WalDel, WalPut, WalReplace},
9    },
10    index::tree::{Iter, Tree},
11    map::flow::ForegroundWritePermit,
12    types::data::{Key, Record, Ver},
13    utils::{
14        Handle, NULL_CMD,
15        observe::{
16            CounterMetric, EventKind, HistogramMetric, LATENCY_SAMPLE_SHIFT, ObserveEvent,
17            observe_elapsed, sampled_instant,
18        },
19    },
20};
21use crossbeam_epoch::Guard;
22use std::cell::{Cell, UnsafeCell};
23use std::ops::RangeBounds;
24use std::sync::atomic::Ordering::Relaxed;
25
26fn get_impl<K: AsRef<[u8]>>(
27    ctx: &Context,
28    cc: &ConcurrencyControl,
29    tree: &Tree,
30    group_id: u8,
31    start_ts: u64,
32    k: K,
33) -> Result<ValRef, OpCode> {
34    #[cfg(feature = "extra_check")]
35    assert!(!k.as_ref().is_empty(), "key must be non-empty");
36
37    let g = crossbeam_epoch::pin();
38    let key = Key::new(k.as_ref(), Ver::new(start_ts, NULL_CMD));
39    let r = tree.traverse(&g, key, |txid, record_gid| {
40        cc.is_visible_to(ctx, group_id, record_gid, start_ts, txid)
41    })?;
42
43    Ok(r)
44}
45
46fn seek_impl<'a, K>(
47    cc: &'a ConcurrencyControl,
48    tree: &'a Tree,
49    group_id: u8,
50    start_ts: u64,
51    prefix: K,
52) -> Iter<'a>
53where
54    K: AsRef<[u8]>,
55{
56    let b = prefix.as_ref();
57    #[cfg(feature = "extra_check")]
58    assert!(!b.is_empty(), "prefix can't be empty");
59
60    let upper = prefix_upper_exclusive(b);
61    if let Some(ref upper) = upper {
62        tree.range(b..upper.as_slice(), move |ctx, txid, record_gid| {
63            cc.is_visible_to(ctx, group_id, record_gid, start_ts, txid)
64        })
65    } else {
66        tree.range(b.., move |ctx, txid, record_gid| {
67            cc.is_visible_to(ctx, group_id, record_gid, start_ts, txid)
68        })
69    }
70}
71
72fn range_impl<'a, K, R>(
73    cc: &'a ConcurrencyControl,
74    tree: &'a Tree,
75    group_id: u8,
76    start_ts: u64,
77    range: R,
78) -> Iter<'a>
79where
80    K: AsRef<[u8]>,
81    R: RangeBounds<K>,
82{
83    tree.range(range, move |ctx, txid, record_gid| {
84        cc.is_visible_to(ctx, group_id, record_gid, start_ts, txid)
85    })
86}
87
88fn prefix_upper_exclusive(prefix: &[u8]) -> Option<Vec<u8>> {
89    let mut upper = prefix.to_vec();
90    for i in (0..upper.len()).rev() {
91        if upper[i] != u8::MAX {
92            upper[i] += 1;
93            upper.truncate(i + 1);
94            return Some(upper);
95        }
96    }
97    None
98}
99
100/// A read-write transaction.
101pub struct TxnKV<'a> {
102    ctx: &'a Context,
103    state: UnsafeCell<TxnState>,
104    tree: &'a Tree,
105    bucket_id: u64,
106    is_end: Cell<bool>,
107    limit: usize,
108}
109
110#[derive(Clone, Copy, Debug, PartialEq, Eq)]
111enum FailCause {
112    Aborted,
113    Conflict,
114}
115
116impl<'a> TxnKV<'a> {
117    pub(crate) fn new(ctx: &'a Context, tree: &'a Tree) -> Result<Self, OpCode> {
118        let start_ts = ctx.alloc_oracle();
119        let gid = ctx.next_group_id();
120        let g = ctx.group(gid);
121        let start_ckpt = g.ckpt_cnt.load(Relaxed);
122        let mut state = TxnState::new(gid, start_ts, start_ckpt);
123        let bucket_id = tree.bucket_id();
124        let max_ckpt_per_txn = tree.store.opt.max_ckpt_per_txn;
125
126        tree.bucket.state.inc_txn_ref();
127
128        let begin_res = {
129            let mut log = g.logging.lock();
130            log.record_begin(start_ts).map(|lsn| {
131                state.begin_lsn = lsn;
132                state.prev_lsn = lsn;
133                g.active_txns.insert(start_ts, state.prev_lsn);
134            })
135        };
136        if let Err(e) = begin_res {
137            g.leave_inflight();
138            tree.bucket.state.dec_txn_ref();
139            return Err(e);
140        }
141        ctx.opt.observer.counter(CounterMetric::TxnBegin, 1);
142
143        g.cc.commit_tree.compact(ctx);
144        Ok(Self {
145            ctx,
146            state: UnsafeCell::new(state),
147            tree,
148            bucket_id,
149            is_end: Cell::new(false),
150            limit: max_ckpt_per_txn,
151        })
152    }
153
154    fn should_abort(&self) -> Result<(), OpCode> {
155        let state = self.state_ref();
156        let g = self.ctx.group(state.group_id);
157
158        if self.is_end.get() || g.ckpt_cnt.load(Relaxed) - state.start_ckpt >= self.limit {
159            return Err(OpCode::AbortTx);
160        }
161        Ok(())
162    }
163
164    #[inline]
165    fn state_ref(&self) -> &TxnState {
166        unsafe { &*self.state.get() }
167    }
168
169    #[inline]
170    #[allow(clippy::mut_from_ref)]
171    fn state_mut(&self) -> &mut TxnState {
172        unsafe { &mut *self.state.get() }
173    }
174
175    #[inline]
176    fn observe_counter(&self, metric: CounterMetric, delta: u64) {
177        self.ctx.opt.observer.counter(metric, delta);
178    }
179
180    #[inline]
181    fn observe_event(&self, event: ObserveEvent) {
182        self.ctx.opt.observer.event(event);
183    }
184
185    #[inline]
186    fn before_write_budget(&self, estimated_bytes: usize) -> ForegroundWritePermit {
187        self.tree
188            .bucket
189            .before_foreground_write(estimated_bytes as u64)
190    }
191
192    #[inline]
193    fn conflict_abort(&self, txid: u64) -> OpCode {
194        self.observe_counter(CounterMetric::TxnConflictAbort, 1);
195        self.observe_event(ObserveEvent {
196            kind: EventKind::TxnConflictAbort,
197            bucket_id: self.bucket_id,
198            txid,
199            file_id: 0,
200            value: 0,
201        });
202        OpCode::AbortTx
203    }
204
205    #[inline]
206    fn write_abort(&self, start_ts: u64, cause: FailCause) -> OpCode {
207        match cause {
208            FailCause::Aborted => OpCode::AbortTx,
209            FailCause::Conflict => self.conflict_abort(start_ts),
210        }
211    }
212
213    #[inline]
214    fn is_visible_for_write(
215        &self,
216        group_id: usize,
217        start_ts: u64,
218        txid: u64,
219        record_gid: u8,
220    ) -> bool {
221        self.ctx.group(group_id).cc.is_visible_to(
222            self.ctx,
223            group_id as u8,
224            record_gid,
225            start_ts,
226            txid,
227        )
228    }
229
230    fn resolve_latest_meta_for_write(
231        &self,
232        opt: &Option<LatestValMeta>,
233        state: &TxnState,
234    ) -> Result<Option<LatestValMeta>, FailCause> {
235        let Some(rv) = opt else {
236            return Ok(None);
237        };
238        let gid = state.group_id;
239        let start_ts = state.start_ts;
240        if self.is_visible_for_write(gid, start_ts, rv.ver.txid, rv.group_id) {
241            return Ok(Some(*rv));
242        }
243        if self.ctx.is_aborted(rv.ver.txid)
244            && self.ctx.get_aborted(rv.ver.txid) == Some(TxOutcome::Aborted)
245        {
246            return Err(FailCause::Aborted);
247        }
248        Err(FailCause::Conflict)
249    }
250
251    fn clean_aborted(&self, g: &Guard, raw: &[u8]) -> Result<bool, OpCode> {
252        let latest = match self
253            .tree
254            .get(g, Key::new(raw, Ver::new(u64::MAX, u32::MAX)))
255        {
256            Ok((k, _)) => Some(k),
257            Err(OpCode::NotFound | OpCode::Again) => None,
258            Err(e) => return Err(e),
259        };
260        let Some(k) = latest else {
261            return Ok(false);
262        };
263        if self.ctx.get_aborted(k.ver().txid) != Some(TxOutcome::Aborted) {
264            return Ok(false);
265        }
266
267        match self.tree.remove_aborted_head(g, raw, k.ver().txid) {
268            Ok(true) => {
269                g.flush();
270                Ok(true)
271            }
272            Ok(false) => Ok(false),
273            Err(OpCode::Again) => {
274                g.flush();
275                Ok(false)
276            }
277            Err(e) => Err(e),
278        }
279    }
280
281    fn put_impl(&self, k: &[u8], v: &[u8], logged: &mut bool) -> Result<(), OpCode> {
282        let estimated = k.len().saturating_add(v.len());
283        loop {
284            self.should_abort()?;
285            let g = crossbeam_epoch::pin();
286            let state = self.state_mut();
287            let start_ts = state.start_ts;
288            let gid = state.group_id;
289
290            let cmd_id_val = state.cmd_id;
291            state.cmd_id += 1;
292            let key = Key::new(k, Ver::new(start_ts, cmd_id_val));
293            let val = Record::normal(gid as u8, v);
294            let _write_permit = self.before_write_budget(estimated);
295            let mut abort_cause = FailCause::Conflict;
296
297            let res = self.tree.update(&g, key, val, |opt| {
298                let gid = state.group_id;
299                let g = self.ctx.group(gid);
300
301                let current = match self.resolve_latest_meta_for_write(opt, state) {
302                    Ok(current) => current,
303                    Err(cause) => {
304                        abort_cause = cause;
305                        return Err(self.write_abort(state.start_ts, cause));
306                    }
307                };
308                let r = match current {
309                    None => Ok(()),
310                    Some(current) => {
311                        if !current.is_del {
312                            Err(OpCode::Exist)
313                        } else {
314                            Ok(())
315                        }
316                    }
317                };
318
319                if r.is_ok() && !*logged {
320                    *logged = true;
321                    state.modified = true;
322                    let mut log = g.logging.lock();
323                    let new_pos = log.record_update(
324                        &Key::new(k, key.ver().to_owned()),
325                        WalPut::new(v.len()),
326                        v,
327                        state.prev_lsn,
328                        self.bucket_id,
329                    )?;
330                    state.prev_lsn = new_pos;
331                }
332                r.map(|_| (gid as u8, state.prev_lsn))
333            });
334
335            match res {
336                Err(OpCode::AbortTx) if abort_cause == FailCause::Aborted => {
337                    let _ = self.clean_aborted(&g, k)?;
338                    continue;
339                }
340                Ok(_) => return Ok(()),
341                Err(e) => return Err(e),
342            }
343        }
344    }
345
346    fn update_impl(&self, k: &[u8], v: &[u8], logged: &mut bool) -> Result<(), OpCode> {
347        let estimated = k.len().saturating_add(v.len().saturating_mul(2));
348        #[cfg(feature = "extra_check")]
349        assert!(!k.is_empty(), "key must be non-empty");
350
351        loop {
352            self.should_abort()?;
353            let g = crossbeam_epoch::pin();
354            let state = self.state_mut();
355            let start_ts = state.start_ts;
356            let gid = state.group_id;
357
358            let cmd_id_val = state.cmd_id;
359            state.cmd_id += 1;
360            let key = Key::new(k, Ver::new(start_ts, cmd_id_val));
361            let val = Record::normal(gid as u8, v);
362            let _write_permit = self.before_write_budget(estimated);
363            let mut abort_cause = FailCause::Conflict;
364
365            let res = self.tree.update(&g, key, val, |opt| {
366                let gid = state.group_id;
367                let g = self.ctx.group(gid);
368                let current = match self.resolve_latest_meta_for_write(opt, state) {
369                    Ok(current) => current,
370                    Err(cause) => {
371                        abort_cause = cause;
372                        return Err(self.write_abort(state.start_ts, cause));
373                    }
374                };
375                let Some(current) = current else {
376                    return Err(OpCode::NotFound);
377                };
378                if current.is_del {
379                    return Err(OpCode::NotFound);
380                }
381
382                if !*logged {
383                    state.modified = true;
384                    *logged = true;
385                    let mut log = g.logging.lock();
386                    let new_pos = log.record_update(
387                        &Key::new(k, key.ver().to_owned()),
388                        WalReplace::new(v.len()),
389                        v,
390                        state.prev_lsn,
391                        self.bucket_id,
392                    )?;
393                    state.prev_lsn = new_pos;
394                }
395                Ok((gid as u8, state.prev_lsn))
396            });
397
398            match res {
399                Err(OpCode::AbortTx) if abort_cause == FailCause::Aborted => {
400                    let _ = self.clean_aborted(&g, k)?;
401                    continue;
402                }
403                Ok(_) => return Ok(()),
404                Err(e) => return Err(e),
405            }
406        }
407    }
408
409    /// Puts a key-value pair into the bucket.
410    pub fn put<K, V>(&self, k: K, v: V) -> Result<(), OpCode>
411    where
412        K: AsRef<[u8]>,
413        V: AsRef<[u8]>,
414    {
415        let mut logged = false;
416        self.put_impl(k.as_ref(), v.as_ref(), &mut logged)
417    }
418
419    /// Updates existing key-value pair in the bucket.
420    pub fn update<K, V>(&self, k: K, v: V) -> Result<(), OpCode>
421    where
422        K: AsRef<[u8]>,
423        V: AsRef<[u8]>,
424    {
425        let mut logged = false;
426        self.update_impl(k.as_ref(), v.as_ref(), &mut logged)
427    }
428
429    /// Upserts a key-value pair into the bucket.
430    pub fn upsert<K, V>(&self, k: K, v: V) -> Result<(), OpCode>
431    where
432        K: AsRef<[u8]>,
433        V: AsRef<[u8]>,
434    {
435        let mut logged = false;
436        let (k, v) = (k.as_ref(), v.as_ref());
437        let estimated = k.len().saturating_add(v.len().saturating_mul(2));
438        #[cfg(feature = "extra_check")]
439        assert!(!k.is_empty(), "key must be non-empty");
440
441        loop {
442            self.should_abort()?;
443            let g = crossbeam_epoch::pin();
444            let state = self.state_mut();
445            let start_ts = state.start_ts;
446            let gid = state.group_id;
447
448            let cmd_id_val = state.cmd_id;
449            state.cmd_id += 1;
450            let key = Key::new(k, Ver::new(start_ts, cmd_id_val));
451            let val = Record::normal(gid as u8, v);
452            let _write_permit = self.before_write_budget(estimated);
453            let mut abort_cause = FailCause::Conflict;
454
455            let res = self.tree.update(&g, key, val, |opt| {
456                let gid = state.group_id;
457                let g = self.ctx.group(gid);
458
459                let current = match self.resolve_latest_meta_for_write(opt, state) {
460                    Ok(current) => current,
461                    Err(cause) => {
462                        abort_cause = cause;
463                        return Err(self.write_abort(state.start_ts, cause));
464                    }
465                };
466
467                if !logged {
468                    logged = true;
469                    state.modified = true;
470                    let mut log = g.logging.lock();
471                    let new_pos = match current {
472                        None => log.record_update(
473                            &Key::new(k, key.ver().to_owned()),
474                            WalPut::new(v.len()),
475                            v,
476                            state.prev_lsn,
477                            self.bucket_id,
478                        )?,
479                        Some(_) => log.record_update(
480                            &Key::new(k, key.ver().to_owned()),
481                            WalReplace::new(v.len()),
482                            v,
483                            state.prev_lsn,
484                            self.bucket_id,
485                        )?,
486                    };
487                    state.prev_lsn = new_pos;
488                }
489                Ok((gid as u8, state.prev_lsn))
490            });
491
492            match res {
493                Err(OpCode::AbortTx) if abort_cause == FailCause::Aborted => {
494                    let _ = self.clean_aborted(&g, k)?;
495                    continue;
496                }
497                Ok(_) => return Ok(()),
498                Err(e) => return Err(e),
499            }
500        }
501    }
502
503    /// Deletes a key-value pair from the bucket.
504    pub fn del<T>(&self, k: T) -> Result<(), OpCode>
505    where
506        T: AsRef<[u8]>,
507    {
508        let mut logged = false;
509        let k = k.as_ref();
510        #[cfg(feature = "extra_check")]
511        assert!(!k.is_empty(), "key must be non-empty");
512
513        loop {
514            self.should_abort()?;
515            let g = crossbeam_epoch::pin();
516            let state = self.state_mut();
517            let start_ts = state.start_ts;
518            let gid = state.group_id;
519            let cmd_id_val = state.cmd_id;
520            state.cmd_id += 1;
521
522            let key = Key::new(k, Ver::new(start_ts, cmd_id_val));
523            let val = Record::remove(gid as u8);
524            let _write_permit = self.before_write_budget(key.raw.len());
525            let mut abort_cause = FailCause::Conflict;
526
527            let res = self.tree.update(&g, key, val, |opt| {
528                let gid = state.group_id;
529                let g = self.ctx.group(gid);
530                let current = match self.resolve_latest_meta_for_write(opt, state) {
531                    Ok(current) => current,
532                    Err(cause) => {
533                        abort_cause = cause;
534                        return Err(self.write_abort(state.start_ts, cause));
535                    }
536                };
537                let Some(current) = current else {
538                    return Err(OpCode::NotFound);
539                };
540                if current.is_del {
541                    return Err(OpCode::NotFound);
542                }
543
544                if !logged {
545                    logged = true;
546                    state.modified = true;
547                    let mut log = g.logging.lock();
548                    let new_pos = log.record_update(
549                        &key,
550                        WalDel::new(),
551                        [].as_slice(),
552                        state.prev_lsn,
553                        self.bucket_id,
554                    )?;
555                    state.prev_lsn = new_pos;
556                }
557                Ok((gid as u8, state.prev_lsn))
558            });
559
560            match res {
561                Err(OpCode::AbortTx) if abort_cause == FailCause::Aborted => {
562                    let _ = self.clean_aborted(&g, k)?;
563                    continue;
564                }
565                Ok(_) => return Ok(()),
566                Err(e) => return Err(e),
567            }
568        }
569    }
570
571    /// Commits the transaction.
572    pub fn commit(self) -> Result<(), OpCode> {
573        self.should_abort()?;
574        let state = self.state_ref();
575        let commit_started = sampled_instant(state.start_ts, LATENCY_SAMPLE_SHIFT);
576        let g = self.ctx.group(state.group_id);
577
578        #[cfg(feature = "failpoints")]
579        crate::utils::failpoint::check("mace_txn_commit_begin")?;
580
581        if !state.modified {
582            {
583                let mut log = g.logging.lock();
584                log.record_commit(state.start_ts)?;
585            }
586            g.active_txns.remove(&state.start_ts);
587            self.is_end.set(true);
588            self.observe_counter(CounterMetric::TxnCommit, 1);
589            observe_elapsed(
590                self.ctx.opt.observer.as_ref(),
591                HistogramMetric::TxnCommitMicros,
592                commit_started,
593            );
594            return Ok(());
595        }
596
597        let commit_ts = self.ctx.alloc_oracle();
598
599        {
600            let mut log = g.logging.lock();
601            log.record_commit(state.start_ts)?;
602            #[cfg(feature = "failpoints")]
603            crate::utils::failpoint::check("mace_txn_commit_after_record_commit")?;
604            log.sync(false)?;
605            #[cfg(feature = "failpoints")]
606            crate::utils::failpoint::check("mace_txn_commit_after_wal_sync")?;
607        }
608
609        g.cc.commit_tree.append(state.start_ts, commit_ts);
610        g.cc.latest_cts.store(commit_ts, Relaxed);
611        g.cc.collect_wmk(self.ctx);
612
613        g.active_txns.remove(&state.start_ts);
614        self.is_end.set(true);
615        self.observe_counter(CounterMetric::TxnCommit, 1);
616        observe_elapsed(
617            self.ctx.opt.observer.as_ref(),
618            HistogramMetric::TxnCommitMicros,
619            commit_started,
620        );
621        Ok(())
622    }
623
624    /// Gets the value associated with a key.
625    #[inline]
626    pub fn get<K>(&self, k: K) -> Result<ValRef, OpCode>
627    where
628        K: AsRef<[u8]>,
629    {
630        let state = self.state_ref();
631        let group_id = state.group_id;
632        get_impl(
633            self.ctx,
634            &self.ctx.group(group_id).cc,
635            self.tree,
636            group_id as u8,
637            state.start_ts,
638            k,
639        )
640    }
641
642    /// Seeks an iterator to a key prefix.
643    /// prefix can't be empty and the [`Iter::Item`] is only valid in current iteration.
644    ///
645    /// **NOTE:** [`Iter`] will save a clone of the resource, so do not save [`Iter`] to avoid
646    /// resource shortage.
647    #[inline]
648    pub fn seek<K>(&self, prefix: K) -> Iter<'_>
649    where
650        K: AsRef<[u8]>,
651    {
652        let state = self.state_ref();
653        let group_id = state.group_id;
654        seek_impl(
655            &self.ctx.group(group_id).cc,
656            self.tree,
657            group_id as u8,
658            state.start_ts,
659            prefix,
660        )
661    }
662
663    #[inline]
664    pub fn range<K, R>(&self, range: R) -> Iter<'_>
665    where
666        K: AsRef<[u8]>,
667        R: RangeBounds<K>,
668    {
669        let state = self.state_ref();
670        let group_id = state.group_id;
671        range_impl(
672            &self.ctx.group(group_id).cc,
673            self.tree,
674            group_id as u8,
675            state.start_ts,
676            range,
677        )
678    }
679}
680
681impl Drop for TxnKV<'_> {
682    fn drop(&mut self) {
683        let group_id = self.state_ref().group_id;
684        if !self.is_end.get() {
685            let state = self.state_ref();
686            let grp = self.ctx.group(state.group_id);
687            let modified = state.modified;
688
689            let mut log = grp.logging.lock();
690            log.record_abort(state.start_ts)
691                .inspect_err(|e| {
692                    log::error!("can't record abort, {:?}", e);
693                })
694                .expect("can't fail");
695            if modified {
696                log.sync(false)
697                    .inspect_err(|e| {
698                        log::error!("can't sync abort chain before enqueue, {:?}", e);
699                    })
700                    .expect("can't fail");
701            }
702            drop(log);
703
704            if modified {
705                self.ctx.add_aborted(state.start_ts);
706                self.ctx.enqueue_abort_clean(
707                    state.start_ts,
708                    self.bucket_id,
709                    state.group_id as u8,
710                    state.prev_lsn,
711                    state.begin_lsn.file_id,
712                );
713            } else {
714                // no tx-outcome mutation on clean readonly/empty-write abort path
715            }
716            self.observe_counter(CounterMetric::TxnAbort, 1);
717            grp.active_txns.remove(&state.start_ts);
718            self.is_end.set(true);
719        }
720        self.ctx.group(group_id).leave_inflight();
721        self.tree.bucket.state.dec_txn_ref();
722    }
723}
724
725/// A read-only transaction (consistent view).
726pub struct TxnView<'a> {
727    ctx: &'a Context,
728    cc: Handle<CCNode>,
729    group_id: u8,
730    tree: &'a Tree,
731}
732
733impl<'a> TxnView<'a> {
734    pub(crate) fn new(ctx: &'a Context, tree: &'a Tree) -> Result<Self, OpCode> {
735        let cc = ctx.alloc_cc();
736        Ok(Self {
737            ctx,
738            cc,
739            group_id: u8::MAX,
740            tree,
741        })
742    }
743
744    /// Gets the value associated with a key in this view.
745    #[inline]
746    pub fn get<K: AsRef<[u8]>>(&self, k: K) -> Result<ValRef, OpCode> {
747        get_impl(
748            self.ctx,
749            &self.cc,
750            self.tree,
751            self.group_id,
752            self.cc.start_ts,
753            k,
754        )
755    }
756
757    /// Seeks an iterator to a key prefix in this view.
758    /// prefix can't be empty and the [`Iter::Item`] is only valid in current iteration.
759    ///
760    /// **NOTE:** [`Iter`] will save a clone of the resource, so do not save [`Iter`] to avoid
761    /// resource shortage.
762    #[inline]
763    pub fn seek<K>(&self, prefix: K) -> Iter<'_>
764    where
765        K: AsRef<[u8]>,
766    {
767        seek_impl(&self.cc, self.tree, self.group_id, self.cc.start_ts, prefix)
768    }
769
770    #[inline]
771    pub fn range<K, R>(&self, range: R) -> Iter<'_>
772    where
773        K: AsRef<[u8]>,
774        R: RangeBounds<K>,
775    {
776        range_impl(&self.cc, self.tree, self.group_id, self.cc.start_ts, range)
777    }
778}
779
780impl Drop for TxnView<'_> {
781    fn drop(&mut self) {
782        self.ctx.free_cc(self.cc);
783    }
784}
785
786#[cfg(test)]
787mod test {
788    use super::prefix_upper_exclusive;
789    use crate::{BucketOptions, Mace, OpCode, Options, RandomPath};
790
791    #[test]
792    fn txnkv() {
793        txnkv_impl().unwrap();
794    }
795
796    #[test]
797    fn prefix_upper_exclusive_handles_carry() {
798        assert_eq!(
799            prefix_upper_exclusive(&[0x61, 0x62, 0x63]),
800            Some(vec![0x61, 0x62, 0x64])
801        );
802        assert_eq!(
803            prefix_upper_exclusive(&[0x61, 0xff, 0xff]),
804            Some(vec![0x62])
805        );
806        assert_eq!(prefix_upper_exclusive(&[0xff]), None);
807        assert_eq!(prefix_upper_exclusive(&[0xff, 0xff]), None);
808    }
809
810    fn txnkv_impl() -> Result<(), OpCode> {
811        let path = RandomPath::tmp();
812        let _ = std::fs::remove_dir_all(&*path);
813        let opt = Options::new(&*path).validate().unwrap();
814        let mace = Mace::new(opt)?;
815        let (k1, k2) = ("beast".as_bytes(), "senpai".as_bytes());
816        let (v1, v2) = ("114514".as_bytes(), "1919810".as_bytes());
817        let db = mace.new_bucket("default", BucketOptions::default())?;
818
819        let kv = db.begin()?;
820        kv.put(k1, v1).expect("can't put");
821        kv.put(k2, v2).expect("can't put");
822
823        kv.del(k1).expect("can't del");
824        kv.commit()?;
825
826        let kv = db.begin()?;
827        let r = kv.get(k1);
828        assert!(r.is_err());
829
830        let r = kv.get(k2).expect("can't get");
831        assert_eq!(r.slice(), v2);
832
833        kv.del(k2).expect("can't del");
834        drop(kv);
835
836        let kv = db.begin()?;
837        let r = kv.get(k1);
838        assert!(r.is_err());
839        kv.del(k2).expect("can't del");
840        let r = kv.del(k2);
841        assert!(r.is_err());
842
843        kv.commit()?;
844
845        let kv = db.begin()?;
846        let r = kv.get(k1);
847        assert!(r.is_err());
848        let r = kv.get(k2);
849        assert!(r.is_err());
850
851        kv.commit()?;
852
853        {
854            let kv = db.begin()?;
855            kv.put("1", "10")?;
856            kv.commit()?;
857
858            let kv = db.begin()?;
859            kv.update("1", "11").expect("can't replace");
860            drop(kv);
861
862            let view = db.view()?;
863            let x = view.get("1").expect("can't get");
864            assert_eq!(x.slice(), "10".as_bytes());
865        }
866
867        {
868            let kv = db.begin()?;
869            kv.put("2", "20")?;
870            kv.update("2", "21")?;
871            let r = kv.get("2").unwrap();
872            assert_eq!(r.slice(), "21".as_bytes());
873            kv.del("2")?;
874            drop(kv);
875
876            let view = db.view()?;
877            let x = view.get("2");
878            assert!(x.is_err());
879        }
880
881        {
882            let kv = db.begin()?;
883            kv.put("11", "10")?;
884            kv.commit()?;
885
886            let kv = db.begin()?;
887            kv.upsert("11", "11").expect("can't replace");
888            drop(kv);
889
890            let view = db.view()?;
891            let x = view.get("11").expect("can't get");
892            assert_eq!(x.slice(), "10".as_bytes());
893        }
894
895        {
896            let kv = db.begin()?;
897            kv.put("22", "20")?;
898            kv.upsert("22", "21")?;
899            let r = kv.get("22").unwrap();
900            assert_eq!(r.slice(), "21".as_bytes());
901            kv.del("22")?;
902            drop(kv);
903
904            let view = db.view()?;
905            let x = view.get("22");
906            assert!(x.is_err());
907        }
908
909        {
910            let kv = db.begin()?;
911            kv.put("elder", "+1s")?;
912            kv.del("elder")?;
913            kv.commit()?;
914            let kv = db.begin()?;
915            let r = kv.update("elder", "mo");
916            // a remove key can't be update again
917            assert!(r.is_err());
918            // but can be upsert
919            kv.upsert("elder", "mo")?;
920            kv.commit()?;
921            let view = db.view()?;
922            assert_eq!(view.get("elder").unwrap().slice(), "mo".as_bytes());
923        }
924
925        {
926            let kv = db.begin()?;
927            kv.put("fast", "v0")?;
928            kv.commit()?;
929
930            let kv = db.begin()?;
931            kv.update("fast", "v1")?;
932            kv.commit()?;
933
934            let view = db.view()?;
935            assert_eq!(view.get("fast")?.slice(), b"v1");
936        }
937
938        {
939            let kv = db.begin()?;
940            let r = kv.update("missing", "v1");
941            assert!(matches!(r, Err(OpCode::NotFound)));
942        }
943        drop(db);
944        drop(mace);
945        Ok(())
946    }
947
948    #[test]
949    fn cross_long_txn() {
950        cross_long_txn_impl().unwrap();
951    }
952
953    fn cross_long_txn_impl() -> Result<(), OpCode> {
954        let path = RandomPath::new();
955        let mut opt = Options::new(&*path);
956        let consolidate_threshold = 256;
957        opt.tmp_store = true;
958        let mace = Mace::new(opt.validate().unwrap())?;
959        let db = mace.new_bucket(
960            "default",
961            BucketOptions {
962                split_elems: consolidate_threshold * 2,
963                consolidate_threshold,
964                ..BucketOptions::default()
965            },
966        )?;
967
968        let kv = db.begin()?;
969        kv.put("foo", "bar")?;
970        kv.commit()?;
971
972        let view = db.view()?;
973        let kv = db.begin()?;
974
975        kv.update("foo", "bar1")?;
976        kv.update("foo", "bar2")?;
977
978        // trigger consolidate
979        for i in 0..consolidate_threshold {
980            let x = format!("key_{i}");
981            kv.put(&x, &x)?;
982        }
983
984        let r = kv.get("foo")?;
985        assert_eq!(r.slice(), "bar2".as_bytes());
986        kv.commit()?;
987
988        let v = view.get("foo")?;
989        assert_eq!(v.slice(), "bar".as_bytes());
990
991        drop(view);
992        drop(db);
993        drop(mace);
994        Ok(())
995    }
996}