Skip to main content

uni_store/runtime/
l0_manager.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::runtime::l0::L0Buffer;
5use crate::runtime::wal::WriteAheadLog;
6use parking_lot::RwLock;
7use std::sync::Arc;
8
9pub struct L0Manager {
10    // The current active L0 buffer.
11    // Outer RwLock protects the Arc (swapping L0s).
12    // Inner RwLock protects the L0Buffer content (concurrent reads/writes).
13    current: RwLock<Arc<RwLock<L0Buffer>>>,
14    // L0 buffers currently being flushed to L1.
15    // These remain visible to reads until flush completes successfully.
16    // This prevents data loss if L1 writes fail after rotation.
17    pending_flush: RwLock<Vec<Arc<RwLock<L0Buffer>>>>,
18}
19
20impl L0Manager {
21    pub fn new(start_version: u64, wal: Option<Arc<WriteAheadLog>>) -> Self {
22        let l0 = L0Buffer::new(start_version, wal);
23        Self {
24            current: RwLock::new(Arc::new(RwLock::new(l0))),
25            pending_flush: RwLock::new(Vec::new()),
26        }
27    }
28
29    /// Create a read-only snapshot L0Manager from existing buffers.
30    ///
31    /// Used by the algorithm execution path to provide L0 visibility
32    /// without owning the actual L0 lifecycle (rotation, flush, WAL).
33    pub fn from_snapshot(
34        current: Arc<RwLock<L0Buffer>>,
35        pending_flush: Vec<Arc<RwLock<L0Buffer>>>,
36    ) -> Self {
37        Self {
38            current: RwLock::new(current),
39            pending_flush: RwLock::new(pending_flush),
40        }
41    }
42
43    /// Get the current L0 buffer.
44    pub fn get_current(&self) -> Arc<RwLock<L0Buffer>> {
45        self.current.read().clone()
46    }
47
48    /// Get all L0 buffers that should be visible to reads.
49    /// This includes the current L0 plus any L0s being flushed.
50    pub fn get_all_readable(&self) -> Vec<Arc<RwLock<L0Buffer>>> {
51        let current = self.get_current();
52        let pending = self.pending_flush.read().clone();
53        let mut all = vec![current];
54        all.extend(pending);
55        all
56    }
57
58    /// Get L0 buffers currently being flushed (for QueryContext).
59    pub fn get_pending_flush(&self) -> Vec<Arc<RwLock<L0Buffer>>> {
60        self.pending_flush.read().clone()
61    }
62
63    /// Rotate L0. Returns the OLD L0 buffer.
64    /// The new L0 is initialized with `next_version` and `new_wal`.
65    pub fn rotate(
66        &self,
67        next_version: u64,
68        new_wal: Option<Arc<WriteAheadLog>>,
69    ) -> Arc<RwLock<L0Buffer>> {
70        let mut guard = self.current.write();
71        let old_l0 = guard.clone();
72
73        let new_l0 = L0Buffer::new(next_version, new_wal);
74        *guard = Arc::new(RwLock::new(new_l0));
75
76        old_l0
77    }
78
79    /// Begin flush: rotate L0 and add old L0 to pending flush list.
80    /// The old L0 remains visible to reads until `complete_flush` is called.
81    /// Returns the old L0 buffer to be flushed.
82    pub fn begin_flush(
83        &self,
84        next_version: u64,
85        new_wal: Option<Arc<WriteAheadLog>>,
86    ) -> Arc<RwLock<L0Buffer>> {
87        let old_l0 = self.rotate(next_version, new_wal);
88        self.pending_flush.write().push(old_l0.clone());
89        old_l0
90    }
91
92    /// Complete flush: remove the flushed L0 from pending list.
93    /// Call this only after L1 writes have succeeded.
94    pub fn complete_flush(&self, l0: &Arc<RwLock<L0Buffer>>) {
95        let mut pending = self.pending_flush.write();
96        pending.retain(|x| !Arc::ptr_eq(x, l0));
97    }
98
99    /// Get the minimum WAL LSN across all pending flush L0s.
100    /// WAL truncation should not go past this LSN to preserve data for pending flushes.
101    /// Returns None if no pending flushes exist.
102    pub fn min_pending_wal_lsn(&self) -> Option<u64> {
103        let pending = self.pending_flush.read();
104        if pending.is_empty() {
105            return None;
106        }
107        pending
108            .iter()
109            .map(|l0_arc| {
110                let l0 = l0_arc.read();
111                l0.wal_lsn_at_flush
112            })
113            .min()
114    }
115}