1use crate::chunking::RecursiveUnchunker;
20use crate::commands::retrieve_tree_node;
21use crate::definitions::{ChunkId, TreeNode};
22use crate::pile::{Keyspace, Pile, RawPile};
23use anyhow::bail;
24use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
25use log::{error, info, warn};
26use std::collections::HashSet;
27use std::io::{Read, Write};
28use std::sync::Mutex;
29
30#[derive(PartialEq, Eq, Copy, Clone, Debug)]
31pub enum VacuumMode {
32 NoVacuum,
33 DryRunVacuum,
34 Vacuum,
35}
36
37pub struct NullWriter {}
38
39impl Write for NullWriter {
40 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
41 Ok(buf.len())
42 }
43
44 fn flush(&mut self) -> std::io::Result<()> {
45 Ok(())
46 }
47}
48
49pub struct VacuumRawPile<RP: RawPile> {
51 underlying: RP,
52 vacuum_tracking_enabled: bool,
53 retrieved_chunks: Mutex<HashSet<ChunkId>>,
54}
55
56impl<RP: RawPile> VacuumRawPile<RP> {
57 pub fn new(underlying: RP, vacuum_tracking_enabled: bool) -> Self {
58 VacuumRawPile {
59 underlying,
60 vacuum_tracking_enabled,
61 retrieved_chunks: Default::default(),
62 }
63 }
64
65 pub fn calculate_vacuum_for_sweeping(&self) -> anyhow::Result<HashSet<ChunkId>> {
66 if !self.vacuum_tracking_enabled {
67 bail!("Vacuum tracking not enabled, you can't calculate the vacuum set!");
68 }
69
70 let mut to_sweep = HashSet::new();
71
72 let retrieved_chunks = self.retrieved_chunks.lock().unwrap();
73
74 let mut chunk_id: ChunkId = Default::default();
75 for key in self.list_keys(Keyspace::Chunk)? {
76 chunk_id.clone_from_slice(&key?);
77 if !retrieved_chunks.contains(&chunk_id) {
78 to_sweep.insert(chunk_id.clone());
79 }
80 }
81
82 Ok(to_sweep)
83 }
84}
85
86impl<RP: RawPile> RawPile for VacuumRawPile<RP> {
87 fn exists(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result<bool> {
88 self.underlying.exists(kind, key)
89 }
90
91 fn read(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result<Option<Vec<u8>>> {
92 if self.vacuum_tracking_enabled && kind == Keyspace::Chunk {
93 let mut chunk_id: ChunkId = Default::default();
94 chunk_id.clone_from_slice(key);
95 self.retrieved_chunks.lock().unwrap().insert(chunk_id);
96 }
97 self.underlying.read(kind, key)
98 }
99
100 fn write(&self, kind: Keyspace, key: &[u8], value: &[u8]) -> anyhow::Result<()> {
101 self.underlying.write(kind, key, value)
102 }
103
104 fn delete(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result<()> {
105 self.underlying.delete(kind, key)
106 }
107
108 fn list_keys(
109 &self,
110 kind: Keyspace,
111 ) -> anyhow::Result<Box<dyn Iterator<Item = anyhow::Result<Vec<u8>>>>> {
112 self.underlying.list_keys(kind)
113 }
114
115 fn flush(&self) -> anyhow::Result<()> {
116 self.underlying.flush()
117 }
118
119 fn check_lowlevel(&self) -> anyhow::Result<bool> {
120 self.underlying.check_lowlevel()
121 }
122}
123
124pub fn check_deep<RP: RawPile>(
125 pile: Pile<RP>,
126 vacuum: VacuumMode,
127 make_progress_bar: bool,
128) -> anyhow::Result<u32> {
129 let pile = Pile::new(VacuumRawPile::new(
130 pile.raw_pile,
131 vacuum != VacuumMode::NoVacuum,
132 ));
133
134 let mut errors = 0;
135
136 let mut to_check = Vec::new();
137 let pointer_list = pile.list_pointers()?;
138
139 for pointer in pointer_list.iter() {
140 info!("Checking pointer {:?}", pointer);
141 match pile.read_pointer(&pointer)? {
142 Some(pointer_data) => {
143 if let Some(parent) = pointer_data.parent_pointer {
144 if !pointer_list.contains(&parent) {
145 errors += 1;
146 error!(
147 "Pointer {:?} has a parent {:?} which does not exist.",
148 pointer, parent
149 );
150 }
151 }
152
153 let tree_node = retrieve_tree_node(&pile, pointer_data.chunk_ref.clone())?;
154 tree_node.node.visit(
155 &mut |node, _| {
156 if let TreeNode::NormalFile { content, .. } = node {
157 to_check.push(content.clone());
158 }
159 Ok(())
160 },
161 "".to_owned(),
162 )?;
163 }
164 None => {
165 errors += 1;
166 error!("Pointer {:?} does not seem to exist.", pointer);
167 }
168 }
169 }
170
171 let pbar = if make_progress_bar {
172 ProgressBar::with_draw_target(1000 as u64, ProgressDrawTarget::stdout_with_hz(10))
173 } else {
174 ProgressBar::hidden()
175 };
176 pbar.set_style(
177 ProgressStyle::default_bar()
178 .template("[{elapsed_precise}]/[{eta}] {bar:40.cyan/blue} {pos:>7}/{len:7} {msg}"),
179 );
180 pbar.set_message("checking");
181
182 let mut done = 0;
183
184 while let Some(next_to_check) = to_check.pop() {
185 done += 1;
186 pbar.set_length(done + to_check.len() as u64);
187 pbar.set_position(done);
188
189 let mut unchunker = RecursiveUnchunker::new(&pile, next_to_check.clone());
190 match std::io::copy(&mut unchunker, &mut NullWriter {}) {
191 Ok(_) => {}
192 Err(err) => {
193 errors += 1;
194 warn!(
195 "Error occurred when reading {:?}: {:?}.",
196 next_to_check, err
197 );
198 }
199 }
200 }
201
202 pbar.finish_and_clear();
203
204 if errors > 0 {
205 error!("There were {:?}", errors);
206 } else {
207 info!("No errors.");
208 }
209
210 if errors == 0 && vacuum != VacuumMode::NoVacuum {
211 info!("Calculating sweep set for vacuuming.");
212 let to_vacuum = pile.raw_pile.calculate_vacuum_for_sweeping()?;
213 info!("{} chunks are ready to be vacuumed.", to_vacuum.len());
214 if vacuum == VacuumMode::Vacuum {
215 let pbar = if make_progress_bar {
216 ProgressBar::with_draw_target(
217 to_vacuum.len() as u64,
218 ProgressDrawTarget::stdout_with_hz(10),
219 )
220 } else {
221 ProgressBar::hidden()
222 };
223 pbar.set_style(
224 ProgressStyle::default_bar().template(
225 "[{elapsed_precise}]/[{eta}] {bar:40.cyan/blue} {pos:>7}/{len:7} {msg}",
226 ),
227 );
228 pbar.set_message("vacuuming");
229
230 info!("Going to vacuum them up.");
232 for vacuum_id in to_vacuum {
233 pile.raw_pile.delete(Keyspace::Chunk, &vacuum_id)?;
234 pbar.inc(1);
235 }
236 pbar.finish_and_clear();
237 }
238 }
239
240 Ok(errors)
241}
242
243pub fn check_shallow<RP: RawPile>(
249 pile: Pile<RP>,
250 vacuum: VacuumMode,
251 make_progress_bar: bool,
252 check_existence: bool,
253) -> anyhow::Result<u32> {
254 let pile = Pile::new(VacuumRawPile::new(
255 pile.raw_pile,
256 vacuum != VacuumMode::NoVacuum,
257 ));
258
259 let mut additional_seen: HashSet<ChunkId> = HashSet::new();
260
261 let mut errors = 0;
262
263 let mut to_check = Vec::new();
264 let pointer_list = pile.list_pointers()?;
265
266 for pointer in pointer_list.iter() {
267 info!("Checking pointer {:?}", pointer);
268 match pile.read_pointer(&pointer)? {
269 Some(pointer_data) => {
270 if let Some(parent) = pointer_data.parent_pointer {
271 if !pointer_list.contains(&parent) {
272 errors += 1;
273 error!(
274 "Pointer {:?} has a parent {:?} which does not exist.",
275 pointer, parent
276 );
277 }
278 }
279
280 let tree_node = retrieve_tree_node(&pile, pointer_data.chunk_ref.clone())?;
281 tree_node.node.visit(
282 &mut |node, _| {
283 if let TreeNode::NormalFile { content, .. } = node {
284 to_check.push(content.clone());
285 }
286 Ok(())
287 },
288 "".to_owned(),
289 )?;
290 }
291 None => {
292 errors += 1;
293 error!("Pointer {:?} does not seem to exist.", pointer);
294 }
295 }
296 }
297
298 let pbar = if make_progress_bar {
299 ProgressBar::with_draw_target(1000 as u64, ProgressDrawTarget::stdout_with_hz(10))
300 } else {
301 ProgressBar::hidden()
302 };
303 pbar.set_style(
304 ProgressStyle::default_bar()
305 .template("[{elapsed_precise}]/[{eta}] {bar:40.cyan/blue} {pos:>7}/{len:7} {msg}"),
306 );
307 pbar.set_message("checking");
308
309 let mut done = 0;
310
311 while let Some(next_to_check) = to_check.pop() {
312 done += 1;
313 pbar.set_length(done + to_check.len() as u64);
314 pbar.set_position(done);
315
316 if next_to_check.depth > 0 {
317 let mut reduced_height = next_to_check.clone();
318 reduced_height.depth -= 1;
319
320 let mut chunk_id_buf: ChunkId = Default::default();
321
322 let mut unchunker = RecursiveUnchunker::new(&pile, reduced_height);
323 loop {
324 let read_bytes = unchunker.read(&mut chunk_id_buf)?;
325
326 if read_bytes == 0 {
327 break;
329 }
330
331 if read_bytes < chunk_id_buf.len() {
332 unchunker.read_exact(&mut chunk_id_buf[read_bytes..])?;
334 }
335
336 if check_existence && !pile.chunk_exists(&chunk_id_buf)? {
337 errors += 1;
338 warn!("Chunk missing: {:?}", &chunk_id_buf);
339 }
340 additional_seen.insert(chunk_id_buf.clone());
341 }
342 } else {
343 additional_seen.insert(next_to_check.chunk_id);
345 }
346 }
347
348 pbar.finish_and_clear();
349
350 if errors > 0 {
351 error!("There were {:?}", errors);
352 } else {
353 info!("No errors.");
354 }
355
356 if errors == 0 && vacuum != VacuumMode::NoVacuum {
357 info!("Calculating sweep set for vacuuming.");
358 let mut to_vacuum = pile.raw_pile.calculate_vacuum_for_sweeping()?;
359 for element in additional_seen {
361 to_vacuum.remove(&element);
362 }
363 info!("{} chunks are ready to be vacuumed.", to_vacuum.len());
364 if vacuum == VacuumMode::Vacuum {
365 let pbar = if make_progress_bar {
366 ProgressBar::with_draw_target(
367 to_vacuum.len() as u64,
368 ProgressDrawTarget::stdout_with_hz(10),
369 )
370 } else {
371 ProgressBar::hidden()
372 };
373 pbar.set_style(
374 ProgressStyle::default_bar().template(
375 "[{elapsed_precise}]/[{eta}] {bar:40.cyan/blue} {pos:>7}/{len:7} {msg}",
376 ),
377 );
378 pbar.set_message("vacuuming");
379
380 info!("Going to vacuum them up.");
382 for vacuum_id in to_vacuum {
383 pile.raw_pile.delete(Keyspace::Chunk, &vacuum_id)?;
384 pbar.inc(1);
385 }
386 pbar.finish_and_clear();
387 }
388 }
389
390 Ok(errors)
391}