small_db/btree/
page_cache.rs

1use std::{
2    fs::File,
3    io::{self, prelude::*, Seek, SeekFrom},
4    sync::{
5        atomic::{AtomicUsize, Ordering},
6        Arc, RwLock,
7    },
8};
9
10use super::page::{
11    BTreeHeaderPage, BTreeInternalPage, BTreeLeafPage, BTreePage,
12    BTreePageID, BTreeRootPointerPage, PageCategory,
13};
14use crate::{
15    concurrent_status::Permission,
16    error::SmallError,
17    transaction::Transaction,
18    tx_log::LogManager,
19    types::{ConcurrentHashMap, ResultPod},
20    utils::HandyRwLock,
21    BTreeTable, Unique,
22};
23
24pub const DEFAULT_PAGE_SIZE: usize = 4096;
25static PAGE_SIZE: AtomicUsize = AtomicUsize::new(DEFAULT_PAGE_SIZE);
26
27pub struct PageCache {
28    pub root_pointer_buffer: ConcurrentHashMap<
29        BTreePageID,
30        Arc<RwLock<BTreeRootPointerPage>>,
31    >,
32    pub internal_buffer: ConcurrentHashMap<
33        BTreePageID,
34        Arc<RwLock<BTreeInternalPage>>,
35    >,
36    pub leaf_buffer:
37        ConcurrentHashMap<BTreePageID, Arc<RwLock<BTreeLeafPage>>>,
38    pub header_buffer:
39        ConcurrentHashMap<BTreePageID, Arc<RwLock<BTreeHeaderPage>>>,
40}
41
42type Key = BTreePageID;
43
44impl PageCache {
45    pub fn new() -> Self {
46        Self {
47            root_pointer_buffer: ConcurrentHashMap::new(),
48            header_buffer: ConcurrentHashMap::new(),
49            internal_buffer: ConcurrentHashMap::new(),
50            leaf_buffer: ConcurrentHashMap::new(),
51        }
52    }
53
54    pub fn clear(&self) {
55        self.root_pointer_buffer.clear();
56        self.header_buffer.clear();
57        self.internal_buffer.clear();
58        self.leaf_buffer.clear();
59    }
60
61    /// Retrieve the specified page with the associated permissions.
62    /// Will acquire a lock and may block if that lock is held by
63    /// another transaction.
64    ///
65    /// The retrieved page should be looked up in the buffer pool.  If
66    /// it is present, it should be returned.  If it is not
67    /// present, it should be added to the buffer pool and
68    /// returned.  If there is insufficient space in the buffer
69    /// pool, a page should be evicted and the new page
70    /// should be added in its place.
71    ///
72    /// reference:
73    /// - https://sourcegraph.com/github.com/XiaochenCui/small-db-hw@87607789b677d6afee00a223eacb4f441bd4ae87/-/blob/src/java/smalldb/BufferPool.java?L88:17&subtree=true
74    fn load_page<PAGE>(&self, pid: &Key) -> ResultPod<PAGE>
75    where
76        PAGE: BTreePage,
77    {
78        // stage 1: get table
79        let catalog = Unique::catalog();
80        let v = catalog.get_table(&pid.get_table_id()).expect(
81            &format!("table {} not found", pid.get_table_id()),
82        );
83        let table = v.read().unwrap();
84
85        // stage 2: read page content from disk
86        let buf = self
87            .read_page(&mut table.get_file(), pid)
88            .or(Err(SmallError::new("read page content failed")))?;
89
90        // stage 3: page instantiation
91        let page = PAGE::new(
92            pid,
93            &buf,
94            &table.tuple_scheme,
95            table.key_field,
96        );
97
98        // stage 4: return
99        return Ok(Arc::new(RwLock::new(page)));
100    }
101
102    fn read_page(
103        &self,
104        file: &mut File,
105        key: &Key,
106    ) -> io::Result<Vec<u8>> {
107        let page_size = Self::get_page_size();
108        let start_pos = key.page_index as usize * page_size;
109        file.seek(SeekFrom::Start(start_pos as u64))
110            .expect("io error");
111
112        let mut buf: Vec<u8> = vec![0; page_size];
113        file.read_exact(&mut buf).expect("io error");
114        Ok(buf)
115    }
116
117    pub fn get_root_ptr_page(
118        &self,
119        tx: &Transaction,
120        perm: Permission,
121        key: &Key,
122    ) -> ResultPod<BTreeRootPointerPage> {
123        Unique::concurrent_status().request_lock(
124            tx,
125            &perm.to_lock(),
126            key,
127        )?;
128        self.root_pointer_buffer.get_or_insert(key, |key| {
129            let page = self.load_page(key)?;
130            Ok(page.clone())
131        })
132    }
133
134    pub fn get_header_page(
135        &self,
136        tx: &Transaction,
137        perm: Permission,
138        key: &Key,
139    ) -> ResultPod<BTreeHeaderPage> {
140        Unique::concurrent_status().request_lock(
141            tx,
142            &perm.to_lock(),
143            key,
144        )?;
145        self.header_buffer.get_or_insert(key, |key| {
146            let page = self.load_page(key)?;
147            Ok(page.clone())
148        })
149    }
150
151    pub fn get_internal_page(
152        &self,
153        tx: &Transaction,
154        perm: Permission,
155        key: &Key,
156    ) -> ResultPod<BTreeInternalPage> {
157        Unique::concurrent_status().request_lock(
158            tx,
159            &perm.to_lock(),
160            key,
161        )?;
162        self.internal_buffer.get_or_insert(key, |key| {
163            let page = self.load_page(key)?;
164            Ok(page.clone())
165        })
166    }
167
168    pub fn get_leaf_page(
169        &self,
170        tx: &Transaction,
171        perm: Permission,
172        key: &Key,
173    ) -> ResultPod<BTreeLeafPage> {
174        Unique::concurrent_status().request_lock(
175            tx,
176            &perm.to_lock(),
177            key,
178        )?;
179        self.leaf_buffer.get_or_insert(key, |key| {
180            let page = self.load_page(key)?;
181            Ok(page.clone())
182        })
183    }
184
185    /// Remove the specific page id from the buffer pool.
186    /// Needed by the recovery manager to ensure that the
187    /// buffer pool doesn't keep a rolled back page in its
188    /// cache.
189    ///
190    /// Also used by B+ tree files to ensure that deleted pages
191    /// are removed from the cache so they can be reused safely
192    pub fn discard_page(&self, pid: &BTreePageID) {
193        match pid.category {
194            PageCategory::Internal => {
195                self.internal_buffer.remove(pid);
196            }
197            PageCategory::Leaf => {
198                self.leaf_buffer.remove(pid);
199            }
200            PageCategory::RootPointer => {
201                self.root_pointer_buffer.remove(pid);
202            }
203            PageCategory::Header => {
204                self.header_buffer.remove(pid);
205            }
206        }
207    }
208
209    pub fn set_page_size(page_size: usize) {
210        PAGE_SIZE.store(page_size, Ordering::Relaxed);
211    }
212
213    pub fn get_page_size() -> usize {
214        PAGE_SIZE.load(Ordering::Relaxed)
215    }
216
217    /// Flush all dirty pages to disk.
218    ///
219    /// NB: Be careful using this routine -- it writes dirty data to
220    /// disk so will break small-db if running in NO STEAL mode.
221    ///
222    /// TODO: does these pages belong to a single table?
223    pub fn flush_all_pages(&self, log_manager: &mut LogManager) {
224        for pid in self.all_keys() {
225            self.flush_page(&pid, log_manager);
226        }
227    }
228
229    /// Write all pages of the specified transaction to disk.
230    ///
231    /// TODO: protest this function (mut self / or global lock)
232    pub fn flush_pages(
233        &self,
234        tx: &Transaction,
235        log_manager: &mut LogManager,
236    ) {
237        for pid in self.all_keys() {
238            if Unique::concurrent_status().holds_lock(tx, &pid) {
239                self.flush_page(&pid, log_manager);
240            }
241        }
242    }
243
244    pub fn tx_complete(&self, tx: &Transaction, commit: bool) {
245        let mut log_manager = Unique::mut_log_manager();
246
247        if !commit {
248            for pid in self.all_keys() {
249                if Unique::concurrent_status().holds_lock(tx, &pid) {
250                    self.discard_page(&pid);
251                }
252            }
253
254            // log_manager.log_abort(tx, self).unwrap();
255
256            return;
257        }
258
259        self.flush_pages(tx, &mut log_manager);
260
261        for pid in self.all_keys() {
262            match pid.category {
263                PageCategory::Internal => {
264                    self.set_before_image(
265                        &pid,
266                        &self.internal_buffer,
267                    );
268                }
269                PageCategory::Leaf => {
270                    self.set_before_image(&pid, &self.leaf_buffer);
271                }
272                PageCategory::RootPointer => {
273                    self.set_before_image(
274                        &pid,
275                        &self.root_pointer_buffer,
276                    );
277                }
278                PageCategory::Header => {
279                    self.set_before_image(&pid, &self.header_buffer);
280                }
281            }
282        }
283
284        // if commit {
285        //     log_manager.log_commit(tx).unwrap();
286        // }
287    }
288
289    fn set_before_image<PAGE: BTreePage>(
290        &self,
291        pid: &BTreePageID,
292        buffer: &ConcurrentHashMap<BTreePageID, Arc<RwLock<PAGE>>>,
293    ) {
294        let b = buffer.get_inner_wl();
295        let page_pod = b.get(pid).unwrap();
296        page_pod.wl().set_before_image();
297    }
298
299    /// Write the content of a specific page to disk.
300    fn flush_page(
301        &self,
302        pid: &BTreePageID,
303
304        log_manager: &mut LogManager,
305    ) {
306        // stage 1: get table
307        let catalog = Unique::catalog();
308        let table_pod =
309            catalog.get_table(&pid.get_table_id()).unwrap();
310        let table = table_pod.read().unwrap();
311
312        match pid.category {
313            PageCategory::RootPointer => {
314                self.write(
315                    &table,
316                    pid,
317                    &self.root_pointer_buffer,
318                    log_manager,
319                );
320            }
321            PageCategory::Header => {
322                self.write(
323                    &table,
324                    pid,
325                    &self.header_buffer,
326                    log_manager,
327                );
328            }
329            PageCategory::Internal => {
330                self.write(
331                    &table,
332                    pid,
333                    &self.internal_buffer,
334                    log_manager,
335                );
336            }
337            PageCategory::Leaf => {
338                self.write(
339                    &table,
340                    pid,
341                    &self.leaf_buffer,
342                    log_manager,
343                );
344            }
345        }
346    }
347
348    fn write<PAGE: BTreePage>(
349        &self,
350        table: &BTreeTable,
351        pid: &BTreePageID,
352        buffer: &ConcurrentHashMap<BTreePageID, Arc<RwLock<PAGE>>>,
353        log_manager: &mut LogManager,
354    ) {
355        let b = buffer.get_inner_wl();
356        let page_pod = b.get(pid).unwrap().clone();
357
358        // TODO: what's the purpose of this block?
359        {
360            // TODO: get tx from somewhere
361            if let Some(tx) =
362                Unique::concurrent_status().get_page_tx(pid)
363            {
364                log_manager
365                    .log_update(&tx, page_pod.clone())
366                    .unwrap();
367            } else {
368                // error!("no tx found for page {:?}", pid);
369                // panic!();
370            }
371        }
372
373        // debug!("flushing page {:?}", pid);
374        table.write_page_to_disk(pid, &page_pod.rl().get_page_data());
375    }
376
377    pub fn recover_page<PAGE: BTreePage>(
378        &self,
379        pid: &BTreePageID,
380        page: PAGE,
381        buffer: &ConcurrentHashMap<BTreePageID, Arc<RwLock<PAGE>>>,
382    ) {
383        // step 1: get table
384        let catalog = Unique::catalog();
385        let table_pod =
386            catalog.get_table(&pid.get_table_id()).unwrap();
387        let table = table_pod.read().unwrap();
388
389        let page_pod = Arc::new(RwLock::new(page));
390
391        self.insert_page_dispatch(pid, &page_pod, buffer);
392        self.force_flush_dispatch(pid, &table, buffer, page_pod);
393    }
394
395    // write a page to disk without write to WAL log
396    fn force_flush_dispatch<PAGE: BTreePage>(
397        &self,
398        pid: &BTreePageID,
399        table: &BTreeTable,
400        buffer: &ConcurrentHashMap<BTreePageID, Arc<RwLock<PAGE>>>,
401        _page_pod: Arc<RwLock<PAGE>>,
402    ) {
403        let b = buffer.get_inner_wl();
404        let page_pod = b.get(pid).unwrap().clone();
405
406        // debug!("force flushing page {:?}", pid);
407        table.write_page_to_disk(pid, &page_pod.rl().get_page_data());
408    }
409
410    fn insert_page_dispatch<PAGE: BTreePage + ?Sized>(
411        &self,
412        pid: &BTreePageID,
413        page: &Arc<RwLock<PAGE>>,
414        buffer: &ConcurrentHashMap<BTreePageID, Arc<RwLock<PAGE>>>,
415    ) {
416        let mut b = buffer.get_inner_wl();
417        b.insert(pid.clone(), page.clone());
418    }
419
420    fn all_keys(&self) -> Vec<Key> {
421        let mut keys = vec![];
422        keys.append(&mut self.root_pointer_buffer.keys());
423        keys.append(&mut self.header_buffer.keys());
424        keys.append(&mut self.leaf_buffer.keys());
425        keys.append(&mut self.internal_buffer.keys());
426        keys
427    }
428}