quill_sql/recovery/
undo.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3
4use crate::buffer::BufferManager;
5use crate::error::QuillSQLResult;
6use crate::recovery::resource_manager::{
7    ensure_default_resource_managers_registered, get_resource_manager, UndoContext,
8};
9use crate::recovery::wal::codec::{
10    decode_clr, decode_transaction, ClrPayload, ResourceManagerId, TransactionRecordKind, WalFrame,
11};
12use crate::recovery::wal_record::WalRecordPayload;
13use crate::recovery::{Lsn, WalManager};
14use crate::storage::disk_scheduler::DiskScheduler;
15
16#[derive(Default)]
17struct UndoRecord {
18    next: Option<Lsn>,
19    payload: Option<WalFrame>,
20}
21
22#[derive(Default)]
23struct UndoIndex {
24    heads: HashMap<u64, Option<Lsn>>,
25    entries: HashMap<Lsn, UndoRecord>,
26    active: HashSet<u64>,
27}
28
29impl UndoIndex {
30    fn observe(&mut self, frame: &WalFrame) -> QuillSQLResult<()> {
31        match frame.rmid {
32            ResourceManagerId::Clr => {
33                let clr = decode_clr(&frame.body)?;
34                let next = if clr.undo_next_lsn == 0 {
35                    None
36                } else {
37                    Some(clr.undo_next_lsn)
38                };
39                self.entries.insert(
40                    frame.lsn,
41                    UndoRecord {
42                        next,
43                        payload: None,
44                    },
45                );
46                self.heads.insert(clr.txn_id, next);
47                self.active.insert(clr.txn_id);
48            }
49            ResourceManagerId::Transaction => {
50                let tx = decode_transaction(&frame.body, frame.info)?;
51                match tx.marker {
52                    TransactionRecordKind::Begin => {
53                        self.active.insert(tx.txn_id);
54                        self.heads.entry(tx.txn_id).or_insert(None);
55                    }
56                    TransactionRecordKind::Commit | TransactionRecordKind::Abort => {
57                        self.active.remove(&tx.txn_id);
58                        self.heads.insert(tx.txn_id, None);
59                    }
60                }
61            }
62            _ => {
63                if let Some(manager) = get_resource_manager(frame.rmid) {
64                    if let Some(txn_id) = manager.transaction_id(frame) {
65                        let prev = self.head_for(txn_id);
66                        self.entries.insert(
67                            frame.lsn,
68                            UndoRecord {
69                                next: prev,
70                                payload: Some(frame.clone()),
71                            },
72                        );
73                        self.heads.insert(txn_id, Some(frame.lsn));
74                        self.active.insert(txn_id);
75                    }
76                }
77            }
78        }
79        Ok(())
80    }
81
82    fn head_for(&self, txn_id: u64) -> Option<Lsn> {
83        self.heads.get(&txn_id).copied().flatten()
84    }
85
86    fn entry(&self, lsn: Lsn) -> Option<&UndoRecord> {
87        self.entries.get(&lsn)
88    }
89
90    fn active_transactions(&self) -> Vec<u64> {
91        let mut txns: Vec<u64> = self.active.iter().copied().collect();
92        txns.sort_unstable();
93        txns
94    }
95}
96
97#[derive(Default)]
98pub struct UndoOutcome {
99    pub loser_transactions: Vec<u64>,
100    pub max_clr_lsn: Lsn,
101}
102
103pub struct UndoExecutor {
104    wal: Arc<WalManager>,
105    disk_scheduler: Arc<DiskScheduler>,
106    buffer_pool: Option<Arc<BufferManager>>,
107    index: UndoIndex,
108}
109
110impl UndoExecutor {
111    pub fn new(
112        wal: Arc<WalManager>,
113        disk_scheduler: Arc<DiskScheduler>,
114        buffer_pool: Option<Arc<BufferManager>>,
115    ) -> Self {
116        ensure_default_resource_managers_registered();
117        Self {
118            wal,
119            disk_scheduler,
120            buffer_pool,
121            index: UndoIndex::default(),
122        }
123    }
124
125    pub fn observe(&mut self, frame: &WalFrame) -> QuillSQLResult<()> {
126        self.index.observe(frame)
127    }
128
129    pub fn finalize(self) -> QuillSQLResult<UndoOutcome> {
130        let losers = self.index.active_transactions();
131        let mut max_clr_lsn = 0;
132
133        for txn_id in losers.iter().copied() {
134            let mut cursor = self.index.head_for(txn_id);
135            while let Some(lsn) = cursor {
136                let Some(entry) = self.index.entry(lsn) else {
137                    break;
138                };
139                if let Some(rec) = &entry.payload {
140                    if let Some(manager) = get_resource_manager(rec.rmid) {
141                        let ctx = UndoContext {
142                            disk_scheduler: self.disk_scheduler.clone(),
143                            buffer_pool: self.buffer_pool.clone(),
144                        };
145                        manager.undo(rec, &ctx)?;
146                    }
147                    let undo_next = entry.next.unwrap_or(0);
148                    let clr = self.wal.append_record_with(|_| {
149                        WalRecordPayload::Clr(ClrPayload {
150                            txn_id,
151                            undone_lsn: lsn,
152                            undo_next_lsn: undo_next,
153                        })
154                    })?;
155                    if clr.end_lsn > max_clr_lsn {
156                        max_clr_lsn = clr.end_lsn;
157                    }
158                }
159                cursor = entry.next;
160            }
161        }
162
163        Ok(UndoOutcome {
164            loser_transactions: losers,
165            max_clr_lsn,
166        })
167    }
168}