1use std::collections::HashSet;
2
3use citadel_core::types::{PageId, PageType, ValueType};
4use citadel_core::Result;
5use citadel_page::page::Page;
6use citadel_page::{branch_node, leaf_node};
7
8use crate::catalog::TableDescriptor;
9use crate::manager::TxnManager;
10
11#[derive(Debug)]
12pub struct IntegrityReport {
13 pub pages_checked: u64,
14 pub errors: Vec<IntegrityError>,
15}
16
17impl IntegrityReport {
18 pub fn is_ok(&self) -> bool {
19 self.errors.is_empty()
20 }
21}
22
23#[derive(Debug)]
24pub enum IntegrityError {
25 PageReadFailed {
26 page: PageId,
27 error: String,
28 },
29 KeyOrderViolation {
30 page: PageId,
31 index: usize,
32 },
33 DuplicatePageRef(PageId),
34 EntryCountMismatch {
35 expected: u64,
36 actual: u64,
37 },
38 InvalidPageType {
39 page: PageId,
40 expected: &'static str,
41 },
42}
43
44pub(crate) fn run_integrity_check(mgr: &TxnManager) -> Result<IntegrityReport> {
45 let slot = mgr.current_slot();
46 let mut visited = HashSet::new();
47 let mut errors = Vec::new();
48 let mut pages_checked: u64 = 0;
49
50 let default_count = walk_tree(
52 mgr,
53 slot.tree_root,
54 &mut visited,
55 &mut errors,
56 &mut pages_checked,
57 );
58
59 if default_count != slot.tree_entries {
61 errors.push(IntegrityError::EntryCountMismatch {
62 expected: slot.tree_entries,
63 actual: default_count,
64 });
65 }
66
67 if slot.catalog_root.is_valid() {
69 walk_catalog(
70 mgr,
71 slot.catalog_root,
72 &mut visited,
73 &mut errors,
74 &mut pages_checked,
75 );
76 }
77
78 if slot.pending_free_root.is_valid() {
80 walk_chain(
81 mgr,
82 slot.pending_free_root,
83 &mut visited,
84 &mut errors,
85 &mut pages_checked,
86 );
87 }
88
89 Ok(IntegrityReport {
90 pages_checked,
91 errors,
92 })
93}
94
95fn walk_tree(
96 mgr: &TxnManager,
97 root: PageId,
98 visited: &mut HashSet<PageId>,
99 errors: &mut Vec<IntegrityError>,
100 pages_checked: &mut u64,
101) -> u64 {
102 let mut entry_count: u64 = 0;
103 let mut stack = vec![root];
104
105 while let Some(page_id) = stack.pop() {
106 if !visited.insert(page_id) {
107 errors.push(IntegrityError::DuplicatePageRef(page_id));
108 continue;
109 }
110
111 let page = match mgr.read_page_from_disk(page_id) {
112 Ok(p) => p,
113 Err(e) => {
114 errors.push(IntegrityError::PageReadFailed {
115 page: page_id,
116 error: e.to_string(),
117 });
118 continue;
119 }
120 };
121 *pages_checked += 1;
122
123 match page.page_type() {
124 Some(PageType::Leaf) => {
125 let n = page.num_cells();
126 entry_count += n as u64;
127 check_leaf_ordering(&page, page_id, errors);
128 }
129 Some(PageType::Branch) => {
130 for i in 0..page.num_cells() as usize {
131 stack.push(branch_node::get_child(&page, i));
132 }
133 let right = page.right_child();
134 if right.is_valid() {
135 stack.push(right);
136 }
137 }
138 _ => {
139 errors.push(IntegrityError::InvalidPageType {
140 page: page_id,
141 expected: "Leaf or Branch",
142 });
143 }
144 }
145 }
146
147 entry_count
148}
149
150fn check_leaf_ordering(page: &Page, page_id: PageId, errors: &mut Vec<IntegrityError>) {
151 let n = page.num_cells();
152 for i in 1..n {
153 let prev = leaf_node::read_cell(page, i - 1);
154 let curr = leaf_node::read_cell(page, i);
155 if prev.key >= curr.key {
156 errors.push(IntegrityError::KeyOrderViolation {
157 page: page_id,
158 index: i as usize,
159 });
160 }
161 }
162}
163
164fn walk_catalog(
165 mgr: &TxnManager,
166 catalog_root: PageId,
167 visited: &mut HashSet<PageId>,
168 errors: &mut Vec<IntegrityError>,
169 pages_checked: &mut u64,
170) {
171 let table_roots = collect_named_table_roots(mgr, catalog_root, visited, errors, pages_checked);
173
174 for root in table_roots {
176 walk_tree(mgr, root, visited, errors, pages_checked);
177 }
178}
179
180fn collect_named_table_roots(
181 mgr: &TxnManager,
182 catalog_root: PageId,
183 visited: &mut HashSet<PageId>,
184 errors: &mut Vec<IntegrityError>,
185 pages_checked: &mut u64,
186) -> Vec<PageId> {
187 let mut roots = Vec::new();
188 let mut stack = vec![catalog_root];
189
190 while let Some(page_id) = stack.pop() {
191 if !visited.insert(page_id) {
192 errors.push(IntegrityError::DuplicatePageRef(page_id));
193 continue;
194 }
195
196 let page = match mgr.read_page_from_disk(page_id) {
197 Ok(p) => p,
198 Err(e) => {
199 errors.push(IntegrityError::PageReadFailed {
200 page: page_id,
201 error: e.to_string(),
202 });
203 continue;
204 }
205 };
206 *pages_checked += 1;
207
208 match page.page_type() {
209 Some(PageType::Leaf) => {
210 for i in 0..page.num_cells() {
211 let cell = leaf_node::read_cell(&page, i);
212 if cell.val_type != ValueType::Tombstone && cell.value.len() >= 4 {
213 let desc = TableDescriptor::deserialize(cell.value);
214 if desc.root_page.is_valid() {
215 roots.push(desc.root_page);
216 }
217 }
218 }
219 }
220 Some(PageType::Branch) => {
221 for i in 0..page.num_cells() as usize {
222 stack.push(branch_node::get_child(&page, i));
223 }
224 let right = page.right_child();
225 if right.is_valid() {
226 stack.push(right);
227 }
228 }
229 _ => {
230 errors.push(IntegrityError::InvalidPageType {
231 page: page_id,
232 expected: "Leaf or Branch (catalog)",
233 });
234 }
235 }
236 }
237
238 roots
239}
240
241fn walk_chain(
242 mgr: &TxnManager,
243 root: PageId,
244 visited: &mut HashSet<PageId>,
245 errors: &mut Vec<IntegrityError>,
246 pages_checked: &mut u64,
247) {
248 let mut current = root;
249 while current.is_valid() {
250 if !visited.insert(current) {
251 errors.push(IntegrityError::DuplicatePageRef(current));
252 break;
253 }
254
255 let page = match mgr.read_page_from_disk(current) {
256 Ok(p) => p,
257 Err(e) => {
258 errors.push(IntegrityError::PageReadFailed {
259 page: current,
260 error: e.to_string(),
261 });
262 break;
263 }
264 };
265 *pages_checked += 1;
266
267 current = page.right_child();
268 }
269}