Skip to main content

uni_store/runtime/
l0_manager.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::runtime::l0::L0Buffer;
5use crate::runtime::wal::WriteAheadLog;
6use parking_lot::RwLock;
7use std::sync::Arc;
8
9pub struct L0Manager {
10    // The current active L0 buffer.
11    // Outer RwLock protects the Arc (swapping L0s).
12    // Inner RwLock protects the L0Buffer content (concurrent reads/writes).
13    current: RwLock<Arc<RwLock<L0Buffer>>>,
14    // L0 buffers currently being flushed to L1.
15    // These remain visible to reads until flush completes successfully.
16    // This prevents data loss if L1 writes fail after rotation.
17    pending_flush: RwLock<Vec<Arc<RwLock<L0Buffer>>>>,
18}
19
20impl L0Manager {
21    pub fn new(start_version: u64, wal: Option<Arc<WriteAheadLog>>) -> Self {
22        let l0 = L0Buffer::new(start_version, wal);
23        Self {
24            current: RwLock::new(Arc::new(RwLock::new(l0))),
25            pending_flush: RwLock::new(Vec::new()),
26        }
27    }
28
29    /// Get the current L0 buffer.
30    pub fn get_current(&self) -> Arc<RwLock<L0Buffer>> {
31        self.current.read().clone()
32    }
33
34    /// Get all L0 buffers that should be visible to reads.
35    /// This includes the current L0 plus any L0s being flushed.
36    pub fn get_all_readable(&self) -> Vec<Arc<RwLock<L0Buffer>>> {
37        let current = self.get_current();
38        let pending = self.pending_flush.read().clone();
39        let mut all = vec![current];
40        all.extend(pending);
41        all
42    }
43
44    /// Get L0 buffers currently being flushed (for QueryContext).
45    pub fn get_pending_flush(&self) -> Vec<Arc<RwLock<L0Buffer>>> {
46        self.pending_flush.read().clone()
47    }
48
49    /// Rotate L0. Returns the OLD L0 buffer.
50    /// The new L0 is initialized with `next_version` and `new_wal`.
51    pub fn rotate(
52        &self,
53        next_version: u64,
54        new_wal: Option<Arc<WriteAheadLog>>,
55    ) -> Arc<RwLock<L0Buffer>> {
56        let mut guard = self.current.write();
57        let old_l0 = guard.clone();
58
59        let new_l0 = L0Buffer::new(next_version, new_wal);
60        *guard = Arc::new(RwLock::new(new_l0));
61
62        old_l0
63    }
64
65    /// Begin flush: rotate L0 and add old L0 to pending flush list.
66    /// The old L0 remains visible to reads until `complete_flush` is called.
67    /// Returns the old L0 buffer to be flushed.
68    pub fn begin_flush(
69        &self,
70        next_version: u64,
71        new_wal: Option<Arc<WriteAheadLog>>,
72    ) -> Arc<RwLock<L0Buffer>> {
73        let old_l0 = self.rotate(next_version, new_wal);
74        self.pending_flush.write().push(old_l0.clone());
75        old_l0
76    }
77
78    /// Complete flush: remove the flushed L0 from pending list.
79    /// Call this only after L1 writes have succeeded.
80    pub fn complete_flush(&self, l0: &Arc<RwLock<L0Buffer>>) {
81        let mut pending = self.pending_flush.write();
82        pending.retain(|x| !Arc::ptr_eq(x, l0));
83    }
84
85    /// Get the minimum WAL LSN across all pending flush L0s.
86    /// WAL truncation should not go past this LSN to preserve data for pending flushes.
87    /// Returns None if no pending flushes exist.
88    pub fn min_pending_wal_lsn(&self) -> Option<u64> {
89        let pending = self.pending_flush.read();
90        if pending.is_empty() {
91            return None;
92        }
93        pending
94            .iter()
95            .map(|l0_arc| {
96                let l0 = l0_arc.read();
97                l0.wal_lsn_at_flush
98            })
99            .min()
100    }
101}