uni_store/runtime/
l0_manager.rs1use crate::runtime::l0::L0Buffer;
5use crate::runtime::wal::WriteAheadLog;
6use parking_lot::RwLock;
7use std::sync::Arc;
8
9pub struct L0Manager {
10 current: RwLock<Arc<RwLock<L0Buffer>>>,
14 pending_flush: RwLock<Vec<Arc<RwLock<L0Buffer>>>>,
18}
19
20impl L0Manager {
21 pub fn new(start_version: u64, wal: Option<Arc<WriteAheadLog>>) -> Self {
22 let l0 = L0Buffer::new(start_version, wal);
23 Self {
24 current: RwLock::new(Arc::new(RwLock::new(l0))),
25 pending_flush: RwLock::new(Vec::new()),
26 }
27 }
28
29 pub fn from_snapshot(
34 current: Arc<RwLock<L0Buffer>>,
35 pending_flush: Vec<Arc<RwLock<L0Buffer>>>,
36 ) -> Self {
37 Self {
38 current: RwLock::new(current),
39 pending_flush: RwLock::new(pending_flush),
40 }
41 }
42
43 pub fn get_current(&self) -> Arc<RwLock<L0Buffer>> {
45 self.current.read().clone()
46 }
47
48 pub fn get_all_readable(&self) -> Vec<Arc<RwLock<L0Buffer>>> {
51 let current = self.get_current();
52 let pending = self.pending_flush.read().clone();
53 let mut all = vec![current];
54 all.extend(pending);
55 all
56 }
57
58 pub fn get_pending_flush(&self) -> Vec<Arc<RwLock<L0Buffer>>> {
60 self.pending_flush.read().clone()
61 }
62
63 pub fn rotate(
66 &self,
67 next_version: u64,
68 new_wal: Option<Arc<WriteAheadLog>>,
69 ) -> Arc<RwLock<L0Buffer>> {
70 let mut guard = self.current.write();
71 let old_l0 = guard.clone();
72
73 let new_l0 = L0Buffer::new(next_version, new_wal);
74 *guard = Arc::new(RwLock::new(new_l0));
75
76 old_l0
77 }
78
79 pub fn begin_flush(
83 &self,
84 next_version: u64,
85 new_wal: Option<Arc<WriteAheadLog>>,
86 ) -> Arc<RwLock<L0Buffer>> {
87 let old_l0 = self.rotate(next_version, new_wal);
88 self.pending_flush.write().push(old_l0.clone());
89 old_l0
90 }
91
92 pub fn complete_flush(&self, l0: &Arc<RwLock<L0Buffer>>) {
95 let mut pending = self.pending_flush.write();
96 pending.retain(|x| !Arc::ptr_eq(x, l0));
97 }
98
99 pub fn min_pending_wal_lsn(&self) -> Option<u64> {
103 let pending = self.pending_flush.read();
104 if pending.is_empty() {
105 return None;
106 }
107 pending
108 .iter()
109 .map(|l0_arc| {
110 let l0 = l0_arc.read();
111 l0.wal_lsn_at_flush
112 })
113 .min()
114 }
115}