persy/transaction/
tx_impl.rs

1use crate::{
2    address::segment::Segment,
3    address::Address,
4    allocator::Allocator,
5    config::TxStrategy,
6    error::{IndexChangeError, PERes},
7    id::{IndexId, RecRef, SegmentId},
8    index::{
9        config::{IndexTypeInternal, ValueMode},
10        keeper_tx::IndexTransactionKeeper,
11        tree::nodes::Value,
12    },
13    journal::{
14        records::{
15            Commit, CreateSegment, DeleteRecord, DropSegment, FreedPage, InsertRecord, Metadata, NewSegmentPage,
16            PrepareCommit, ReadRecord, Rollback, RollbackPage, UpdateRecord,
17        },
18        Journal, JournalId,
19    },
20    persy::PersyImpl,
21    snapshot::data::{CleanInfo, SnapshotEntry},
22    snapshots::{SnapshotRef, Snapshots},
23    transaction::{index_locks::IndexDataLocks, iter::TransactionInsertScanner, locks::Locks},
24    DropSegmentError, GenericError, PrepareError,
25};
26use std::{
27    collections::{hash_map::Entry, BTreeSet, HashMap, HashSet},
28    ops::RangeBounds,
29    sync::Arc,
30    time::Duration,
31    vec::IntoIter,
32};
33
34pub enum SegmentOperation {
35    Create(CreateSegment),
36    Drop(DropSegment),
37}
38
39#[derive(Clone, Hash, Eq, PartialEq, Debug)]
40pub(crate) struct CheckRecord {
41    pub(crate) segment_id: SegmentId,
42    pub(crate) record_id: RecRef,
43    pub(crate) version: u16,
44}
45impl CheckRecord {
46    pub(crate) fn new(segment_id: SegmentId, record_id: RecRef, version: u16) -> Self {
47        Self {
48            segment_id,
49            record_id,
50            version,
51        }
52    }
53}
54
55#[derive(Clone)]
56pub struct PreparedState {
57    snapshot_ref: Option<SnapshotRef>,
58    data_locks: Option<Locks>,
59    entries: Option<Vec<SnapshotEntry>>,
60    freed_pages: Option<Vec<FreedPage>>,
61}
62
63impl PreparedState {
64    fn only_indexes(index: IndexDataLocks) -> Self {
65        let (index_record_locks, segments) = index.extract(&[]);
66        let data_locks = Some(Locks::new(
67            index_record_locks.into_iter().collect(),
68            Vec::new(),
69            segments.into_iter().collect(),
70            Vec::new(),
71        ));
72        Self {
73            snapshot_ref: None,
74            data_locks,
75            entries: None,
76            freed_pages: None,
77        }
78    }
79    fn locked(data: Locks, index: IndexDataLocks) -> Self {
80        Self::all_int(None, data, index, None, None)
81    }
82    fn all(
83        snapshot_ref: SnapshotRef,
84        data: Locks,
85        index: IndexDataLocks,
86        entries: Vec<SnapshotEntry>,
87        freed_pages: Vec<FreedPage>,
88    ) -> Self {
89        Self::all_int(Some(snapshot_ref), data, index, Some(entries), Some(freed_pages))
90    }
91
92    fn all_int(
93        snapshot_ref: Option<SnapshotRef>,
94        mut data: Locks,
95        index: IndexDataLocks,
96        entries: Option<Vec<SnapshotEntry>>,
97        freed_pages: Option<Vec<FreedPage>>,
98    ) -> Self {
99        let (index_record_locks, segments) = index.extract(data.records());
100        data.add_records(index_record_locks.into_iter());
101        data.add_create_update_segments(segments.into_iter());
102        Self {
103            snapshot_ref,
104            data_locks: Some(data),
105            entries,
106            freed_pages,
107        }
108    }
109
110    #[cfg(test)]
111    pub(crate) fn leak(&mut self) {
112        if let Some(sref) = &mut self.snapshot_ref {
113            sref.leak();
114        }
115    }
116}
117
118pub enum SyncMode {
119    Sync,
120    BackgroundSync,
121}
122
123pub struct TransactionImpl {
124    strategy: TxStrategy,
125    sync_mode: SyncMode,
126    meta_id: Vec<u8>,
127    id: JournalId,
128    inserted: Vec<InsertRecord>,
129    updated: Vec<UpdateRecord>,
130    deleted: Vec<DeleteRecord>,
131    read: HashMap<RecRef, ReadRecord>,
132    segments_operations: Vec<SegmentOperation>,
133    segs_created_names: HashMap<String, SegmentId>,
134    segs_dropped_names: HashSet<String>,
135    segs_created: HashMap<SegmentId, Segment>,
136    segs_dropped: HashSet<SegmentId>,
137    segs_updated: HashSet<SegmentId>,
138    freed_pages: Option<Vec<FreedPage>>,
139    rollback_pages: Option<Vec<RollbackPage>>,
140    indexes: Option<IndexTransactionKeeper>,
141    segs_new_pages: Vec<NewSegmentPage>,
142    timeout: Duration,
143}
144
145pub enum TxRead {
146    Record((u64, u16)),
147    Deleted,
148    None,
149}
150
151pub enum TxSegCheck {
152    Created(SegmentId),
153    Dropped,
154    None,
155}
156
157impl TransactionImpl {
158    pub fn new(
159        journal: &Journal,
160        strategy: &TxStrategy,
161        sync_mode: SyncMode,
162        meta_id: Vec<u8>,
163        timeout: Duration,
164    ) -> PERes<TransactionImpl> {
165        let id = journal.start()?;
166        journal.log(&Metadata::new(strategy, meta_id.clone()), &id)?;
167        Ok(TransactionImpl {
168            strategy: strategy.clone(),
169            sync_mode,
170            meta_id,
171            id,
172            inserted: Vec::new(),
173            updated: Vec::new(),
174            deleted: Vec::new(),
175            read: HashMap::new(),
176            segments_operations: Vec::new(),
177            segs_created_names: HashMap::new(),
178            segs_dropped_names: HashSet::new(),
179            segs_created: HashMap::new(),
180            segs_dropped: HashSet::new(),
181            segs_updated: HashSet::new(),
182            freed_pages: None,
183            rollback_pages: None,
184            indexes: Some(IndexTransactionKeeper::new()),
185            segs_new_pages: Vec::new(),
186            timeout,
187        })
188    }
189
190    pub fn recover(id: JournalId) -> TransactionImpl {
191        TransactionImpl {
192            strategy: TxStrategy::LastWin,
193            sync_mode: SyncMode::Sync,
194            meta_id: Vec::new(),
195            id,
196            inserted: Vec::new(),
197            updated: Vec::new(),
198            deleted: Vec::new(),
199            read: HashMap::new(),
200            segments_operations: Vec::new(),
201            segs_created_names: HashMap::new(),
202            segs_dropped_names: HashSet::new(),
203            segs_created: HashMap::new(),
204            segs_dropped: HashSet::new(),
205            segs_updated: HashSet::new(),
206            freed_pages: None,
207            rollback_pages: None,
208            indexes: Some(IndexTransactionKeeper::new()),
209            segs_new_pages: Vec::new(),
210            timeout: Duration::from_secs(60 * 60 * 24),
211        }
212    }
213
214    pub fn segment_created_in_tx(&self, segment: SegmentId) -> bool {
215        self.segs_created.contains_key(&segment)
216    }
217
218    pub fn index_created_in_tx(&self, index: &IndexId) -> bool {
219        self.segment_created_in_tx(index.get_meta_id())
220    }
221
222    pub fn segment_name_by_id(&self, segment: SegmentId) -> Option<String> {
223        for info in &self.segments_operations {
224            if let SegmentOperation::Create(ref c) = info {
225                if c.segment_id == segment {
226                    return Some(c.name.clone());
227                }
228            }
229        }
230        None
231    }
232    pub fn exists_segment(&self, segment: &str) -> TxSegCheck {
233        if self.segs_created_names.contains_key(segment) {
234            for a in &self.segments_operations {
235                if let SegmentOperation::Create(ref c) = a {
236                    if c.name == segment {
237                        return TxSegCheck::Created(c.segment_id);
238                    }
239                }
240            }
241        } else if self.segs_dropped_names.contains(segment) {
242            return TxSegCheck::Dropped;
243        }
244        TxSegCheck::None
245    }
246
247    pub fn add_create_segment(&mut self, journal: &Journal, segment: Segment) -> PERes<()> {
248        let create = CreateSegment::new(segment.get_name(), segment.get_segment_id(), segment.get_first_page());
249
250        journal.log(&create, &self.id)?;
251        self.segments_operations.push(SegmentOperation::Create(create));
252        self.segs_created_names
253            .insert(segment.get_name().to_owned(), segment.get_segment_id());
254        self.segs_created.insert(segment.get_segment_id(), segment);
255        Ok(())
256    }
257
258    pub fn recover_add(&mut self, create: &CreateSegment) {
259        self.segments_operations.push(SegmentOperation::Create(create.clone()));
260        let segment = Segment::new_allocation(create.first_page, create.segment_id, &create.name);
261        self.segs_created_names
262            .insert(create.name.clone(), segment.get_segment_id());
263        self.segs_created.insert(create.segment_id, segment);
264    }
265
266    pub fn add_drop_segment(
267        &mut self,
268        journal: &Journal,
269        name: &str,
270        segment_id: SegmentId,
271    ) -> Result<(), DropSegmentError> {
272        if self.segs_created_names.contains_key(name) {
273            Err(DropSegmentError::CannotDropSegmentCreatedInTx)
274        } else {
275            let drop = DropSegment::new(name, segment_id);
276            journal.log(&drop, &self.id)?;
277            self.segments_operations.push(SegmentOperation::Drop(drop));
278            self.segs_dropped.insert(segment_id);
279            self.segs_dropped_names.insert(name.into());
280            Ok(())
281        }
282    }
283
284    pub fn recover_drop(&mut self, drop: &DropSegment) {
285        self.segments_operations.push(SegmentOperation::Drop(drop.clone()));
286        self.segs_dropped.insert(drop.segment_id);
287        self.segs_dropped_names.insert(drop.name.clone());
288    }
289
290    pub fn add_read(&mut self, journal: &Journal, segment: SegmentId, recref: &RecRef, version: u16) -> PERes<()> {
291        if self.strategy == TxStrategy::VersionOnRead {
292            let read = ReadRecord::new(segment, recref, version);
293            journal.log(&read, &self.id)?;
294            self.read.insert(*recref, read);
295        }
296        Ok(())
297    }
298
299    pub fn recover_read(&mut self, read: &ReadRecord) {
300        self.read.insert(read.recref, read.clone());
301    }
302
303    pub fn add_insert(&mut self, journal: &Journal, segment: SegmentId, rec_ref: &RecRef, record: u64) -> PERes<()> {
304        self.segs_updated.insert(segment);
305        let insert = InsertRecord::new(segment, rec_ref, record);
306
307        journal.log(&insert, &self.id)?;
308        self.inserted.push(insert);
309        Ok(())
310    }
311
312    pub fn add_new_segment_page(
313        &mut self,
314        journal: &Journal,
315        segment: SegmentId,
316        new_page: u64,
317        previous_page: u64,
318    ) -> PERes<()> {
319        let new_page = NewSegmentPage::new(segment, new_page, previous_page);
320
321        journal.log(&new_page, &self.id)?;
322        self.segs_new_pages.push(new_page);
323        Ok(())
324    }
325
326    pub fn recover_insert(&mut self, insert: &InsertRecord) {
327        self.segs_updated.insert(insert.segment);
328        self.inserted.push(insert.clone());
329    }
330
331    pub fn add_update(
332        &mut self,
333        journal: &Journal,
334        segment: SegmentId,
335        rec_ref: &RecRef,
336        record: u64,
337        version: u16,
338    ) -> PERes<()> {
339        self.segs_updated.insert(segment);
340        let update = UpdateRecord::new(segment, rec_ref, record, version);
341        journal.log(&update, &self.id)?;
342        self.updated.push(update);
343        Ok(())
344    }
345
346    pub fn recover_update(&mut self, update: &UpdateRecord) {
347        self.segs_updated.insert(update.segment);
348        self.updated.push(update.clone());
349    }
350
351    pub fn add_delete(&mut self, journal: &Journal, segment: SegmentId, rec_ref: &RecRef, version: u16) -> PERes<()> {
352        self.segs_updated.insert(segment);
353        let delete = DeleteRecord::new(segment, rec_ref, version);
354        journal.log(&delete, &self.id)?;
355        self.deleted.push(delete);
356        Ok(())
357    }
358
359    pub fn add_put<K, V>(&mut self, index: IndexId, k: K, v: V)
360    where
361        K: IndexTypeInternal,
362        V: IndexTypeInternal,
363    {
364        if let Some(ref mut indexes) = self.indexes {
365            indexes.put(index, k, v);
366        }
367    }
368
369    pub fn add_remove<K, V>(&mut self, index: IndexId, k: K, v: Option<V>)
370    where
371        K: IndexTypeInternal,
372        V: IndexTypeInternal,
373    {
374        if let Some(ref mut indexes) = self.indexes {
375            indexes.remove(index, k, v);
376        }
377    }
378
379    pub fn apply_changes<K, V>(
380        &self,
381        vm: ValueMode,
382        index: IndexId,
383        index_name: &str,
384        k: &K,
385        pers: Option<Value<V>>,
386    ) -> Result<Option<Value<V>>, IndexChangeError>
387    where
388        K: IndexTypeInternal,
389        V: IndexTypeInternal,
390    {
391        if let Some(ref indexes) = self.indexes {
392            indexes.apply_changes(index, index_name, vm, k, pers)
393        } else {
394            Ok(pers)
395        }
396    }
397
398    pub fn index_range<K, V, R>(&self, index: IndexId, range: R) -> Option<IntoIter<K>>
399    where
400        K: IndexTypeInternal,
401        V: IndexTypeInternal,
402        R: RangeBounds<K>,
403    {
404        if let Some(ind) = &self.indexes {
405            ind.range::<K, V, R>(index, range)
406        } else {
407            None
408        }
409    }
410
411    pub fn recover_delete(&mut self, delete: &DeleteRecord) {
412        self.segs_updated.insert(delete.segment);
413        self.deleted.push(delete.clone());
414    }
415
416    pub fn scan_insert(&self, seg: SegmentId) -> TransactionInsertScanner {
417        TransactionInsertScanner::new(self.inserted.clone(), seg)
418    }
419
420    pub fn read(&self, rec_ref: &RecRef) -> TxRead {
421        for ele in &self.deleted {
422            if ele.recref == *rec_ref {
423                return TxRead::Deleted;
424            }
425        }
426        if let Some(ele) = self.updated.iter().rev().find(|ele| ele.recref == *rec_ref) {
427            return TxRead::Record((ele.record_page, ele.version));
428        }
429        for ele in &self.inserted {
430            if ele.recref == *rec_ref {
431                return TxRead::Record((ele.record_page, 1));
432            }
433        }
434        TxRead::None
435    }
436
437    pub fn recover_prepare(&mut self, persy_impl: &PersyImpl) -> Result<PreparedState, PrepareError> {
438        let address = persy_impl.address();
439
440        let _ = self.collapse_operations();
441        let index_locks = IndexDataLocks::new(self.timeout);
442        let (locks, _) = self.coll_locks(&index_locks);
443        if let Err(x) = address.acquire_locks(&locks, self.timeout) {
444            self.recover_rollback(persy_impl)?;
445            return Err(x);
446        }
447        let crt_upd_segs = locks.created_updated_segments_cloned();
448        if let Err(x) = address.recover_allocations(&crt_upd_segs, &mut self.segs_created) {
449            self.rollback_prepared(persy_impl, PreparedState::locked(locks, index_locks))?;
450            return Err(PrepareError::from(x));
451        }
452        Ok(PreparedState::locked(locks, index_locks))
453    }
454
455    pub fn prepare(mut self, persy_impl: &PersyImpl) -> Result<(TransactionImpl, PreparedState), PrepareError> {
456        let journal = persy_impl.journal();
457        let snapshots = persy_impl.snapshots();
458        let address = persy_impl.address();
459
460        let ind = self.indexes;
461        self.indexes = None;
462        let mut index_locks = IndexDataLocks::new(self.timeout);
463        if let Some(mut ind_change) = ind {
464            let changed_indexes = ind_change.changed_indexes();
465            for check in changed_indexes {
466                if self.segs_dropped.contains(&check.get_meta_id()) {
467                    ind_change.remove_changes(&check);
468                }
469            }
470
471            let to_lock = ind_change.changed_indexes();
472            if let Err(x) = index_locks.read_lock_indexes(persy_impl, &to_lock) {
473                self.rollback_prepared(persy_impl, PreparedState::only_indexes(index_locks))?;
474                return Err(PrepareError::from(x));
475            }
476            if let Err(x) = ind_change.apply(&mut index_locks, persy_impl, &mut self) {
477                self.rollback_prepared(persy_impl, PreparedState::only_indexes(index_locks))?;
478                return Err(PrepareError::from(x));
479            }
480        }
481
482        let mut freed_pages = self.collapse_operations();
483
484        let (locks, checks) = self.coll_locks(&index_locks);
485        if let Err(x) = address.acquire_locks(&locks, self.timeout) {
486            let prepared = PreparedState::only_indexes(index_locks);
487            self.rollback_prepared(persy_impl, prepared)?;
488            return Err(x);
489        };
490        let mut created = locks.created_segments_cloned();
491        created.retain(|x| !self.segs_dropped_names.contains(x));
492        let mut updated_segs = self.segs_updated.clone();
493        updated_segs.retain(|x| !self.segs_created.contains_key(x));
494        if let Err(x) = address.check_segments(&created, updated_segs.into_iter()) {
495            self.rollback_prepared(persy_impl, PreparedState::locked(locks, index_locks))?;
496            return Err(x);
497        }
498        let check_version = self.strategy != TxStrategy::LastWin;
499        let old_records = match address.check_persistent_records(&checks, check_version) {
500            Ok(old) => old,
501            Err(x) => {
502                self.rollback_prepared(persy_impl, PreparedState::locked(locks, index_locks))?;
503                return Err(x);
504            }
505        };
506
507        for dropped_seg in &self.segs_dropped {
508            let pages = address.collect_segment_pages(*dropped_seg)?;
509            for p in pages.into_iter().map(FreedPage::new) {
510                freed_pages.insert(p);
511            }
512        }
513        let mut snapshot_entries = Vec::with_capacity(old_records.len());
514        for old_record in &old_records {
515            freed_pages.insert(FreedPage::new(old_record.record_page));
516            snapshot_entries.push(SnapshotEntry::change(
517                &old_record.recref,
518                old_record.record_page,
519                old_record.version,
520            ));
521            let rollback = RollbackPage::new(old_record.segment, &old_record.recref, old_record.record_page);
522            journal.log(&rollback, &self.id)?;
523        }
524        for insert in &self.inserted {
525            snapshot_entries.push(SnapshotEntry::insert(&insert.recref));
526        }
527        for freed_page in &freed_pages {
528            journal.log(freed_page, &self.id)?;
529        }
530        let mut freed_pages_vec: Vec<_> = freed_pages.into_iter().collect();
531        freed_pages_vec.reverse();
532        let snapshot_ref = snapshots.new_snapshot();
533        self.freed_pages = Some(freed_pages_vec.clone());
534        journal.prepare(&PrepareCommit::new(), &self.id)?;
535        self.sync(persy_impl, &snapshot_ref)?;
536
537        Ok((
538            self,
539            PreparedState::all(snapshot_ref, locks, index_locks, snapshot_entries, freed_pages_vec),
540        ))
541    }
542
543    fn sync(&self, persy_impl: &PersyImpl, snapshot_ref: &SnapshotRef) -> PERes<()> {
544        persy_impl.transaction_sync(&self.sync_mode, snapshot_ref)
545    }
546
547    fn collapse_operations(&mut self) -> BTreeSet<FreedPage> {
548        let mut pages_to_free = BTreeSet::new();
549        let mut inserted_by_id = HashMap::new();
550        for insert in self.inserted.drain(..) {
551            inserted_by_id.insert(insert.recref, insert);
552        }
553
554        let mut updated_by_id = HashMap::new();
555        for update in self.updated.drain(..) {
556            match updated_by_id.entry(update.recref) {
557                Entry::Vacant(e) => {
558                    e.insert(update);
559                }
560                Entry::Occupied(mut e) => {
561                    pages_to_free.insert(FreedPage::new(e.get().record_page));
562                    e.get_mut().record_page = update.record_page;
563                }
564            }
565        }
566
567        for (k, insert) in &mut inserted_by_id {
568            if let Some(update) = updated_by_id.remove(k) {
569                pages_to_free.insert(FreedPage::new(insert.record_page));
570                insert.record_page = update.record_page;
571            }
572        }
573
574        let mut i = 0;
575        while i != self.deleted.len() {
576            if let Some(insert) = inserted_by_id.remove(&self.deleted[i].recref) {
577                self.deleted.remove(i);
578                pages_to_free.insert(FreedPage::new(insert.record_page));
579            } else {
580                i += 1;
581            }
582        }
583
584        for delete in &self.deleted {
585            if let Some(update) = updated_by_id.remove(&delete.recref) {
586                pages_to_free.insert(FreedPage::new(update.record_page));
587            }
588        }
589
590        for (_, insert) in inserted_by_id.drain() {
591            if self.segs_dropped.contains(&insert.segment) {
592                pages_to_free.insert(FreedPage::new(insert.record_page));
593            } else {
594                self.inserted.push(insert);
595            }
596        }
597
598        for (_, update) in updated_by_id.drain() {
599            if self.segs_dropped.contains(&update.segment) {
600                pages_to_free.insert(FreedPage::new(update.record_page));
601            } else {
602                self.updated.push(update);
603            }
604        }
605        pages_to_free
606    }
607
608    fn coll_locks(&self, index_locks: &IndexDataLocks) -> (Locks, Vec<CheckRecord>) {
609        let mut crt_upd_segs = Vec::new();
610        for create in self.segs_created.keys() {
611            if !&self.segs_dropped.contains(create) && !index_locks.is_segment_locked(create) {
612                crt_upd_segs.push(*create);
613            }
614        }
615        for update in &self.segs_updated {
616            if !&self.segs_dropped.contains(update) && !index_locks.is_segment_locked(update) {
617                crt_upd_segs.push(*update);
618            }
619        }
620
621        let mut dropped_segs: Vec<_> = self.segs_dropped.iter().copied().collect();
622        let mut check_records = HashSet::new();
623        let mut lock_records = HashSet::new();
624
625        // No need to lock on inserted records the new id unique is managed by the address.
626        //
627        for update in &self.updated {
628            let mut version = update.version;
629            // I found values in the read only for VersionOnRead
630            if let Some(read_v) = self.read.get(&update.recref) {
631                version = read_v.version;
632            }
633            if !index_locks.is_record_locked(&update.recref) {
634                lock_records.insert(update.recref);
635            }
636            check_records.insert(CheckRecord::new(update.segment, update.recref, version));
637        }
638
639        for delete in &self.deleted {
640            let mut version = delete.version;
641            // I found values in the read only for VersionOnRead
642            if let Some(read_v) = self.read.get(&delete.recref) {
643                version = read_v.version;
644            }
645            if !index_locks.is_record_locked(&delete.recref) {
646                lock_records.insert(delete.recref);
647            }
648            check_records.insert(CheckRecord::new(delete.segment, delete.recref, version));
649        }
650
651        for insert in &self.inserted {
652            lock_records.remove(&insert.recref);
653            check_records.remove(&CheckRecord::new(insert.segment, insert.recref, 1));
654        }
655
656        let mut records: Vec<RecRef> = lock_records.into_iter().collect();
657        records.sort();
658        crt_upd_segs.sort();
659        dropped_segs.sort();
660        let created = self.segs_created_names.keys().cloned().collect();
661        (
662            Locks::new(records, created, crt_upd_segs, dropped_segs),
663            check_records.into_iter().collect(),
664        )
665    }
666
667    fn internal_rollback(&self, persy_impl: &PersyImpl) -> PERes<(Vec<(SegmentId, u64)>, Vec<u64>)> {
668        let address = persy_impl.address();
669
670        let dropped_segs: Vec<_> = self.segs_created.keys().copied().collect();
671        let address_to_free = address.rollback(&self.inserted)?;
672        let mut free_pages = Vec::new();
673        for insert in &self.inserted {
674            if dropped_segs.contains(&insert.segment) {
675                free_pages.push(insert.record_page);
676            }
677        }
678
679        for page in &self.segs_new_pages {
680            if self.segs_created.contains_key(&page.segment) {
681                free_pages.push(page.page);
682            }
683        }
684
685        for update in &self.updated {
686            if dropped_segs.contains(&update.segment) {
687                free_pages.push(update.record_page);
688            }
689        }
690
691        Ok((address_to_free, free_pages))
692    }
693
694    pub fn recover_rollback(&self, persy_impl: &PersyImpl) -> PERes<()> {
695        let journal = persy_impl.journal();
696        let address = persy_impl.address();
697        let allocator = persy_impl.allocator();
698
699        journal.end(&Rollback::new(), &self.id)?;
700        let (address_to_free, free_pages) = self.internal_rollback(persy_impl)?;
701        for page in free_pages {
702            allocator.recover_free(page)?;
703        }
704        journal.finished_to_clean(&[self.id.clone()])?;
705
706        if let Some(rollback_pages) = &self.rollback_pages {
707            address.recover_rollback(rollback_pages)?;
708        }
709        let to_free = address.recover_remove_pages(&address_to_free)?;
710        for page in to_free {
711            allocator.recover_free(page)?;
712        }
713        Ok(())
714    }
715
716    pub fn rollback(&mut self, persy_impl: &PersyImpl) -> PERes<()> {
717        let journal = persy_impl.journal();
718        let snapshots = persy_impl.snapshots();
719        let allocator = persy_impl.allocator();
720        journal.end(&Rollback::new(), &self.id)?;
721        let (address_to_free, free_pages) = self.internal_rollback(persy_impl)?;
722        allocator.free_pages(&free_pages)?;
723        journal.finished_to_clean(&[self.id.clone()])?;
724        let _add_snap_id = snapshots.snapshot(Vec::new(), CleanInfo::new(Vec::new(), address_to_free), self.id.clone());
725        Ok(())
726    }
727
728    pub fn rollback_prepared(&mut self, persy_impl: &PersyImpl, prepared: PreparedState) -> PERes<()> {
729        let journal = persy_impl.journal();
730        let address = persy_impl.address();
731        let snapshots = persy_impl.snapshots();
732        let allocator = persy_impl.allocator();
733
734        journal.end(&Rollback::new(), &self.id)?;
735
736        let (address_to_free, free_pages) = self.internal_rollback(persy_impl)?;
737        allocator.free_pages(&free_pages)?;
738        if let Some(locks) = &prepared.data_locks {
739            address.release_locks(locks);
740        }
741        journal.finished_to_clean(&[self.id.clone()])?;
742
743        let infos = CleanInfo::new(Vec::new(), address_to_free);
744        if let Some(snapshot_ref) = prepared.snapshot_ref {
745            snapshots.fill_snapshot_clean_info(&snapshot_ref, infos);
746        }
747        Ok(())
748    }
749
750    pub fn recover_commit(&mut self, persy_impl: &PersyImpl, prepared: PreparedState) -> PERes<()> {
751        let journal = persy_impl.journal();
752
753        self.internal_commit(persy_impl, true, &prepared)?;
754        journal.end(&Commit::new(), &self.id)?;
755        self.recover_commit_cleanup(persy_impl)
756    }
757
758    pub(crate) fn remove_free_pages(&mut self, pages: Vec<u64>) {
759        if let Some(fp) = &mut self.freed_pages {
760            fp.retain(|x| !pages.contains(&x.page));
761        }
762    }
763    pub fn recover_commit_cleanup(&mut self, persy_impl: &PersyImpl) -> PERes<()> {
764        let allocator = persy_impl.allocator();
765        let address = persy_impl.address();
766        let journal = persy_impl.journal();
767        address.recover_segment_operations(&self.segments_operations, &mut self.segs_created, &self.segs_new_pages)?;
768        if let Some(ref up_free) = self.freed_pages {
769            for to_free in up_free {
770                if !journal.is_page_in_start_list(to_free.page) {
771                    allocator.recover_free(to_free.page)?;
772                }
773            }
774        }
775        let delete_pages = self
776            .deleted
777            .iter()
778            .map(|r| (r.segment, r.recref.page))
779            .collect::<Vec<_>>();
780        let to_free = address.recover_remove_pages(&delete_pages)?;
781        for page in to_free {
782            allocator.recover_free(page)?;
783        }
784        persy_impl.allocator().device_sync()?;
785        Ok(())
786    }
787
788    fn internal_commit(
789        &mut self,
790        persy_impl: &PersyImpl,
791        recover: bool,
792        prepared: &PreparedState,
793    ) -> PERes<Vec<(SegmentId, u64)>> {
794        let address = persy_impl.address();
795        let pages_to_unlink = address.apply(
796            &self.segs_new_pages,
797            &self.inserted,
798            &self.updated,
799            &self.deleted,
800            &self.segments_operations,
801            &mut self.segs_created,
802            recover,
803        )?;
804
805        if let Some(locks) = &prepared.data_locks {
806            address.release_locks(locks);
807        }
808
809        Ok(pages_to_unlink)
810    }
811
812    pub(crate) fn free_address_structures_impl(
813        journal: &Journal,
814        snapshots: &Arc<Snapshots>,
815        address: &Address,
816        allocator: &Allocator,
817        address_to_free: &[(SegmentId, u64)],
818    ) -> PERes<()> {
819        if !address_to_free.is_empty() {
820            let new_pages = address.clear_empty(address_to_free)?;
821            let tx_id = journal.start()?;
822            let mut freed = Vec::new();
823            for (_, page) in address_to_free {
824                let fp = FreedPage::new(*page);
825                journal.log(&fp, &tx_id)?;
826                freed.push(fp);
827            }
828            //TODO: double check this, we are logging here pages that are removed from the address
829            // the recover of a new segment page may add it back to the address structure, when it
830            // should remove it.
831            for (segment, page) in new_pages {
832                journal.log(&NewSegmentPage::new(segment, page, 0), &tx_id)?;
833            }
834            journal.end(&PrepareCommit::new(), &tx_id)?;
835            journal.end(&Commit::new(), &tx_id)?;
836            //This need to still use a snapshot, in case there are other snapshots holding back the
837            //cleaning process
838            let add_snap_id = snapshots.snapshot(Vec::new(), CleanInfo::new(freed, Vec::new()), tx_id);
839            if let Some(pc) = snapshots.pending_clean(add_snap_id.id()) {
840                allocator.to_release_next_sync(pc);
841            }
842        }
843        Ok(())
844    }
845
846    pub fn commit(&mut self, persy_impl: &PersyImpl, mut prepared: PreparedState) -> Result<SnapshotRef, GenericError> {
847        let allocator = persy_impl.allocator();
848        let journal = persy_impl.journal();
849        let snapshots = persy_impl.snapshots();
850        if let (Some(entries), Some(snapshot_ref)) = (prepared.entries.take(), &prepared.snapshot_ref) {
851            snapshots.fill_snapshot_address(snapshot_ref, entries, self.id.clone())
852        }
853        let address_to_free = self.internal_commit(persy_impl, false, &prepared)?;
854        if let (Some(freed_pages), Some(snapshot_ref)) = (prepared.freed_pages.take(), &prepared.snapshot_ref) {
855            let infos = CleanInfo::new(freed_pages, address_to_free);
856            snapshots.fill_snapshot_clean_info(snapshot_ref, infos);
857        }
858        if !self.segments_operations.is_empty() {
859            if let Some(snapshot_ref) = &prepared.snapshot_ref {
860                self.sync(persy_impl, snapshot_ref)?;
861            }
862        }
863        journal.end(&Commit::new(), &self.id)?;
864        let snapshot_ref = prepared.snapshot_ref.take().expect("everytime has snapshot");
865        if let Some(pc) = snapshots.pending_clean(snapshot_ref.id()) {
866            allocator.to_release_next_sync(pc);
867        }
868        Ok(snapshot_ref)
869    }
870
871    pub fn recover_metadata(&mut self, metadata: &Metadata) {
872        self.strategy = metadata.strategy.clone();
873        self.meta_id = metadata.meta_id.clone();
874    }
875
876    pub fn recover_rollback_page(&mut self, rollback_page: &RollbackPage) {
877        if let Some(pages) = &mut &mut self.rollback_pages {
878            pages.push(rollback_page.clone());
879        } else {
880            self.rollback_pages = Some(vec![rollback_page.clone()]);
881        }
882    }
883
884    pub fn recover_freed_page(&mut self, freed: &FreedPage) {
885        self.freed_pages.get_or_insert(Vec::new()).push(freed.clone());
886    }
887
888    pub fn recover_new_segment_page(&mut self, new_page: &NewSegmentPage) {
889        self.segs_new_pages.push(new_page.clone());
890    }
891
892    pub fn meta_id(&self) -> &Vec<u8> {
893        &self.meta_id
894    }
895    pub fn id(&self) -> &JournalId {
896        &self.id
897    }
898
899    pub fn filter_list<'a>(
900        &'a self,
901        pers: &'a [(String, SegmentId)],
902    ) -> impl Iterator<Item = (&'a str, SegmentId)> + 'a {
903        let outer = pers.iter().map(|(name, id)| (name.as_str(), *id));
904        let inner = self.segments_operations.iter().filter_map(|seg| {
905            if let SegmentOperation::Create(c) = seg {
906                Some((c.name.as_str(), c.segment_id))
907            } else {
908                None
909            }
910        });
911
912        outer.chain(inner).filter(move |x| !self.segs_dropped.contains(&x.1))
913    }
914
915    pub fn filter_list_snap<'a>(
916        &'a self,
917        pers: &'a [(String, SegmentId, u64)],
918    ) -> impl Iterator<Item = (&'a str, SegmentId, u64)> + 'a {
919        let outer = pers.iter().map(|(name, id, d)| (name.as_str(), *id, *d));
920        let inner = self.segments_operations.iter().filter_map(|seg| {
921            if let SegmentOperation::Create(c) = seg {
922                Some((c.name.as_str(), c.segment_id, c.first_page))
923            } else {
924                None
925            }
926        });
927
928        outer.chain(inner).filter(move |x| !self.segs_dropped.contains(&x.1))
929    }
930
931    pub(crate) fn get_segment_mut(&mut self, segment_id: SegmentId) -> Option<&mut Segment> {
932        self.segs_created.get_mut(&segment_id)
933    }
934    pub(crate) fn get_segment(&self, segment_id: SegmentId) -> Option<&Segment> {
935        self.segs_created.get(&segment_id)
936    }
937    pub(crate) fn get_timeout(&self) -> Duration {
938        self.timeout
939    }
940}
941
942#[cfg(test)]
943mod tests {
944    use crate::{
945        id::{RecRef, SegmentId},
946        journal::{
947            records::{DeleteRecord, FreedPage, InsertRecord, UpdateRecord},
948            JournalId,
949        },
950        transaction::tx_impl::TransactionImpl,
951    };
952
953    #[test]
954    fn test_scan_insert() {
955        let segment_id = SegmentId::new(10);
956        let segment_id_other = SegmentId::new(20);
957        let mut tx = TransactionImpl::recover(JournalId::new(0, 0));
958        tx.inserted.push(InsertRecord::new(segment_id, &RecRef::new(3, 2), 2));
959        tx.inserted.push(InsertRecord::new(segment_id, &RecRef::new(4, 2), 2));
960        tx.inserted
961            .push(InsertRecord::new(segment_id_other, &RecRef::new(0, 1), 3));
962        let mut count = 0;
963        for x in tx.scan_insert(segment_id) {
964            assert_eq!(x.pos, 2);
965            count += 1;
966        }
967        assert_eq!(count, 2);
968    }
969
970    #[test]
971    fn test_collapse() {
972        let segment_id = SegmentId::new(10);
973        let segment_id_other = SegmentId::new(20);
974        let mut tx = TransactionImpl::recover(JournalId::new(0, 0));
975        tx.inserted.push(InsertRecord::new(segment_id, &RecRef::new(3, 1), 1));
976        tx.inserted.push(InsertRecord::new(segment_id, &RecRef::new(3, 2), 2));
977        tx.inserted
978            .push(InsertRecord::new(segment_id_other, &RecRef::new(3, 3), 3));
979        tx.inserted
980            .push(InsertRecord::new(segment_id_other, &RecRef::new(3, 4), 4));
981        tx.updated.push(UpdateRecord::new(segment_id, &RecRef::new(3, 1), 5, 1));
982        tx.updated.push(UpdateRecord::new(segment_id, &RecRef::new(3, 1), 6, 1));
983        tx.updated.push(UpdateRecord::new(segment_id, &RecRef::new(3, 2), 7, 1));
984        tx.updated.push(UpdateRecord::new(segment_id, &RecRef::new(3, 5), 8, 1));
985        tx.updated.push(UpdateRecord::new(segment_id, &RecRef::new(3, 5), 9, 1));
986        tx.updated
987            .push(UpdateRecord::new(segment_id, &RecRef::new(3, 6), 10, 1));
988        tx.updated
989            .push(UpdateRecord::new(segment_id, &RecRef::new(3, 7), 11, 1));
990        tx.deleted.push(DeleteRecord::new(segment_id, &RecRef::new(3, 1), 0));
991        tx.deleted.push(DeleteRecord::new(segment_id, &RecRef::new(3, 3), 1));
992        tx.deleted.push(DeleteRecord::new(segment_id, &RecRef::new(3, 6), 2));
993        tx.deleted.push(DeleteRecord::new(segment_id, &RecRef::new(3, 8), 2));
994        let free = tx.collapse_operations();
995        assert_eq!(free.len(), 7);
996        for e in [1, 2, 3, 5, 6, 8, 10].iter().map(|x| FreedPage::new(*x)) {
997            assert!(free.contains(&e));
998        }
999        assert_eq!(tx.inserted.len(), 2);
1000        assert_eq!(tx.updated.len(), 2);
1001        assert_eq!(tx.deleted.len(), 2);
1002    }
1003}