yama/operations/
checking.rs

1/*
2This file is part of Yama.
3
4Yama is free software: you can redistribute it and/or modify
5it under the terms of the GNU General Public License as published by
6the Free Software Foundation, either version 3 of the License, or
7(at your option) any later version.
8
9Yama is distributed in the hope that it will be useful,
10but WITHOUT ANY WARRANTY; without even the implied warranty of
11MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12GNU General Public License for more details.
13
14You should have received a copy of the GNU General Public License
15along with Yama.  If not, see <https://www.gnu.org/licenses/>.
16*/
17
18
19use crate::chunking::RecursiveUnchunker;
20use crate::commands::retrieve_tree_node;
21use crate::definitions::{ChunkId, TreeNode};
22use crate::pile::{Keyspace, Pile, RawPile};
23use anyhow::bail;
24use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
25use log::{error, info, warn};
26use std::collections::HashSet;
27use std::io::{Read, Write};
28use std::sync::Mutex;
29
30#[derive(PartialEq, Eq, Copy, Clone, Debug)]
31pub enum VacuumMode {
32    NoVacuum,
33    DryRunVacuum,
34    Vacuum,
35}
36
37pub struct NullWriter {}
38
39impl Write for NullWriter {
40    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
41        Ok(buf.len())
42    }
43
44    fn flush(&mut self) -> std::io::Result<()> {
45        Ok(())
46    }
47}
48
49/// Mark-and-sweep style vacuuming system.
50pub struct VacuumRawPile<RP: RawPile> {
51    underlying: RP,
52    vacuum_tracking_enabled: bool,
53    retrieved_chunks: Mutex<HashSet<ChunkId>>,
54}
55
56impl<RP: RawPile> VacuumRawPile<RP> {
57    pub fn new(underlying: RP, vacuum_tracking_enabled: bool) -> Self {
58        VacuumRawPile {
59            underlying,
60            vacuum_tracking_enabled,
61            retrieved_chunks: Default::default(),
62        }
63    }
64
65    pub fn calculate_vacuum_for_sweeping(&self) -> anyhow::Result<HashSet<ChunkId>> {
66        if !self.vacuum_tracking_enabled {
67            bail!("Vacuum tracking not enabled, you can't calculate the vacuum set!");
68        }
69
70        let mut to_sweep = HashSet::new();
71
72        let retrieved_chunks = self.retrieved_chunks.lock().unwrap();
73
74        let mut chunk_id: ChunkId = Default::default();
75        for key in self.list_keys(Keyspace::Chunk)? {
76            chunk_id.clone_from_slice(&key?);
77            if !retrieved_chunks.contains(&chunk_id) {
78                to_sweep.insert(chunk_id.clone());
79            }
80        }
81
82        Ok(to_sweep)
83    }
84}
85
86impl<RP: RawPile> RawPile for VacuumRawPile<RP> {
87    fn exists(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result<bool> {
88        self.underlying.exists(kind, key)
89    }
90
91    fn read(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result<Option<Vec<u8>>> {
92        if self.vacuum_tracking_enabled && kind == Keyspace::Chunk {
93            let mut chunk_id: ChunkId = Default::default();
94            chunk_id.clone_from_slice(key);
95            self.retrieved_chunks.lock().unwrap().insert(chunk_id);
96        }
97        self.underlying.read(kind, key)
98    }
99
100    fn write(&self, kind: Keyspace, key: &[u8], value: &[u8]) -> anyhow::Result<()> {
101        self.underlying.write(kind, key, value)
102    }
103
104    fn delete(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result<()> {
105        self.underlying.delete(kind, key)
106    }
107
108    fn list_keys(
109        &self,
110        kind: Keyspace,
111    ) -> anyhow::Result<Box<dyn Iterator<Item = anyhow::Result<Vec<u8>>>>> {
112        self.underlying.list_keys(kind)
113    }
114
115    fn flush(&self) -> anyhow::Result<()> {
116        self.underlying.flush()
117    }
118
119    fn check_lowlevel(&self) -> anyhow::Result<bool> {
120        self.underlying.check_lowlevel()
121    }
122}
123
124pub fn check_deep<RP: RawPile>(
125    pile: Pile<RP>,
126    vacuum: VacuumMode,
127    make_progress_bar: bool,
128) -> anyhow::Result<u32> {
129    let pile = Pile::new(VacuumRawPile::new(
130        pile.raw_pile,
131        vacuum != VacuumMode::NoVacuum,
132    ));
133
134    let mut errors = 0;
135
136    let mut to_check = Vec::new();
137    let pointer_list = pile.list_pointers()?;
138
139    for pointer in pointer_list.iter() {
140        info!("Checking pointer {:?}", pointer);
141        match pile.read_pointer(&pointer)? {
142            Some(pointer_data) => {
143                if let Some(parent) = pointer_data.parent_pointer {
144                    if !pointer_list.contains(&parent) {
145                        errors += 1;
146                        error!(
147                            "Pointer {:?} has a parent {:?} which does not exist.",
148                            pointer, parent
149                        );
150                    }
151                }
152
153                let tree_node = retrieve_tree_node(&pile, pointer_data.chunk_ref.clone())?;
154                tree_node.node.visit(
155                    &mut |node, _| {
156                        if let TreeNode::NormalFile { content, .. } = node {
157                            to_check.push(content.clone());
158                        }
159                        Ok(())
160                    },
161                    "".to_owned(),
162                )?;
163            }
164            None => {
165                errors += 1;
166                error!("Pointer {:?} does not seem to exist.", pointer);
167            }
168        }
169    }
170
171    let pbar = if make_progress_bar {
172        ProgressBar::with_draw_target(1000 as u64, ProgressDrawTarget::stdout_with_hz(10))
173    } else {
174        ProgressBar::hidden()
175    };
176    pbar.set_style(
177        ProgressStyle::default_bar()
178            .template("[{elapsed_precise}]/[{eta}] {bar:40.cyan/blue} {pos:>7}/{len:7} {msg}"),
179    );
180    pbar.set_message("checking");
181
182    let mut done = 0;
183
184    while let Some(next_to_check) = to_check.pop() {
185        done += 1;
186        pbar.set_length(done + to_check.len() as u64);
187        pbar.set_position(done);
188
189        let mut unchunker = RecursiveUnchunker::new(&pile, next_to_check.clone());
190        match std::io::copy(&mut unchunker, &mut NullWriter {}) {
191            Ok(_) => {}
192            Err(err) => {
193                errors += 1;
194                warn!(
195                    "Error occurred when reading {:?}: {:?}.",
196                    next_to_check, err
197                );
198            }
199        }
200    }
201
202    pbar.finish_and_clear();
203
204    if errors > 0 {
205        error!("There were {:?}", errors);
206    } else {
207        info!("No errors.");
208    }
209
210    if errors == 0 && vacuum != VacuumMode::NoVacuum {
211        info!("Calculating sweep set for vacuuming.");
212        let to_vacuum = pile.raw_pile.calculate_vacuum_for_sweeping()?;
213        info!("{} chunks are ready to be vacuumed.", to_vacuum.len());
214        if vacuum == VacuumMode::Vacuum {
215            let pbar = if make_progress_bar {
216                ProgressBar::with_draw_target(
217                    to_vacuum.len() as u64,
218                    ProgressDrawTarget::stdout_with_hz(10),
219                )
220            } else {
221                ProgressBar::hidden()
222            };
223            pbar.set_style(
224                ProgressStyle::default_bar().template(
225                    "[{elapsed_precise}]/[{eta}] {bar:40.cyan/blue} {pos:>7}/{len:7} {msg}",
226                ),
227            );
228            pbar.set_message("vacuuming");
229
230            // actually do the vacuum!
231            info!("Going to vacuum them up.");
232            for vacuum_id in to_vacuum {
233                pile.raw_pile.delete(Keyspace::Chunk, &vacuum_id)?;
234                pbar.inc(1);
235            }
236            pbar.finish_and_clear();
237        }
238    }
239
240    Ok(errors)
241}
242
243/// A shallower check than the deep one. This avoids reading the last layer of chunks.
244/// (they are simply assumed to be OK.).
245/// This leads to much faster performance and is mostly intended for GC.
246/// We can check existence for those leaf chunks if desired. This still avoids the
247/// overhead of decryption, decompression and reading from disk/network.
248pub fn check_shallow<RP: RawPile>(
249    pile: Pile<RP>,
250    vacuum: VacuumMode,
251    make_progress_bar: bool,
252    check_existence: bool,
253) -> anyhow::Result<u32> {
254    let pile = Pile::new(VacuumRawPile::new(
255        pile.raw_pile,
256        vacuum != VacuumMode::NoVacuum,
257    ));
258
259    let mut additional_seen: HashSet<ChunkId> = HashSet::new();
260
261    let mut errors = 0;
262
263    let mut to_check = Vec::new();
264    let pointer_list = pile.list_pointers()?;
265
266    for pointer in pointer_list.iter() {
267        info!("Checking pointer {:?}", pointer);
268        match pile.read_pointer(&pointer)? {
269            Some(pointer_data) => {
270                if let Some(parent) = pointer_data.parent_pointer {
271                    if !pointer_list.contains(&parent) {
272                        errors += 1;
273                        error!(
274                            "Pointer {:?} has a parent {:?} which does not exist.",
275                            pointer, parent
276                        );
277                    }
278                }
279
280                let tree_node = retrieve_tree_node(&pile, pointer_data.chunk_ref.clone())?;
281                tree_node.node.visit(
282                    &mut |node, _| {
283                        if let TreeNode::NormalFile { content, .. } = node {
284                            to_check.push(content.clone());
285                        }
286                        Ok(())
287                    },
288                    "".to_owned(),
289                )?;
290            }
291            None => {
292                errors += 1;
293                error!("Pointer {:?} does not seem to exist.", pointer);
294            }
295        }
296    }
297
298    let pbar = if make_progress_bar {
299        ProgressBar::with_draw_target(1000 as u64, ProgressDrawTarget::stdout_with_hz(10))
300    } else {
301        ProgressBar::hidden()
302    };
303    pbar.set_style(
304        ProgressStyle::default_bar()
305            .template("[{elapsed_precise}]/[{eta}] {bar:40.cyan/blue} {pos:>7}/{len:7} {msg}"),
306    );
307    pbar.set_message("checking");
308
309    let mut done = 0;
310
311    while let Some(next_to_check) = to_check.pop() {
312        done += 1;
313        pbar.set_length(done + to_check.len() as u64);
314        pbar.set_position(done);
315
316        if next_to_check.depth > 0 {
317            let mut reduced_height = next_to_check.clone();
318            reduced_height.depth -= 1;
319
320            let mut chunk_id_buf: ChunkId = Default::default();
321
322            let mut unchunker = RecursiveUnchunker::new(&pile, reduced_height);
323            loop {
324                let read_bytes = unchunker.read(&mut chunk_id_buf)?;
325
326                if read_bytes == 0 {
327                    // end of chunks, because return of zero here means EOF
328                    break;
329                }
330
331                if read_bytes < chunk_id_buf.len() {
332                    // any error, including EOF at this point, is an error
333                    unchunker.read_exact(&mut chunk_id_buf[read_bytes..])?;
334                }
335
336                if check_existence && !pile.chunk_exists(&chunk_id_buf)? {
337                    errors += 1;
338                    warn!("Chunk missing: {:?}", &chunk_id_buf);
339                }
340                additional_seen.insert(chunk_id_buf.clone());
341            }
342        } else {
343            // already shallowest, just add the reference to the seen list.
344            additional_seen.insert(next_to_check.chunk_id);
345        }
346    }
347
348    pbar.finish_and_clear();
349
350    if errors > 0 {
351        error!("There were {:?}", errors);
352    } else {
353        info!("No errors.");
354    }
355
356    if errors == 0 && vacuum != VacuumMode::NoVacuum {
357        info!("Calculating sweep set for vacuuming.");
358        let mut to_vacuum = pile.raw_pile.calculate_vacuum_for_sweeping()?;
359        // don't forget to include the leaves that we didn't actually visit!
360        for element in additional_seen {
361            to_vacuum.remove(&element);
362        }
363        info!("{} chunks are ready to be vacuumed.", to_vacuum.len());
364        if vacuum == VacuumMode::Vacuum {
365            let pbar = if make_progress_bar {
366                ProgressBar::with_draw_target(
367                    to_vacuum.len() as u64,
368                    ProgressDrawTarget::stdout_with_hz(10),
369                )
370            } else {
371                ProgressBar::hidden()
372            };
373            pbar.set_style(
374                ProgressStyle::default_bar().template(
375                    "[{elapsed_precise}]/[{eta}] {bar:40.cyan/blue} {pos:>7}/{len:7} {msg}",
376                ),
377            );
378            pbar.set_message("vacuuming");
379
380            // actually do the vacuum!
381            info!("Going to vacuum them up.");
382            for vacuum_id in to_vacuum {
383                pile.raw_pile.delete(Keyspace::Chunk, &vacuum_id)?;
384                pbar.inc(1);
385            }
386            pbar.finish_and_clear();
387        }
388    }
389
390    Ok(errors)
391}