uni_store/runtime/
l0_manager.rs1use crate::runtime::l0::L0Buffer;
5use crate::runtime::wal::WriteAheadLog;
6use parking_lot::RwLock;
7use std::sync::Arc;
8
9pub struct L0Manager {
10 current: RwLock<Arc<RwLock<L0Buffer>>>,
14 pending_flush: RwLock<Vec<Arc<RwLock<L0Buffer>>>>,
18}
19
20impl L0Manager {
21 pub fn new(start_version: u64, wal: Option<Arc<WriteAheadLog>>) -> Self {
22 let l0 = L0Buffer::new(start_version, wal);
23 Self {
24 current: RwLock::new(Arc::new(RwLock::new(l0))),
25 pending_flush: RwLock::new(Vec::new()),
26 }
27 }
28
29 pub fn get_current(&self) -> Arc<RwLock<L0Buffer>> {
31 self.current.read().clone()
32 }
33
34 pub fn get_all_readable(&self) -> Vec<Arc<RwLock<L0Buffer>>> {
37 let current = self.get_current();
38 let pending = self.pending_flush.read().clone();
39 let mut all = vec![current];
40 all.extend(pending);
41 all
42 }
43
44 pub fn get_pending_flush(&self) -> Vec<Arc<RwLock<L0Buffer>>> {
46 self.pending_flush.read().clone()
47 }
48
49 pub fn rotate(
52 &self,
53 next_version: u64,
54 new_wal: Option<Arc<WriteAheadLog>>,
55 ) -> Arc<RwLock<L0Buffer>> {
56 let mut guard = self.current.write();
57 let old_l0 = guard.clone();
58
59 let new_l0 = L0Buffer::new(next_version, new_wal);
60 *guard = Arc::new(RwLock::new(new_l0));
61
62 old_l0
63 }
64
65 pub fn begin_flush(
69 &self,
70 next_version: u64,
71 new_wal: Option<Arc<WriteAheadLog>>,
72 ) -> Arc<RwLock<L0Buffer>> {
73 let old_l0 = self.rotate(next_version, new_wal);
74 self.pending_flush.write().push(old_l0.clone());
75 old_l0
76 }
77
78 pub fn complete_flush(&self, l0: &Arc<RwLock<L0Buffer>>) {
81 let mut pending = self.pending_flush.write();
82 pending.retain(|x| !Arc::ptr_eq(x, l0));
83 }
84
85 pub fn min_pending_wal_lsn(&self) -> Option<u64> {
89 let pending = self.pending_flush.read();
90 if pending.is_empty() {
91 return None;
92 }
93 pending
94 .iter()
95 .map(|l0_arc| {
96 let l0 = l0_arc.read();
97 l0.wal_lsn_at_flush
98 })
99 .min()
100 }
101}