persy/
address.rs

1use crate::{
2    address::segment::{
3        segment_page_iterator::SegmentPageIterator, AllocatedSegmentPage, Segment, SegmentPage, SegmentPageRead,
4        Segments,
5    },
6    allocator::Allocator,
7    config::Config,
8    device::{Page, PageOps},
9    error::{PERes, ReadError, SegmentError, TimeoutError},
10    id::{PersyId, RecRef, SegmentId},
11    index::config::{is_index_name_data, is_index_name_meta},
12    journal::records::{DeleteRecord, InsertRecord, NewSegmentPage, RollbackPage, UpdateRecord},
13    locks::{LockManager, RwLockManager},
14    transaction::{
15        locks::Locks,
16        tx_impl::{CheckRecord, SegmentOperation},
17    },
18    PrepareError,
19};
20use std::{
21    collections::{hash_map::Entry, HashMap, HashSet},
22    sync::{Arc, RwLock},
23    time::Duration,
24};
25
26#[cfg(feature = "experimental_inspect")]
27use crate::{error::GenericError, inspect::RecordState};
28
29pub mod record_scanner;
30pub mod segment;
31pub mod segment_iter;
32#[cfg(test)]
33mod tests;
34
35pub const ADDRESS_ROOT_PAGE_EXP: u8 = 6; // 2^6
36pub const ADDRESS_PAGE_EXP: u8 = 10; // 2^10
37pub const FLAG_EXISTS: u8 = 0b000_0001;
38pub const FLAG_DELETED: u8 = 0b000_0010;
39pub const SEGMENT_HASH_OFFSET: u32 = 16;
40pub const SEGMENT_PAGE_DELETE_COUNT_OFFSET: u32 = 24;
41pub const SEGMENT_DATA_OFFSET: u32 = 26;
42pub const ADDRESS_ENTRY_SIZE: u32 = 8 + 1 + 2; // Pointer to data page + flags + version management (not yet used)
43
44pub struct OldRecordInfo {
45    pub recref: RecRef,
46    pub segment: SegmentId,
47    pub record_page: u64,
48    pub version: u16,
49}
50
51impl OldRecordInfo {
52    fn new(recref: &RecRef, segment: SegmentId, record_page: u64, version: u16) -> OldRecordInfo {
53        OldRecordInfo {
54            recref: *recref,
55            segment,
56            record_page,
57            version,
58        }
59    }
60}
61
62/// Address segment keep the basic addressing of the data in the data segment for a specific
63/// data block
64pub struct Address {
65    allocator: Arc<Allocator>,
66    record_locks: LockManager<RecRef>,
67    create_segment_locks: LockManager<String>,
68    segment_locks: RwLockManager<SegmentId>,
69    segments: RwLock<Segments>,
70}
71
72impl Address {
73    pub fn new(all: &Arc<Allocator>, _config: &Arc<Config>, page: u64) -> PERes<Address> {
74        let segments = Segments::new(page, all)?;
75        Ok(Address {
76            allocator: all.clone(),
77            record_locks: Default::default(),
78            create_segment_locks: Default::default(),
79            segment_locks: Default::default(),
80            segments: RwLock::new(segments),
81        })
82    }
83
84    pub fn init(all: &Allocator) -> PERes<u64> {
85        let page = all.allocate(ADDRESS_ROOT_PAGE_EXP)?;
86        let page_index = page.get_index();
87        Segments::init(page, all)?;
88        Ok(page_index)
89    }
90
91    pub fn scan_segment(&self, segment: Option<&Segment>) -> Result<SegmentPageIterator, SegmentError> {
92        if let Some(segment) = segment {
93            Ok(SegmentPageIterator::new(segment.first_page))
94        } else {
95            Err(SegmentError::SegmentNotFound)
96        }
97    }
98
99    pub fn scan(&self, segment: SegmentId) -> Result<SegmentPageIterator, SegmentError> {
100        let segments = self.segments.read().expect("lock not poisoned");
101        if let Some(segment) = segments.segment_by_id(segment) {
102            Ok(SegmentPageIterator::new(segment.first_page))
103        } else {
104            Err(SegmentError::SegmentNotFound)
105        }
106    }
107
108    pub fn scan_page_all(&self, cur_page: u64) -> PERes<(u64, Vec<(u32, bool)>)> {
109        // THIS IS ONLY FOR LOCK PROTECTION
110        let _lock = self.segments.read().expect("lock not poisoned");
111        let mut page = self.allocator.load_page(cur_page)?;
112        Ok(page.segment_scan_all_entries())
113    }
114
115    pub fn allocate_temp_seg(
116        &self,
117        segment: Option<&mut Segment>,
118    ) -> Result<(RecRef, Option<AllocatedSegmentPage>), SegmentError> {
119        if let Some(found) = segment {
120            Ok(found.allocate_internal(&self.allocator)?)
121        } else {
122            Err(SegmentError::SegmentNotFound)
123        }
124    }
125
126    pub fn create_temp_segment(&self, segment: &str) -> PERes<Segment> {
127        self.segments
128            .write()
129            .expect("lock not poisoned")
130            .create_temp_segment(&self.allocator, segment)
131    }
132
133    pub fn allocate(&self, segment: SegmentId) -> Result<(RecRef, Option<AllocatedSegmentPage>), SegmentError> {
134        if let Some(found) = self
135            .segments
136            .write()
137            .expect("lock not poisoned")
138            .segments
139            .get_mut(&segment)
140        {
141            Ok(found.allocate_internal(&self.allocator)?)
142        } else {
143            Err(SegmentError::SegmentNotFound)
144        }
145    }
146
147    pub(crate) fn acquire_locks(&self, locks: &Locks, timeout: Duration) -> Result<(), PrepareError> {
148        self.create_segment_locks.lock_all(locks.created_segments(), timeout)?;
149        if let Err(x) = self.segment_locks.lock_all_write(locks.dropped_segments(), timeout) {
150            self.create_segment_locks.unlock_all(locks.created_segments());
151            return Err(PrepareError::from(x));
152        }
153        if let Err(x) = self
154            .segment_locks
155            .lock_all_read(locks.created_updated_segments(), timeout)
156        {
157            self.create_segment_locks.unlock_all(locks.created_segments());
158            self.segment_locks.unlock_all_write(locks.dropped_segments());
159            return Err(PrepareError::from(x));
160        }
161
162        if let Err(x) = self.record_locks.lock_all(locks.records(), timeout) {
163            self.create_segment_locks.unlock_all(locks.created_segments());
164            self.segment_locks.unlock_all_write(locks.dropped_segments());
165            self.segment_locks.unlock_all_read(locks.created_updated_segments());
166            return Err(PrepareError::from(x));
167        }
168
169        Ok(())
170    }
171    pub fn check_segments(
172        &self,
173        created: &[String],
174        updated: impl Iterator<Item = SegmentId>,
175    ) -> Result<(), PrepareError> {
176        let segs = self.segments.read().expect("lock not poisoned");
177        for c in created {
178            if segs.has_segment(c) {
179                if is_index_name_meta(c) || is_index_name_data(c) {
180                    return Err(PrepareError::IndexAlreadyExists);
181                } else {
182                    return Err(PrepareError::SegmentAlreadyExists);
183                }
184            }
185        }
186        for u in updated {
187            if !segs.has_segment_by_id(&u) {
188                return Err(PrepareError::SegmentNotFound);
189            }
190        }
191        Ok(())
192    }
193
194    pub fn acquire_segment_read_lock(&self, segment: SegmentId, timeout: Duration) -> Result<(), TimeoutError> {
195        self.segment_locks.lock_all_read(&[segment], timeout)?;
196        Ok(())
197    }
198
199    pub fn acquire_segments_read_lock(&self, segments: &[SegmentId], timeout: Duration) -> Result<(), TimeoutError> {
200        self.segment_locks.lock_all_read(segments, timeout)?;
201        Ok(())
202    }
203    pub fn acquire_record_lock(&self, id: &RecRef, timeout: Duration) -> Result<(), TimeoutError> {
204        self.record_locks.lock_all(&[*id], timeout)?;
205        Ok(())
206    }
207
208    pub fn release_segment_read_lock(&self, segment: SegmentId) {
209        self.segment_locks.unlock_all_read(&[segment]);
210    }
211    pub fn release_record_lock(&self, id: &RecRef) {
212        self.record_locks.unlock_all(&[*id]);
213    }
214
215    pub fn recover_allocations(&self, segs: &[SegmentId], created: &mut HashMap<SegmentId, Segment>) -> PERes<()> {
216        let mut segments = self.segments.write().expect("lock not poisoned");
217        segments.recover_allocations(segs, created, &self.allocator)?;
218        Ok(())
219    }
220
221    pub fn recompute_last_pages(&self) -> PERes<()> {
222        let mut segments = self.segments.write().expect("lock not poisoned");
223        segments.recompute_last_pages(&self.allocator)?;
224        Ok(())
225    }
226
227    pub(crate) fn check_persistent_records(
228        &self,
229        records: &[CheckRecord],
230        check_version: bool,
231    ) -> Result<Vec<OldRecordInfo>, PrepareError> {
232        let mut current_record_pages = Vec::with_capacity(records.len());
233        for &CheckRecord {
234            segment_id,
235            ref record_id,
236            version,
237        } in records
238        {
239            let val = self.read(record_id, segment_id)?;
240            if let Some((record, pers_version)) = val {
241                current_record_pages.push(OldRecordInfo::new(record_id, segment_id, record, pers_version));
242                if check_version && pers_version != version {
243                    return Err(PrepareError::VersionNotLatest);
244                }
245            } else {
246                return Err(PrepareError::RecordNotFound(PersyId(*record_id)));
247            }
248        }
249        Ok(current_record_pages)
250    }
251
252    pub(crate) fn release_locks(&self, locks: &Locks) {
253        self.record_locks.unlock_all(locks.records());
254        self.segment_locks.unlock_all_read(locks.created_updated_segments());
255        self.segment_locks.unlock_all_write(locks.dropped_segments());
256        self.create_segment_locks.unlock_all(locks.created_segments());
257    }
258
259    pub fn rollback(&self, inserts: &[InsertRecord]) -> PERes<Vec<(SegmentId, u64)>> {
260        let segments = self.segments.write().expect("lock not poisoned");
261        let mut pages_to_remove = Vec::new();
262        let mut pages = HashMap::new();
263        for insert in inserts {
264            if segments.segments.contains_key(&insert.segment) {
265                let page = insert.recref.page;
266                let seg_page = self.get_or_insert_mut(&mut pages, page)?;
267                if seg_page.segment_delete_entry(insert.segment, insert.recref.pos) && seg_page.get_next()? != 0 {
268                    pages_to_remove.push((insert.segment, page));
269                }
270            }
271        }
272        for (_, to_flush) in pages.into_iter() {
273            self.allocator.flush_page(to_flush)?;
274        }
275        Ok(pages_to_remove)
276    }
277
278    pub fn recover_rollback(&self, rollbacks: &[RollbackPage]) -> PERes<()> {
279        let mut pages = HashMap::new();
280        for rollback in rollbacks {
281            let page = rollback.recref.page;
282            let seg_page = self.get_or_insert_mut(&mut pages, page)?;
283            seg_page.segment_update_entry(rollback.segment, rollback.recref.pos, rollback.record_page);
284        }
285        for (_, to_flush) in pages.into_iter() {
286            self.allocator.flush_page(to_flush)?;
287        }
288        Ok(())
289    }
290
291    pub fn apply(
292        &self,
293        segs_new_pages: &[NewSegmentPage],
294        inserts: &[InsertRecord],
295        updates: &[UpdateRecord],
296        deletes: &[DeleteRecord],
297        seg_ops: &[SegmentOperation],
298        created: &mut HashMap<SegmentId, Segment>,
299        recover: bool,
300    ) -> PERes<Vec<(SegmentId, u64)>> {
301        let mut segments = self.segments.write().expect("lock not poisoned");
302        let mut dropped = HashSet::new();
303        for seg_op in seg_ops {
304            if let SegmentOperation::Drop(ref op) = *seg_op {
305                dropped.insert(op.segment_id);
306            }
307        }
308        let mut pages = HashMap::new();
309
310        if recover {
311            for new_page in segs_new_pages {
312                if new_page.previous == 0 {
313                    segments.set_first_page(new_page.segment, new_page.page);
314                } else {
315                    let p_page = self.get_or_insert_mut(&mut pages, new_page.previous)?;
316                    p_page.set_next(new_page.page)?;
317                    let n_page = self.get_or_insert_mut(&mut pages, new_page.page)?;
318                    n_page.set_prev(new_page.previous)?;
319                    n_page.set_segment_id(new_page.segment)?;
320                }
321            }
322        }
323        for insert in inserts {
324            if !dropped.contains(&insert.segment) {
325                let page = insert.recref.page;
326                let seg_page = self.get_or_insert_mut(&mut pages, page)?;
327                seg_page.segment_insert_entry(insert.segment, insert.recref.pos, insert.record_page);
328            }
329        }
330
331        for update in updates {
332            if !dropped.contains(&update.segment) {
333                let page = update.recref.page;
334                let seg_page = self.get_or_insert_mut(&mut pages, page)?;
335                seg_page.segment_update_entry(update.segment, update.recref.pos, update.record_page);
336            }
337        }
338        let mut pages_to_remove = Vec::new();
339
340        for delete in deletes {
341            if !dropped.contains(&delete.segment) {
342                let page = delete.recref.page;
343                let seg_page = self.get_or_insert_mut(&mut pages, page)?;
344                if seg_page.segment_delete_entry(delete.segment, delete.recref.pos) {
345                    // Avoid to remove last pages, for avoid concurrent operations with page
346                    // creation
347                    if seg_page.get_next()? != 0 {
348                        pages_to_remove.push((delete.segment, page));
349                    }
350                }
351            }
352        }
353
354        if recover {
355            for (_, mut to_flush) in pages.into_iter() {
356                to_flush.recalc_count()?;
357                self.allocator.flush_page(to_flush)?;
358            }
359
360            let recover_page = |record_page: u64| {
361                let page = self.allocator.load_page(record_page)?;
362                self.allocator.remove_from_free(record_page, page.get_size_exp())
363            };
364            let mut segs = HashSet::new();
365            for insert in inserts {
366                recover_page(insert.record_page)?;
367                segs.insert(insert.segment);
368            }
369            for update in updates {
370                recover_page(update.record_page)?;
371                segs.insert(update.segment);
372            }
373            for delete in deletes {
374                segs.insert(delete.segment);
375            }
376
377            segments.recover_allocations(&segs.into_iter().collect::<Vec<_>>(), created, &self.allocator)?;
378        } else {
379            for (_, to_flush) in pages.into_iter() {
380                self.allocator.flush_page(to_flush)?;
381            }
382        }
383
384        for seg_op in seg_ops {
385            if let SegmentOperation::Drop(ref op) = *seg_op {
386                segments.drop_segment(&op.name);
387            }
388        }
389
390        for seg_op in seg_ops {
391            if let SegmentOperation::Create(ref op) = *seg_op {
392                if let Some(s) = created.remove(&op.segment_id) {
393                    segments.finalize_create_segment(s);
394                }
395            }
396        }
397        segments.flush_segments(&self.allocator)?;
398
399        Ok(pages_to_remove)
400    }
401    pub fn recover_segment_operations(
402        &self,
403        seg_ops: &[SegmentOperation],
404        created: &mut HashMap<SegmentId, Segment>,
405        segs_new_pages: &[NewSegmentPage],
406    ) -> PERes<()> {
407        let mut segments = self.segments.write().expect("lock not poisoned");
408        for seg_op in seg_ops {
409            if let SegmentOperation::Drop(ref op) = *seg_op {
410                segments.drop_segment(&op.name);
411            }
412        }
413
414        for seg_op in seg_ops {
415            if let SegmentOperation::Create(ref op) = *seg_op {
416                if let Some(s) = created.remove(&op.segment_id) {
417                    segments.recover_finalize_create_segment(s);
418                }
419            }
420        }
421        for new_page in segs_new_pages {
422            if new_page.previous == 0 {
423                segments.set_first_page(new_page.segment, new_page.page);
424            }
425        }
426        segments.flush_segments(&self.allocator)?;
427        Ok(())
428    }
429
430    pub fn recover_remove_pages(&self, delete_pages: &[(SegmentId, u64)]) -> PERes<Vec<u64>> {
431        let mut segments = self.segments.write().expect("lock not poisoned");
432        segments.recover_remove_pages(&self.allocator, delete_pages)
433    }
434
435    pub fn collect_segment_pages(&self, segment: SegmentId) -> PERes<Vec<u64>> {
436        let segments = self.segments.read().expect("lock not poisoned");
437        segments.collect_segment_pages(&self.allocator, segment)
438    }
439
440    pub fn clear_empty(&self, empty: &[(SegmentId, u64)]) -> PERes<Vec<(SegmentId, u64)>> {
441        let mut segments = self.segments.write().expect("lock not poisoned");
442        segments.clear_empty(&self.allocator, empty)
443    }
444
445    pub fn flush_segments(&self) -> PERes<()> {
446        let mut segments = self.segments.write().expect("lock not poisoned");
447        segments.flush_segments(&self.allocator)
448    }
449
450    pub fn exists_segment(&self, segment: &str) -> bool {
451        self.segments.read().expect("lock not poisoned").has_segment(segment)
452    }
453
454    pub fn exists_segment_by_id(&self, segment: &SegmentId) -> bool {
455        self.segments
456            .read()
457            .expect("lock not poisoned")
458            .has_segment_by_id(segment)
459    }
460
461    pub fn segment_id(&self, segment: &str) -> Option<SegmentId> {
462        self.segments.read().expect("lock not poisoned").segment_id(segment)
463    }
464
465    pub fn segment_name_by_id(&self, segment: SegmentId) -> Option<String> {
466        self.segments
467            .read()
468            .expect("lock not poisoned")
469            .segment_name_by_id(segment)
470    }
471
472    #[cfg(test)]
473    pub fn insert(&self, segment_id: SegmentId, recref: &RecRef, record_page: u64) -> PERes<()> {
474        let mut page = self.allocator.write_page(recref.page)?;
475        page.segment_insert_entry(segment_id, recref.pos, record_page);
476        self.allocator.flush_page(page)?;
477        Ok(())
478    }
479
480    pub fn read(&self, recref: &RecRef, segment: SegmentId) -> Result<Option<(u64, u16)>, ReadError> {
481        if let Some(mut page) = self.allocator.load_page_not_free(recref.page)? {
482            if recref.pos > page.get_content_size() - ADDRESS_ENTRY_SIZE {
483                return Err(ReadError::InvalidPersyId(*recref));
484            }
485            Ok(page.segment_read_entry(segment, recref.pos))
486        } else {
487            Ok(None)
488        }
489    }
490
491    #[cfg(feature = "experimental_inspect")]
492    pub fn read_inspect(
493        &self,
494        recref: &RecRef,
495        segment: Option<SegmentId>,
496    ) -> Result<Result<(u64, u16), RecordState>, GenericError> {
497        if let Some(mut page) = self.allocator.load_page_not_free(recref.page)? {
498            if recref.pos > page.get_content_size() - ADDRESS_ENTRY_SIZE {
499                return Ok(Err(RecordState::IdPagePositionOutOfPage));
500            }
501            Ok(page.segment_read_entry_inspect(segment, recref.pos))
502        } else {
503            Ok(Err(RecordState::IdPageNotExists))
504        }
505    }
506
507    fn get_or_insert_mut<'a>(&self, map: &'a mut HashMap<u64, Page>, k: u64) -> PERes<&'a mut Page> {
508        Ok(match map.entry(k) {
509            Entry::Occupied(entry) => entry.into_mut(),
510            Entry::Vacant(entry) => entry.insert(self.allocator.write_page(k)?),
511        })
512    }
513
514    pub fn list(&self) -> Vec<(String, SegmentId)> {
515        self.segments.read().expect("lock not poisoned").list()
516    }
517    pub fn snapshot_list(&self) -> Vec<(String, SegmentId, u64)> {
518        self.segments.read().expect("lock not poisoned").snapshot_list()
519    }
520}