Skip to main content

rustdb/
pstore.rs

1use crate::{
2    heap::GHeap, nd, Arc, BTreeMap, Data, HashMap, HashSet, Mutex, PageStorage, PageStorageInfo,
3    RwLock, SaveOp, Storage,
4};
5
6type HX = u32; // Typical 8M cache will have 1K x 8KB pages, so 10 bits is typical, 32 should be plenty.
7type Heap = GHeap<u64, u64, HX>;
8
9/// ```Arc<Mutex<PageInfo>>```
10pub type PageInfoPtr = Arc<Mutex<PageInfo>>;
11
12/// Information for a logical page, including historic data.
13pub struct PageInfo {
14    /// Current data for the page( None implies it is stored in underlying file ).
15    pub current: Option<Data>,
16    /// Historic data for the page. Has data for page at specified time.
17    /// A copy is made prior to an update, so get looks forward from access time.
18    pub history: BTreeMap<u64, Data>,
19    /// How many times has the page been used.
20    pub usage: u64,
21    /// Heap index.
22    pub hx: HX,
23}
24
25impl PageInfo {
26    fn new() -> PageInfoPtr {
27        Arc::new(Mutex::new(PageInfo {
28            current: None,
29            history: BTreeMap::new(),
30            usage: 0,
31            hx: HX::MAX,
32        }))
33    }
34
35    /// Increase usage.
36    fn inc_usage(&mut self, lpnum: u64, ah: &mut Heap) {
37        self.usage += 1;
38        if self.hx == HX::MAX {
39            self.hx = ah.insert(lpnum, self.usage);
40        } else {
41            ah.modify(self.hx, self.usage);
42        }
43    }
44
45    /// Get the Data for the page, checking history if not a writer.
46    /// Reads Data from file if necessary.
47    /// Result is Data and size of loaded data ( cache delta ).
48    fn get_data(&mut self, lpnum: u64, a: &AccessPagedData) -> (Data, usize) {
49        if !a.writer {
50            if let Some((_k, v)) = self.history.range(a.time..).next() {
51                return (v.clone(), 0);
52            }
53        }
54
55        if let Some(p) = &self.current {
56            return (p.clone(), 0);
57        }
58
59        // Get data from page storage.
60        let ps = a.spd.ps.read().unwrap();
61        let data = ps.get_page(lpnum);
62        self.current = Some(data.clone());
63        let len = data.len();
64        (data, len)
65    }
66
67    /// Set the page data, updating the history using the specified time and old data.
68    /// Result is delta of length (old size, new size)
69    fn set_data(&mut self, time: u64, old: Data, data: Data, do_history: bool) -> (usize, usize) {
70        if do_history {
71            self.history.insert(time, old);
72        }
73        let old = if let Some(x) = &self.current {
74            x.len()
75        } else {
76            0
77        };
78        let new = data.len();
79        self.current = if new == 0 { None } else { Some(data) };
80        (old, new)
81    }
82
83    /// Trim entry for time t that no longer need to be retained, returning whether entry was retained.
84    /// start is start of range for which no readers exist.
85    fn trim(&mut self, t: u64, start: u64) -> bool {
86        let first = self.history_start(t);
87        if first >= start {
88            // There is no reader that can read copy for time t, so copy can be removed.
89            self.history.remove(&t);
90            false
91        } else {
92            true
93        }
94    }
95
96    /// Returns the earliest time that would return the page for the specified time.
97    fn history_start(&self, t: u64) -> u64 {
98        if let Some((k, _)) = self.history.range(..t).next_back() {
99            *k + 1
100        } else {
101            0
102        }
103    }
104}
105
106/// Central store of data.
107#[derive(Default)]
108pub struct Stash {
109    /// Write time - number of writes.
110    pub time: u64,
111    /// Page number -> page info.
112    pub pages: HashMap<u64, PageInfoPtr>,
113    /// Time -> reader count. Number of readers for given time.
114    pub rdrs: BTreeMap<u64, usize>,
115    /// Time -> set of page numbers. Page copies held for given time.
116    pub vers: BTreeMap<u64, HashSet<u64>>,
117    /// Total size of current pages.
118    pub total: i64, // Use i64 to avoid problems with overflow.
119    /// trim_cache reduces total to mem_limit (or below).
120    pub mem_limit: usize,
121    /// Tracks loaded page with smallest usage.
122    pub min: Heap,
123    /// Total number of page accesses.
124    pub read: u64,
125    /// Total number of misses ( data was not already loaded ).
126    pub miss: u64,
127}
128
129impl Stash {
130    /// Set the value of the specified page for the current time.
131    fn set(&mut self, lpnum: u64, old: Data, data: Data) {
132        let time = self.time;
133        let u = self.vers.entry(time).or_default();
134        let do_history = u.insert(lpnum);
135        let p = self.get_pinfo(lpnum);
136        let diff = p.lock().unwrap().set_data(time, old, data, do_history);
137        self.delta(diff, false, false);
138    }
139
140    /// Get the PageInfoPtr for the specified page and note the page as used.
141    fn get_pinfo(&mut self, lpnum: u64) -> PageInfoPtr {
142        let p = self
143            .pages
144            .entry(lpnum)
145            .or_insert_with(PageInfo::new)
146            .clone();
147        p.lock().unwrap().inc_usage(lpnum, &mut self.min);
148        self.read += 1;
149        p
150    }
151
152    /// Register that there is a client reading the database. The result is the current time.
153    fn begin_read(&mut self) -> u64 {
154        let time = self.time;
155        let n = self.rdrs.entry(time).or_insert(0);
156        *n += 1;
157        time
158    }
159
160    /// Register that the read at the specified time has ended. Stashed pages may be freed.
161    fn end_read(&mut self, time: u64) {
162        let n = self.rdrs.get_mut(&time).unwrap();
163        *n -= 1;
164        if *n == 0 {
165            self.rdrs.remove(&time);
166            self.trim(time);
167        }
168    }
169
170    /// Register that an update operation has completed. Time is incremented.
171    /// Stashed pages may be freed. Returns number of pages updated.
172    fn end_write(&mut self) -> usize {
173        let result = if let Some(u) = self.vers.get(&self.time) {
174            u.len()
175        } else {
176            0
177        };
178        let t = self.time;
179        self.time = t + 1;
180        self.trim(t);
181        result
182    }
183
184    /// Trim historic data that is no longer required.
185    fn trim(&mut self, time: u64) {
186        let (s, r) = (self.start(time), self.retain(time));
187        if s != r {
188            let mut empty = Vec::<u64>::new();
189            for (t, pl) in self.vers.range_mut(s..r) {
190                pl.retain(|pnum| {
191                    let p = self.pages.get(pnum).unwrap();
192                    p.lock().unwrap().trim(*t, s)
193                });
194                if pl.is_empty() {
195                    empty.push(*t);
196                }
197            }
198            for t in empty {
199                self.vers.remove(&t);
200            }
201        }
202    }
203
204    /// Calculate the start of the range of times for which there are no readers.
205    fn start(&self, time: u64) -> u64 {
206        if let Some((t, _n)) = self.rdrs.range(..time).next_back() {
207            1 + *t
208        } else {
209            0
210        }
211    }
212
213    /// Calculate the end of the range of times for which there are no readers.
214    fn retain(&self, time: u64) -> u64 {
215        if let Some((t, _n)) = self.rdrs.range(time..).next() {
216            *t
217        } else {
218            self.time
219        }
220    }
221
222    /// Adjust total.
223    fn delta(&mut self, d: (usize, usize), miss: bool, trim: bool) {
224        if miss {
225            self.miss += 1;
226        }
227        self.total += d.1 as i64 - d.0 as i64;
228        if trim {
229            self.trim_cache();
230        }
231    }
232
233    /// Trim cached data to configured limit.
234    fn trim_cache(&mut self) {
235        while self.total > self.mem_limit as i64 && self.min.len() > 0 {
236            let lpnum = self.min.pop();
237            let mut p = self.pages.get(&lpnum).unwrap().lock().unwrap();
238            p.hx = HX::MAX;
239            if let Some(data) = &p.current {
240                self.total -= data.len() as i64;
241                p.current = None;
242            }
243        }
244    }
245
246    /// Return the number of pages currently cached.
247    pub fn cached(&self) -> usize {
248        self.min.len() as usize
249    }
250}
251
252/// Allows logical database pages to be shared to allow concurrent readers.
253pub struct SharedPagedData {
254    /// Permanent storage of pages.
255    pub ps: RwLock<Box<dyn PageStorage>>,
256    /// Stash of pages.
257    pub stash: Mutex<Stash>,
258    /// Info on page sizes.
259    pub psi: Box<dyn PageStorageInfo>,
260}
261
262impl SharedPagedData {
263    #[cfg(feature = "compact")]
264    /// Construct default SharedPageData ( default depends on compact feature ).
265    pub fn new(stg: Box<dyn Storage>) -> Arc<Self> {
266        const EP_SIZE: usize = 1024; // Size of an extension page.
267        const EP_MAX: usize = 16; // Maximum number of extension pages.
268        const SP_SIZE: usize = (EP_MAX + 1) * 8; // =136. Starter page size.
269
270        Self::new_from_ps(Box::new(crate::compact::CompactFile::new(
271            stg, SP_SIZE, EP_SIZE,
272        )))
273    }
274
275    #[cfg(not(feature = "compact"))]
276    /// Construct default SharedPageData ( default depends on compact feature ).
277    pub fn new(stg: Box<dyn Storage>) -> Arc<Self> {
278        let limits = crate::Limits::default();
279        Self::new_from_ps(crate::blockpagestg::BlockPageStg::new(stg, &limits))
280    }
281
282    /// Construct SharedPageData based on specified PageStorage ( e.g. BlockPageStg )
283    pub fn new_from_ps(ps: Box<dyn PageStorage>) -> Arc<Self> {
284        // Set a default stash memory limit of 10 MB.
285        let stash = Stash {
286            mem_limit: 10 * 1024 * 1024,
287            ..Default::default()
288        };
289        let psi = ps.info();
290        Arc::new(Self {
291            stash: Mutex::new(stash),
292            ps: RwLock::new(ps),
293            psi,
294        })
295    }
296
297    /// Wait until current commits have been written.
298    pub fn wait_complete(&self) {
299        self.ps.read().unwrap().wait_complete();
300    }
301}
302
303/// Access to shared paged data.
304pub struct AccessPagedData {
305    writer: bool,
306    time: u64,
307    /// Shared Page Data.
308    pub spd: Arc<SharedPagedData>,
309}
310
311impl AccessPagedData {
312    /// Construct access to a virtual read-only copy of the database logical pages.
313    pub fn new_reader(spd: Arc<SharedPagedData>) -> Self {
314        let time = spd.stash.lock().unwrap().begin_read();
315        AccessPagedData {
316            writer: false,
317            time,
318            spd,
319        }
320    }
321
322    /// Construct access to the database logical pages.
323    pub fn new_writer(spd: Arc<SharedPagedData>) -> Self {
324        #[cfg(feature = "log")]
325        {
326            let psi = &spd.psi;
327            println!(
328                "max page size={} half={}",
329                psi.max_size_page(),
330                psi.half_size_page()
331            );
332        }
333
334        AccessPagedData {
335            writer: true,
336            time: 0,
337            spd,
338        }
339    }
340
341    /// Get locked guard of stash.
342    pub fn stash(&self) -> std::sync::MutexGuard<'_, Stash> {
343        self.spd.stash.lock().unwrap()
344    }
345
346    /// Get the Data for the specified page.
347    pub fn get_data(&self, lpnum: u64) -> Data {
348        // Get page info.
349        let pinfo = self.stash().get_pinfo(lpnum);
350
351        // Read the page data.
352        let (data, loaded) = pinfo.lock().unwrap().get_data(lpnum, self);
353
354        if loaded > 0 {
355            self.stash().delta((0, loaded), true, true);
356        }
357        data
358    }
359
360    /// Set the data of the specified page.
361    pub fn set_data(&self, lpnum: u64, data: Data) {
362        debug_assert!(self.writer);
363
364        // Get copy of current data.
365        let pinfo = self.stash().get_pinfo(lpnum);
366
367        // Read the page data.
368        let (old, loaded) = pinfo.lock().unwrap().get_data(lpnum, self);
369
370        // Update the stash ( ensures any readers will not attempt to read the file ).
371        {
372            let s = &mut *self.stash();
373            if loaded > 0 {
374                s.delta((0, loaded), true, false);
375            }
376            s.set(lpnum, old, data.clone());
377            s.trim_cache();
378        }
379
380        // Write data to underlying file.
381        if data.len() > 0 {
382            self.spd.ps.write().unwrap().set_page(lpnum, data);
383        } else {
384            self.spd.ps.write().unwrap().drop_page(lpnum);
385        }
386    }
387
388    /// Allocate a logical page.
389    pub fn alloc_page(&self) -> u64 {
390        debug_assert!(self.writer);
391        self.spd.ps.write().unwrap().new_page()
392    }
393
394    /// Free a logical page.
395    pub fn free_page(&self, lpnum: u64) {
396        self.set_data(lpnum, nd());
397    }
398
399    /// Is the underlying file new (so needs to be initialised ).
400    pub fn is_new(&self) -> bool {
401        self.writer && self.spd.ps.read().unwrap().is_new()
402    }
403
404    /// Check whether compressing a page is worthwhile.
405    pub fn compress(&self, size: usize, saving: usize) -> bool {
406        debug_assert!(self.writer);
407        self.spd.psi.compress(size, saving)
408    }
409
410    /// Commit changes to underlying file ( or rollback logical page allocations ).
411    pub fn save(&self, op: SaveOp) -> usize {
412        debug_assert!(self.writer);
413        match op {
414            SaveOp::Save => {
415                self.spd.ps.write().unwrap().save();
416                self.stash().end_write()
417            }
418            SaveOp::RollBack => {
419                // Note: rollback happens before any pages are updated.
420                // However logical page allocations need to be rolled back.
421                self.spd.ps.write().unwrap().rollback();
422                0
423            }
424        }
425    }
426
427    /// Renumber a page.
428    #[cfg(feature = "renumber")]
429    pub fn renumber_page(&self, lpnum: u64) -> u64 {
430        assert!(self.writer);
431        let data = self.get_data(lpnum);
432        self.stash().set(lpnum, data.clone(), nd());
433        let lpnum2 = self.spd.ps.write().unwrap().renumber(lpnum);
434        debug_assert!(self
435            .stash()
436            .get_pinfo(lpnum2)
437            .lock()
438            .unwrap()
439            .current
440            .is_none());
441        let old2 = self.get_data(lpnum2);
442        self.stash().set(lpnum2, old2, data);
443        lpnum2
444    }
445}
446
447impl Drop for AccessPagedData {
448    fn drop(&mut self) {
449        if !self.writer {
450            self.stash().end_read(self.time);
451        }
452    }
453}