canopydb/
checkpoint.rs

1use std::{collections::VecDeque, time::Duration};
2
3use crate::{
4    repr::TxId,
5    shim::{
6        parking_lot::{Mutex, RwLock},
7        sync::{mpsc, Arc},
8    },
9    wal::WalIdx,
10};
11
12#[derive(Debug, Clone, Copy, PartialEq)]
13pub enum CheckpointReason {
14    User(TxId),
15    WalSize(WalIdx),
16    Periodic,
17    OnDrop,
18    WritesSpilled,
19    TargetSize,
20    MemoryPressure,
21}
22
23impl CheckpointReason {
24    fn mutually_exclusive(&self, other: &mut CheckpointReason) -> bool {
25        match (self, other) {
26            (CheckpointReason::User(a), CheckpointReason::User(b)) => {
27                *b = *a.max(b);
28                true
29            }
30            (CheckpointReason::User(_), _) => false,
31            (CheckpointReason::WalSize(_), _) => true,
32            (CheckpointReason::Periodic, _) => unreachable!("Periodic isn't queued"),
33            (CheckpointReason::OnDrop, _) => false,
34            (CheckpointReason::WritesSpilled, CheckpointReason::WritesSpilled) => false,
35            (CheckpointReason::WritesSpilled, _) => false,
36            (CheckpointReason::TargetSize, CheckpointReason::TargetSize) => true,
37            (CheckpointReason::TargetSize, _) => false,
38            (CheckpointReason::MemoryPressure, CheckpointReason::MemoryPressure) => true,
39            (CheckpointReason::MemoryPressure, _) => false,
40        }
41    }
42}
43
44#[derive(Debug, Default)]
45struct Queue {
46    closed: bool,
47    queue: VecDeque<CheckpointReason>,
48}
49
50#[derive(Debug)]
51pub struct CheckpointQueue {
52    tx: mpsc::SyncSender<()>,
53    rx: Arc<Mutex<mpsc::Receiver<()>>>,
54    queue: Arc<RwLock<Queue>>,
55}
56
57impl CheckpointQueue {
58    pub fn new() -> CheckpointQueue {
59        let (tx, rx) = mpsc::sync_channel(1);
60        let queue: Arc<RwLock<Queue>> = Default::default();
61        CheckpointQueue {
62            queue: queue.clone(),
63            rx: Arc::new(Mutex::new(rx)),
64            tx,
65        }
66    }
67
68    pub fn request(&self, r: CheckpointReason) -> bool {
69        let mut queue = self.queue.write();
70        if queue.closed {
71            return false;
72        }
73        let mut mut_exclusive = false;
74        for reason in &mut queue.queue {
75            if r.mutually_exclusive(reason) {
76                mut_exclusive = true;
77                break;
78            }
79        }
80        if !mut_exclusive {
81            queue.queue.push_back(r);
82        }
83        drop(queue);
84
85        let _ = self.tx.try_send(());
86        true
87    }
88
89    pub fn is_empty(&self) -> bool {
90        self.queue.read().queue.is_empty()
91    }
92
93    pub fn peek(&self, timeout: Duration) -> Result<CheckpointReason, mpsc::RecvTimeoutError> {
94        loop {
95            let queue = self.queue.write();
96            if queue.closed {
97                return Err(mpsc::RecvTimeoutError::Disconnected);
98            }
99            if let Some(msg) = queue.queue.front().copied() {
100                return Ok(msg);
101            }
102            drop(queue);
103            self.rx.lock().recv_timeout(timeout)?;
104        }
105    }
106
107    pub fn pop(&self, reason: &CheckpointReason) {
108        let mut queue = self.queue.write();
109        if queue.queue.front() == Some(reason) {
110            queue.queue.pop_front();
111        }
112    }
113
114    pub fn set_closed(&self, closed: bool) {
115        self.queue.write().closed = closed;
116        if closed {
117            let _ = self.tx.try_send(());
118        }
119    }
120}