1use crate::{
2 address::segment::Segment,
3 address::Address,
4 allocator::Allocator,
5 config::TxStrategy,
6 error::{IndexChangeError, PERes},
7 id::{IndexId, RecRef, SegmentId},
8 index::{
9 config::{IndexTypeInternal, ValueMode},
10 keeper_tx::IndexTransactionKeeper,
11 tree::nodes::Value,
12 },
13 journal::{
14 records::{
15 Commit, CreateSegment, DeleteRecord, DropSegment, FreedPage, InsertRecord, Metadata, NewSegmentPage,
16 PrepareCommit, ReadRecord, Rollback, RollbackPage, UpdateRecord,
17 },
18 Journal, JournalId,
19 },
20 persy::PersyImpl,
21 snapshot::data::{CleanInfo, SnapshotEntry},
22 snapshots::{SnapshotRef, Snapshots},
23 transaction::{index_locks::IndexDataLocks, iter::TransactionInsertScanner, locks::Locks},
24 DropSegmentError, GenericError, PrepareError,
25};
26use std::{
27 collections::{hash_map::Entry, BTreeSet, HashMap, HashSet},
28 ops::RangeBounds,
29 sync::Arc,
30 time::Duration,
31 vec::IntoIter,
32};
33
34pub enum SegmentOperation {
35 Create(CreateSegment),
36 Drop(DropSegment),
37}
38
39#[derive(Clone, Hash, Eq, PartialEq, Debug)]
40pub(crate) struct CheckRecord {
41 pub(crate) segment_id: SegmentId,
42 pub(crate) record_id: RecRef,
43 pub(crate) version: u16,
44}
45impl CheckRecord {
46 pub(crate) fn new(segment_id: SegmentId, record_id: RecRef, version: u16) -> Self {
47 Self {
48 segment_id,
49 record_id,
50 version,
51 }
52 }
53}
54
55#[derive(Clone)]
56pub struct PreparedState {
57 snapshot_ref: Option<SnapshotRef>,
58 data_locks: Option<Locks>,
59 entries: Option<Vec<SnapshotEntry>>,
60 freed_pages: Option<Vec<FreedPage>>,
61}
62
63impl PreparedState {
64 fn only_indexes(index: IndexDataLocks) -> Self {
65 let (index_record_locks, segments) = index.extract(&[]);
66 let data_locks = Some(Locks::new(
67 index_record_locks.into_iter().collect(),
68 Vec::new(),
69 segments.into_iter().collect(),
70 Vec::new(),
71 ));
72 Self {
73 snapshot_ref: None,
74 data_locks,
75 entries: None,
76 freed_pages: None,
77 }
78 }
79 fn locked(data: Locks, index: IndexDataLocks) -> Self {
80 Self::all_int(None, data, index, None, None)
81 }
82 fn all(
83 snapshot_ref: SnapshotRef,
84 data: Locks,
85 index: IndexDataLocks,
86 entries: Vec<SnapshotEntry>,
87 freed_pages: Vec<FreedPage>,
88 ) -> Self {
89 Self::all_int(Some(snapshot_ref), data, index, Some(entries), Some(freed_pages))
90 }
91
92 fn all_int(
93 snapshot_ref: Option<SnapshotRef>,
94 mut data: Locks,
95 index: IndexDataLocks,
96 entries: Option<Vec<SnapshotEntry>>,
97 freed_pages: Option<Vec<FreedPage>>,
98 ) -> Self {
99 let (index_record_locks, segments) = index.extract(data.records());
100 data.add_records(index_record_locks.into_iter());
101 data.add_create_update_segments(segments.into_iter());
102 Self {
103 snapshot_ref,
104 data_locks: Some(data),
105 entries,
106 freed_pages,
107 }
108 }
109
110 #[cfg(test)]
111 pub(crate) fn leak(&mut self) {
112 if let Some(sref) = &mut self.snapshot_ref {
113 sref.leak();
114 }
115 }
116}
117
118pub enum SyncMode {
119 Sync,
120 BackgroundSync,
121}
122
123pub struct TransactionImpl {
124 strategy: TxStrategy,
125 sync_mode: SyncMode,
126 meta_id: Vec<u8>,
127 id: JournalId,
128 inserted: Vec<InsertRecord>,
129 updated: Vec<UpdateRecord>,
130 deleted: Vec<DeleteRecord>,
131 read: HashMap<RecRef, ReadRecord>,
132 segments_operations: Vec<SegmentOperation>,
133 segs_created_names: HashMap<String, SegmentId>,
134 segs_dropped_names: HashSet<String>,
135 segs_created: HashMap<SegmentId, Segment>,
136 segs_dropped: HashSet<SegmentId>,
137 segs_updated: HashSet<SegmentId>,
138 freed_pages: Option<Vec<FreedPage>>,
139 rollback_pages: Option<Vec<RollbackPage>>,
140 indexes: Option<IndexTransactionKeeper>,
141 segs_new_pages: Vec<NewSegmentPage>,
142 timeout: Duration,
143}
144
145pub enum TxRead {
146 Record((u64, u16)),
147 Deleted,
148 None,
149}
150
151pub enum TxSegCheck {
152 Created(SegmentId),
153 Dropped,
154 None,
155}
156
157impl TransactionImpl {
158 pub fn new(
159 journal: &Journal,
160 strategy: &TxStrategy,
161 sync_mode: SyncMode,
162 meta_id: Vec<u8>,
163 timeout: Duration,
164 ) -> PERes<TransactionImpl> {
165 let id = journal.start()?;
166 journal.log(&Metadata::new(strategy, meta_id.clone()), &id)?;
167 Ok(TransactionImpl {
168 strategy: strategy.clone(),
169 sync_mode,
170 meta_id,
171 id,
172 inserted: Vec::new(),
173 updated: Vec::new(),
174 deleted: Vec::new(),
175 read: HashMap::new(),
176 segments_operations: Vec::new(),
177 segs_created_names: HashMap::new(),
178 segs_dropped_names: HashSet::new(),
179 segs_created: HashMap::new(),
180 segs_dropped: HashSet::new(),
181 segs_updated: HashSet::new(),
182 freed_pages: None,
183 rollback_pages: None,
184 indexes: Some(IndexTransactionKeeper::new()),
185 segs_new_pages: Vec::new(),
186 timeout,
187 })
188 }
189
190 pub fn recover(id: JournalId) -> TransactionImpl {
191 TransactionImpl {
192 strategy: TxStrategy::LastWin,
193 sync_mode: SyncMode::Sync,
194 meta_id: Vec::new(),
195 id,
196 inserted: Vec::new(),
197 updated: Vec::new(),
198 deleted: Vec::new(),
199 read: HashMap::new(),
200 segments_operations: Vec::new(),
201 segs_created_names: HashMap::new(),
202 segs_dropped_names: HashSet::new(),
203 segs_created: HashMap::new(),
204 segs_dropped: HashSet::new(),
205 segs_updated: HashSet::new(),
206 freed_pages: None,
207 rollback_pages: None,
208 indexes: Some(IndexTransactionKeeper::new()),
209 segs_new_pages: Vec::new(),
210 timeout: Duration::from_secs(60 * 60 * 24),
211 }
212 }
213
214 pub fn segment_created_in_tx(&self, segment: SegmentId) -> bool {
215 self.segs_created.contains_key(&segment)
216 }
217
218 pub fn index_created_in_tx(&self, index: &IndexId) -> bool {
219 self.segment_created_in_tx(index.get_meta_id())
220 }
221
222 pub fn segment_name_by_id(&self, segment: SegmentId) -> Option<String> {
223 for info in &self.segments_operations {
224 if let SegmentOperation::Create(ref c) = info {
225 if c.segment_id == segment {
226 return Some(c.name.clone());
227 }
228 }
229 }
230 None
231 }
232 pub fn exists_segment(&self, segment: &str) -> TxSegCheck {
233 if self.segs_created_names.contains_key(segment) {
234 for a in &self.segments_operations {
235 if let SegmentOperation::Create(ref c) = a {
236 if c.name == segment {
237 return TxSegCheck::Created(c.segment_id);
238 }
239 }
240 }
241 } else if self.segs_dropped_names.contains(segment) {
242 return TxSegCheck::Dropped;
243 }
244 TxSegCheck::None
245 }
246
247 pub fn add_create_segment(&mut self, journal: &Journal, segment: Segment) -> PERes<()> {
248 let create = CreateSegment::new(segment.get_name(), segment.get_segment_id(), segment.get_first_page());
249
250 journal.log(&create, &self.id)?;
251 self.segments_operations.push(SegmentOperation::Create(create));
252 self.segs_created_names
253 .insert(segment.get_name().to_owned(), segment.get_segment_id());
254 self.segs_created.insert(segment.get_segment_id(), segment);
255 Ok(())
256 }
257
258 pub fn recover_add(&mut self, create: &CreateSegment) {
259 self.segments_operations.push(SegmentOperation::Create(create.clone()));
260 let segment = Segment::new_allocation(create.first_page, create.segment_id, &create.name);
261 self.segs_created_names
262 .insert(create.name.clone(), segment.get_segment_id());
263 self.segs_created.insert(create.segment_id, segment);
264 }
265
266 pub fn add_drop_segment(
267 &mut self,
268 journal: &Journal,
269 name: &str,
270 segment_id: SegmentId,
271 ) -> Result<(), DropSegmentError> {
272 if self.segs_created_names.contains_key(name) {
273 Err(DropSegmentError::CannotDropSegmentCreatedInTx)
274 } else {
275 let drop = DropSegment::new(name, segment_id);
276 journal.log(&drop, &self.id)?;
277 self.segments_operations.push(SegmentOperation::Drop(drop));
278 self.segs_dropped.insert(segment_id);
279 self.segs_dropped_names.insert(name.into());
280 Ok(())
281 }
282 }
283
284 pub fn recover_drop(&mut self, drop: &DropSegment) {
285 self.segments_operations.push(SegmentOperation::Drop(drop.clone()));
286 self.segs_dropped.insert(drop.segment_id);
287 self.segs_dropped_names.insert(drop.name.clone());
288 }
289
290 pub fn add_read(&mut self, journal: &Journal, segment: SegmentId, recref: &RecRef, version: u16) -> PERes<()> {
291 if self.strategy == TxStrategy::VersionOnRead {
292 let read = ReadRecord::new(segment, recref, version);
293 journal.log(&read, &self.id)?;
294 self.read.insert(*recref, read);
295 }
296 Ok(())
297 }
298
299 pub fn recover_read(&mut self, read: &ReadRecord) {
300 self.read.insert(read.recref, read.clone());
301 }
302
303 pub fn add_insert(&mut self, journal: &Journal, segment: SegmentId, rec_ref: &RecRef, record: u64) -> PERes<()> {
304 self.segs_updated.insert(segment);
305 let insert = InsertRecord::new(segment, rec_ref, record);
306
307 journal.log(&insert, &self.id)?;
308 self.inserted.push(insert);
309 Ok(())
310 }
311
312 pub fn add_new_segment_page(
313 &mut self,
314 journal: &Journal,
315 segment: SegmentId,
316 new_page: u64,
317 previous_page: u64,
318 ) -> PERes<()> {
319 let new_page = NewSegmentPage::new(segment, new_page, previous_page);
320
321 journal.log(&new_page, &self.id)?;
322 self.segs_new_pages.push(new_page);
323 Ok(())
324 }
325
326 pub fn recover_insert(&mut self, insert: &InsertRecord) {
327 self.segs_updated.insert(insert.segment);
328 self.inserted.push(insert.clone());
329 }
330
331 pub fn add_update(
332 &mut self,
333 journal: &Journal,
334 segment: SegmentId,
335 rec_ref: &RecRef,
336 record: u64,
337 version: u16,
338 ) -> PERes<()> {
339 self.segs_updated.insert(segment);
340 let update = UpdateRecord::new(segment, rec_ref, record, version);
341 journal.log(&update, &self.id)?;
342 self.updated.push(update);
343 Ok(())
344 }
345
346 pub fn recover_update(&mut self, update: &UpdateRecord) {
347 self.segs_updated.insert(update.segment);
348 self.updated.push(update.clone());
349 }
350
351 pub fn add_delete(&mut self, journal: &Journal, segment: SegmentId, rec_ref: &RecRef, version: u16) -> PERes<()> {
352 self.segs_updated.insert(segment);
353 let delete = DeleteRecord::new(segment, rec_ref, version);
354 journal.log(&delete, &self.id)?;
355 self.deleted.push(delete);
356 Ok(())
357 }
358
359 pub fn add_put<K, V>(&mut self, index: IndexId, k: K, v: V)
360 where
361 K: IndexTypeInternal,
362 V: IndexTypeInternal,
363 {
364 if let Some(ref mut indexes) = self.indexes {
365 indexes.put(index, k, v);
366 }
367 }
368
369 pub fn add_remove<K, V>(&mut self, index: IndexId, k: K, v: Option<V>)
370 where
371 K: IndexTypeInternal,
372 V: IndexTypeInternal,
373 {
374 if let Some(ref mut indexes) = self.indexes {
375 indexes.remove(index, k, v);
376 }
377 }
378
379 pub fn apply_changes<K, V>(
380 &self,
381 vm: ValueMode,
382 index: IndexId,
383 index_name: &str,
384 k: &K,
385 pers: Option<Value<V>>,
386 ) -> Result<Option<Value<V>>, IndexChangeError>
387 where
388 K: IndexTypeInternal,
389 V: IndexTypeInternal,
390 {
391 if let Some(ref indexes) = self.indexes {
392 indexes.apply_changes(index, index_name, vm, k, pers)
393 } else {
394 Ok(pers)
395 }
396 }
397
398 pub fn index_range<K, V, R>(&self, index: IndexId, range: R) -> Option<IntoIter<K>>
399 where
400 K: IndexTypeInternal,
401 V: IndexTypeInternal,
402 R: RangeBounds<K>,
403 {
404 if let Some(ind) = &self.indexes {
405 ind.range::<K, V, R>(index, range)
406 } else {
407 None
408 }
409 }
410
411 pub fn recover_delete(&mut self, delete: &DeleteRecord) {
412 self.segs_updated.insert(delete.segment);
413 self.deleted.push(delete.clone());
414 }
415
416 pub fn scan_insert(&self, seg: SegmentId) -> TransactionInsertScanner {
417 TransactionInsertScanner::new(self.inserted.clone(), seg)
418 }
419
420 pub fn read(&self, rec_ref: &RecRef) -> TxRead {
421 for ele in &self.deleted {
422 if ele.recref == *rec_ref {
423 return TxRead::Deleted;
424 }
425 }
426 if let Some(ele) = self.updated.iter().rev().find(|ele| ele.recref == *rec_ref) {
427 return TxRead::Record((ele.record_page, ele.version));
428 }
429 for ele in &self.inserted {
430 if ele.recref == *rec_ref {
431 return TxRead::Record((ele.record_page, 1));
432 }
433 }
434 TxRead::None
435 }
436
437 pub fn recover_prepare(&mut self, persy_impl: &PersyImpl) -> Result<PreparedState, PrepareError> {
438 let address = persy_impl.address();
439
440 let _ = self.collapse_operations();
441 let index_locks = IndexDataLocks::new(self.timeout);
442 let (locks, _) = self.coll_locks(&index_locks);
443 if let Err(x) = address.acquire_locks(&locks, self.timeout) {
444 self.recover_rollback(persy_impl)?;
445 return Err(x);
446 }
447 let crt_upd_segs = locks.created_updated_segments_cloned();
448 if let Err(x) = address.recover_allocations(&crt_upd_segs, &mut self.segs_created) {
449 self.rollback_prepared(persy_impl, PreparedState::locked(locks, index_locks))?;
450 return Err(PrepareError::from(x));
451 }
452 Ok(PreparedState::locked(locks, index_locks))
453 }
454
455 pub fn prepare(mut self, persy_impl: &PersyImpl) -> Result<(TransactionImpl, PreparedState), PrepareError> {
456 let journal = persy_impl.journal();
457 let snapshots = persy_impl.snapshots();
458 let address = persy_impl.address();
459
460 let ind = self.indexes;
461 self.indexes = None;
462 let mut index_locks = IndexDataLocks::new(self.timeout);
463 if let Some(mut ind_change) = ind {
464 let changed_indexes = ind_change.changed_indexes();
465 for check in changed_indexes {
466 if self.segs_dropped.contains(&check.get_meta_id()) {
467 ind_change.remove_changes(&check);
468 }
469 }
470
471 let to_lock = ind_change.changed_indexes();
472 if let Err(x) = index_locks.read_lock_indexes(persy_impl, &to_lock) {
473 self.rollback_prepared(persy_impl, PreparedState::only_indexes(index_locks))?;
474 return Err(PrepareError::from(x));
475 }
476 if let Err(x) = ind_change.apply(&mut index_locks, persy_impl, &mut self) {
477 self.rollback_prepared(persy_impl, PreparedState::only_indexes(index_locks))?;
478 return Err(PrepareError::from(x));
479 }
480 }
481
482 let mut freed_pages = self.collapse_operations();
483
484 let (locks, checks) = self.coll_locks(&index_locks);
485 if let Err(x) = address.acquire_locks(&locks, self.timeout) {
486 let prepared = PreparedState::only_indexes(index_locks);
487 self.rollback_prepared(persy_impl, prepared)?;
488 return Err(x);
489 };
490 let mut created = locks.created_segments_cloned();
491 created.retain(|x| !self.segs_dropped_names.contains(x));
492 let mut updated_segs = self.segs_updated.clone();
493 updated_segs.retain(|x| !self.segs_created.contains_key(x));
494 if let Err(x) = address.check_segments(&created, updated_segs.into_iter()) {
495 self.rollback_prepared(persy_impl, PreparedState::locked(locks, index_locks))?;
496 return Err(x);
497 }
498 let check_version = self.strategy != TxStrategy::LastWin;
499 let old_records = match address.check_persistent_records(&checks, check_version) {
500 Ok(old) => old,
501 Err(x) => {
502 self.rollback_prepared(persy_impl, PreparedState::locked(locks, index_locks))?;
503 return Err(x);
504 }
505 };
506
507 for dropped_seg in &self.segs_dropped {
508 let pages = address.collect_segment_pages(*dropped_seg)?;
509 for p in pages.into_iter().map(FreedPage::new) {
510 freed_pages.insert(p);
511 }
512 }
513 let mut snapshot_entries = Vec::with_capacity(old_records.len());
514 for old_record in &old_records {
515 freed_pages.insert(FreedPage::new(old_record.record_page));
516 snapshot_entries.push(SnapshotEntry::change(
517 &old_record.recref,
518 old_record.record_page,
519 old_record.version,
520 ));
521 let rollback = RollbackPage::new(old_record.segment, &old_record.recref, old_record.record_page);
522 journal.log(&rollback, &self.id)?;
523 }
524 for insert in &self.inserted {
525 snapshot_entries.push(SnapshotEntry::insert(&insert.recref));
526 }
527 for freed_page in &freed_pages {
528 journal.log(freed_page, &self.id)?;
529 }
530 let mut freed_pages_vec: Vec<_> = freed_pages.into_iter().collect();
531 freed_pages_vec.reverse();
532 let snapshot_ref = snapshots.new_snapshot();
533 self.freed_pages = Some(freed_pages_vec.clone());
534 journal.prepare(&PrepareCommit::new(), &self.id)?;
535 self.sync(persy_impl, &snapshot_ref)?;
536
537 Ok((
538 self,
539 PreparedState::all(snapshot_ref, locks, index_locks, snapshot_entries, freed_pages_vec),
540 ))
541 }
542
543 fn sync(&self, persy_impl: &PersyImpl, snapshot_ref: &SnapshotRef) -> PERes<()> {
544 persy_impl.transaction_sync(&self.sync_mode, snapshot_ref)
545 }
546
547 fn collapse_operations(&mut self) -> BTreeSet<FreedPage> {
548 let mut pages_to_free = BTreeSet::new();
549 let mut inserted_by_id = HashMap::new();
550 for insert in self.inserted.drain(..) {
551 inserted_by_id.insert(insert.recref, insert);
552 }
553
554 let mut updated_by_id = HashMap::new();
555 for update in self.updated.drain(..) {
556 match updated_by_id.entry(update.recref) {
557 Entry::Vacant(e) => {
558 e.insert(update);
559 }
560 Entry::Occupied(mut e) => {
561 pages_to_free.insert(FreedPage::new(e.get().record_page));
562 e.get_mut().record_page = update.record_page;
563 }
564 }
565 }
566
567 for (k, insert) in &mut inserted_by_id {
568 if let Some(update) = updated_by_id.remove(k) {
569 pages_to_free.insert(FreedPage::new(insert.record_page));
570 insert.record_page = update.record_page;
571 }
572 }
573
574 let mut i = 0;
575 while i != self.deleted.len() {
576 if let Some(insert) = inserted_by_id.remove(&self.deleted[i].recref) {
577 self.deleted.remove(i);
578 pages_to_free.insert(FreedPage::new(insert.record_page));
579 } else {
580 i += 1;
581 }
582 }
583
584 for delete in &self.deleted {
585 if let Some(update) = updated_by_id.remove(&delete.recref) {
586 pages_to_free.insert(FreedPage::new(update.record_page));
587 }
588 }
589
590 for (_, insert) in inserted_by_id.drain() {
591 if self.segs_dropped.contains(&insert.segment) {
592 pages_to_free.insert(FreedPage::new(insert.record_page));
593 } else {
594 self.inserted.push(insert);
595 }
596 }
597
598 for (_, update) in updated_by_id.drain() {
599 if self.segs_dropped.contains(&update.segment) {
600 pages_to_free.insert(FreedPage::new(update.record_page));
601 } else {
602 self.updated.push(update);
603 }
604 }
605 pages_to_free
606 }
607
608 fn coll_locks(&self, index_locks: &IndexDataLocks) -> (Locks, Vec<CheckRecord>) {
609 let mut crt_upd_segs = Vec::new();
610 for create in self.segs_created.keys() {
611 if !&self.segs_dropped.contains(create) && !index_locks.is_segment_locked(create) {
612 crt_upd_segs.push(*create);
613 }
614 }
615 for update in &self.segs_updated {
616 if !&self.segs_dropped.contains(update) && !index_locks.is_segment_locked(update) {
617 crt_upd_segs.push(*update);
618 }
619 }
620
621 let mut dropped_segs: Vec<_> = self.segs_dropped.iter().copied().collect();
622 let mut check_records = HashSet::new();
623 let mut lock_records = HashSet::new();
624
625 for update in &self.updated {
628 let mut version = update.version;
629 if let Some(read_v) = self.read.get(&update.recref) {
631 version = read_v.version;
632 }
633 if !index_locks.is_record_locked(&update.recref) {
634 lock_records.insert(update.recref);
635 }
636 check_records.insert(CheckRecord::new(update.segment, update.recref, version));
637 }
638
639 for delete in &self.deleted {
640 let mut version = delete.version;
641 if let Some(read_v) = self.read.get(&delete.recref) {
643 version = read_v.version;
644 }
645 if !index_locks.is_record_locked(&delete.recref) {
646 lock_records.insert(delete.recref);
647 }
648 check_records.insert(CheckRecord::new(delete.segment, delete.recref, version));
649 }
650
651 for insert in &self.inserted {
652 lock_records.remove(&insert.recref);
653 check_records.remove(&CheckRecord::new(insert.segment, insert.recref, 1));
654 }
655
656 let mut records: Vec<RecRef> = lock_records.into_iter().collect();
657 records.sort();
658 crt_upd_segs.sort();
659 dropped_segs.sort();
660 let created = self.segs_created_names.keys().cloned().collect();
661 (
662 Locks::new(records, created, crt_upd_segs, dropped_segs),
663 check_records.into_iter().collect(),
664 )
665 }
666
667 fn internal_rollback(&self, persy_impl: &PersyImpl) -> PERes<(Vec<(SegmentId, u64)>, Vec<u64>)> {
668 let address = persy_impl.address();
669
670 let dropped_segs: Vec<_> = self.segs_created.keys().copied().collect();
671 let address_to_free = address.rollback(&self.inserted)?;
672 let mut free_pages = Vec::new();
673 for insert in &self.inserted {
674 if dropped_segs.contains(&insert.segment) {
675 free_pages.push(insert.record_page);
676 }
677 }
678
679 for page in &self.segs_new_pages {
680 if self.segs_created.contains_key(&page.segment) {
681 free_pages.push(page.page);
682 }
683 }
684
685 for update in &self.updated {
686 if dropped_segs.contains(&update.segment) {
687 free_pages.push(update.record_page);
688 }
689 }
690
691 Ok((address_to_free, free_pages))
692 }
693
694 pub fn recover_rollback(&self, persy_impl: &PersyImpl) -> PERes<()> {
695 let journal = persy_impl.journal();
696 let address = persy_impl.address();
697 let allocator = persy_impl.allocator();
698
699 journal.end(&Rollback::new(), &self.id)?;
700 let (address_to_free, free_pages) = self.internal_rollback(persy_impl)?;
701 for page in free_pages {
702 allocator.recover_free(page)?;
703 }
704 journal.finished_to_clean(&[self.id.clone()])?;
705
706 if let Some(rollback_pages) = &self.rollback_pages {
707 address.recover_rollback(rollback_pages)?;
708 }
709 let to_free = address.recover_remove_pages(&address_to_free)?;
710 for page in to_free {
711 allocator.recover_free(page)?;
712 }
713 Ok(())
714 }
715
716 pub fn rollback(&mut self, persy_impl: &PersyImpl) -> PERes<()> {
717 let journal = persy_impl.journal();
718 let snapshots = persy_impl.snapshots();
719 let allocator = persy_impl.allocator();
720 journal.end(&Rollback::new(), &self.id)?;
721 let (address_to_free, free_pages) = self.internal_rollback(persy_impl)?;
722 allocator.free_pages(&free_pages)?;
723 journal.finished_to_clean(&[self.id.clone()])?;
724 let _add_snap_id = snapshots.snapshot(Vec::new(), CleanInfo::new(Vec::new(), address_to_free), self.id.clone());
725 Ok(())
726 }
727
728 pub fn rollback_prepared(&mut self, persy_impl: &PersyImpl, prepared: PreparedState) -> PERes<()> {
729 let journal = persy_impl.journal();
730 let address = persy_impl.address();
731 let snapshots = persy_impl.snapshots();
732 let allocator = persy_impl.allocator();
733
734 journal.end(&Rollback::new(), &self.id)?;
735
736 let (address_to_free, free_pages) = self.internal_rollback(persy_impl)?;
737 allocator.free_pages(&free_pages)?;
738 if let Some(locks) = &prepared.data_locks {
739 address.release_locks(locks);
740 }
741 journal.finished_to_clean(&[self.id.clone()])?;
742
743 let infos = CleanInfo::new(Vec::new(), address_to_free);
744 if let Some(snapshot_ref) = prepared.snapshot_ref {
745 snapshots.fill_snapshot_clean_info(&snapshot_ref, infos);
746 }
747 Ok(())
748 }
749
750 pub fn recover_commit(&mut self, persy_impl: &PersyImpl, prepared: PreparedState) -> PERes<()> {
751 let journal = persy_impl.journal();
752
753 self.internal_commit(persy_impl, true, &prepared)?;
754 journal.end(&Commit::new(), &self.id)?;
755 self.recover_commit_cleanup(persy_impl)
756 }
757
758 pub(crate) fn remove_free_pages(&mut self, pages: Vec<u64>) {
759 if let Some(fp) = &mut self.freed_pages {
760 fp.retain(|x| !pages.contains(&x.page));
761 }
762 }
763 pub fn recover_commit_cleanup(&mut self, persy_impl: &PersyImpl) -> PERes<()> {
764 let allocator = persy_impl.allocator();
765 let address = persy_impl.address();
766 let journal = persy_impl.journal();
767 address.recover_segment_operations(&self.segments_operations, &mut self.segs_created, &self.segs_new_pages)?;
768 if let Some(ref up_free) = self.freed_pages {
769 for to_free in up_free {
770 if !journal.is_page_in_start_list(to_free.page) {
771 allocator.recover_free(to_free.page)?;
772 }
773 }
774 }
775 let delete_pages = self
776 .deleted
777 .iter()
778 .map(|r| (r.segment, r.recref.page))
779 .collect::<Vec<_>>();
780 let to_free = address.recover_remove_pages(&delete_pages)?;
781 for page in to_free {
782 allocator.recover_free(page)?;
783 }
784 persy_impl.allocator().device_sync()?;
785 Ok(())
786 }
787
788 fn internal_commit(
789 &mut self,
790 persy_impl: &PersyImpl,
791 recover: bool,
792 prepared: &PreparedState,
793 ) -> PERes<Vec<(SegmentId, u64)>> {
794 let address = persy_impl.address();
795 let pages_to_unlink = address.apply(
796 &self.segs_new_pages,
797 &self.inserted,
798 &self.updated,
799 &self.deleted,
800 &self.segments_operations,
801 &mut self.segs_created,
802 recover,
803 )?;
804
805 if let Some(locks) = &prepared.data_locks {
806 address.release_locks(locks);
807 }
808
809 Ok(pages_to_unlink)
810 }
811
812 pub(crate) fn free_address_structures_impl(
813 journal: &Journal,
814 snapshots: &Arc<Snapshots>,
815 address: &Address,
816 allocator: &Allocator,
817 address_to_free: &[(SegmentId, u64)],
818 ) -> PERes<()> {
819 if !address_to_free.is_empty() {
820 let new_pages = address.clear_empty(address_to_free)?;
821 let tx_id = journal.start()?;
822 let mut freed = Vec::new();
823 for (_, page) in address_to_free {
824 let fp = FreedPage::new(*page);
825 journal.log(&fp, &tx_id)?;
826 freed.push(fp);
827 }
828 for (segment, page) in new_pages {
832 journal.log(&NewSegmentPage::new(segment, page, 0), &tx_id)?;
833 }
834 journal.end(&PrepareCommit::new(), &tx_id)?;
835 journal.end(&Commit::new(), &tx_id)?;
836 let add_snap_id = snapshots.snapshot(Vec::new(), CleanInfo::new(freed, Vec::new()), tx_id);
839 if let Some(pc) = snapshots.pending_clean(add_snap_id.id()) {
840 allocator.to_release_next_sync(pc);
841 }
842 }
843 Ok(())
844 }
845
846 pub fn commit(&mut self, persy_impl: &PersyImpl, mut prepared: PreparedState) -> Result<SnapshotRef, GenericError> {
847 let allocator = persy_impl.allocator();
848 let journal = persy_impl.journal();
849 let snapshots = persy_impl.snapshots();
850 if let (Some(entries), Some(snapshot_ref)) = (prepared.entries.take(), &prepared.snapshot_ref) {
851 snapshots.fill_snapshot_address(snapshot_ref, entries, self.id.clone())
852 }
853 let address_to_free = self.internal_commit(persy_impl, false, &prepared)?;
854 if let (Some(freed_pages), Some(snapshot_ref)) = (prepared.freed_pages.take(), &prepared.snapshot_ref) {
855 let infos = CleanInfo::new(freed_pages, address_to_free);
856 snapshots.fill_snapshot_clean_info(snapshot_ref, infos);
857 }
858 if !self.segments_operations.is_empty() {
859 if let Some(snapshot_ref) = &prepared.snapshot_ref {
860 self.sync(persy_impl, snapshot_ref)?;
861 }
862 }
863 journal.end(&Commit::new(), &self.id)?;
864 let snapshot_ref = prepared.snapshot_ref.take().expect("everytime has snapshot");
865 if let Some(pc) = snapshots.pending_clean(snapshot_ref.id()) {
866 allocator.to_release_next_sync(pc);
867 }
868 Ok(snapshot_ref)
869 }
870
871 pub fn recover_metadata(&mut self, metadata: &Metadata) {
872 self.strategy = metadata.strategy.clone();
873 self.meta_id = metadata.meta_id.clone();
874 }
875
876 pub fn recover_rollback_page(&mut self, rollback_page: &RollbackPage) {
877 if let Some(pages) = &mut &mut self.rollback_pages {
878 pages.push(rollback_page.clone());
879 } else {
880 self.rollback_pages = Some(vec![rollback_page.clone()]);
881 }
882 }
883
884 pub fn recover_freed_page(&mut self, freed: &FreedPage) {
885 self.freed_pages.get_or_insert(Vec::new()).push(freed.clone());
886 }
887
888 pub fn recover_new_segment_page(&mut self, new_page: &NewSegmentPage) {
889 self.segs_new_pages.push(new_page.clone());
890 }
891
892 pub fn meta_id(&self) -> &Vec<u8> {
893 &self.meta_id
894 }
895 pub fn id(&self) -> &JournalId {
896 &self.id
897 }
898
899 pub fn filter_list<'a>(
900 &'a self,
901 pers: &'a [(String, SegmentId)],
902 ) -> impl Iterator<Item = (&'a str, SegmentId)> + 'a {
903 let outer = pers.iter().map(|(name, id)| (name.as_str(), *id));
904 let inner = self.segments_operations.iter().filter_map(|seg| {
905 if let SegmentOperation::Create(c) = seg {
906 Some((c.name.as_str(), c.segment_id))
907 } else {
908 None
909 }
910 });
911
912 outer.chain(inner).filter(move |x| !self.segs_dropped.contains(&x.1))
913 }
914
915 pub fn filter_list_snap<'a>(
916 &'a self,
917 pers: &'a [(String, SegmentId, u64)],
918 ) -> impl Iterator<Item = (&'a str, SegmentId, u64)> + 'a {
919 let outer = pers.iter().map(|(name, id, d)| (name.as_str(), *id, *d));
920 let inner = self.segments_operations.iter().filter_map(|seg| {
921 if let SegmentOperation::Create(c) = seg {
922 Some((c.name.as_str(), c.segment_id, c.first_page))
923 } else {
924 None
925 }
926 });
927
928 outer.chain(inner).filter(move |x| !self.segs_dropped.contains(&x.1))
929 }
930
931 pub(crate) fn get_segment_mut(&mut self, segment_id: SegmentId) -> Option<&mut Segment> {
932 self.segs_created.get_mut(&segment_id)
933 }
934 pub(crate) fn get_segment(&self, segment_id: SegmentId) -> Option<&Segment> {
935 self.segs_created.get(&segment_id)
936 }
937 pub(crate) fn get_timeout(&self) -> Duration {
938 self.timeout
939 }
940}
941
942#[cfg(test)]
943mod tests {
944 use crate::{
945 id::{RecRef, SegmentId},
946 journal::{
947 records::{DeleteRecord, FreedPage, InsertRecord, UpdateRecord},
948 JournalId,
949 },
950 transaction::tx_impl::TransactionImpl,
951 };
952
953 #[test]
954 fn test_scan_insert() {
955 let segment_id = SegmentId::new(10);
956 let segment_id_other = SegmentId::new(20);
957 let mut tx = TransactionImpl::recover(JournalId::new(0, 0));
958 tx.inserted.push(InsertRecord::new(segment_id, &RecRef::new(3, 2), 2));
959 tx.inserted.push(InsertRecord::new(segment_id, &RecRef::new(4, 2), 2));
960 tx.inserted
961 .push(InsertRecord::new(segment_id_other, &RecRef::new(0, 1), 3));
962 let mut count = 0;
963 for x in tx.scan_insert(segment_id) {
964 assert_eq!(x.pos, 2);
965 count += 1;
966 }
967 assert_eq!(count, 2);
968 }
969
970 #[test]
971 fn test_collapse() {
972 let segment_id = SegmentId::new(10);
973 let segment_id_other = SegmentId::new(20);
974 let mut tx = TransactionImpl::recover(JournalId::new(0, 0));
975 tx.inserted.push(InsertRecord::new(segment_id, &RecRef::new(3, 1), 1));
976 tx.inserted.push(InsertRecord::new(segment_id, &RecRef::new(3, 2), 2));
977 tx.inserted
978 .push(InsertRecord::new(segment_id_other, &RecRef::new(3, 3), 3));
979 tx.inserted
980 .push(InsertRecord::new(segment_id_other, &RecRef::new(3, 4), 4));
981 tx.updated.push(UpdateRecord::new(segment_id, &RecRef::new(3, 1), 5, 1));
982 tx.updated.push(UpdateRecord::new(segment_id, &RecRef::new(3, 1), 6, 1));
983 tx.updated.push(UpdateRecord::new(segment_id, &RecRef::new(3, 2), 7, 1));
984 tx.updated.push(UpdateRecord::new(segment_id, &RecRef::new(3, 5), 8, 1));
985 tx.updated.push(UpdateRecord::new(segment_id, &RecRef::new(3, 5), 9, 1));
986 tx.updated
987 .push(UpdateRecord::new(segment_id, &RecRef::new(3, 6), 10, 1));
988 tx.updated
989 .push(UpdateRecord::new(segment_id, &RecRef::new(3, 7), 11, 1));
990 tx.deleted.push(DeleteRecord::new(segment_id, &RecRef::new(3, 1), 0));
991 tx.deleted.push(DeleteRecord::new(segment_id, &RecRef::new(3, 3), 1));
992 tx.deleted.push(DeleteRecord::new(segment_id, &RecRef::new(3, 6), 2));
993 tx.deleted.push(DeleteRecord::new(segment_id, &RecRef::new(3, 8), 2));
994 let free = tx.collapse_operations();
995 assert_eq!(free.len(), 7);
996 for e in [1, 2, 3, 5, 6, 8, 10].iter().map(|x| FreedPage::new(*x)) {
997 assert!(free.contains(&e));
998 }
999 assert_eq!(tx.inserted.len(), 2);
1000 assert_eq!(tx.updated.len(), 2);
1001 assert_eq!(tx.deleted.len(), 2);
1002 }
1003}