Skip to main content

citadel_txn/
integrity.rs

1use std::collections::HashSet;
2
3use citadel_core::types::{PageId, PageType, ValueType};
4use citadel_core::Result;
5use citadel_page::page::Page;
6use citadel_page::{branch_node, leaf_node};
7
8use crate::catalog::TableDescriptor;
9use crate::manager::TxnManager;
10
11#[derive(Debug)]
12pub struct IntegrityReport {
13    pub pages_checked: u64,
14    pub errors: Vec<IntegrityError>,
15}
16
17impl IntegrityReport {
18    pub fn is_ok(&self) -> bool {
19        self.errors.is_empty()
20    }
21}
22
23#[derive(Debug)]
24pub enum IntegrityError {
25    PageReadFailed {
26        page: PageId,
27        error: String,
28    },
29    KeyOrderViolation {
30        page: PageId,
31        index: usize,
32    },
33    DuplicatePageRef(PageId),
34    EntryCountMismatch {
35        expected: u64,
36        actual: u64,
37    },
38    InvalidPageType {
39        page: PageId,
40        expected: &'static str,
41    },
42}
43
44pub(crate) fn run_integrity_check(mgr: &TxnManager) -> Result<IntegrityReport> {
45    let slot = mgr.current_slot();
46    let mut visited = HashSet::new();
47    let mut errors = Vec::new();
48    let mut pages_checked: u64 = 0;
49
50    let default_count = walk_tree(
51        mgr,
52        slot.tree_root,
53        &mut visited,
54        &mut errors,
55        &mut pages_checked,
56    );
57
58    if default_count != slot.tree_entries {
59        errors.push(IntegrityError::EntryCountMismatch {
60            expected: slot.tree_entries,
61            actual: default_count,
62        });
63    }
64
65    if slot.catalog_root.is_valid() {
66        walk_catalog(
67            mgr,
68            slot.catalog_root,
69            &mut visited,
70            &mut errors,
71            &mut pages_checked,
72        );
73    }
74
75    if slot.pending_free_root.is_valid() {
76        walk_chain(
77            mgr,
78            slot.pending_free_root,
79            &mut visited,
80            &mut errors,
81            &mut pages_checked,
82        );
83    }
84
85    Ok(IntegrityReport {
86        pages_checked,
87        errors,
88    })
89}
90
91fn walk_tree(
92    mgr: &TxnManager,
93    root: PageId,
94    visited: &mut HashSet<PageId>,
95    errors: &mut Vec<IntegrityError>,
96    pages_checked: &mut u64,
97) -> u64 {
98    let mut entry_count: u64 = 0;
99    let mut stack = vec![root];
100
101    while let Some(page_id) = stack.pop() {
102        if !visited.insert(page_id) {
103            errors.push(IntegrityError::DuplicatePageRef(page_id));
104            continue;
105        }
106
107        let page = match mgr.read_page_from_disk(page_id) {
108            Ok(p) => p,
109            Err(e) => {
110                errors.push(IntegrityError::PageReadFailed {
111                    page: page_id,
112                    error: e.to_string(),
113                });
114                continue;
115            }
116        };
117        *pages_checked += 1;
118
119        match page.page_type() {
120            Some(PageType::Leaf) => {
121                let n = page.num_cells();
122                entry_count += n as u64;
123                check_leaf_ordering(&page, page_id, errors);
124            }
125            Some(PageType::Branch) => {
126                for i in 0..page.num_cells() as usize {
127                    stack.push(branch_node::get_child(&page, i));
128                }
129                let right = page.right_child();
130                if right.is_valid() {
131                    stack.push(right);
132                }
133            }
134            _ => {
135                errors.push(IntegrityError::InvalidPageType {
136                    page: page_id,
137                    expected: "Leaf or Branch",
138                });
139            }
140        }
141    }
142
143    entry_count
144}
145
146fn check_leaf_ordering(page: &Page, page_id: PageId, errors: &mut Vec<IntegrityError>) {
147    let n = page.num_cells();
148    for i in 1..n {
149        let prev = leaf_node::read_cell(page, i - 1);
150        let curr = leaf_node::read_cell(page, i);
151        if prev.key >= curr.key {
152            errors.push(IntegrityError::KeyOrderViolation {
153                page: page_id,
154                index: i as usize,
155            });
156        }
157    }
158}
159
160fn walk_catalog(
161    mgr: &TxnManager,
162    catalog_root: PageId,
163    visited: &mut HashSet<PageId>,
164    errors: &mut Vec<IntegrityError>,
165    pages_checked: &mut u64,
166) {
167    let table_roots = collect_named_table_roots(mgr, catalog_root, visited, errors, pages_checked);
168
169    for root in table_roots {
170        walk_tree(mgr, root, visited, errors, pages_checked);
171    }
172}
173
174fn collect_named_table_roots(
175    mgr: &TxnManager,
176    catalog_root: PageId,
177    visited: &mut HashSet<PageId>,
178    errors: &mut Vec<IntegrityError>,
179    pages_checked: &mut u64,
180) -> Vec<PageId> {
181    let mut roots = Vec::new();
182    let mut stack = vec![catalog_root];
183
184    while let Some(page_id) = stack.pop() {
185        if !visited.insert(page_id) {
186            errors.push(IntegrityError::DuplicatePageRef(page_id));
187            continue;
188        }
189
190        let page = match mgr.read_page_from_disk(page_id) {
191            Ok(p) => p,
192            Err(e) => {
193                errors.push(IntegrityError::PageReadFailed {
194                    page: page_id,
195                    error: e.to_string(),
196                });
197                continue;
198            }
199        };
200        *pages_checked += 1;
201
202        match page.page_type() {
203            Some(PageType::Leaf) => {
204                for i in 0..page.num_cells() {
205                    let cell = leaf_node::read_cell(&page, i);
206                    if cell.val_type != ValueType::Tombstone && cell.value.len() >= 4 {
207                        let desc = TableDescriptor::deserialize(cell.value);
208                        if desc.root_page.is_valid() {
209                            roots.push(desc.root_page);
210                        }
211                    }
212                }
213            }
214            Some(PageType::Branch) => {
215                for i in 0..page.num_cells() as usize {
216                    stack.push(branch_node::get_child(&page, i));
217                }
218                let right = page.right_child();
219                if right.is_valid() {
220                    stack.push(right);
221                }
222            }
223            _ => {
224                errors.push(IntegrityError::InvalidPageType {
225                    page: page_id,
226                    expected: "Leaf or Branch (catalog)",
227                });
228            }
229        }
230    }
231
232    roots
233}
234
235fn walk_chain(
236    mgr: &TxnManager,
237    root: PageId,
238    visited: &mut HashSet<PageId>,
239    errors: &mut Vec<IntegrityError>,
240    pages_checked: &mut u64,
241) {
242    let mut current = root;
243    while current.is_valid() {
244        if !visited.insert(current) {
245            errors.push(IntegrityError::DuplicatePageRef(current));
246            break;
247        }
248
249        let page = match mgr.read_page_from_disk(current) {
250            Ok(p) => p,
251            Err(e) => {
252                errors.push(IntegrityError::PageReadFailed {
253                    page: current,
254                    error: e.to_string(),
255                });
256                break;
257            }
258        };
259        *pages_checked += 1;
260
261        current = page.right_child();
262    }
263}