1use crate::{
2 heap::GHeap, nd, Arc, BTreeMap, Data, HashMap, HashSet, Mutex, PageStorage, PageStorageInfo,
3 RwLock, SaveOp, Storage,
4};
5
6type HX = u32; type Heap = GHeap<u64, u64, HX>;
8
9pub type PageInfoPtr = Arc<Mutex<PageInfo>>;
11
12pub struct PageInfo {
14 pub current: Option<Data>,
16 pub history: BTreeMap<u64, Data>,
19 pub usage: u64,
21 pub hx: HX,
23}
24
25impl PageInfo {
26 fn new() -> PageInfoPtr {
27 Arc::new(Mutex::new(PageInfo {
28 current: None,
29 history: BTreeMap::new(),
30 usage: 0,
31 hx: HX::MAX,
32 }))
33 }
34
35 fn inc_usage(&mut self, lpnum: u64, ah: &mut Heap) {
37 self.usage += 1;
38 if self.hx == HX::MAX {
39 self.hx = ah.insert(lpnum, self.usage);
40 } else {
41 ah.modify(self.hx, self.usage);
42 }
43 }
44
45 fn get_data(&mut self, lpnum: u64, a: &AccessPagedData) -> (Data, usize) {
49 if !a.writer {
50 if let Some((_k, v)) = self.history.range(a.time..).next() {
51 return (v.clone(), 0);
52 }
53 }
54
55 if let Some(p) = &self.current {
56 return (p.clone(), 0);
57 }
58
59 let ps = a.spd.ps.read().unwrap();
61 let data = ps.get_page(lpnum);
62 self.current = Some(data.clone());
63 let len = data.len();
64 (data, len)
65 }
66
67 fn set_data(&mut self, time: u64, old: Data, data: Data, do_history: bool) -> (usize, usize) {
70 if do_history {
71 self.history.insert(time, old);
72 }
73 let old = if let Some(x) = &self.current {
74 x.len()
75 } else {
76 0
77 };
78 let new = data.len();
79 self.current = if new == 0 { None } else { Some(data) };
80 (old, new)
81 }
82
83 fn trim(&mut self, t: u64, start: u64) -> bool {
86 let first = self.history_start(t);
87 if first >= start {
88 self.history.remove(&t);
90 false
91 } else {
92 true
93 }
94 }
95
96 fn history_start(&self, t: u64) -> u64 {
98 if let Some((k, _)) = self.history.range(..t).next_back() {
99 *k + 1
100 } else {
101 0
102 }
103 }
104}
105
106#[derive(Default)]
108pub struct Stash {
109 pub time: u64,
111 pub pages: HashMap<u64, PageInfoPtr>,
113 pub rdrs: BTreeMap<u64, usize>,
115 pub vers: BTreeMap<u64, HashSet<u64>>,
117 pub total: i64, pub mem_limit: usize,
121 pub min: Heap,
123 pub read: u64,
125 pub miss: u64,
127}
128
129impl Stash {
130 fn set(&mut self, lpnum: u64, old: Data, data: Data) {
132 let time = self.time;
133 let u = self.vers.entry(time).or_default();
134 let do_history = u.insert(lpnum);
135 let p = self.get_pinfo(lpnum);
136 let diff = p.lock().unwrap().set_data(time, old, data, do_history);
137 self.delta(diff, false, false);
138 }
139
140 fn get_pinfo(&mut self, lpnum: u64) -> PageInfoPtr {
142 let p = self
143 .pages
144 .entry(lpnum)
145 .or_insert_with(PageInfo::new)
146 .clone();
147 p.lock().unwrap().inc_usage(lpnum, &mut self.min);
148 self.read += 1;
149 p
150 }
151
152 fn begin_read(&mut self) -> u64 {
154 let time = self.time;
155 let n = self.rdrs.entry(time).or_insert(0);
156 *n += 1;
157 time
158 }
159
160 fn end_read(&mut self, time: u64) {
162 let n = self.rdrs.get_mut(&time).unwrap();
163 *n -= 1;
164 if *n == 0 {
165 self.rdrs.remove(&time);
166 self.trim(time);
167 }
168 }
169
170 fn end_write(&mut self) -> usize {
173 let result = if let Some(u) = self.vers.get(&self.time) {
174 u.len()
175 } else {
176 0
177 };
178 let t = self.time;
179 self.time = t + 1;
180 self.trim(t);
181 result
182 }
183
184 fn trim(&mut self, time: u64) {
186 let (s, r) = (self.start(time), self.retain(time));
187 if s != r {
188 let mut empty = Vec::<u64>::new();
189 for (t, pl) in self.vers.range_mut(s..r) {
190 pl.retain(|pnum| {
191 let p = self.pages.get(pnum).unwrap();
192 p.lock().unwrap().trim(*t, s)
193 });
194 if pl.is_empty() {
195 empty.push(*t);
196 }
197 }
198 for t in empty {
199 self.vers.remove(&t);
200 }
201 }
202 }
203
204 fn start(&self, time: u64) -> u64 {
206 if let Some((t, _n)) = self.rdrs.range(..time).next_back() {
207 1 + *t
208 } else {
209 0
210 }
211 }
212
213 fn retain(&self, time: u64) -> u64 {
215 if let Some((t, _n)) = self.rdrs.range(time..).next() {
216 *t
217 } else {
218 self.time
219 }
220 }
221
222 fn delta(&mut self, d: (usize, usize), miss: bool, trim: bool) {
224 if miss {
225 self.miss += 1;
226 }
227 self.total += d.1 as i64 - d.0 as i64;
228 if trim {
229 self.trim_cache();
230 }
231 }
232
233 fn trim_cache(&mut self) {
235 while self.total > self.mem_limit as i64 && self.min.len() > 0 {
236 let lpnum = self.min.pop();
237 let mut p = self.pages.get(&lpnum).unwrap().lock().unwrap();
238 p.hx = HX::MAX;
239 if let Some(data) = &p.current {
240 self.total -= data.len() as i64;
241 p.current = None;
242 }
243 }
244 }
245
246 pub fn cached(&self) -> usize {
248 self.min.len() as usize
249 }
250}
251
252pub struct SharedPagedData {
254 pub ps: RwLock<Box<dyn PageStorage>>,
256 pub stash: Mutex<Stash>,
258 pub psi: Box<dyn PageStorageInfo>,
260}
261
262impl SharedPagedData {
263 #[cfg(feature = "compact")]
264 pub fn new(stg: Box<dyn Storage>) -> Arc<Self> {
266 const EP_SIZE: usize = 1024; const EP_MAX: usize = 16; const SP_SIZE: usize = (EP_MAX + 1) * 8; Self::new_from_ps(Box::new(crate::compact::CompactFile::new(
271 stg, SP_SIZE, EP_SIZE,
272 )))
273 }
274
275 #[cfg(not(feature = "compact"))]
276 pub fn new(stg: Box<dyn Storage>) -> Arc<Self> {
278 let limits = crate::Limits::default();
279 Self::new_from_ps(crate::blockpagestg::BlockPageStg::new(stg, &limits))
280 }
281
282 pub fn new_from_ps(ps: Box<dyn PageStorage>) -> Arc<Self> {
284 let stash = Stash {
286 mem_limit: 10 * 1024 * 1024,
287 ..Default::default()
288 };
289 let psi = ps.info();
290 Arc::new(Self {
291 stash: Mutex::new(stash),
292 ps: RwLock::new(ps),
293 psi,
294 })
295 }
296
297 pub fn wait_complete(&self) {
299 self.ps.read().unwrap().wait_complete();
300 }
301}
302
303pub struct AccessPagedData {
305 writer: bool,
306 time: u64,
307 pub spd: Arc<SharedPagedData>,
309}
310
311impl AccessPagedData {
312 pub fn new_reader(spd: Arc<SharedPagedData>) -> Self {
314 let time = spd.stash.lock().unwrap().begin_read();
315 AccessPagedData {
316 writer: false,
317 time,
318 spd,
319 }
320 }
321
322 pub fn new_writer(spd: Arc<SharedPagedData>) -> Self {
324 #[cfg(feature = "log")]
325 {
326 let psi = &spd.psi;
327 println!(
328 "max page size={} half={}",
329 psi.max_size_page(),
330 psi.half_size_page()
331 );
332 }
333
334 AccessPagedData {
335 writer: true,
336 time: 0,
337 spd,
338 }
339 }
340
341 pub fn stash(&self) -> std::sync::MutexGuard<'_, Stash> {
343 self.spd.stash.lock().unwrap()
344 }
345
346 pub fn get_data(&self, lpnum: u64) -> Data {
348 let pinfo = self.stash().get_pinfo(lpnum);
350
351 let (data, loaded) = pinfo.lock().unwrap().get_data(lpnum, self);
353
354 if loaded > 0 {
355 self.stash().delta((0, loaded), true, true);
356 }
357 data
358 }
359
360 pub fn set_data(&self, lpnum: u64, data: Data) {
362 debug_assert!(self.writer);
363
364 let pinfo = self.stash().get_pinfo(lpnum);
366
367 let (old, loaded) = pinfo.lock().unwrap().get_data(lpnum, self);
369
370 {
372 let s = &mut *self.stash();
373 if loaded > 0 {
374 s.delta((0, loaded), true, false);
375 }
376 s.set(lpnum, old, data.clone());
377 s.trim_cache();
378 }
379
380 if data.len() > 0 {
382 self.spd.ps.write().unwrap().set_page(lpnum, data);
383 } else {
384 self.spd.ps.write().unwrap().drop_page(lpnum);
385 }
386 }
387
388 pub fn alloc_page(&self) -> u64 {
390 debug_assert!(self.writer);
391 self.spd.ps.write().unwrap().new_page()
392 }
393
394 pub fn free_page(&self, lpnum: u64) {
396 self.set_data(lpnum, nd());
397 }
398
399 pub fn is_new(&self) -> bool {
401 self.writer && self.spd.ps.read().unwrap().is_new()
402 }
403
404 pub fn compress(&self, size: usize, saving: usize) -> bool {
406 debug_assert!(self.writer);
407 self.spd.psi.compress(size, saving)
408 }
409
410 pub fn save(&self, op: SaveOp) -> usize {
412 debug_assert!(self.writer);
413 match op {
414 SaveOp::Save => {
415 self.spd.ps.write().unwrap().save();
416 self.stash().end_write()
417 }
418 SaveOp::RollBack => {
419 self.spd.ps.write().unwrap().rollback();
422 0
423 }
424 }
425 }
426
427 #[cfg(feature = "renumber")]
429 pub fn renumber_page(&self, lpnum: u64) -> u64 {
430 assert!(self.writer);
431 let data = self.get_data(lpnum);
432 self.stash().set(lpnum, data.clone(), nd());
433 let lpnum2 = self.spd.ps.write().unwrap().renumber(lpnum);
434 debug_assert!(self
435 .stash()
436 .get_pinfo(lpnum2)
437 .lock()
438 .unwrap()
439 .current
440 .is_none());
441 let old2 = self.get_data(lpnum2);
442 self.stash().set(lpnum2, old2, data);
443 lpnum2
444 }
445}
446
447impl Drop for AccessPagedData {
448 fn drop(&mut self) {
449 if !self.writer {
450 self.stash().end_read(self.time);
451 }
452 }
453}