Skip to main content

citadel_txn/
integrity.rs

1use std::collections::HashSet;
2
3use citadel_core::types::{PageId, PageType, ValueType};
4use citadel_core::Result;
5use citadel_page::page::Page;
6use citadel_page::{branch_node, leaf_node};
7
8use crate::catalog::TableDescriptor;
9use crate::manager::TxnManager;
10
11#[derive(Debug)]
12pub struct IntegrityReport {
13    pub pages_checked: u64,
14    pub errors: Vec<IntegrityError>,
15}
16
17impl IntegrityReport {
18    pub fn is_ok(&self) -> bool {
19        self.errors.is_empty()
20    }
21}
22
23#[derive(Debug)]
24pub enum IntegrityError {
25    PageReadFailed {
26        page: PageId,
27        error: String,
28    },
29    KeyOrderViolation {
30        page: PageId,
31        index: usize,
32    },
33    DuplicatePageRef(PageId),
34    EntryCountMismatch {
35        expected: u64,
36        actual: u64,
37    },
38    InvalidPageType {
39        page: PageId,
40        expected: &'static str,
41    },
42}
43
44pub(crate) fn run_integrity_check(mgr: &TxnManager) -> Result<IntegrityReport> {
45    let slot = mgr.current_slot();
46    let mut visited = HashSet::new();
47    let mut errors = Vec::new();
48    let mut pages_checked: u64 = 0;
49
50    // Walk the default tree
51    let default_count = walk_tree(
52        mgr,
53        slot.tree_root,
54        &mut visited,
55        &mut errors,
56        &mut pages_checked,
57    );
58
59    // Check default table entry count
60    if default_count != slot.tree_entries {
61        errors.push(IntegrityError::EntryCountMismatch {
62            expected: slot.tree_entries,
63            actual: default_count,
64        });
65    }
66
67    // Walk catalog tree and named tables
68    if slot.catalog_root.is_valid() {
69        walk_catalog(
70            mgr,
71            slot.catalog_root,
72            &mut visited,
73            &mut errors,
74            &mut pages_checked,
75        );
76    }
77
78    // Walk pending-free chain
79    if slot.pending_free_root.is_valid() {
80        walk_chain(
81            mgr,
82            slot.pending_free_root,
83            &mut visited,
84            &mut errors,
85            &mut pages_checked,
86        );
87    }
88
89    Ok(IntegrityReport {
90        pages_checked,
91        errors,
92    })
93}
94
95fn walk_tree(
96    mgr: &TxnManager,
97    root: PageId,
98    visited: &mut HashSet<PageId>,
99    errors: &mut Vec<IntegrityError>,
100    pages_checked: &mut u64,
101) -> u64 {
102    let mut entry_count: u64 = 0;
103    let mut stack = vec![root];
104
105    while let Some(page_id) = stack.pop() {
106        if !visited.insert(page_id) {
107            errors.push(IntegrityError::DuplicatePageRef(page_id));
108            continue;
109        }
110
111        let page = match mgr.read_page_from_disk(page_id) {
112            Ok(p) => p,
113            Err(e) => {
114                errors.push(IntegrityError::PageReadFailed {
115                    page: page_id,
116                    error: e.to_string(),
117                });
118                continue;
119            }
120        };
121        *pages_checked += 1;
122
123        match page.page_type() {
124            Some(PageType::Leaf) => {
125                let n = page.num_cells();
126                entry_count += n as u64;
127                check_leaf_ordering(&page, page_id, errors);
128            }
129            Some(PageType::Branch) => {
130                for i in 0..page.num_cells() as usize {
131                    stack.push(branch_node::get_child(&page, i));
132                }
133                let right = page.right_child();
134                if right.is_valid() {
135                    stack.push(right);
136                }
137            }
138            _ => {
139                errors.push(IntegrityError::InvalidPageType {
140                    page: page_id,
141                    expected: "Leaf or Branch",
142                });
143            }
144        }
145    }
146
147    entry_count
148}
149
150fn check_leaf_ordering(page: &Page, page_id: PageId, errors: &mut Vec<IntegrityError>) {
151    let n = page.num_cells();
152    for i in 1..n {
153        let prev = leaf_node::read_cell(page, i - 1);
154        let curr = leaf_node::read_cell(page, i);
155        if prev.key >= curr.key {
156            errors.push(IntegrityError::KeyOrderViolation {
157                page: page_id,
158                index: i as usize,
159            });
160        }
161    }
162}
163
164fn walk_catalog(
165    mgr: &TxnManager,
166    catalog_root: PageId,
167    visited: &mut HashSet<PageId>,
168    errors: &mut Vec<IntegrityError>,
169    pages_checked: &mut u64,
170) {
171    // First walk the catalog tree itself
172    let table_roots = collect_named_table_roots(mgr, catalog_root, visited, errors, pages_checked);
173
174    // Then walk each named table tree
175    for root in table_roots {
176        walk_tree(mgr, root, visited, errors, pages_checked);
177    }
178}
179
180fn collect_named_table_roots(
181    mgr: &TxnManager,
182    catalog_root: PageId,
183    visited: &mut HashSet<PageId>,
184    errors: &mut Vec<IntegrityError>,
185    pages_checked: &mut u64,
186) -> Vec<PageId> {
187    let mut roots = Vec::new();
188    let mut stack = vec![catalog_root];
189
190    while let Some(page_id) = stack.pop() {
191        if !visited.insert(page_id) {
192            errors.push(IntegrityError::DuplicatePageRef(page_id));
193            continue;
194        }
195
196        let page = match mgr.read_page_from_disk(page_id) {
197            Ok(p) => p,
198            Err(e) => {
199                errors.push(IntegrityError::PageReadFailed {
200                    page: page_id,
201                    error: e.to_string(),
202                });
203                continue;
204            }
205        };
206        *pages_checked += 1;
207
208        match page.page_type() {
209            Some(PageType::Leaf) => {
210                for i in 0..page.num_cells() {
211                    let cell = leaf_node::read_cell(&page, i);
212                    if cell.val_type != ValueType::Tombstone && cell.value.len() >= 4 {
213                        let desc = TableDescriptor::deserialize(cell.value);
214                        if desc.root_page.is_valid() {
215                            roots.push(desc.root_page);
216                        }
217                    }
218                }
219            }
220            Some(PageType::Branch) => {
221                for i in 0..page.num_cells() as usize {
222                    stack.push(branch_node::get_child(&page, i));
223                }
224                let right = page.right_child();
225                if right.is_valid() {
226                    stack.push(right);
227                }
228            }
229            _ => {
230                errors.push(IntegrityError::InvalidPageType {
231                    page: page_id,
232                    expected: "Leaf or Branch (catalog)",
233                });
234            }
235        }
236    }
237
238    roots
239}
240
241fn walk_chain(
242    mgr: &TxnManager,
243    root: PageId,
244    visited: &mut HashSet<PageId>,
245    errors: &mut Vec<IntegrityError>,
246    pages_checked: &mut u64,
247) {
248    let mut current = root;
249    while current.is_valid() {
250        if !visited.insert(current) {
251            errors.push(IntegrityError::DuplicatePageRef(current));
252            break;
253        }
254
255        let page = match mgr.read_page_from_disk(current) {
256            Ok(p) => p,
257            Err(e) => {
258                errors.push(IntegrityError::PageReadFailed {
259                    page: current,
260                    error: e.to_string(),
261                });
262                break;
263            }
264        };
265        *pages_checked += 1;
266
267        current = page.right_child();
268    }
269}