asimov_snapshot/
snapshot.rs

1// This is free and unencumbered software released into the public domain.
2
3use asimov_module::resolve::Resolver;
4use asimov_registry::Registry;
5use asimov_runner::GraphOutput;
6use jiff::{Span, Timestamp, ToSpan};
7use std::{
8    format,
9    io::{self, Result},
10    string::{String, ToString},
11    vec::Vec,
12};
13
14#[derive(Clone, Debug, PartialEq, Eq, bon::Builder)]
15pub struct Snapshot {
16    #[builder(into)]
17    pub url: std::string::String,
18    #[builder(into)]
19    pub data: std::vec::Vec<u8>,
20    pub start_timestamp: jiff::Timestamp,
21    pub end_timestamp: Option<jiff::Timestamp>,
22}
23
24#[derive(Clone, Debug, bon::Builder)]
25pub struct Options {
26    /// Controls maximum age of the "current" snapshot that is allowed to be
27    /// returned from `Snapshotter.read_current`.
28    #[builder(required, default = Some(1.minute()))]
29    #[builder(with = |duration: std::time::Duration| -> core::result::Result<_, jiff::Error> { Span::try_from(duration).map(Option::Some) })]
30    pub max_current_age: Option<Span>,
31}
32
33impl Default for Options {
34    fn default() -> Self {
35        Self {
36            max_current_age: Some(1.minute()),
37        }
38    }
39}
40
41pub struct Snapshotter<S> {
42    // TODO: not critical but would be nice to have the ability to inject the fetcher impl:
43    // fetcher: F,
44    registry: Registry,
45    storage: S,
46    options: Options,
47
48    cached_resolver: Option<Resolver>,
49}
50
51impl<S> Snapshotter<S> {
52    pub fn new(registry: Registry, storage: S, options: Options) -> Self {
53        Self {
54            registry,
55            storage,
56            options,
57            cached_resolver: None,
58        }
59    }
60}
61
62impl<S: crate::storage::Storage> Snapshotter<S> {
63    /// Fetches the content from an URL and saves it to the snapshot storage.
64    #[tracing::instrument(skip(self), fields(url = url.as_ref()))]
65    pub async fn snapshot(&mut self, url: impl AsRef<str>) -> Result<Snapshot> {
66        if self.cached_resolver.is_none() {
67            let modules = self
68                .registry
69                .enabled_modules()
70                .await
71                .map_err(io::Error::other)?
72                .into_iter()
73                .map(|enabled| enabled.manifest)
74                .filter(|manifest| {
75                    manifest
76                        .provides
77                        .programs
78                        .iter()
79                        .any(|p| p.ends_with("-fetcher") || p.ends_with("-cataloger"))
80                });
81            let resolver = Resolver::try_from_iter(modules).map_err(io::Error::other)?;
82            self.cached_resolver = Some(resolver);
83        }
84        let resolver = self.cached_resolver.as_ref().unwrap();
85
86        let module = resolver
87            .resolve(url.as_ref())
88            .map_err(std::io::Error::other)?
89            .first()
90            .cloned()
91            .ok_or_else(|| std::io::Error::other("No module found for creating snapshot"))?;
92
93        let programs = self
94            .registry
95            .read_manifest(&module.name)
96            .await
97            .map_err(io::Error::other)?
98            .manifest
99            .provides
100            .programs;
101
102        let url = url.as_ref().to_string();
103
104        let fetcher = programs.iter().find(|p| p.ends_with("-fetcher"));
105        let cataloger = programs.iter().find(|p| p.ends_with("-cataloger"));
106
107        if fetcher.is_none() && cataloger.is_none() {
108            return Err(std::io::Error::other(
109                "No module found for creating snapshot",
110            ));
111        }
112
113        // try fetcher if available
114        let fetcher_error = if let Some(program) = fetcher {
115            tracing::debug!("attempting to capture a snapshot with fetcher");
116            let start_timestamp = Timestamp::now();
117            match asimov_runner::Fetcher::new(
118                program,
119                &url,
120                GraphOutput::Captured,
121                Default::default(),
122            )
123            .execute()
124            .await
125            .inspect_err(|e| tracing::debug!("failed creating a snapshot with fetcher: {e}"))
126            {
127                Ok(result) => {
128                    let snapshot = Snapshot {
129                        url,
130                        start_timestamp,
131                        end_timestamp: Some(Timestamp::now()),
132                        data: result.into_inner(),
133                    };
134                    self.storage.save(&snapshot)?;
135
136                    return Ok(snapshot);
137                },
138                Err(e) => Some(e),
139            }
140        } else {
141            None
142        };
143
144        // if fetcher failed or doesn't exist, try cataloger
145        let cataloger_error = if let Some(program) = cataloger {
146            tracing::debug!("attempting to capture a snapshot with cataloger");
147            let start_timestamp = Timestamp::now();
148            match asimov_runner::Cataloger::new(
149                program,
150                &url,
151                GraphOutput::Captured,
152                Default::default(),
153            )
154            .execute()
155            .await
156            .inspect_err(|e| tracing::debug!("failed creating a snapshot with cataloger: {e}"))
157            {
158                Ok(result) => {
159                    let snapshot = Snapshot {
160                        url,
161                        start_timestamp,
162                        end_timestamp: Some(Timestamp::now()),
163                        data: result.into_inner(),
164                    };
165                    self.storage.save(&snapshot)?;
166
167                    return Ok(snapshot);
168                },
169                Err(e) => Some(e),
170            }
171        } else {
172            None
173        };
174
175        let error_msg = match (fetcher_error, cataloger_error) {
176            (Some(fe), Some(ce)) => {
177                format!("both fetcher and cataloger failed - fetcher: {fe}, cataloger: {ce}")
178            },
179            (Some(fe), None) => format!("fetcher failed: {fe}"),
180            (None, Some(ce)) => format!("cataloger failed: {ce}"),
181            (None, None) => unreachable!("At least one program should exist at this point"),
182        };
183
184        Err(std::io::Error::other(error_msg.replace('\n', " ")))
185    }
186
187    /// Returns the snapshot content of an URL at the given timestamp.
188    #[tracing::instrument(skip(self), fields(url = url.as_ref()))]
189    pub async fn read(&self, url: impl AsRef<str>, timestamp: Timestamp) -> Result<Snapshot> {
190        self.storage.read(url, timestamp)
191    }
192
193    /// Returns the latest snapshot content of an URL.
194    ///
195    /// A fresh snapshot is first created if [`Options::max_current_age`]
196    /// is set and the latest snapshot is older than the maximum age.
197    #[tracing::instrument(skip(self), fields(url = url.as_ref()))]
198    pub async fn read_current(&mut self, url: impl AsRef<str>) -> Result<Snapshot> {
199        if let Some(max_age) = &self.options.max_current_age {
200            let ts = self
201                .storage
202                .current_version(&url)?
203                .to_zoned(jiff::tz::TimeZone::UTC);
204            let now = jiff::Zoned::now();
205
206            let diff = &now - &ts;
207
208            // should never error, we provide `(jiff::Span, &jiff::Zoned)` to `compare`.
209            // doc:
210            // > If either of the spans being compared have a non-zero calendar
211            // > unit (units bigger than hours), then this routine requires a
212            // > relative datetime. If one is not provided, then an error is
213            // > returned.
214            match diff.compare((max_age, &now)) {
215                Ok(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal) => {
216                    tracing::debug!("Updating current snapshot...");
217                    return self.snapshot(&url).await;
218                },
219                Ok(std::cmp::Ordering::Less) => return self.storage.read_current(url),
220                Err(err) => {
221                    tracing::error!(?err, "unable to compare timestamps, not updating snapshot")
222                },
223            };
224        };
225
226        self.storage.read_current(url)
227    }
228
229    /// Returns the list of snapshotted URLs along with their latest timestamps.
230    /// Entries can be read by [`Self::read`].
231    #[tracing::instrument(skip(self))]
232    pub async fn list(&self) -> Result<Vec<(String, Timestamp)>> {
233        self.storage.list_urls()
234    }
235
236    /// Returns the log of timestamps for a given URL.
237    /// Entries can be read by [`Self::read`].
238    #[tracing::instrument(skip(self))]
239    pub async fn log(&self, url: &str) -> Result<Vec<Timestamp>> {
240        self.storage.list_snapshots(url)
241    }
242
243    /// Deletes old snapshots for a given URL.
244    /// Currently everything but the latest snapshot is deleted.
245    #[tracing::instrument(skip(self), fields(url = url.as_ref()))]
246    pub async fn compact(&self, url: impl AsRef<str>) -> Result<()> {
247        // TODO: max hourly/daily/weekly/monthly/yearly snapshots
248        let timestamps = self.storage.list_snapshots(&url)?;
249        let Some(latest) = timestamps.iter().max() else {
250            return Ok(());
251        };
252        tracing::debug!("Deleting snapshots older than `{latest}`");
253        for &ts in timestamps.iter().filter(|&ts| ts != latest) {
254            self.storage.delete(&url, ts)?;
255        }
256        Ok(())
257    }
258}