1use std::collections::HashSet;
18use std::sync::MutexGuard;
19use std::time::Instant;
20
21use obj_core::integrity::{
22 check_catalog_pointers, collect_primary_ids, cross_reference_index, walk_btree, walk_freelist,
23 IntegrityFailure, IntegrityReport, TreeContext,
24};
25use obj_core::pager::page::PageId;
26use obj_core::pager::Pager;
27use obj_core::platform::FileHandle;
28use obj_core::{Catalog, CollectionDescriptor, Error, IndexKind, IndexStatus, Result};
29
30use crate::Db;
31
32impl Db {
33 #[cfg_attr(
87 feature = "tracing",
88 tracing::instrument(name = "db.integrity_check", level = "info", skip_all)
89 )]
90 pub fn integrity_check(&self) -> Result<IntegrityReport> {
91 let start = Instant::now();
92 let mut state = IntegrityState::new();
93 let mut pager = lock_pager(self)?;
94 state.pages_checked = state.pages_checked.saturating_add(1); walk_catalog(&mut pager, &mut state)?;
96 walk_collections(&mut pager, &mut state)?;
97 walk_freelist_chain(&mut pager, &mut state)?;
98 detect_orphan_pages(&pager, &mut state);
99 Ok(IntegrityReport::new(
100 state.failures,
101 state.pages_checked,
102 start.elapsed(),
103 ))
104 }
105}
106
107struct IntegrityState {
110 failures: Vec<IntegrityFailure>,
111 reachable: HashSet<PageId>,
112 pages_checked: u64,
113}
114
115impl IntegrityState {
116 fn new() -> Self {
117 Self {
118 failures: Vec::new(),
119 reachable: HashSet::new(),
120 pages_checked: 0,
121 }
122 }
123}
124
125fn lock_pager(db: &Db) -> Result<MutexGuard<'_, Pager<FileHandle>>> {
126 db.env.pager().lock().map_err(|_| Error::Busy {
127 kind: obj_core::LockKind::WriterInProcess,
128 })
129}
130
131fn walk_catalog(pager: &mut Pager<FileHandle>, state: &mut IntegrityState) -> Result<()> {
132 let raw = pager.root_catalog();
133 let Some(root) = PageId::new(raw) else {
134 return Ok(());
135 };
136 let page_count = pager.page_count();
137 if root.get() >= page_count {
138 state
139 .failures
140 .push(IntegrityFailure::DanglingCatalogPointer {
141 collection: "<catalog>".to_owned(),
142 index: None,
143 page_id: root.get(),
144 });
145 return Ok(());
146 }
147 let ctx = TreeContext {
148 label: "catalog".to_owned(),
149 root,
150 };
151 let walked = walk_btree(pager, &ctx, &mut state.reachable, &mut state.failures)?;
152 state.pages_checked = state.pages_checked.saturating_add(walked);
153 Ok(())
154}
155
156fn walk_collections(pager: &mut Pager<FileHandle>, state: &mut IntegrityState) -> Result<()> {
157 let raw = pager.root_catalog();
158 if PageId::new(raw).is_none() {
159 return Ok(());
160 }
161 let catalog = match Catalog::<FileHandle>::open_or_init(pager) {
162 Ok(c) => c,
163 Err(Error::Corruption { .. }) => return Ok(()),
164 Err(e) => return Err(e),
165 };
166 let rows = match catalog.list_collections(pager) {
167 Ok(r) => r,
168 Err(Error::Corruption { .. }) => return Ok(()),
169 Err(e) => return Err(e),
170 };
171 let page_count = pager.page_count();
172 for (name, descriptor) in rows {
173 check_catalog_pointers(&name, &descriptor, page_count, &mut state.failures);
174 walk_one_collection(pager, &name, &descriptor, state)?;
175 }
176 Ok(())
177}
178
179fn walk_one_collection(
180 pager: &mut Pager<FileHandle>,
181 name: &str,
182 descriptor: &CollectionDescriptor,
183 state: &mut IntegrityState,
184) -> Result<()> {
185 walk_primary_tree(pager, name, descriptor, state)?;
186 let mut primary_ids: HashSet<u64> = HashSet::new();
187 let _scanned = collect_primary_ids(pager, descriptor, &mut primary_ids)?;
188 let mut per_index: Vec<(String, IndexKind, HashSet<u64>)> = Vec::new();
189 for index in &descriptor.indexes {
190 if index.status != IndexStatus::Active {
191 continue;
192 }
193 walk_index_tree(pager, name, descriptor, index, state)?;
194 let mut referenced: HashSet<u64> = HashSet::new();
195 let _entries = cross_reference_index::<FileHandle>(
196 pager,
197 name,
198 index,
199 &primary_ids,
200 &mut referenced,
201 &mut state.failures,
202 )?;
203 per_index.push((index.name.clone(), index.kind, referenced));
204 }
205 obj_core::integrity::check_primary_to_index(
206 name,
207 descriptor,
208 &primary_ids,
209 &per_index,
210 &mut state.failures,
211 );
212 Ok(())
213}
214
215fn walk_primary_tree(
216 pager: &mut Pager<FileHandle>,
217 name: &str,
218 descriptor: &CollectionDescriptor,
219 state: &mut IntegrityState,
220) -> Result<()> {
221 let page_count = pager.page_count();
222 let Some(root) = PageId::new(descriptor.primary_root) else {
223 return Ok(());
224 };
225 if root.get() >= page_count {
226 return Ok(());
227 }
228 let ctx = TreeContext {
229 label: format!("primary:{name}"),
230 root,
231 };
232 let walked = walk_btree(pager, &ctx, &mut state.reachable, &mut state.failures)?;
233 state.pages_checked = state.pages_checked.saturating_add(walked);
234 Ok(())
235}
236
237fn walk_index_tree(
238 pager: &mut Pager<FileHandle>,
239 collection: &str,
240 _descriptor: &CollectionDescriptor,
241 index: &obj_core::IndexDescriptor,
242 state: &mut IntegrityState,
243) -> Result<()> {
244 let page_count = pager.page_count();
245 let Some(root) = PageId::new(index.root_page_id) else {
246 return Ok(());
247 };
248 if root.get() >= page_count {
249 return Ok(());
250 }
251 let ctx = TreeContext {
252 label: format!("index:{}.{}", collection, index.name),
253 root,
254 };
255 let walked = walk_btree(pager, &ctx, &mut state.reachable, &mut state.failures)?;
256 state.pages_checked = state.pages_checked.saturating_add(walked);
257 Ok(())
258}
259
260fn walk_freelist_chain(pager: &mut Pager<FileHandle>, state: &mut IntegrityState) -> Result<()> {
261 let head = pager.freelist_head();
262 let page_count = pager.page_count();
263 let walked = walk_freelist(
264 pager,
265 head,
266 page_count,
267 &mut state.reachable,
268 &mut state.failures,
269 )?;
270 state.pages_checked = state.pages_checked.saturating_add(walked);
271 Ok(())
272}
273
274fn detect_orphan_pages(pager: &Pager<FileHandle>, state: &mut IntegrityState) {
275 let page_count = pager.page_count();
276 let mut id: u64 = 1;
278 while id < page_count {
279 if let Some(pid) = PageId::new(id) {
280 if !state.reachable.contains(&pid) {
281 state
282 .failures
283 .push(IntegrityFailure::OrphanPage { page_id: id });
284 }
285 }
286 id = id.saturating_add(1);
287 }
288}