melange_db/
lib.rs

1// melange_db - 高性能嵌入式数据库
2// 在sled架构基础上进行了深度性能优化,目标超越RocksDB性能
3
4//! `melange_db` 是一个高性能嵌入式数据库,
5//! 在sled架构基础上进行了深度性能优化,目标超越RocksDB性能。
6//!
7//! 主要优化包括:
8//! - 增量序列化减少IO开销
9//! - 改进的缓存策略
10//! - 优化的flush机制
11//! - 更高效的内存管理
12
13pub mod block_cache;
14pub mod bloom_filter;
15pub mod smart_flush;
16mod config;
17mod db;
18mod flush_epoch;
19mod heap;
20mod id_allocator;
21mod leaf;
22mod logging;
23mod metadata_store;
24mod object_cache;
25mod object_location_mapper;
26pub mod platform_utils;
27pub mod simd_optimized;
28pub mod atomic_worker;
29pub mod atomic_operations_manager;
30pub mod database_worker;
31mod tree;
32
33#[cfg(any(
34    feature = "testing-shred-allocator",
35    feature = "testing-count-allocator",
36    feature = "mimalloc"
37))]
38pub mod alloc;
39
40
41#[inline]
42fn debug_delay() {
43    #[cfg(debug_assertions)]
44    {
45        let rand =
46            std::time::SystemTime::UNIX_EPOCH.elapsed().unwrap().as_nanos();
47
48        if rand % 128 > 100 {
49            for _ in 0..rand % 16 {
50                std::thread::yield_now();
51            }
52        }
53    }
54}
55
56pub use crate::config::{Config, CacheWarmupStrategy, CompressionAlgorithm};
57pub use crate::db::Db;
58pub use crate::tree::{Batch, Iter, Tree};
59
60// 内部优化实现细节,不应暴露给用户
61#[doc(hidden)]
62pub use crate::block_cache::{CacheManager, CacheConfig, AccessPattern};
63#[doc(hidden)]
64pub use crate::bloom_filter::{BloomFilter, ConcurrentBloomFilter, TieredBloomFilter, FilterTier};
65#[doc(hidden)]
66pub use crate::simd_optimized::{SimdComparator, KeyComparator};
67pub use inline_array::InlineArray;
68
69const NAME_MAPPING_COLLECTION_ID: CollectionId = CollectionId(0);
70const DEFAULT_COLLECTION_ID: CollectionId = CollectionId(1);
71const INDEX_FANOUT: usize = 64;
72const EBR_LOCAL_GC_BUFFER_SIZE: usize = 128;
73
74use std::collections::BTreeMap;
75use std::num::NonZeroU64;
76use std::ops::Bound;
77use std::sync::Arc;
78
79use parking_lot::RwLock;
80
81use crate::flush_epoch::{
82    FlushEpoch, FlushEpochGuard, FlushEpochTracker, FlushInvariants,
83};
84use crate::heap::{
85    HeapStats, ObjectRecovery, SlabAddress, Update, WriteBatchStats,
86};
87use crate::id_allocator::{Allocator, DeferredFree};
88use crate::leaf::Leaf;
89
90// 这些是公开的,以便在外部二进制文件中进行崩溃测试
91// 它们被隐藏是因为没有关于其API稳定性或功能的保证
92#[doc(hidden)]
93pub use crate::heap::{Heap, HeapRecovery};
94#[doc(hidden)]
95pub use crate::metadata_store::MetadataStore;
96#[doc(hidden)]
97pub use crate::object_cache::{CacheStats, Dirty, FlushStats, ObjectCache};
98
99/// 使用默认配置在指定路径打开一个 `Db`
100/// 这将在指定路径创建一个新的存储目录(如果它不存在)
101/// 您可以使用 `Db::was_recovered` 方法来确定数据库是否从之前的实例中恢复
102pub fn open<P: AsRef<std::path::Path>>(path: P) -> std::io::Result<Db> {
103    Config::new().path(path).open()
104}
105
106/// 清理指定路径的数据库锁文件
107///
108/// 这个函数会清理指定路径下的所有锁文件,包括:
109/// - `.lock` - 主要的数据库锁文件
110/// - `.meta_lock` - 元数据存储锁文件
111///
112/// # 参数
113/// * `path` - 数据库路径
114///
115/// # 返回值
116/// 返回 `Ok(usize)` 表示成功清理的锁文件数量,
117/// 或返回 `std::io::Error` 表示清理过程中遇到的错误。
118///
119/// # 示例
120/// ```no_run
121/// use melange_db;
122///
123/// match melange_db::cleanup_lock_files("./my_database") {
124///     Ok(count) => println!("成功清理了 {} 个锁文件", count),
125///     Err(e) => eprintln!("清理锁文件失败: {}", e),
126/// }
127/// ```
128pub fn cleanup_lock_files<P: AsRef<std::path::Path>>(path: P) -> std::io::Result<usize> {
129    use std::fs;
130    use std::path::Path;
131
132    let path = path.as_ref();
133    let mut cleaned_count = 0;
134
135    // 锁文件列表
136    let lock_files = [".lock", ".meta_lock"];
137
138    for lock_file in &lock_files {
139        let lock_path = path.join(lock_file);
140
141        if lock_path.exists() {
142            match fs::remove_file(&lock_path) {
143                Ok(_) => {
144                    cleaned_count += 1;
145                    eprintln!("已清理锁文件: {:?}", lock_path);
146                }
147                Err(e) => {
148                    eprintln!("清理锁文件失败 {:?}: {}", lock_path, e);
149                    // 继续尝试清理其他锁文件
150                }
151            }
152        }
153    }
154
155    Ok(cleaned_count)
156}
157
158#[derive(Debug, Copy, Clone)]
159pub struct Stats {
160    pub cache: CacheStats,
161}
162
163/// 比较并交换结果
164///
165/// 它返回 `Ok(Ok(()))` 如果操作成功完成
166///     - `Ok(Err(CompareAndSwapError(current, proposed)))` 如果操作失败
167///       无法设置新值。`CompareAndSwapError` 包含当前和提议的值。
168///     - `Err(Error::Unsupported)` 如果数据库以只读模式打开。
169pub type CompareAndSwapResult = std::io::Result<
170    std::result::Result<CompareAndSwapSuccess, CompareAndSwapError>,
171>;
172
173type Index<const LEAF_FANOUT: usize> = concurrent_map::ConcurrentMap<
174    InlineArray,
175    Object<LEAF_FANOUT>,
176    INDEX_FANOUT,
177    EBR_LOCAL_GC_BUFFER_SIZE,
178>;
179
180/// 比较并交换错误
181#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
182pub struct CompareAndSwapError {
183    /// 导致您的CAS失败的当前值
184    pub current: Option<InlineArray>,
185    /// 返回的未成功提出的值
186    pub proposed: Option<InlineArray>,
187}
188
189/// 比较并交换成功
190#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
191pub struct CompareAndSwapSuccess {
192    /// 成功安装的当前值
193    pub new_value: Option<InlineArray>,
194    /// 之前存储的返回值
195    pub previous_value: Option<InlineArray>,
196}
197
198impl std::fmt::Display for CompareAndSwapError {
199    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
200        write!(f, "Compare and swap conflict")
201    }
202}
203
204impl std::error::Error for CompareAndSwapError {}
205
206#[derive(
207    Debug,
208    Clone,
209    Copy,
210    serde::Serialize,
211    serde::Deserialize,
212    PartialOrd,
213    Ord,
214    PartialEq,
215    Eq,
216    Hash,
217)]
218pub struct ObjectId(NonZeroU64);
219
220impl ObjectId {
221    fn new(from: u64) -> Option<ObjectId> {
222        NonZeroU64::new(from).map(ObjectId)
223    }
224}
225
226impl std::ops::Deref for ObjectId {
227    type Target = u64;
228
229    fn deref(&self) -> &u64 {
230        let self_ref: &NonZeroU64 = &self.0;
231
232        // NonZeroU64 是 repr(transparent) 包装了一个 u64
233        // 所以它保证匹配二进制布局。这使得
234        // 可以安全地将一个引用转换为另一个引用
235        let self_ptr: *const NonZeroU64 = self_ref as *const _;
236        let reference: *const u64 = self_ptr as *const u64;
237
238        unsafe { &*reference }
239    }
240}
241
242impl concurrent_map::Minimum for ObjectId {
243    const MIN: ObjectId = ObjectId(NonZeroU64::MIN);
244}
245
246#[derive(
247    Debug,
248    Clone,
249    Copy,
250    serde::Serialize,
251    serde::Deserialize,
252    PartialOrd,
253    Ord,
254    PartialEq,
255    Eq,
256    Hash,
257)]
258pub struct CollectionId(u64);
259
260impl concurrent_map::Minimum for CollectionId {
261    const MIN: CollectionId = CollectionId(u64::MIN);
262}
263
264#[derive(Debug, Clone)]
265struct CacheBox<const LEAF_FANOUT: usize> {
266    leaf: Option<Box<Leaf<LEAF_FANOUT>>>,
267    #[allow(unused)]
268    logged_index: BTreeMap<InlineArray, LogValue>,
269}
270
271#[allow(unused)]
272#[derive(Debug, Clone)]
273struct LogValue {
274    location: SlabAddress,
275    value: Option<InlineArray>,
276}
277
278#[derive(Debug, Clone)]
279pub struct Object<const LEAF_FANOUT: usize> {
280    object_id: ObjectId,
281    collection_id: CollectionId,
282    low_key: InlineArray,
283    inner: Arc<RwLock<CacheBox<LEAF_FANOUT>>>,
284}
285
286impl<const LEAF_FANOUT: usize> PartialEq for Object<LEAF_FANOUT> {
287    fn eq(&self, other: &Self) -> bool {
288        self.object_id == other.object_id
289    }
290}
291
292/// 存储在 `Db` 和 `Tree` 的 Arc 中,
293/// 所以当最后一个"高级"结构被删除时,
294/// flusher 线程被清理
295struct ShutdownDropper<const LEAF_FANOUT: usize> {
296    shutdown_sender: parking_lot::Mutex<
297        std::sync::mpsc::Sender<std::sync::mpsc::Sender<()>>,
298    >,
299    cache: parking_lot::Mutex<object_cache::ObjectCache<LEAF_FANOUT>>,
300}
301
302impl<const LEAF_FANOUT: usize> Drop for ShutdownDropper<LEAF_FANOUT> {
303    fn drop(&mut self) {
304        let (tx, rx) = std::sync::mpsc::channel();
305        debug_log!("sending shutdown signal to flusher");
306        if self.shutdown_sender.lock().send(tx).is_ok() {
307            if let Err(e) = rx.recv() {
308                error_log!("failed to shut down flusher thread: {:?}", e);
309            } else {
310                debug_log!("flush thread successfully terminated");
311            }
312        } else {
313            debug_log!(
314                "failed to shut down flusher, manually flushing ObjectCache"
315            );
316            let cache = self.cache.lock();
317            if let Err(e) = cache.flush() {
318                error_log!(
319                    "Db flusher encountered error while flushing: {:?}",
320                    e
321                );
322                cache.set_error(&e);
323            }
324        }
325    }
326}
327
328fn map_bound<T, U, F: FnOnce(T) -> U>(bound: Bound<T>, f: F) -> Bound<U> {
329    match bound {
330        Bound::Unbounded => Bound::Unbounded,
331        Bound::Included(x) => Bound::Included(f(x)),
332        Bound::Excluded(x) => Bound::Excluded(f(x)),
333    }
334}
335
336const fn _assert_public_types_send_sync() {
337    use std::fmt::Debug;
338
339    const fn _assert_send<S: Send + Clone + Debug>() {}
340
341    const fn _assert_send_sync<S: Send + Sync + Clone + Debug>() {}
342
343    _assert_send::<Db>();
344
345    _assert_send_sync::<Batch>();
346    _assert_send_sync::<InlineArray>();
347    _assert_send_sync::<Config>();
348    _assert_send_sync::<CompareAndSwapSuccess>();
349    _assert_send_sync::<CompareAndSwapError>();
350}