1use std::{collections::VecDeque, time::Duration};
2
3use crate::{
4 repr::TxId,
5 shim::{
6 parking_lot::{Mutex, RwLock},
7 sync::{mpsc, Arc},
8 },
9 wal::WalIdx,
10};
11
12#[derive(Debug, Clone, Copy, PartialEq)]
13pub enum CheckpointReason {
14 User(TxId),
15 WalSize(WalIdx),
16 Periodic,
17 OnDrop,
18 WritesSpilled,
19 TargetSize,
20 MemoryPressure,
21}
22
23impl CheckpointReason {
24 fn mutually_exclusive(&self, other: &mut CheckpointReason) -> bool {
25 match (self, other) {
26 (CheckpointReason::User(a), CheckpointReason::User(b)) => {
27 *b = *a.max(b);
28 true
29 }
30 (CheckpointReason::User(_), _) => false,
31 (CheckpointReason::WalSize(_), _) => true,
32 (CheckpointReason::Periodic, _) => unreachable!("Periodic isn't queued"),
33 (CheckpointReason::OnDrop, _) => false,
34 (CheckpointReason::WritesSpilled, CheckpointReason::WritesSpilled) => false,
35 (CheckpointReason::WritesSpilled, _) => false,
36 (CheckpointReason::TargetSize, CheckpointReason::TargetSize) => true,
37 (CheckpointReason::TargetSize, _) => false,
38 (CheckpointReason::MemoryPressure, CheckpointReason::MemoryPressure) => true,
39 (CheckpointReason::MemoryPressure, _) => false,
40 }
41 }
42}
43
44#[derive(Debug, Default)]
45struct Queue {
46 closed: bool,
47 queue: VecDeque<CheckpointReason>,
48}
49
50#[derive(Debug)]
51pub struct CheckpointQueue {
52 tx: mpsc::SyncSender<()>,
53 rx: Arc<Mutex<mpsc::Receiver<()>>>,
54 queue: Arc<RwLock<Queue>>,
55}
56
57impl CheckpointQueue {
58 pub fn new() -> CheckpointQueue {
59 let (tx, rx) = mpsc::sync_channel(1);
60 let queue: Arc<RwLock<Queue>> = Default::default();
61 CheckpointQueue {
62 queue: queue.clone(),
63 rx: Arc::new(Mutex::new(rx)),
64 tx,
65 }
66 }
67
68 pub fn request(&self, r: CheckpointReason) -> bool {
69 let mut queue = self.queue.write();
70 if queue.closed {
71 return false;
72 }
73 let mut mut_exclusive = false;
74 for reason in &mut queue.queue {
75 if r.mutually_exclusive(reason) {
76 mut_exclusive = true;
77 break;
78 }
79 }
80 if !mut_exclusive {
81 queue.queue.push_back(r);
82 }
83 drop(queue);
84
85 let _ = self.tx.try_send(());
86 true
87 }
88
89 pub fn is_empty(&self) -> bool {
90 self.queue.read().queue.is_empty()
91 }
92
93 pub fn peek(&self, timeout: Duration) -> Result<CheckpointReason, mpsc::RecvTimeoutError> {
94 loop {
95 let queue = self.queue.write();
96 if queue.closed {
97 return Err(mpsc::RecvTimeoutError::Disconnected);
98 }
99 if let Some(msg) = queue.queue.front().copied() {
100 return Ok(msg);
101 }
102 drop(queue);
103 self.rx.lock().recv_timeout(timeout)?;
104 }
105 }
106
107 pub fn pop(&self, reason: &CheckpointReason) {
108 let mut queue = self.queue.write();
109 if queue.queue.front() == Some(reason) {
110 queue.queue.pop_front();
111 }
112 }
113
114 pub fn set_closed(&self, closed: bool) {
115 self.queue.write().closed = closed;
116 if closed {
117 let _ = self.tx.try_send(());
118 }
119 }
120}