1pub mod block_cache;
14pub mod bloom_filter;
15pub mod smart_flush;
16mod config;
17mod db;
18mod flush_epoch;
19mod heap;
20mod id_allocator;
21mod leaf;
22mod logging;
23mod metadata_store;
24mod object_cache;
25mod object_location_mapper;
26pub mod platform_utils;
27pub mod simd_optimized;
28pub mod atomic_worker;
29pub mod atomic_operations_manager;
30pub mod database_worker;
31mod tree;
32
33#[cfg(any(
34 feature = "testing-shred-allocator",
35 feature = "testing-count-allocator",
36 feature = "mimalloc"
37))]
38pub mod alloc;
39
40
41#[inline]
42fn debug_delay() {
43 #[cfg(debug_assertions)]
44 {
45 let rand =
46 std::time::SystemTime::UNIX_EPOCH.elapsed().unwrap().as_nanos();
47
48 if rand % 128 > 100 {
49 for _ in 0..rand % 16 {
50 std::thread::yield_now();
51 }
52 }
53 }
54}
55
56pub use crate::config::{Config, CacheWarmupStrategy, CompressionAlgorithm};
57pub use crate::db::Db;
58pub use crate::tree::{Batch, Iter, Tree};
59
60#[doc(hidden)]
62pub use crate::block_cache::{CacheManager, CacheConfig, AccessPattern};
63#[doc(hidden)]
64pub use crate::bloom_filter::{BloomFilter, ConcurrentBloomFilter, TieredBloomFilter, FilterTier};
65#[doc(hidden)]
66pub use crate::simd_optimized::{SimdComparator, KeyComparator};
67pub use inline_array::InlineArray;
68
69const NAME_MAPPING_COLLECTION_ID: CollectionId = CollectionId(0);
70const DEFAULT_COLLECTION_ID: CollectionId = CollectionId(1);
71const INDEX_FANOUT: usize = 64;
72const EBR_LOCAL_GC_BUFFER_SIZE: usize = 128;
73
74use std::collections::BTreeMap;
75use std::num::NonZeroU64;
76use std::ops::Bound;
77use std::sync::Arc;
78
79use parking_lot::RwLock;
80
81use crate::flush_epoch::{
82 FlushEpoch, FlushEpochGuard, FlushEpochTracker, FlushInvariants,
83};
84use crate::heap::{
85 HeapStats, ObjectRecovery, SlabAddress, Update, WriteBatchStats,
86};
87use crate::id_allocator::{Allocator, DeferredFree};
88use crate::leaf::Leaf;
89
90#[doc(hidden)]
93pub use crate::heap::{Heap, HeapRecovery};
94#[doc(hidden)]
95pub use crate::metadata_store::MetadataStore;
96#[doc(hidden)]
97pub use crate::object_cache::{CacheStats, Dirty, FlushStats, ObjectCache};
98
99pub fn open<P: AsRef<std::path::Path>>(path: P) -> std::io::Result<Db> {
103 Config::new().path(path).open()
104}
105
106pub fn cleanup_lock_files<P: AsRef<std::path::Path>>(path: P) -> std::io::Result<usize> {
129 use std::fs;
130 use std::path::Path;
131
132 let path = path.as_ref();
133 let mut cleaned_count = 0;
134
135 let lock_files = [".lock", ".meta_lock"];
137
138 for lock_file in &lock_files {
139 let lock_path = path.join(lock_file);
140
141 if lock_path.exists() {
142 match fs::remove_file(&lock_path) {
143 Ok(_) => {
144 cleaned_count += 1;
145 eprintln!("已清理锁文件: {:?}", lock_path);
146 }
147 Err(e) => {
148 eprintln!("清理锁文件失败 {:?}: {}", lock_path, e);
149 }
151 }
152 }
153 }
154
155 Ok(cleaned_count)
156}
157
158#[derive(Debug, Copy, Clone)]
159pub struct Stats {
160 pub cache: CacheStats,
161}
162
163pub type CompareAndSwapResult = std::io::Result<
170 std::result::Result<CompareAndSwapSuccess, CompareAndSwapError>,
171>;
172
173type Index<const LEAF_FANOUT: usize> = concurrent_map::ConcurrentMap<
174 InlineArray,
175 Object<LEAF_FANOUT>,
176 INDEX_FANOUT,
177 EBR_LOCAL_GC_BUFFER_SIZE,
178>;
179
180#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
182pub struct CompareAndSwapError {
183 pub current: Option<InlineArray>,
185 pub proposed: Option<InlineArray>,
187}
188
189#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
191pub struct CompareAndSwapSuccess {
192 pub new_value: Option<InlineArray>,
194 pub previous_value: Option<InlineArray>,
196}
197
198impl std::fmt::Display for CompareAndSwapError {
199 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
200 write!(f, "Compare and swap conflict")
201 }
202}
203
204impl std::error::Error for CompareAndSwapError {}
205
206#[derive(
207 Debug,
208 Clone,
209 Copy,
210 serde::Serialize,
211 serde::Deserialize,
212 PartialOrd,
213 Ord,
214 PartialEq,
215 Eq,
216 Hash,
217)]
218pub struct ObjectId(NonZeroU64);
219
220impl ObjectId {
221 fn new(from: u64) -> Option<ObjectId> {
222 NonZeroU64::new(from).map(ObjectId)
223 }
224}
225
226impl std::ops::Deref for ObjectId {
227 type Target = u64;
228
229 fn deref(&self) -> &u64 {
230 let self_ref: &NonZeroU64 = &self.0;
231
232 let self_ptr: *const NonZeroU64 = self_ref as *const _;
236 let reference: *const u64 = self_ptr as *const u64;
237
238 unsafe { &*reference }
239 }
240}
241
242impl concurrent_map::Minimum for ObjectId {
243 const MIN: ObjectId = ObjectId(NonZeroU64::MIN);
244}
245
246#[derive(
247 Debug,
248 Clone,
249 Copy,
250 serde::Serialize,
251 serde::Deserialize,
252 PartialOrd,
253 Ord,
254 PartialEq,
255 Eq,
256 Hash,
257)]
258pub struct CollectionId(u64);
259
260impl concurrent_map::Minimum for CollectionId {
261 const MIN: CollectionId = CollectionId(u64::MIN);
262}
263
264#[derive(Debug, Clone)]
265struct CacheBox<const LEAF_FANOUT: usize> {
266 leaf: Option<Box<Leaf<LEAF_FANOUT>>>,
267 #[allow(unused)]
268 logged_index: BTreeMap<InlineArray, LogValue>,
269}
270
271#[allow(unused)]
272#[derive(Debug, Clone)]
273struct LogValue {
274 location: SlabAddress,
275 value: Option<InlineArray>,
276}
277
278#[derive(Debug, Clone)]
279pub struct Object<const LEAF_FANOUT: usize> {
280 object_id: ObjectId,
281 collection_id: CollectionId,
282 low_key: InlineArray,
283 inner: Arc<RwLock<CacheBox<LEAF_FANOUT>>>,
284}
285
286impl<const LEAF_FANOUT: usize> PartialEq for Object<LEAF_FANOUT> {
287 fn eq(&self, other: &Self) -> bool {
288 self.object_id == other.object_id
289 }
290}
291
292struct ShutdownDropper<const LEAF_FANOUT: usize> {
296 shutdown_sender: parking_lot::Mutex<
297 std::sync::mpsc::Sender<std::sync::mpsc::Sender<()>>,
298 >,
299 cache: parking_lot::Mutex<object_cache::ObjectCache<LEAF_FANOUT>>,
300}
301
302impl<const LEAF_FANOUT: usize> Drop for ShutdownDropper<LEAF_FANOUT> {
303 fn drop(&mut self) {
304 let (tx, rx) = std::sync::mpsc::channel();
305 debug_log!("sending shutdown signal to flusher");
306 if self.shutdown_sender.lock().send(tx).is_ok() {
307 if let Err(e) = rx.recv() {
308 error_log!("failed to shut down flusher thread: {:?}", e);
309 } else {
310 debug_log!("flush thread successfully terminated");
311 }
312 } else {
313 debug_log!(
314 "failed to shut down flusher, manually flushing ObjectCache"
315 );
316 let cache = self.cache.lock();
317 if let Err(e) = cache.flush() {
318 error_log!(
319 "Db flusher encountered error while flushing: {:?}",
320 e
321 );
322 cache.set_error(&e);
323 }
324 }
325 }
326}
327
328fn map_bound<T, U, F: FnOnce(T) -> U>(bound: Bound<T>, f: F) -> Bound<U> {
329 match bound {
330 Bound::Unbounded => Bound::Unbounded,
331 Bound::Included(x) => Bound::Included(f(x)),
332 Bound::Excluded(x) => Bound::Excluded(f(x)),
333 }
334}
335
336const fn _assert_public_types_send_sync() {
337 use std::fmt::Debug;
338
339 const fn _assert_send<S: Send + Clone + Debug>() {}
340
341 const fn _assert_send_sync<S: Send + Sync + Clone + Debug>() {}
342
343 _assert_send::<Db>();
344
345 _assert_send_sync::<Batch>();
346 _assert_send_sync::<InlineArray>();
347 _assert_send_sync::<Config>();
348 _assert_send_sync::<CompareAndSwapSuccess>();
349 _assert_send_sync::<CompareAndSwapError>();
350}