snops_checkpoint/
manager.rs

1use std::{collections::BTreeMap, fs, path::PathBuf};
2
3use chrono::{DateTime, TimeDelta, Utc};
4use rayon::iter::{IntoParallelIterator, ParallelIterator};
5use tracing::{error, trace};
6
7#[cfg(feature = "write")]
8use crate::errors::{ManagerCullError, ManagerInsertError, ManagerPollError};
9use crate::{errors::ManagerLoadError, path_from_height, CheckpointHeader, RetentionPolicy};
10
11#[derive(Debug, Clone)]
12pub struct CheckpointManager {
13    #[cfg(feature = "write")]
14    storage_path: PathBuf,
15    policy: RetentionPolicy,
16    /// timestamp -> checkpoint header
17    checkpoints: BTreeMap<DateTime<Utc>, (CheckpointHeader, PathBuf)>,
18}
19
20/// Block timestamps are seconds since Unix epoch UTC
21fn datetime_from_int(timestamp: i64) -> DateTime<Utc> {
22    DateTime::UNIX_EPOCH + TimeDelta::new(timestamp, 0).unwrap()
23}
24
25impl CheckpointManager {
26    /// Given a storage path, load headers from the available checkpoints into
27    /// memory
28    pub fn load(storage_path: PathBuf, policy: RetentionPolicy) -> Result<Self, ManagerLoadError> {
29        use ManagerLoadError::*;
30
31        // assemble glob checkpoint files based on the provided storage path
32        let Some(storage_glob) = path_from_height(&storage_path, "*") else {
33            return Err(InvalidStoragePath(storage_path));
34        };
35        let paths = glob::glob(&storage_glob.to_string_lossy())?;
36        // this ridiculous Path result from glob is NOT IntoParallelIterator
37        let paths = paths.into_iter().collect::<Vec<_>>();
38
39        // read checkpoint headers in parallel
40        let checkpoints = paths
41            .into_par_iter()
42            .filter_map(|path| {
43                let path = match path {
44                    Ok(path) => path,
45                    Err(err) => {
46                        error!("error globbing {storage_path:?}: {err}");
47                        return None;
48                    }
49                };
50
51                // parse only the header from the given path
52                let header = match CheckpointHeader::read_file(&path) {
53                    Ok(header) => header,
54                    Err(err) => {
55                        error!("error parsing {path:?}: {err}");
56                        return None;
57                    }
58                };
59
60                let timestamp = datetime_from_int(header.timestamp);
61                Some((timestamp, (header, path)))
62            })
63            .collect();
64
65        Ok(Self {
66            #[cfg(feature = "write")]
67            storage_path,
68            checkpoints,
69            policy,
70        })
71    }
72
73    /// Cull checkpoints that are incompatible with the current block database
74    #[cfg(feature = "write")]
75    pub fn cull_incompatible<N: crate::aleo::Network>(
76        &mut self,
77    ) -> Result<usize, ManagerCullError> {
78        use ManagerCullError::*;
79
80        use crate::aleo::*;
81
82        let blocks = BlockDB::<N>::open(StorageMode::Custom(self.storage_path.clone()))
83            .map_err(StorageOpenError)?;
84
85        let mut rejected = vec![];
86
87        for (time, (header, path)) in self.checkpoints.iter() {
88            let height = header.block_height;
89            let Some(block_hash): Option<BlockHash<N>> =
90                blocks.get_block_hash(height).map_err(ReadLedger)?
91            else {
92                trace!("checkpoint {path:?} at height {height} is taller than the ledger");
93                rejected.push(*time);
94                continue;
95            };
96            if block_bytes::<N>(&block_hash) != header.block_hash {
97                trace!("checkpoint {path:?} is incompatible with block at height {height}");
98                rejected.push(*time);
99            }
100        }
101
102        let count = rejected.len();
103        for time in rejected {
104            if let Some((_header, path)) = self.checkpoints.remove(&time) {
105                if let Err(err) = fs::remove_file(&path) {
106                    error!("error deleting incompatible checkpoint {path:?}: {err}");
107                }
108            }
109        }
110
111        Ok(count)
112    }
113
114    /// Delete all checkpoints stored by this manager
115    pub fn wipe(&mut self) {
116        for (_header, path) in self.checkpoints.values() {
117            if let Err(err) = fs::remove_file(path) {
118                error!("error deleting checkpoint {path:?}: {err}");
119            }
120        }
121        self.checkpoints.clear();
122    }
123
124    /// Poll the ledger for a new checkpoint and write it to disk
125    /// Also reject old checkpoints that are no longer needed
126    #[cfg(feature = "write")]
127    pub fn poll<N: crate::aleo::Network>(&mut self) -> Result<bool, ManagerPollError> {
128        let header = CheckpointHeader::read_ledger::<N>(self.storage_path.clone())?;
129        let time = header.time();
130
131        if !self.is_ready(&time) || header.block_height == 0 {
132            return Ok(false);
133        }
134
135        trace!("creating checkpoint @ {}...", header.block_height);
136        let checkpoint =
137            crate::Checkpoint::<N>::new_from_header(self.storage_path.clone(), header)?;
138        self.write_and_insert(checkpoint)?;
139        self.cull_timestamp(time);
140        Ok(true)
141    }
142
143    /// Check if the manager is ready to create a new checkpoint given the
144    /// current timestamp
145    pub fn is_ready(&self, timestamp: &DateTime<Utc>) -> bool {
146        let Some((last_time, _)) = self.checkpoints.last_key_value() else {
147            // if this is the first checkpoint, it is ready
148            return true;
149        };
150
151        self.policy.is_ready_with_time(timestamp, last_time)
152    }
153
154    /// Write a checkpoint to disk and insert it into the manager
155    #[cfg(feature = "write")]
156    pub fn write_and_insert<N: crate::aleo::Network>(
157        &mut self,
158        checkpoint: crate::Checkpoint<N>,
159    ) -> Result<(), ManagerInsertError> {
160        use ManagerInsertError::*;
161
162        use crate::aleo::ToBytes;
163
164        let Some(path) = path_from_height(&self.storage_path, checkpoint.height()) else {
165            return Err(InvalidStoragePath(self.storage_path.clone()));
166        };
167
168        // write to a file
169        let mut writer = fs::File::options()
170            .write(true)
171            .create(true)
172            .truncate(true)
173            .open(&path)
174            .map_err(FileError)?;
175        writer
176            .set_times(std::fs::FileTimes::new().set_modified(checkpoint.header.time().into()))
177            .map_err(ModifyError)?;
178        checkpoint.write_le(&mut writer).map_err(WriteError)?;
179
180        trace!(
181            "checkpoint on {} @ {} written to {path:?}",
182            checkpoint.header.time(),
183            checkpoint.height(),
184        );
185
186        self.checkpoints
187            .insert(checkpoint.header.time(), (checkpoint.header, path));
188        Ok(())
189    }
190
191    pub fn cull(&mut self) {
192        self.cull_timestamp(Utc::now());
193    }
194
195    /// Remove the oldest checkpoints that are no longer needed
196    pub fn cull_timestamp(&mut self, timestamp: DateTime<Utc>) {
197        let times = self.checkpoints.keys().collect();
198        let rejected = self.policy.reject_with_time(timestamp, times);
199        for time in rejected {
200            if let Some((_header, path)) = self.checkpoints.remove(&time) {
201                trace!("deleting rejected checkpoint {path:?}");
202                if let Err(err) = fs::remove_file(&path) {
203                    error!("error deleting rejected checkpoint {path:?}: {err}");
204                }
205            }
206        }
207    }
208
209    /// Get the retention policy used by this manager
210    pub fn policy(&self) -> &RetentionPolicy {
211        &self.policy
212    }
213
214    /// Iterate the checkpoints stored by this manager
215    pub fn checkpoints(&self) -> impl Iterator<Item = &(CheckpointHeader, PathBuf)> {
216        self.checkpoints.values()
217    }
218}
219
220impl std::fmt::Display for CheckpointManager {
221    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
222        let mut prev_time: Option<DateTime<Utc>> = None;
223        write!(f, "{} checkpoints:", self.checkpoints.len())?;
224        for (time, (header, _)) in &self.checkpoints {
225            write!(
226                f,
227                "\n  {time}: block {}, {}",
228                header.block_height,
229                if let Some(prev) = prev_time {
230                    format!(
231                        "{}hr later",
232                        time.signed_duration_since(prev).num_seconds() / 3600
233                    )
234                } else {
235                    "".to_string()
236                }
237            )?;
238            prev_time = Some(*time);
239        }
240        Ok(())
241    }
242}