Skip to main content

page_store/
lib.rs

1//! [`SharedPagedData`] manages numbered [`Data`] pages, which can be shared by multiple processes.
2//!
3//! [`PageStorageInfo`] has information about the page sizes available. The default maximum page size is 4612.
4//!
5//!# Test example
6//!
7//! ```
8//!     use page_store::{SharedPagedData,SaveOp};
9//!     use atom_file::{MemFile,Data};
10//!     use std::sync::Arc;
11//!
12//!     let spd = SharedPagedData::new(MemFile::new());
13//!     println!( "Number of page sizes={}", spd.psi.sizes() );
14//!     println!( "Max page size={}", spd.psi.max_size_page() );
15//!     let w = spd.new_writer();
16//!     let pnum : u64 = w.alloc_page();
17//!     w.set_data( pnum, Arc::new( vec![1,2,3,4] ) );
18//!     w.save( SaveOp::Save );
19//!     let r = spd.new_reader();
20//!     let mut d : Data = w.get_data( pnum );
21//!     assert!( *d == vec![1,2,3,4] );
22//!     let md = Arc::make_mut(&mut d);
23//!     md[0] = 2;
24//!     w.set_data( pnum, d );
25//!     w.save( SaveOp::Save );
26//!     let d : Data = w.get_data( pnum );
27//!     assert!( *d == vec![2,2,3,4] );
28//!     let d : Data = r.get_data( pnum );
29//!     assert!( *d == vec![1,2,3,4] ); // Reader still sees "old" data.
30//! ```
31
32#![deny(missing_docs)]
33
34use atom_file::{Data, Storage};
35use heap::GHeap;
36pub use rustc_hash::{FxHashMap as HashMap, FxHashSet as HashSet};
37use std::sync::{Arc, Mutex, RwLock};
38
39mod block;
40mod blockpagestg;
41mod dividedstg;
42mod heap;
43mod util;
44
45pub use blockpagestg::BlockPageStg;
46
47#[cfg(feature = "pstd")]
48use pstd::collections::BTreeMap;
49
50#[cfg(not(feature = "pstd"))]
51use std::collections::BTreeMap;
52
53/// Save or Rollback.
54#[derive(PartialEq, Eq, PartialOrd, Clone, Copy)]
55pub enum SaveOp {
56    /// Save changes.
57    Save,
58    /// Roll changes back.
59    RollBack,
60}
61
62/// Interface for page storage.
63pub trait PageStorage: Send + Sync {
64    /// Is the underlying storage new?
65    fn is_new(&self) -> bool;
66    /// Information about page sizes.
67    fn info(&self) -> Box<dyn PageStorageInfo>;
68    /// Make a new page, result is page number.
69    fn new_page(&mut self) -> u64;
70    /// Drop page number.
71    fn drop_page(&mut self, pn: u64);
72    /// Set contents of page.
73    fn set_page(&mut self, pn: u64, data: Data);
74    /// Get contents of page.
75    fn get_page(&self, pn: u64) -> Data;
76    /// Get page size (for repacking).
77    fn size(&self, pn: u64) -> usize;
78    /// Save pages to underlying storage.
79    fn save(&mut self);
80    /// Undo changes since last save ( but set_page/renumber cannot be undone, only new_page and drop_page can be undone ).
81    fn rollback(&mut self);
82    /// Wait until save is complete.
83    fn wait_complete(&self);
84    #[cfg(feature = "verify")]
85    /// Get set of free pages and number of pages ever allocated ( for VERIFY builtin function ).
86    fn get_free(&mut self) -> (crate::HashSet<u64>, u64);
87    #[cfg(feature = "renumber")]
88    /// Renumber page.
89    fn renumber(&mut self, pn: u64) -> u64;
90    #[cfg(feature = "renumber")]
91    /// Load free pages in preparation for page renumbering. Returns number of used pages or None if there are no free pages.
92    fn load_free_pages(&mut self) -> Option<u64>;
93    #[cfg(feature = "renumber")]
94    /// Final part of page renumber operation.
95    fn set_alloc_pn(&mut self, target: u64);
96}
97
98/// Information about page sizes.
99pub trait PageStorageInfo: Send + Sync {
100    /// Number of different page sizes.
101    fn sizes(&self) -> usize;
102    /// Size index for given page size.
103    fn index(&self, size: usize) -> usize;
104    /// Page size for ix ( 1-based ix must be <= sizes() ).
105    fn size(&self, ix: usize) -> usize;
106    /// Maximum size page.
107    fn max_size_page(&self) -> usize {
108        self.size(self.sizes())
109    }
110    /// Half size page.
111    fn half_size_page(&self) -> usize {
112        self.size(self.index(self.max_size_page() / 2 - 50))
113    }
114    /// Is it worth compressing a page of given size by saving.
115    fn compress(&self, size: usize, saving: usize) -> bool {
116        self.index(size - saving) < self.index(size)
117    }
118}
119
120type HX = u32; // Typical 8M cache will have 1K x 8KB pages, so 10 bits is typical, 32 should be plenty.
121type Heap = GHeap<u64, u64, HX>;
122
123/// ```Arc<Mutex<PageInfo>>```
124type PageInfoPtr = Arc<Mutex<PageInfo>>;
125
126/// Information for a page, including historic data.
127pub struct PageInfo {
128    /// Current data for the page( None implies it is stored in underlying file ).
129    pub current: Option<Data>,
130    /// Historic data for the page. Has data for page at specified time.
131    /// A copy is made prior to an update, so get looks forward from access time.
132    pub history: BTreeMap<u64, Data>,
133    /// How many times has the page been used.
134    pub usage: u64,
135    /// Heap index.
136    pub hx: HX,
137}
138
139impl PageInfo {
140    fn new() -> PageInfoPtr {
141        Arc::new(Mutex::new(PageInfo {
142            current: None,
143            history: BTreeMap::new(),
144            usage: 0,
145            hx: HX::MAX,
146        }))
147    }
148
149    /// Increase usage.
150    fn inc_usage(&mut self, lpnum: u64, ah: &mut Heap) {
151        self.usage += 1;
152        if self.hx == HX::MAX {
153            self.hx = ah.insert(lpnum, self.usage);
154        } else {
155            ah.modify(self.hx, self.usage);
156        }
157    }
158
159    /// Get the Data for the page, checking history if not a writer.
160    /// Reads Data from file if necessary.
161    /// Result is Data and size of loaded data ( cache delta ).
162    fn get_data(&mut self, lpnum: u64, a: &AccessPagedData) -> (Data, usize) {
163        if !a.writer
164            && let Some((_k, v)) = self.history.range(a.time..).next()
165        {
166            return (v.clone(), 0);
167        }
168
169        if let Some(p) = &self.current {
170            return (p.clone(), 0);
171        }
172
173        // Get data from page storage.
174        let ps = a.spd.ps.read().unwrap();
175        let data = ps.get_page(lpnum);
176        self.current = Some(data.clone());
177        let len = data.len();
178        (data, len)
179    }
180
181    /// Set the page data, updating the history using the specified time and old data.
182    /// Result is delta of length (old size, new size)
183    fn set_data(&mut self, time: u64, old: Data, data: Data, do_history: bool) -> (usize, usize) {
184        if do_history {
185            self.history.insert(time, old);
186        }
187        let old = if let Some(x) = &self.current {
188            x.len()
189        } else {
190            0
191        };
192        let new = data.len();
193        self.current = if new == 0 { None } else { Some(data) };
194        (old, new)
195    }
196
197    /// Trim entry for time t that no longer need to be retained, returning whether entry was retained.
198    /// start is start of range for which no readers exist.
199    fn trim(&mut self, t: u64, start: u64) -> bool {
200        let first = self.history_start(t);
201        if first >= start {
202            // There is no reader that can read copy for time t, so copy can be removed.
203            self.history.remove(&t);
204            false
205        } else {
206            true
207        }
208    }
209
210    /// Returns the earliest time that would return the page for the specified time.
211    fn history_start(&self, t: u64) -> u64 {
212        if let Some((k, _)) = self.history.range(..t).next_back() {
213            *k + 1
214        } else {
215            0
216        }
217    }
218}
219
220/// Central store of data.
221#[derive(Default)]
222pub struct Stash {
223    /// Write time - number of writes.
224    time: u64,
225    /// Page number -> page info.
226    pub pages: HashMap<u64, PageInfoPtr>,
227    /// Time -> reader count. Number of readers for given time.
228    rdrs: BTreeMap<u64, usize>,
229    /// Time -> set of page numbers. Page copies held for given time.
230    vers: BTreeMap<u64, HashSet<u64>>,
231    /// Total size of current pages.
232    pub total: i64, // Use i64 to avoid problems with overflow.
233    /// trim_cache reduces total to mem_limit (or below).
234    pub mem_limit: usize,
235    /// Tracks loaded page with smallest usage.
236    min: Heap,
237    /// Total number of page accesses.
238    pub read: u64,
239    /// Total number of misses ( data was not already loaded ).
240    pub miss: u64,
241}
242
243impl Stash {
244    /// Set the value of the specified page for the current time.
245    fn set(&mut self, lpnum: u64, old: Data, data: Data) {
246        let time = self.time;
247        let u = self.vers.entry(time).or_default();
248        let do_history = u.insert(lpnum);
249        let p = self.get_pinfo(lpnum);
250        let diff = p.lock().unwrap().set_data(time, old, data, do_history);
251        self.delta(diff, false, false);
252    }
253
254    /// Get the PageInfoPtr for the specified page and note the page as used.
255    fn get_pinfo(&mut self, lpnum: u64) -> PageInfoPtr {
256        let p = self
257            .pages
258            .entry(lpnum)
259            .or_insert_with(PageInfo::new)
260            .clone();
261        p.lock().unwrap().inc_usage(lpnum, &mut self.min);
262        self.read += 1;
263        p
264    }
265
266    /// Register that there is a client reading. The result is the current time.
267    fn begin_read(&mut self) -> u64 {
268        let time = self.time;
269        let n = self.rdrs.entry(time).or_insert(0);
270        *n += 1;
271        time
272    }
273
274    /// Register that the read at the specified time has ended. Stashed pages may be freed.
275    fn end_read(&mut self, time: u64) {
276        let n = self.rdrs.get_mut(&time).unwrap();
277        *n -= 1;
278        if *n == 0 {
279            self.rdrs.remove(&time);
280            self.trim(time);
281        }
282    }
283
284    /// Register that an update operation has completed. Time is incremented.
285    /// Stashed pages may be freed. Returns number of pages updated.
286    fn end_write(&mut self) -> usize {
287        let result = if let Some(u) = self.vers.get(&self.time) {
288            u.len()
289        } else {
290            0
291        };
292        let t = self.time;
293        self.time = t + 1;
294        self.trim(t);
295        result
296    }
297
298    /// Trim historic data that is no longer required.
299    fn trim(&mut self, time: u64) {
300        let (s, r) = (self.start(time), self.retain(time));
301        if s != r {
302            let mut empty = Vec::<u64>::new();
303            for (t, pl) in self.vers.range_mut(s..r) {
304                pl.retain(|pnum| {
305                    let p = self.pages.get(pnum).unwrap();
306                    p.lock().unwrap().trim(*t, s)
307                });
308                if pl.is_empty() {
309                    empty.push(*t);
310                }
311            }
312            for t in empty {
313                self.vers.remove(&t);
314            }
315        }
316    }
317
318    /// Calculate the start of the range of times for which there are no readers.
319    fn start(&self, time: u64) -> u64 {
320        if let Some((t, _n)) = self.rdrs.range(..time).next_back() {
321            1 + *t
322        } else {
323            0
324        }
325    }
326
327    /// Calculate the end of the range of times for which there are no readers.
328    fn retain(&self, time: u64) -> u64 {
329        if let Some((t, _n)) = self.rdrs.range(time..).next() {
330            *t
331        } else {
332            self.time
333        }
334    }
335
336    /// Adjust total.
337    fn delta(&mut self, d: (usize, usize), miss: bool, trim: bool) {
338        if miss {
339            self.miss += 1;
340        }
341        self.total += d.1 as i64 - d.0 as i64;
342        if trim {
343            self.trim_cache();
344        }
345    }
346
347    /// Trim cached data to configured limit.
348    pub fn trim_cache(&mut self) {
349        while self.total > self.mem_limit as i64 && self.min.len() > 0 {
350            let lpnum = self.min.pop();
351            let mut p = self.pages.get(&lpnum).unwrap().lock().unwrap();
352            p.hx = HX::MAX;
353            if let Some(data) = &p.current {
354                self.total -= data.len() as i64;
355                p.current = None;
356            }
357        }
358    }
359
360    /// Return the number of pages currently cached.
361    pub fn cached(&self) -> usize {
362        self.min.len() as usize
363    }
364}
365
366/// Allows pages to be shared to allow concurrent readers.
367pub struct SharedPagedData {
368    /// Permanent storage of pages.
369    pub ps: RwLock<Box<dyn PageStorage>>,
370    /// Stash of pages.
371    pub stash: Mutex<Stash>,
372    /// Info on page sizes.
373    pub psi: Box<dyn PageStorageInfo>,
374}
375
376impl SharedPagedData {
377    /*
378    #[cfg(feature = "compact")]
379    /// Construct default SharedPageData ( default depends on compact feature ).
380    pub fn new(stg: Box<dyn Storage>) -> Arc<Self> {
381        const EP_SIZE: usize = 1024; // Size of an extension page.
382        const EP_MAX: usize = 16; // Maximum number of extension pages.
383        const SP_SIZE: usize = (EP_MAX + 1) * 8; // =136. Starter page size.
384
385        Self::new_from_ps(Box::new(crate::compact::CompactFile::new(
386            stg, SP_SIZE, EP_SIZE,
387        )))
388    }
389
390    #[cfg(not(feature = "compact"))]
391    */
392
393    /// Construct default SharedPageData ( default depends on compact feature ).
394    pub fn new(stg: Box<dyn Storage>) -> Arc<Self> {
395        let limits = crate::Limits::default();
396        Self::new_from_ps(crate::blockpagestg::BlockPageStg::new(stg, &limits))
397    }
398
399    /// Construct SharedPageData based on specified PageStorage ( e.g. BlockPageStg )
400    pub fn new_from_ps(ps: Box<dyn PageStorage>) -> Arc<Self> {
401        // Set a default stash memory limit of 10 MB.
402        let stash = Stash {
403            mem_limit: 10 * 1024 * 1024,
404            ..Default::default()
405        };
406        let psi = ps.info();
407        Arc::new(Self {
408            stash: Mutex::new(stash),
409            ps: RwLock::new(ps),
410            psi,
411     
412        })
413    }
414
415    /// Get read access to a virtual read-only copy of the pages.
416    pub fn new_reader(self: &Arc<Self>) -> AccessPagedData
417    {
418        let time = self.stash.lock().unwrap().begin_read();
419        AccessPagedData {
420            writer: false,
421            time,
422            spd: self.clone(),
423        }
424    }
425
426    /// Get write access to the pages.
427    pub fn new_writer(self: &Arc<Self>) -> AccessPagedData{
428        AccessPagedData {
429            writer: true,
430            time: 0,
431            spd: self.clone(),
432        }
433    }
434
435    /// Wait until current commits have been written.
436    pub fn wait_complete(&self) {
437        self.ps.read().unwrap().wait_complete();
438    }
439}
440
441/// Access to shared paged data.
442pub struct AccessPagedData {
443    writer: bool,
444    time: u64,
445    /// Shared Page Data.
446    pub spd: Arc<SharedPagedData>,
447}
448
449impl AccessPagedData {
450    /// Construct access to a virtual read-only copy of the pages.
451    #[deprecated(note="use SharedPagedData::new_reader instead")]
452    pub fn new_reader(spd: Arc<SharedPagedData>) -> Self {
453        let time = spd.stash.lock().unwrap().begin_read();
454        AccessPagedData {
455            writer: false,
456            time,
457            spd,
458        }
459    }
460
461    /// Construct access to the pages.
462    #[deprecated(note="use SharedPagedData::new_writer instead")] 
463    pub fn new_writer(spd: Arc<SharedPagedData>) -> Self {
464        AccessPagedData {
465            writer: true,
466            time: 0,
467            spd,
468        }
469    }
470
471    /// Get locked guard of stash.
472    pub fn stash(&self) -> std::sync::MutexGuard<'_, Stash> {
473        self.spd.stash.lock().unwrap()
474    }
475
476    /// Get the Data for the specified page.
477    pub fn get_data(&self, lpnum: u64) -> Data {
478        // Get page info.
479        let pinfo = self.stash().get_pinfo(lpnum);
480
481        // Read the page data.
482        let (data, loaded) = pinfo.lock().unwrap().get_data(lpnum, self);
483
484        if loaded > 0 {
485            self.stash().delta((0, loaded), true, true);
486        }
487        data
488    }
489
490    /// Set the data of the specified page.
491    pub fn set_data(&self, lpnum: u64, data: Data) {
492        debug_assert!(self.writer);
493
494        // Get copy of current data.
495        let pinfo = self.stash().get_pinfo(lpnum);
496
497        // Read the page data.
498        let (old, loaded) = pinfo.lock().unwrap().get_data(lpnum, self);
499
500        // Update the stash ( ensures any readers will not attempt to read the file ).
501        {
502            let s = &mut *self.stash();
503            if loaded > 0 {
504                s.delta((0, loaded), true, false);
505            }
506            s.set(lpnum, old, data.clone());
507            s.trim_cache();
508        }
509
510        // Write data to underlying file.
511        if !data.is_empty() {
512            self.spd.ps.write().unwrap().set_page(lpnum, data);
513        } else {
514            self.spd.ps.write().unwrap().drop_page(lpnum);
515        }
516    }
517
518    /// Allocate a page.
519    pub fn alloc_page(&self) -> u64 {
520        debug_assert!(self.writer);
521        self.spd.ps.write().unwrap().new_page()
522    }
523
524    /// Free a page.
525    pub fn free_page(&self, lpnum: u64) {
526        self.set_data(lpnum, Data::default());
527    }
528
529    /// Is the underlying file new (so needs to be initialised ).
530    pub fn is_new(&self) -> bool {
531        self.writer && self.spd.ps.read().unwrap().is_new()
532    }
533
534    /// Check whether compressing a page is worthwhile.
535    pub fn compress(&self, size: usize, saving: usize) -> bool {
536        debug_assert!(self.writer);
537        self.spd.psi.compress(size, saving)
538    }
539
540    /// Commit changes to underlying file ( or rollback page allocations ).
541    pub fn save(&self, op: SaveOp) -> usize {
542        debug_assert!(self.writer);
543        match op {
544            SaveOp::Save => {
545                self.spd.ps.write().unwrap().save();
546                self.stash().end_write()
547            }
548            SaveOp::RollBack => {
549                // Note: rollback happens before any pages are updated.
550                // However page allocations need to be rolled back.
551                self.spd.ps.write().unwrap().rollback();
552                0
553            }
554        }
555    }
556
557    /// Renumber a page.
558    #[cfg(feature = "renumber")]
559    pub fn renumber_page(&self, lpnum: u64) -> u64 {
560        assert!(self.writer);
561        let data = self.get_data(lpnum);
562        self.stash().set(lpnum, data.clone(), Data::default());
563        let lpnum2 = self.spd.ps.write().unwrap().renumber(lpnum);
564        debug_assert!(
565            self.stash()
566                .get_pinfo(lpnum2)
567                .lock()
568                .unwrap()
569                .current
570                .is_none()
571        );
572        let old2 = self.get_data(lpnum2);
573        self.stash().set(lpnum2, old2, data);
574        lpnum2
575    }
576}
577
578impl Drop for AccessPagedData {
579    fn drop(&mut self) {
580        if !self.writer {
581            self.stash().end_read(self.time);
582        }
583    }
584}
585
586/// Memory limits.
587/// 
588/// Default block capacity is 27720  = 5 x 7 x 9 x 11, which is divisible by 6..=12.
589///
590/// Giving page sizes of 2302, 2512, 2764, 3072, 3437, 3952, 4612 net of 8 byte page header.
591#[non_exhaustive]
592pub struct Limits {
593    /// Atomic file limits
594    pub af_lim: atom_file::Limits,
595    /// Block capacity - default is 27720 = 5 x 7 x 9 x 11.
596    pub blk_cap: u64,
597    /// Page sizes - default is 7.
598    pub page_sizes: usize,
599    /// Largest division of page - default is 12.
600    pub max_div: usize,
601}
602
603impl Default for Limits {
604    fn default() -> Self {
605        Self {
606            af_lim: atom_file::Limits::default(),
607            blk_cap: 27720,
608            page_sizes: 7,
609            max_div: 12,
610        }
611    }
612}