asimov_snapshot/
snapshot.rs1use asimov_module::resolve::Resolver;
4use asimov_registry::Registry;
5use asimov_runner::GraphOutput;
6use jiff::{Span, Timestamp, ToSpan};
7use std::{
8 format,
9 io::{self, Result},
10 string::{String, ToString},
11 vec::Vec,
12};
13
14#[derive(Clone, Debug, PartialEq, Eq, bon::Builder)]
15pub struct Snapshot {
16 #[builder(into)]
17 pub url: std::string::String,
18 #[builder(into)]
19 pub data: std::vec::Vec<u8>,
20 pub start_timestamp: jiff::Timestamp,
21 pub end_timestamp: Option<jiff::Timestamp>,
22}
23
24#[derive(Clone, Debug, bon::Builder)]
25pub struct Options {
26 #[builder(required, default = Some(1.minute()))]
29 #[builder(with = |duration: std::time::Duration| -> core::result::Result<_, jiff::Error> { Span::try_from(duration).map(Option::Some) })]
30 pub max_current_age: Option<Span>,
31}
32
33impl Default for Options {
34 fn default() -> Self {
35 Self {
36 max_current_age: Some(1.minute()),
37 }
38 }
39}
40
41pub struct Snapshotter<S> {
42 registry: Registry,
45 storage: S,
46 options: Options,
47
48 cached_resolver: Option<Resolver>,
49}
50
51impl<S> Snapshotter<S> {
52 pub fn new(registry: Registry, storage: S, options: Options) -> Self {
53 Self {
54 registry,
55 storage,
56 options,
57 cached_resolver: None,
58 }
59 }
60}
61
62impl<S: crate::storage::Storage> Snapshotter<S> {
63 #[tracing::instrument(skip(self), fields(url = url.as_ref()))]
65 pub async fn snapshot(&mut self, url: impl AsRef<str>) -> Result<Snapshot> {
66 if self.cached_resolver.is_none() {
67 let modules = self
68 .registry
69 .enabled_modules()
70 .await
71 .map_err(io::Error::other)?
72 .into_iter()
73 .map(|enabled| enabled.manifest)
74 .filter(|manifest| {
75 manifest
76 .provides
77 .programs
78 .iter()
79 .any(|p| p.ends_with("-fetcher") || p.ends_with("-cataloger"))
80 });
81 let resolver = Resolver::try_from_iter(modules).map_err(io::Error::other)?;
82 self.cached_resolver = Some(resolver);
83 }
84 let resolver = self.cached_resolver.as_ref().unwrap();
85
86 let module = resolver
87 .resolve(url.as_ref())
88 .map_err(std::io::Error::other)?
89 .first()
90 .cloned()
91 .ok_or_else(|| std::io::Error::other("No module found for creating snapshot"))?;
92
93 let programs = self
94 .registry
95 .read_manifest(&module.name)
96 .await
97 .map_err(io::Error::other)?
98 .manifest
99 .provides
100 .programs;
101
102 let url = url.as_ref().to_string();
103
104 let fetcher = programs.iter().find(|p| p.ends_with("-fetcher"));
105 let cataloger = programs.iter().find(|p| p.ends_with("-cataloger"));
106
107 if fetcher.is_none() && cataloger.is_none() {
108 return Err(std::io::Error::other(
109 "No module found for creating snapshot",
110 ));
111 }
112
113 let fetcher_error = if let Some(program) = fetcher {
115 tracing::debug!("attempting to capture a snapshot with fetcher");
116 let start_timestamp = Timestamp::now();
117 match asimov_runner::Fetcher::new(
118 program,
119 &url,
120 GraphOutput::Captured,
121 Default::default(),
122 )
123 .execute()
124 .await
125 .inspect_err(|e| tracing::debug!("failed creating a snapshot with fetcher: {e}"))
126 {
127 Ok(result) => {
128 let snapshot = Snapshot {
129 url,
130 start_timestamp,
131 end_timestamp: Some(Timestamp::now()),
132 data: result.into_inner(),
133 };
134 self.storage.save(&snapshot)?;
135
136 return Ok(snapshot);
137 },
138 Err(e) => Some(e),
139 }
140 } else {
141 None
142 };
143
144 let cataloger_error = if let Some(program) = cataloger {
146 tracing::debug!("attempting to capture a snapshot with cataloger");
147 let start_timestamp = Timestamp::now();
148 match asimov_runner::Cataloger::new(
149 program,
150 &url,
151 GraphOutput::Captured,
152 Default::default(),
153 )
154 .execute()
155 .await
156 .inspect_err(|e| tracing::debug!("failed creating a snapshot with cataloger: {e}"))
157 {
158 Ok(result) => {
159 let snapshot = Snapshot {
160 url,
161 start_timestamp,
162 end_timestamp: Some(Timestamp::now()),
163 data: result.into_inner(),
164 };
165 self.storage.save(&snapshot)?;
166
167 return Ok(snapshot);
168 },
169 Err(e) => Some(e),
170 }
171 } else {
172 None
173 };
174
175 let error_msg = match (fetcher_error, cataloger_error) {
176 (Some(fe), Some(ce)) => {
177 format!("both fetcher and cataloger failed - fetcher: {fe}, cataloger: {ce}")
178 },
179 (Some(fe), None) => format!("fetcher failed: {fe}"),
180 (None, Some(ce)) => format!("cataloger failed: {ce}"),
181 (None, None) => unreachable!("At least one program should exist at this point"),
182 };
183
184 Err(std::io::Error::other(error_msg.replace('\n', " ")))
185 }
186
187 #[tracing::instrument(skip(self), fields(url = url.as_ref()))]
189 pub async fn read(&self, url: impl AsRef<str>, timestamp: Timestamp) -> Result<Snapshot> {
190 self.storage.read(url, timestamp)
191 }
192
193 #[tracing::instrument(skip(self), fields(url = url.as_ref()))]
198 pub async fn read_current(&mut self, url: impl AsRef<str>) -> Result<Snapshot> {
199 if let Some(max_age) = &self.options.max_current_age {
200 let ts = self
201 .storage
202 .current_version(&url)?
203 .to_zoned(jiff::tz::TimeZone::UTC);
204 let now = jiff::Zoned::now();
205
206 let diff = &now - &ts;
207
208 match diff.compare((max_age, &now)) {
215 Ok(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal) => {
216 tracing::debug!("Updating current snapshot...");
217 return self.snapshot(&url).await;
218 },
219 Ok(std::cmp::Ordering::Less) => return self.storage.read_current(url),
220 Err(err) => {
221 tracing::error!(?err, "unable to compare timestamps, not updating snapshot")
222 },
223 };
224 };
225
226 self.storage.read_current(url)
227 }
228
229 #[tracing::instrument(skip(self))]
232 pub async fn list(&self) -> Result<Vec<(String, Timestamp)>> {
233 self.storage.list_urls()
234 }
235
236 #[tracing::instrument(skip(self))]
239 pub async fn log(&self, url: &str) -> Result<Vec<Timestamp>> {
240 self.storage.list_snapshots(url)
241 }
242
243 #[tracing::instrument(skip(self), fields(url = url.as_ref()))]
246 pub async fn compact(&self, url: impl AsRef<str>) -> Result<()> {
247 let timestamps = self.storage.list_snapshots(&url)?;
249 let Some(latest) = timestamps.iter().max() else {
250 return Ok(());
251 };
252 tracing::debug!("Deleting snapshots older than `{latest}`");
253 for &ts in timestamps.iter().filter(|&ts| ts != latest) {
254 self.storage.delete(&url, ts)?;
255 }
256 Ok(())
257 }
258}