disk_mpmc/
manager.rs

1use std::{
2    collections::VecDeque,
3    path::{Path, PathBuf},
4    sync::{
5        atomic::{AtomicUsize, Ordering},
6        Arc,
7    },
8};
9
10use mmapcell::MmapCell;
11use parking_lot::RwLock;
12
13use crate::datapage::DataPage;
14
15#[derive(Clone)]
16pub struct DataPagesManager {
17    path: PathBuf,
18    max_datapages: Arc<AtomicUsize>,
19    datapage_count: Arc<AtomicUsize>,
20    datapage_ring: Arc<RwLock<VecDeque<Arc<MmapCell<DataPage>>>>>,
21}
22
23impl DataPagesManager {
24    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self, std::io::Error> {
25        // TODO: actually get the page count
26        let total_page_count = 0;
27
28        // let mut init_pages = VecDeque::with_capacity(MAX_PAGES + 1);
29        let mut init_pages = VecDeque::new();
30        init_pages.push_back(unsafe {
31            Arc::new(MmapCell::new_named(
32                path.as_ref().join(total_page_count.to_string()),
33            )?)
34        });
35
36        Ok(DataPagesManager {
37            path: path.as_ref().into(),
38            max_datapages: Arc::new(AtomicUsize::new(usize::MAX)),
39            datapage_count: Arc::new(AtomicUsize::new(total_page_count)),
40            datapage_ring: Arc::new(RwLock::new(init_pages)),
41        })
42    }
43
44    fn load_total_page_count<P: AsRef<Path>>(path: P) {}
45
46    pub fn set_max_datapages(&mut self, val: usize) {
47        let _dp = self.datapage_ring.write();
48        self.max_datapages.store(val, Ordering::Relaxed);
49    }
50
51    pub fn get_max_datapages(&self) -> usize {
52        self.max_datapages.load(Ordering::Relaxed)
53    }
54
55    pub fn get_last_datapage(&self) -> Result<(usize, Arc<MmapCell<DataPage>>), std::io::Error> {
56        let datapages = self.datapage_ring.read();
57        let last_datapage = datapages
58            .back()
59            .ok_or(std::io::Error::other("DataPage not found"))?;
60
61        let dp_count = self.datapage_count.load(Ordering::Relaxed);
62        Ok((dp_count, last_datapage.clone()))
63    }
64
65    pub fn get_or_create_datapage(
66        &self,
67        num: usize,
68    ) -> Result<(usize, Arc<MmapCell<DataPage>>), std::io::Error> {
69        let mut datapages = self.datapage_ring.upgradable_read();
70        let dp_count = self.datapage_count.load(Ordering::Relaxed);
71        let max_dps = self.max_datapages.load(Ordering::Relaxed);
72
73        if num > dp_count {
74            return datapages.with_upgraded(|datapages| {
75                let dp_count = self.datapage_count.fetch_add(1, Ordering::Relaxed) + 1;
76                let max_dps = self.max_datapages.load(Ordering::Relaxed);
77
78                if dp_count >= max_dps {
79                    std::fs::remove_file(self.path.join(format!("{}", dp_count - max_dps)))?;
80
81                    let _ = datapages.pop_front();
82                }
83
84                datapages.push_back(Arc::new(DataPage::new(
85                    self.path.join(dp_count.to_string()),
86                )?));
87
88                Ok::<(usize, Arc<MmapCell<DataPage>>), std::io::Error>((
89                    dp_count,
90                    datapages[dp_count % max_dps].clone(),
91                ))
92            });
93        }
94
95        let dp_count = num.max(dp_count.saturating_sub(max_dps));
96
97        Ok((dp_count, datapages[dp_count % max_dps].clone()))
98    }
99}