1use std::collections::HashSet;
2
3use citadel_core::types::{PageId, PageType, ValueType};
4use citadel_core::Result;
5use citadel_page::page::Page;
6use citadel_page::{branch_node, leaf_node};
7
8use crate::catalog::TableDescriptor;
9use crate::manager::TxnManager;
10
11#[derive(Debug)]
12pub struct IntegrityReport {
13 pub pages_checked: u64,
14 pub errors: Vec<IntegrityError>,
15}
16
17impl IntegrityReport {
18 pub fn is_ok(&self) -> bool {
19 self.errors.is_empty()
20 }
21}
22
23#[derive(Debug)]
24pub enum IntegrityError {
25 PageReadFailed {
26 page: PageId,
27 error: String,
28 },
29 KeyOrderViolation {
30 page: PageId,
31 index: usize,
32 },
33 DuplicatePageRef(PageId),
34 EntryCountMismatch {
35 expected: u64,
36 actual: u64,
37 },
38 InvalidPageType {
39 page: PageId,
40 expected: &'static str,
41 },
42}
43
44pub(crate) fn run_integrity_check(mgr: &TxnManager) -> Result<IntegrityReport> {
45 let slot = mgr.current_slot();
46 let mut visited = HashSet::new();
47 let mut errors = Vec::new();
48 let mut pages_checked: u64 = 0;
49
50 let default_count = walk_tree(
51 mgr,
52 slot.tree_root,
53 &mut visited,
54 &mut errors,
55 &mut pages_checked,
56 );
57
58 if default_count != slot.tree_entries {
59 errors.push(IntegrityError::EntryCountMismatch {
60 expected: slot.tree_entries,
61 actual: default_count,
62 });
63 }
64
65 if slot.catalog_root.is_valid() {
66 walk_catalog(
67 mgr,
68 slot.catalog_root,
69 &mut visited,
70 &mut errors,
71 &mut pages_checked,
72 );
73 }
74
75 if slot.pending_free_root.is_valid() {
76 walk_chain(
77 mgr,
78 slot.pending_free_root,
79 &mut visited,
80 &mut errors,
81 &mut pages_checked,
82 );
83 }
84
85 Ok(IntegrityReport {
86 pages_checked,
87 errors,
88 })
89}
90
91fn walk_tree(
92 mgr: &TxnManager,
93 root: PageId,
94 visited: &mut HashSet<PageId>,
95 errors: &mut Vec<IntegrityError>,
96 pages_checked: &mut u64,
97) -> u64 {
98 let mut entry_count: u64 = 0;
99 let mut stack = vec![root];
100
101 while let Some(page_id) = stack.pop() {
102 if !visited.insert(page_id) {
103 errors.push(IntegrityError::DuplicatePageRef(page_id));
104 continue;
105 }
106
107 let page = match mgr.read_page_from_disk(page_id) {
108 Ok(p) => p,
109 Err(e) => {
110 errors.push(IntegrityError::PageReadFailed {
111 page: page_id,
112 error: e.to_string(),
113 });
114 continue;
115 }
116 };
117 *pages_checked += 1;
118
119 match page.page_type() {
120 Some(PageType::Leaf) => {
121 let n = page.num_cells();
122 entry_count += n as u64;
123 check_leaf_ordering(&page, page_id, errors);
124 }
125 Some(PageType::Branch) => {
126 for i in 0..page.num_cells() as usize {
127 stack.push(branch_node::get_child(&page, i));
128 }
129 let right = page.right_child();
130 if right.is_valid() {
131 stack.push(right);
132 }
133 }
134 _ => {
135 errors.push(IntegrityError::InvalidPageType {
136 page: page_id,
137 expected: "Leaf or Branch",
138 });
139 }
140 }
141 }
142
143 entry_count
144}
145
146fn check_leaf_ordering(page: &Page, page_id: PageId, errors: &mut Vec<IntegrityError>) {
147 let n = page.num_cells();
148 for i in 1..n {
149 let prev = leaf_node::read_cell(page, i - 1);
150 let curr = leaf_node::read_cell(page, i);
151 if prev.key >= curr.key {
152 errors.push(IntegrityError::KeyOrderViolation {
153 page: page_id,
154 index: i as usize,
155 });
156 }
157 }
158}
159
160fn walk_catalog(
161 mgr: &TxnManager,
162 catalog_root: PageId,
163 visited: &mut HashSet<PageId>,
164 errors: &mut Vec<IntegrityError>,
165 pages_checked: &mut u64,
166) {
167 let table_roots = collect_named_table_roots(mgr, catalog_root, visited, errors, pages_checked);
168
169 for root in table_roots {
170 walk_tree(mgr, root, visited, errors, pages_checked);
171 }
172}
173
174fn collect_named_table_roots(
175 mgr: &TxnManager,
176 catalog_root: PageId,
177 visited: &mut HashSet<PageId>,
178 errors: &mut Vec<IntegrityError>,
179 pages_checked: &mut u64,
180) -> Vec<PageId> {
181 let mut roots = Vec::new();
182 let mut stack = vec![catalog_root];
183
184 while let Some(page_id) = stack.pop() {
185 if !visited.insert(page_id) {
186 errors.push(IntegrityError::DuplicatePageRef(page_id));
187 continue;
188 }
189
190 let page = match mgr.read_page_from_disk(page_id) {
191 Ok(p) => p,
192 Err(e) => {
193 errors.push(IntegrityError::PageReadFailed {
194 page: page_id,
195 error: e.to_string(),
196 });
197 continue;
198 }
199 };
200 *pages_checked += 1;
201
202 match page.page_type() {
203 Some(PageType::Leaf) => {
204 for i in 0..page.num_cells() {
205 let cell = leaf_node::read_cell(&page, i);
206 if cell.val_type != ValueType::Tombstone && cell.value.len() >= 4 {
207 let desc = TableDescriptor::deserialize(cell.value);
208 if desc.root_page.is_valid() {
209 roots.push(desc.root_page);
210 }
211 }
212 }
213 }
214 Some(PageType::Branch) => {
215 for i in 0..page.num_cells() as usize {
216 stack.push(branch_node::get_child(&page, i));
217 }
218 let right = page.right_child();
219 if right.is_valid() {
220 stack.push(right);
221 }
222 }
223 _ => {
224 errors.push(IntegrityError::InvalidPageType {
225 page: page_id,
226 expected: "Leaf or Branch (catalog)",
227 });
228 }
229 }
230 }
231
232 roots
233}
234
235fn walk_chain(
236 mgr: &TxnManager,
237 root: PageId,
238 visited: &mut HashSet<PageId>,
239 errors: &mut Vec<IntegrityError>,
240 pages_checked: &mut u64,
241) {
242 let mut current = root;
243 while current.is_valid() {
244 if !visited.insert(current) {
245 errors.push(IntegrityError::DuplicatePageRef(current));
246 break;
247 }
248
249 let page = match mgr.read_page_from_disk(current) {
250 Ok(p) => p,
251 Err(e) => {
252 errors.push(IntegrityError::PageReadFailed {
253 page: current,
254 error: e.to_string(),
255 });
256 break;
257 }
258 };
259 *pages_checked += 1;
260
261 current = page.right_child();
262 }
263}