1use crate::{
2 address::segment::{
3 segment_page_iterator::SegmentPageIterator, AllocatedSegmentPage, Segment, SegmentPage, SegmentPageRead,
4 Segments,
5 },
6 allocator::Allocator,
7 config::Config,
8 device::{Page, PageOps},
9 error::{PERes, ReadError, SegmentError, TimeoutError},
10 id::{PersyId, RecRef, SegmentId},
11 index::config::{is_index_name_data, is_index_name_meta},
12 journal::records::{DeleteRecord, InsertRecord, NewSegmentPage, RollbackPage, UpdateRecord},
13 locks::{LockManager, RwLockManager},
14 transaction::{
15 locks::Locks,
16 tx_impl::{CheckRecord, SegmentOperation},
17 },
18 PrepareError,
19};
20use std::{
21 collections::{hash_map::Entry, HashMap, HashSet},
22 sync::{Arc, RwLock},
23 time::Duration,
24};
25
26#[cfg(feature = "experimental_inspect")]
27use crate::{error::GenericError, inspect::RecordState};
28
29pub mod record_scanner;
30pub mod segment;
31pub mod segment_iter;
32#[cfg(test)]
33mod tests;
34
35pub const ADDRESS_ROOT_PAGE_EXP: u8 = 6; pub const ADDRESS_PAGE_EXP: u8 = 10; pub const FLAG_EXISTS: u8 = 0b000_0001;
38pub const FLAG_DELETED: u8 = 0b000_0010;
39pub const SEGMENT_HASH_OFFSET: u32 = 16;
40pub const SEGMENT_PAGE_DELETE_COUNT_OFFSET: u32 = 24;
41pub const SEGMENT_DATA_OFFSET: u32 = 26;
42pub const ADDRESS_ENTRY_SIZE: u32 = 8 + 1 + 2; pub struct OldRecordInfo {
45 pub recref: RecRef,
46 pub segment: SegmentId,
47 pub record_page: u64,
48 pub version: u16,
49}
50
51impl OldRecordInfo {
52 fn new(recref: &RecRef, segment: SegmentId, record_page: u64, version: u16) -> OldRecordInfo {
53 OldRecordInfo {
54 recref: *recref,
55 segment,
56 record_page,
57 version,
58 }
59 }
60}
61
62pub struct Address {
65 allocator: Arc<Allocator>,
66 record_locks: LockManager<RecRef>,
67 create_segment_locks: LockManager<String>,
68 segment_locks: RwLockManager<SegmentId>,
69 segments: RwLock<Segments>,
70}
71
72impl Address {
73 pub fn new(all: &Arc<Allocator>, _config: &Arc<Config>, page: u64) -> PERes<Address> {
74 let segments = Segments::new(page, all)?;
75 Ok(Address {
76 allocator: all.clone(),
77 record_locks: Default::default(),
78 create_segment_locks: Default::default(),
79 segment_locks: Default::default(),
80 segments: RwLock::new(segments),
81 })
82 }
83
84 pub fn init(all: &Allocator) -> PERes<u64> {
85 let page = all.allocate(ADDRESS_ROOT_PAGE_EXP)?;
86 let page_index = page.get_index();
87 Segments::init(page, all)?;
88 Ok(page_index)
89 }
90
91 pub fn scan_segment(&self, segment: Option<&Segment>) -> Result<SegmentPageIterator, SegmentError> {
92 if let Some(segment) = segment {
93 Ok(SegmentPageIterator::new(segment.first_page))
94 } else {
95 Err(SegmentError::SegmentNotFound)
96 }
97 }
98
99 pub fn scan(&self, segment: SegmentId) -> Result<SegmentPageIterator, SegmentError> {
100 let segments = self.segments.read().expect("lock not poisoned");
101 if let Some(segment) = segments.segment_by_id(segment) {
102 Ok(SegmentPageIterator::new(segment.first_page))
103 } else {
104 Err(SegmentError::SegmentNotFound)
105 }
106 }
107
108 pub fn scan_page_all(&self, cur_page: u64) -> PERes<(u64, Vec<(u32, bool)>)> {
109 let _lock = self.segments.read().expect("lock not poisoned");
111 let mut page = self.allocator.load_page(cur_page)?;
112 Ok(page.segment_scan_all_entries())
113 }
114
115 pub fn allocate_temp_seg(
116 &self,
117 segment: Option<&mut Segment>,
118 ) -> Result<(RecRef, Option<AllocatedSegmentPage>), SegmentError> {
119 if let Some(found) = segment {
120 Ok(found.allocate_internal(&self.allocator)?)
121 } else {
122 Err(SegmentError::SegmentNotFound)
123 }
124 }
125
126 pub fn create_temp_segment(&self, segment: &str) -> PERes<Segment> {
127 self.segments
128 .write()
129 .expect("lock not poisoned")
130 .create_temp_segment(&self.allocator, segment)
131 }
132
133 pub fn allocate(&self, segment: SegmentId) -> Result<(RecRef, Option<AllocatedSegmentPage>), SegmentError> {
134 if let Some(found) = self
135 .segments
136 .write()
137 .expect("lock not poisoned")
138 .segments
139 .get_mut(&segment)
140 {
141 Ok(found.allocate_internal(&self.allocator)?)
142 } else {
143 Err(SegmentError::SegmentNotFound)
144 }
145 }
146
147 pub(crate) fn acquire_locks(&self, locks: &Locks, timeout: Duration) -> Result<(), PrepareError> {
148 self.create_segment_locks.lock_all(locks.created_segments(), timeout)?;
149 if let Err(x) = self.segment_locks.lock_all_write(locks.dropped_segments(), timeout) {
150 self.create_segment_locks.unlock_all(locks.created_segments());
151 return Err(PrepareError::from(x));
152 }
153 if let Err(x) = self
154 .segment_locks
155 .lock_all_read(locks.created_updated_segments(), timeout)
156 {
157 self.create_segment_locks.unlock_all(locks.created_segments());
158 self.segment_locks.unlock_all_write(locks.dropped_segments());
159 return Err(PrepareError::from(x));
160 }
161
162 if let Err(x) = self.record_locks.lock_all(locks.records(), timeout) {
163 self.create_segment_locks.unlock_all(locks.created_segments());
164 self.segment_locks.unlock_all_write(locks.dropped_segments());
165 self.segment_locks.unlock_all_read(locks.created_updated_segments());
166 return Err(PrepareError::from(x));
167 }
168
169 Ok(())
170 }
171 pub fn check_segments(
172 &self,
173 created: &[String],
174 updated: impl Iterator<Item = SegmentId>,
175 ) -> Result<(), PrepareError> {
176 let segs = self.segments.read().expect("lock not poisoned");
177 for c in created {
178 if segs.has_segment(c) {
179 if is_index_name_meta(c) || is_index_name_data(c) {
180 return Err(PrepareError::IndexAlreadyExists);
181 } else {
182 return Err(PrepareError::SegmentAlreadyExists);
183 }
184 }
185 }
186 for u in updated {
187 if !segs.has_segment_by_id(&u) {
188 return Err(PrepareError::SegmentNotFound);
189 }
190 }
191 Ok(())
192 }
193
194 pub fn acquire_segment_read_lock(&self, segment: SegmentId, timeout: Duration) -> Result<(), TimeoutError> {
195 self.segment_locks.lock_all_read(&[segment], timeout)?;
196 Ok(())
197 }
198
199 pub fn acquire_segments_read_lock(&self, segments: &[SegmentId], timeout: Duration) -> Result<(), TimeoutError> {
200 self.segment_locks.lock_all_read(segments, timeout)?;
201 Ok(())
202 }
203 pub fn acquire_record_lock(&self, id: &RecRef, timeout: Duration) -> Result<(), TimeoutError> {
204 self.record_locks.lock_all(&[*id], timeout)?;
205 Ok(())
206 }
207
208 pub fn release_segment_read_lock(&self, segment: SegmentId) {
209 self.segment_locks.unlock_all_read(&[segment]);
210 }
211 pub fn release_record_lock(&self, id: &RecRef) {
212 self.record_locks.unlock_all(&[*id]);
213 }
214
215 pub fn recover_allocations(&self, segs: &[SegmentId], created: &mut HashMap<SegmentId, Segment>) -> PERes<()> {
216 let mut segments = self.segments.write().expect("lock not poisoned");
217 segments.recover_allocations(segs, created, &self.allocator)?;
218 Ok(())
219 }
220
221 pub fn recompute_last_pages(&self) -> PERes<()> {
222 let mut segments = self.segments.write().expect("lock not poisoned");
223 segments.recompute_last_pages(&self.allocator)?;
224 Ok(())
225 }
226
227 pub(crate) fn check_persistent_records(
228 &self,
229 records: &[CheckRecord],
230 check_version: bool,
231 ) -> Result<Vec<OldRecordInfo>, PrepareError> {
232 let mut current_record_pages = Vec::with_capacity(records.len());
233 for &CheckRecord {
234 segment_id,
235 ref record_id,
236 version,
237 } in records
238 {
239 let val = self.read(record_id, segment_id)?;
240 if let Some((record, pers_version)) = val {
241 current_record_pages.push(OldRecordInfo::new(record_id, segment_id, record, pers_version));
242 if check_version && pers_version != version {
243 return Err(PrepareError::VersionNotLatest);
244 }
245 } else {
246 return Err(PrepareError::RecordNotFound(PersyId(*record_id)));
247 }
248 }
249 Ok(current_record_pages)
250 }
251
252 pub(crate) fn release_locks(&self, locks: &Locks) {
253 self.record_locks.unlock_all(locks.records());
254 self.segment_locks.unlock_all_read(locks.created_updated_segments());
255 self.segment_locks.unlock_all_write(locks.dropped_segments());
256 self.create_segment_locks.unlock_all(locks.created_segments());
257 }
258
259 pub fn rollback(&self, inserts: &[InsertRecord]) -> PERes<Vec<(SegmentId, u64)>> {
260 let segments = self.segments.write().expect("lock not poisoned");
261 let mut pages_to_remove = Vec::new();
262 let mut pages = HashMap::new();
263 for insert in inserts {
264 if segments.segments.contains_key(&insert.segment) {
265 let page = insert.recref.page;
266 let seg_page = self.get_or_insert_mut(&mut pages, page)?;
267 if seg_page.segment_delete_entry(insert.segment, insert.recref.pos) && seg_page.get_next()? != 0 {
268 pages_to_remove.push((insert.segment, page));
269 }
270 }
271 }
272 for (_, to_flush) in pages.into_iter() {
273 self.allocator.flush_page(to_flush)?;
274 }
275 Ok(pages_to_remove)
276 }
277
278 pub fn recover_rollback(&self, rollbacks: &[RollbackPage]) -> PERes<()> {
279 let mut pages = HashMap::new();
280 for rollback in rollbacks {
281 let page = rollback.recref.page;
282 let seg_page = self.get_or_insert_mut(&mut pages, page)?;
283 seg_page.segment_update_entry(rollback.segment, rollback.recref.pos, rollback.record_page);
284 }
285 for (_, to_flush) in pages.into_iter() {
286 self.allocator.flush_page(to_flush)?;
287 }
288 Ok(())
289 }
290
291 pub fn apply(
292 &self,
293 segs_new_pages: &[NewSegmentPage],
294 inserts: &[InsertRecord],
295 updates: &[UpdateRecord],
296 deletes: &[DeleteRecord],
297 seg_ops: &[SegmentOperation],
298 created: &mut HashMap<SegmentId, Segment>,
299 recover: bool,
300 ) -> PERes<Vec<(SegmentId, u64)>> {
301 let mut segments = self.segments.write().expect("lock not poisoned");
302 let mut dropped = HashSet::new();
303 for seg_op in seg_ops {
304 if let SegmentOperation::Drop(ref op) = *seg_op {
305 dropped.insert(op.segment_id);
306 }
307 }
308 let mut pages = HashMap::new();
309
310 if recover {
311 for new_page in segs_new_pages {
312 if new_page.previous == 0 {
313 segments.set_first_page(new_page.segment, new_page.page);
314 } else {
315 let p_page = self.get_or_insert_mut(&mut pages, new_page.previous)?;
316 p_page.set_next(new_page.page)?;
317 let n_page = self.get_or_insert_mut(&mut pages, new_page.page)?;
318 n_page.set_prev(new_page.previous)?;
319 n_page.set_segment_id(new_page.segment)?;
320 }
321 }
322 }
323 for insert in inserts {
324 if !dropped.contains(&insert.segment) {
325 let page = insert.recref.page;
326 let seg_page = self.get_or_insert_mut(&mut pages, page)?;
327 seg_page.segment_insert_entry(insert.segment, insert.recref.pos, insert.record_page);
328 }
329 }
330
331 for update in updates {
332 if !dropped.contains(&update.segment) {
333 let page = update.recref.page;
334 let seg_page = self.get_or_insert_mut(&mut pages, page)?;
335 seg_page.segment_update_entry(update.segment, update.recref.pos, update.record_page);
336 }
337 }
338 let mut pages_to_remove = Vec::new();
339
340 for delete in deletes {
341 if !dropped.contains(&delete.segment) {
342 let page = delete.recref.page;
343 let seg_page = self.get_or_insert_mut(&mut pages, page)?;
344 if seg_page.segment_delete_entry(delete.segment, delete.recref.pos) {
345 if seg_page.get_next()? != 0 {
348 pages_to_remove.push((delete.segment, page));
349 }
350 }
351 }
352 }
353
354 if recover {
355 for (_, mut to_flush) in pages.into_iter() {
356 to_flush.recalc_count()?;
357 self.allocator.flush_page(to_flush)?;
358 }
359
360 let recover_page = |record_page: u64| {
361 let page = self.allocator.load_page(record_page)?;
362 self.allocator.remove_from_free(record_page, page.get_size_exp())
363 };
364 let mut segs = HashSet::new();
365 for insert in inserts {
366 recover_page(insert.record_page)?;
367 segs.insert(insert.segment);
368 }
369 for update in updates {
370 recover_page(update.record_page)?;
371 segs.insert(update.segment);
372 }
373 for delete in deletes {
374 segs.insert(delete.segment);
375 }
376
377 segments.recover_allocations(&segs.into_iter().collect::<Vec<_>>(), created, &self.allocator)?;
378 } else {
379 for (_, to_flush) in pages.into_iter() {
380 self.allocator.flush_page(to_flush)?;
381 }
382 }
383
384 for seg_op in seg_ops {
385 if let SegmentOperation::Drop(ref op) = *seg_op {
386 segments.drop_segment(&op.name);
387 }
388 }
389
390 for seg_op in seg_ops {
391 if let SegmentOperation::Create(ref op) = *seg_op {
392 if let Some(s) = created.remove(&op.segment_id) {
393 segments.finalize_create_segment(s);
394 }
395 }
396 }
397 segments.flush_segments(&self.allocator)?;
398
399 Ok(pages_to_remove)
400 }
401 pub fn recover_segment_operations(
402 &self,
403 seg_ops: &[SegmentOperation],
404 created: &mut HashMap<SegmentId, Segment>,
405 segs_new_pages: &[NewSegmentPage],
406 ) -> PERes<()> {
407 let mut segments = self.segments.write().expect("lock not poisoned");
408 for seg_op in seg_ops {
409 if let SegmentOperation::Drop(ref op) = *seg_op {
410 segments.drop_segment(&op.name);
411 }
412 }
413
414 for seg_op in seg_ops {
415 if let SegmentOperation::Create(ref op) = *seg_op {
416 if let Some(s) = created.remove(&op.segment_id) {
417 segments.recover_finalize_create_segment(s);
418 }
419 }
420 }
421 for new_page in segs_new_pages {
422 if new_page.previous == 0 {
423 segments.set_first_page(new_page.segment, new_page.page);
424 }
425 }
426 segments.flush_segments(&self.allocator)?;
427 Ok(())
428 }
429
430 pub fn recover_remove_pages(&self, delete_pages: &[(SegmentId, u64)]) -> PERes<Vec<u64>> {
431 let mut segments = self.segments.write().expect("lock not poisoned");
432 segments.recover_remove_pages(&self.allocator, delete_pages)
433 }
434
435 pub fn collect_segment_pages(&self, segment: SegmentId) -> PERes<Vec<u64>> {
436 let segments = self.segments.read().expect("lock not poisoned");
437 segments.collect_segment_pages(&self.allocator, segment)
438 }
439
440 pub fn clear_empty(&self, empty: &[(SegmentId, u64)]) -> PERes<Vec<(SegmentId, u64)>> {
441 let mut segments = self.segments.write().expect("lock not poisoned");
442 segments.clear_empty(&self.allocator, empty)
443 }
444
445 pub fn flush_segments(&self) -> PERes<()> {
446 let mut segments = self.segments.write().expect("lock not poisoned");
447 segments.flush_segments(&self.allocator)
448 }
449
450 pub fn exists_segment(&self, segment: &str) -> bool {
451 self.segments.read().expect("lock not poisoned").has_segment(segment)
452 }
453
454 pub fn exists_segment_by_id(&self, segment: &SegmentId) -> bool {
455 self.segments
456 .read()
457 .expect("lock not poisoned")
458 .has_segment_by_id(segment)
459 }
460
461 pub fn segment_id(&self, segment: &str) -> Option<SegmentId> {
462 self.segments.read().expect("lock not poisoned").segment_id(segment)
463 }
464
465 pub fn segment_name_by_id(&self, segment: SegmentId) -> Option<String> {
466 self.segments
467 .read()
468 .expect("lock not poisoned")
469 .segment_name_by_id(segment)
470 }
471
472 #[cfg(test)]
473 pub fn insert(&self, segment_id: SegmentId, recref: &RecRef, record_page: u64) -> PERes<()> {
474 let mut page = self.allocator.write_page(recref.page)?;
475 page.segment_insert_entry(segment_id, recref.pos, record_page);
476 self.allocator.flush_page(page)?;
477 Ok(())
478 }
479
480 pub fn read(&self, recref: &RecRef, segment: SegmentId) -> Result<Option<(u64, u16)>, ReadError> {
481 if let Some(mut page) = self.allocator.load_page_not_free(recref.page)? {
482 if recref.pos > page.get_content_size() - ADDRESS_ENTRY_SIZE {
483 return Err(ReadError::InvalidPersyId(*recref));
484 }
485 Ok(page.segment_read_entry(segment, recref.pos))
486 } else {
487 Ok(None)
488 }
489 }
490
491 #[cfg(feature = "experimental_inspect")]
492 pub fn read_inspect(
493 &self,
494 recref: &RecRef,
495 segment: Option<SegmentId>,
496 ) -> Result<Result<(u64, u16), RecordState>, GenericError> {
497 if let Some(mut page) = self.allocator.load_page_not_free(recref.page)? {
498 if recref.pos > page.get_content_size() - ADDRESS_ENTRY_SIZE {
499 return Ok(Err(RecordState::IdPagePositionOutOfPage));
500 }
501 Ok(page.segment_read_entry_inspect(segment, recref.pos))
502 } else {
503 Ok(Err(RecordState::IdPageNotExists))
504 }
505 }
506
507 fn get_or_insert_mut<'a>(&self, map: &'a mut HashMap<u64, Page>, k: u64) -> PERes<&'a mut Page> {
508 Ok(match map.entry(k) {
509 Entry::Occupied(entry) => entry.into_mut(),
510 Entry::Vacant(entry) => entry.insert(self.allocator.write_page(k)?),
511 })
512 }
513
514 pub fn list(&self) -> Vec<(String, SegmentId)> {
515 self.segments.read().expect("lock not poisoned").list()
516 }
517 pub fn snapshot_list(&self) -> Vec<(String, SegmentId, u64)> {
518 self.segments.read().expect("lock not poisoned").snapshot_list()
519 }
520}