quill_sql/recovery/
undo.rs1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3
4use crate::buffer::BufferManager;
5use crate::error::QuillSQLResult;
6use crate::recovery::resource_manager::{
7 ensure_default_resource_managers_registered, get_resource_manager, UndoContext,
8};
9use crate::recovery::wal::codec::{
10 decode_clr, decode_transaction, ClrPayload, ResourceManagerId, TransactionRecordKind, WalFrame,
11};
12use crate::recovery::wal_record::WalRecordPayload;
13use crate::recovery::{Lsn, WalManager};
14use crate::storage::disk_scheduler::DiskScheduler;
15
16#[derive(Default)]
17struct UndoRecord {
18 next: Option<Lsn>,
19 payload: Option<WalFrame>,
20}
21
22#[derive(Default)]
23struct UndoIndex {
24 heads: HashMap<u64, Option<Lsn>>,
25 entries: HashMap<Lsn, UndoRecord>,
26 active: HashSet<u64>,
27}
28
29impl UndoIndex {
30 fn observe(&mut self, frame: &WalFrame) -> QuillSQLResult<()> {
31 match frame.rmid {
32 ResourceManagerId::Clr => {
33 let clr = decode_clr(&frame.body)?;
34 let next = if clr.undo_next_lsn == 0 {
35 None
36 } else {
37 Some(clr.undo_next_lsn)
38 };
39 self.entries.insert(
40 frame.lsn,
41 UndoRecord {
42 next,
43 payload: None,
44 },
45 );
46 self.heads.insert(clr.txn_id, next);
47 self.active.insert(clr.txn_id);
48 }
49 ResourceManagerId::Transaction => {
50 let tx = decode_transaction(&frame.body, frame.info)?;
51 match tx.marker {
52 TransactionRecordKind::Begin => {
53 self.active.insert(tx.txn_id);
54 self.heads.entry(tx.txn_id).or_insert(None);
55 }
56 TransactionRecordKind::Commit | TransactionRecordKind::Abort => {
57 self.active.remove(&tx.txn_id);
58 self.heads.insert(tx.txn_id, None);
59 }
60 }
61 }
62 _ => {
63 if let Some(manager) = get_resource_manager(frame.rmid) {
64 if let Some(txn_id) = manager.transaction_id(frame) {
65 let prev = self.head_for(txn_id);
66 self.entries.insert(
67 frame.lsn,
68 UndoRecord {
69 next: prev,
70 payload: Some(frame.clone()),
71 },
72 );
73 self.heads.insert(txn_id, Some(frame.lsn));
74 self.active.insert(txn_id);
75 }
76 }
77 }
78 }
79 Ok(())
80 }
81
82 fn head_for(&self, txn_id: u64) -> Option<Lsn> {
83 self.heads.get(&txn_id).copied().flatten()
84 }
85
86 fn entry(&self, lsn: Lsn) -> Option<&UndoRecord> {
87 self.entries.get(&lsn)
88 }
89
90 fn active_transactions(&self) -> Vec<u64> {
91 let mut txns: Vec<u64> = self.active.iter().copied().collect();
92 txns.sort_unstable();
93 txns
94 }
95}
96
97#[derive(Default)]
98pub struct UndoOutcome {
99 pub loser_transactions: Vec<u64>,
100 pub max_clr_lsn: Lsn,
101}
102
103pub struct UndoExecutor {
104 wal: Arc<WalManager>,
105 disk_scheduler: Arc<DiskScheduler>,
106 buffer_pool: Option<Arc<BufferManager>>,
107 index: UndoIndex,
108}
109
110impl UndoExecutor {
111 pub fn new(
112 wal: Arc<WalManager>,
113 disk_scheduler: Arc<DiskScheduler>,
114 buffer_pool: Option<Arc<BufferManager>>,
115 ) -> Self {
116 ensure_default_resource_managers_registered();
117 Self {
118 wal,
119 disk_scheduler,
120 buffer_pool,
121 index: UndoIndex::default(),
122 }
123 }
124
125 pub fn observe(&mut self, frame: &WalFrame) -> QuillSQLResult<()> {
126 self.index.observe(frame)
127 }
128
129 pub fn finalize(self) -> QuillSQLResult<UndoOutcome> {
130 let losers = self.index.active_transactions();
131 let mut max_clr_lsn = 0;
132
133 for txn_id in losers.iter().copied() {
134 let mut cursor = self.index.head_for(txn_id);
135 while let Some(lsn) = cursor {
136 let Some(entry) = self.index.entry(lsn) else {
137 break;
138 };
139 if let Some(rec) = &entry.payload {
140 if let Some(manager) = get_resource_manager(rec.rmid) {
141 let ctx = UndoContext {
142 disk_scheduler: self.disk_scheduler.clone(),
143 buffer_pool: self.buffer_pool.clone(),
144 };
145 manager.undo(rec, &ctx)?;
146 }
147 let undo_next = entry.next.unwrap_or(0);
148 let clr = self.wal.append_record_with(|_| {
149 WalRecordPayload::Clr(ClrPayload {
150 txn_id,
151 undone_lsn: lsn,
152 undo_next_lsn: undo_next,
153 })
154 })?;
155 if clr.end_lsn > max_clr_lsn {
156 max_clr_lsn = clr.end_lsn;
157 }
158 }
159 cursor = entry.next;
160 }
161 }
162
163 Ok(UndoOutcome {
164 loser_transactions: losers,
165 max_clr_lsn,
166 })
167 }
168}