1use std::{
2 fs::File,
3 io::{self, prelude::*, Seek, SeekFrom},
4 sync::{
5 atomic::{AtomicUsize, Ordering},
6 Arc, RwLock,
7 },
8};
9
10use super::page::{
11 BTreeHeaderPage, BTreeInternalPage, BTreeLeafPage, BTreePage,
12 BTreePageID, BTreeRootPointerPage, PageCategory,
13};
14use crate::{
15 concurrent_status::Permission,
16 error::SmallError,
17 transaction::Transaction,
18 tx_log::LogManager,
19 types::{ConcurrentHashMap, ResultPod},
20 utils::HandyRwLock,
21 BTreeTable, Unique,
22};
23
24pub const DEFAULT_PAGE_SIZE: usize = 4096;
25static PAGE_SIZE: AtomicUsize = AtomicUsize::new(DEFAULT_PAGE_SIZE);
26
27pub struct PageCache {
28 pub root_pointer_buffer: ConcurrentHashMap<
29 BTreePageID,
30 Arc<RwLock<BTreeRootPointerPage>>,
31 >,
32 pub internal_buffer: ConcurrentHashMap<
33 BTreePageID,
34 Arc<RwLock<BTreeInternalPage>>,
35 >,
36 pub leaf_buffer:
37 ConcurrentHashMap<BTreePageID, Arc<RwLock<BTreeLeafPage>>>,
38 pub header_buffer:
39 ConcurrentHashMap<BTreePageID, Arc<RwLock<BTreeHeaderPage>>>,
40}
41
42type Key = BTreePageID;
43
44impl PageCache {
45 pub fn new() -> Self {
46 Self {
47 root_pointer_buffer: ConcurrentHashMap::new(),
48 header_buffer: ConcurrentHashMap::new(),
49 internal_buffer: ConcurrentHashMap::new(),
50 leaf_buffer: ConcurrentHashMap::new(),
51 }
52 }
53
54 pub fn clear(&self) {
55 self.root_pointer_buffer.clear();
56 self.header_buffer.clear();
57 self.internal_buffer.clear();
58 self.leaf_buffer.clear();
59 }
60
61 fn load_page<PAGE>(&self, pid: &Key) -> ResultPod<PAGE>
75 where
76 PAGE: BTreePage,
77 {
78 let catalog = Unique::catalog();
80 let v = catalog.get_table(&pid.get_table_id()).expect(
81 &format!("table {} not found", pid.get_table_id()),
82 );
83 let table = v.read().unwrap();
84
85 let buf = self
87 .read_page(&mut table.get_file(), pid)
88 .or(Err(SmallError::new("read page content failed")))?;
89
90 let page = PAGE::new(
92 pid,
93 &buf,
94 &table.tuple_scheme,
95 table.key_field,
96 );
97
98 return Ok(Arc::new(RwLock::new(page)));
100 }
101
102 fn read_page(
103 &self,
104 file: &mut File,
105 key: &Key,
106 ) -> io::Result<Vec<u8>> {
107 let page_size = Self::get_page_size();
108 let start_pos = key.page_index as usize * page_size;
109 file.seek(SeekFrom::Start(start_pos as u64))
110 .expect("io error");
111
112 let mut buf: Vec<u8> = vec![0; page_size];
113 file.read_exact(&mut buf).expect("io error");
114 Ok(buf)
115 }
116
117 pub fn get_root_ptr_page(
118 &self,
119 tx: &Transaction,
120 perm: Permission,
121 key: &Key,
122 ) -> ResultPod<BTreeRootPointerPage> {
123 Unique::concurrent_status().request_lock(
124 tx,
125 &perm.to_lock(),
126 key,
127 )?;
128 self.root_pointer_buffer.get_or_insert(key, |key| {
129 let page = self.load_page(key)?;
130 Ok(page.clone())
131 })
132 }
133
134 pub fn get_header_page(
135 &self,
136 tx: &Transaction,
137 perm: Permission,
138 key: &Key,
139 ) -> ResultPod<BTreeHeaderPage> {
140 Unique::concurrent_status().request_lock(
141 tx,
142 &perm.to_lock(),
143 key,
144 )?;
145 self.header_buffer.get_or_insert(key, |key| {
146 let page = self.load_page(key)?;
147 Ok(page.clone())
148 })
149 }
150
151 pub fn get_internal_page(
152 &self,
153 tx: &Transaction,
154 perm: Permission,
155 key: &Key,
156 ) -> ResultPod<BTreeInternalPage> {
157 Unique::concurrent_status().request_lock(
158 tx,
159 &perm.to_lock(),
160 key,
161 )?;
162 self.internal_buffer.get_or_insert(key, |key| {
163 let page = self.load_page(key)?;
164 Ok(page.clone())
165 })
166 }
167
168 pub fn get_leaf_page(
169 &self,
170 tx: &Transaction,
171 perm: Permission,
172 key: &Key,
173 ) -> ResultPod<BTreeLeafPage> {
174 Unique::concurrent_status().request_lock(
175 tx,
176 &perm.to_lock(),
177 key,
178 )?;
179 self.leaf_buffer.get_or_insert(key, |key| {
180 let page = self.load_page(key)?;
181 Ok(page.clone())
182 })
183 }
184
185 pub fn discard_page(&self, pid: &BTreePageID) {
193 match pid.category {
194 PageCategory::Internal => {
195 self.internal_buffer.remove(pid);
196 }
197 PageCategory::Leaf => {
198 self.leaf_buffer.remove(pid);
199 }
200 PageCategory::RootPointer => {
201 self.root_pointer_buffer.remove(pid);
202 }
203 PageCategory::Header => {
204 self.header_buffer.remove(pid);
205 }
206 }
207 }
208
209 pub fn set_page_size(page_size: usize) {
210 PAGE_SIZE.store(page_size, Ordering::Relaxed);
211 }
212
213 pub fn get_page_size() -> usize {
214 PAGE_SIZE.load(Ordering::Relaxed)
215 }
216
217 pub fn flush_all_pages(&self, log_manager: &mut LogManager) {
224 for pid in self.all_keys() {
225 self.flush_page(&pid, log_manager);
226 }
227 }
228
229 pub fn flush_pages(
233 &self,
234 tx: &Transaction,
235 log_manager: &mut LogManager,
236 ) {
237 for pid in self.all_keys() {
238 if Unique::concurrent_status().holds_lock(tx, &pid) {
239 self.flush_page(&pid, log_manager);
240 }
241 }
242 }
243
244 pub fn tx_complete(&self, tx: &Transaction, commit: bool) {
245 let mut log_manager = Unique::mut_log_manager();
246
247 if !commit {
248 for pid in self.all_keys() {
249 if Unique::concurrent_status().holds_lock(tx, &pid) {
250 self.discard_page(&pid);
251 }
252 }
253
254 return;
257 }
258
259 self.flush_pages(tx, &mut log_manager);
260
261 for pid in self.all_keys() {
262 match pid.category {
263 PageCategory::Internal => {
264 self.set_before_image(
265 &pid,
266 &self.internal_buffer,
267 );
268 }
269 PageCategory::Leaf => {
270 self.set_before_image(&pid, &self.leaf_buffer);
271 }
272 PageCategory::RootPointer => {
273 self.set_before_image(
274 &pid,
275 &self.root_pointer_buffer,
276 );
277 }
278 PageCategory::Header => {
279 self.set_before_image(&pid, &self.header_buffer);
280 }
281 }
282 }
283
284 }
288
289 fn set_before_image<PAGE: BTreePage>(
290 &self,
291 pid: &BTreePageID,
292 buffer: &ConcurrentHashMap<BTreePageID, Arc<RwLock<PAGE>>>,
293 ) {
294 let b = buffer.get_inner_wl();
295 let page_pod = b.get(pid).unwrap();
296 page_pod.wl().set_before_image();
297 }
298
299 fn flush_page(
301 &self,
302 pid: &BTreePageID,
303
304 log_manager: &mut LogManager,
305 ) {
306 let catalog = Unique::catalog();
308 let table_pod =
309 catalog.get_table(&pid.get_table_id()).unwrap();
310 let table = table_pod.read().unwrap();
311
312 match pid.category {
313 PageCategory::RootPointer => {
314 self.write(
315 &table,
316 pid,
317 &self.root_pointer_buffer,
318 log_manager,
319 );
320 }
321 PageCategory::Header => {
322 self.write(
323 &table,
324 pid,
325 &self.header_buffer,
326 log_manager,
327 );
328 }
329 PageCategory::Internal => {
330 self.write(
331 &table,
332 pid,
333 &self.internal_buffer,
334 log_manager,
335 );
336 }
337 PageCategory::Leaf => {
338 self.write(
339 &table,
340 pid,
341 &self.leaf_buffer,
342 log_manager,
343 );
344 }
345 }
346 }
347
348 fn write<PAGE: BTreePage>(
349 &self,
350 table: &BTreeTable,
351 pid: &BTreePageID,
352 buffer: &ConcurrentHashMap<BTreePageID, Arc<RwLock<PAGE>>>,
353 log_manager: &mut LogManager,
354 ) {
355 let b = buffer.get_inner_wl();
356 let page_pod = b.get(pid).unwrap().clone();
357
358 {
360 if let Some(tx) =
362 Unique::concurrent_status().get_page_tx(pid)
363 {
364 log_manager
365 .log_update(&tx, page_pod.clone())
366 .unwrap();
367 } else {
368 }
371 }
372
373 table.write_page_to_disk(pid, &page_pod.rl().get_page_data());
375 }
376
377 pub fn recover_page<PAGE: BTreePage>(
378 &self,
379 pid: &BTreePageID,
380 page: PAGE,
381 buffer: &ConcurrentHashMap<BTreePageID, Arc<RwLock<PAGE>>>,
382 ) {
383 let catalog = Unique::catalog();
385 let table_pod =
386 catalog.get_table(&pid.get_table_id()).unwrap();
387 let table = table_pod.read().unwrap();
388
389 let page_pod = Arc::new(RwLock::new(page));
390
391 self.insert_page_dispatch(pid, &page_pod, buffer);
392 self.force_flush_dispatch(pid, &table, buffer, page_pod);
393 }
394
395 fn force_flush_dispatch<PAGE: BTreePage>(
397 &self,
398 pid: &BTreePageID,
399 table: &BTreeTable,
400 buffer: &ConcurrentHashMap<BTreePageID, Arc<RwLock<PAGE>>>,
401 _page_pod: Arc<RwLock<PAGE>>,
402 ) {
403 let b = buffer.get_inner_wl();
404 let page_pod = b.get(pid).unwrap().clone();
405
406 table.write_page_to_disk(pid, &page_pod.rl().get_page_data());
408 }
409
410 fn insert_page_dispatch<PAGE: BTreePage + ?Sized>(
411 &self,
412 pid: &BTreePageID,
413 page: &Arc<RwLock<PAGE>>,
414 buffer: &ConcurrentHashMap<BTreePageID, Arc<RwLock<PAGE>>>,
415 ) {
416 let mut b = buffer.get_inner_wl();
417 b.insert(pid.clone(), page.clone());
418 }
419
420 fn all_keys(&self) -> Vec<Key> {
421 let mut keys = vec![];
422 keys.append(&mut self.root_pointer_buffer.keys());
423 keys.append(&mut self.header_buffer.keys());
424 keys.append(&mut self.leaf_buffer.keys());
425 keys.append(&mut self.internal_buffer.keys());
426 keys
427 }
428}