snops_checkpoint/
manager.rs1use std::{collections::BTreeMap, fs, path::PathBuf};
2
3use chrono::{DateTime, TimeDelta, Utc};
4use rayon::iter::{IntoParallelIterator, ParallelIterator};
5use tracing::{error, trace};
6
7#[cfg(feature = "write")]
8use crate::errors::{ManagerCullError, ManagerInsertError, ManagerPollError};
9use crate::{errors::ManagerLoadError, path_from_height, CheckpointHeader, RetentionPolicy};
10
11#[derive(Debug, Clone)]
12pub struct CheckpointManager {
13 #[cfg(feature = "write")]
14 storage_path: PathBuf,
15 policy: RetentionPolicy,
16 checkpoints: BTreeMap<DateTime<Utc>, (CheckpointHeader, PathBuf)>,
18}
19
20fn datetime_from_int(timestamp: i64) -> DateTime<Utc> {
22 DateTime::UNIX_EPOCH + TimeDelta::new(timestamp, 0).unwrap()
23}
24
25impl CheckpointManager {
26 pub fn load(storage_path: PathBuf, policy: RetentionPolicy) -> Result<Self, ManagerLoadError> {
29 use ManagerLoadError::*;
30
31 let Some(storage_glob) = path_from_height(&storage_path, "*") else {
33 return Err(InvalidStoragePath(storage_path));
34 };
35 let paths = glob::glob(&storage_glob.to_string_lossy())?;
36 let paths = paths.into_iter().collect::<Vec<_>>();
38
39 let checkpoints = paths
41 .into_par_iter()
42 .filter_map(|path| {
43 let path = match path {
44 Ok(path) => path,
45 Err(err) => {
46 error!("error globbing {storage_path:?}: {err}");
47 return None;
48 }
49 };
50
51 let header = match CheckpointHeader::read_file(&path) {
53 Ok(header) => header,
54 Err(err) => {
55 error!("error parsing {path:?}: {err}");
56 return None;
57 }
58 };
59
60 let timestamp = datetime_from_int(header.timestamp);
61 Some((timestamp, (header, path)))
62 })
63 .collect();
64
65 Ok(Self {
66 #[cfg(feature = "write")]
67 storage_path,
68 checkpoints,
69 policy,
70 })
71 }
72
73 #[cfg(feature = "write")]
75 pub fn cull_incompatible<N: crate::aleo::Network>(
76 &mut self,
77 ) -> Result<usize, ManagerCullError> {
78 use ManagerCullError::*;
79
80 use crate::aleo::*;
81
82 let blocks = BlockDB::<N>::open(StorageMode::Custom(self.storage_path.clone()))
83 .map_err(StorageOpenError)?;
84
85 let mut rejected = vec![];
86
87 for (time, (header, path)) in self.checkpoints.iter() {
88 let height = header.block_height;
89 let Some(block_hash): Option<BlockHash<N>> =
90 blocks.get_block_hash(height).map_err(ReadLedger)?
91 else {
92 trace!("checkpoint {path:?} at height {height} is taller than the ledger");
93 rejected.push(*time);
94 continue;
95 };
96 if block_bytes::<N>(&block_hash) != header.block_hash {
97 trace!("checkpoint {path:?} is incompatible with block at height {height}");
98 rejected.push(*time);
99 }
100 }
101
102 let count = rejected.len();
103 for time in rejected {
104 if let Some((_header, path)) = self.checkpoints.remove(&time) {
105 if let Err(err) = fs::remove_file(&path) {
106 error!("error deleting incompatible checkpoint {path:?}: {err}");
107 }
108 }
109 }
110
111 Ok(count)
112 }
113
114 pub fn wipe(&mut self) {
116 for (_header, path) in self.checkpoints.values() {
117 if let Err(err) = fs::remove_file(path) {
118 error!("error deleting checkpoint {path:?}: {err}");
119 }
120 }
121 self.checkpoints.clear();
122 }
123
124 #[cfg(feature = "write")]
127 pub fn poll<N: crate::aleo::Network>(&mut self) -> Result<bool, ManagerPollError> {
128 let header = CheckpointHeader::read_ledger::<N>(self.storage_path.clone())?;
129 let time = header.time();
130
131 if !self.is_ready(&time) || header.block_height == 0 {
132 return Ok(false);
133 }
134
135 trace!("creating checkpoint @ {}...", header.block_height);
136 let checkpoint =
137 crate::Checkpoint::<N>::new_from_header(self.storage_path.clone(), header)?;
138 self.write_and_insert(checkpoint)?;
139 self.cull_timestamp(time);
140 Ok(true)
141 }
142
143 pub fn is_ready(&self, timestamp: &DateTime<Utc>) -> bool {
146 let Some((last_time, _)) = self.checkpoints.last_key_value() else {
147 return true;
149 };
150
151 self.policy.is_ready_with_time(timestamp, last_time)
152 }
153
154 #[cfg(feature = "write")]
156 pub fn write_and_insert<N: crate::aleo::Network>(
157 &mut self,
158 checkpoint: crate::Checkpoint<N>,
159 ) -> Result<(), ManagerInsertError> {
160 use ManagerInsertError::*;
161
162 use crate::aleo::ToBytes;
163
164 let Some(path) = path_from_height(&self.storage_path, checkpoint.height()) else {
165 return Err(InvalidStoragePath(self.storage_path.clone()));
166 };
167
168 let mut writer = fs::File::options()
170 .write(true)
171 .create(true)
172 .truncate(true)
173 .open(&path)
174 .map_err(FileError)?;
175 writer
176 .set_times(std::fs::FileTimes::new().set_modified(checkpoint.header.time().into()))
177 .map_err(ModifyError)?;
178 checkpoint.write_le(&mut writer).map_err(WriteError)?;
179
180 trace!(
181 "checkpoint on {} @ {} written to {path:?}",
182 checkpoint.header.time(),
183 checkpoint.height(),
184 );
185
186 self.checkpoints
187 .insert(checkpoint.header.time(), (checkpoint.header, path));
188 Ok(())
189 }
190
191 pub fn cull(&mut self) {
192 self.cull_timestamp(Utc::now());
193 }
194
195 pub fn cull_timestamp(&mut self, timestamp: DateTime<Utc>) {
197 let times = self.checkpoints.keys().collect();
198 let rejected = self.policy.reject_with_time(timestamp, times);
199 for time in rejected {
200 if let Some((_header, path)) = self.checkpoints.remove(&time) {
201 trace!("deleting rejected checkpoint {path:?}");
202 if let Err(err) = fs::remove_file(&path) {
203 error!("error deleting rejected checkpoint {path:?}: {err}");
204 }
205 }
206 }
207 }
208
209 pub fn policy(&self) -> &RetentionPolicy {
211 &self.policy
212 }
213
214 pub fn checkpoints(&self) -> impl Iterator<Item = &(CheckpointHeader, PathBuf)> {
216 self.checkpoints.values()
217 }
218}
219
220impl std::fmt::Display for CheckpointManager {
221 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
222 let mut prev_time: Option<DateTime<Utc>> = None;
223 write!(f, "{} checkpoints:", self.checkpoints.len())?;
224 for (time, (header, _)) in &self.checkpoints {
225 write!(
226 f,
227 "\n {time}: block {}, {}",
228 header.block_height,
229 if let Some(prev) = prev_time {
230 format!(
231 "{}hr later",
232 time.signed_duration_since(prev).num_seconds() / 3600
233 )
234 } else {
235 "".to_string()
236 }
237 )?;
238 prev_time = Some(*time);
239 }
240 Ok(())
241 }
242}