1use std::{
2 collections::VecDeque,
3 path::{Path, PathBuf},
4 sync::{
5 atomic::{AtomicUsize, Ordering},
6 Arc,
7 },
8};
9
10use mmapcell::MmapCell;
11use parking_lot::RwLock;
12
13use crate::datapage::DataPage;
14
15#[derive(Clone)]
16pub struct DataPagesManager {
17 path: PathBuf,
18 max_datapages: Arc<AtomicUsize>,
19 datapage_count: Arc<AtomicUsize>,
20 datapage_ring: Arc<RwLock<VecDeque<Arc<MmapCell<DataPage>>>>>,
21}
22
23impl DataPagesManager {
24 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self, std::io::Error> {
25 let total_page_count = 0;
27
28 let mut init_pages = VecDeque::new();
30 init_pages.push_back(unsafe {
31 Arc::new(MmapCell::new_named(
32 path.as_ref().join(total_page_count.to_string()),
33 )?)
34 });
35
36 Ok(DataPagesManager {
37 path: path.as_ref().into(),
38 max_datapages: Arc::new(AtomicUsize::new(usize::MAX)),
39 datapage_count: Arc::new(AtomicUsize::new(total_page_count)),
40 datapage_ring: Arc::new(RwLock::new(init_pages)),
41 })
42 }
43
44 fn load_total_page_count<P: AsRef<Path>>(path: P) {}
45
46 pub fn set_max_datapages(&mut self, val: usize) {
47 let _dp = self.datapage_ring.write();
48 self.max_datapages.store(val, Ordering::Relaxed);
49 }
50
51 pub fn get_max_datapages(&self) -> usize {
52 self.max_datapages.load(Ordering::Relaxed)
53 }
54
55 pub fn get_last_datapage(&self) -> Result<(usize, Arc<MmapCell<DataPage>>), std::io::Error> {
56 let datapages = self.datapage_ring.read();
57 let last_datapage = datapages
58 .back()
59 .ok_or(std::io::Error::other("DataPage not found"))?;
60
61 let dp_count = self.datapage_count.load(Ordering::Relaxed);
62 Ok((dp_count, last_datapage.clone()))
63 }
64
65 pub fn get_or_create_datapage(
66 &self,
67 num: usize,
68 ) -> Result<(usize, Arc<MmapCell<DataPage>>), std::io::Error> {
69 let mut datapages = self.datapage_ring.upgradable_read();
70 let dp_count = self.datapage_count.load(Ordering::Relaxed);
71 let max_dps = self.max_datapages.load(Ordering::Relaxed);
72
73 if num > dp_count {
74 return datapages.with_upgraded(|datapages| {
75 let dp_count = self.datapage_count.fetch_add(1, Ordering::Relaxed) + 1;
76 let max_dps = self.max_datapages.load(Ordering::Relaxed);
77
78 if dp_count >= max_dps {
79 std::fs::remove_file(self.path.join(format!("{}", dp_count - max_dps)))?;
80
81 let _ = datapages.pop_front();
82 }
83
84 datapages.push_back(Arc::new(DataPage::new(
85 self.path.join(dp_count.to_string()),
86 )?));
87
88 Ok::<(usize, Arc<MmapCell<DataPage>>), std::io::Error>((
89 dp_count,
90 datapages[dp_count % max_dps].clone(),
91 ))
92 });
93 }
94
95 let dp_count = num.max(dp_count.saturating_sub(max_dps));
96
97 Ok((dp_count, datapages[dp_count % max_dps].clone()))
98 }
99}