1mod query;
62
63use std::error::Error as StdError;
64use std::fs;
65use std::fs::TryLockError;
66use std::io;
67use std::path::{Path, PathBuf};
68use std::thread;
69use std::time::{Duration, Instant, SystemTime};
70
71use flagset::FlagSet;
72use serde::{Deserialize, Serialize};
73use serde_json as json;
74use thiserror::Error;
75
76use powerpack_detach as detach;
77use powerpack_env as env;
78
79pub use crate::query::{Query, QueryError, QueryPolicy};
80
81const DATA: &str = "v2.json";
83
84pub type UpdateFn<'f, T, E> = Box<dyn FnOnce(Option<PrevEntry<T>>) -> Result<T, E> + 'f>;
86
87#[derive(Debug, Error)]
89#[non_exhaustive]
90pub enum BuildError {
91 #[error("home directory not found")]
93 NoHomeDir,
94}
95
96#[derive(Debug, Error)]
98#[non_exhaustive]
99enum UpdateError {
100 #[error("io error")]
102 Io(#[from] io::Error),
103
104 #[error("serialization error")]
106 Serialize(#[from] json::Error),
107
108 #[error("update fn failed: {0}")]
110 UpdateFn(#[from] Box<dyn StdError + Send + Sync + 'static>),
111}
112
113#[derive(Debug, Clone)]
115pub struct Builder {
116 directory: Option<PathBuf>,
117 query_policy: FlagSet<QueryPolicy>,
118 ttl: Duration,
119 initial_poll: Option<Duration>,
120}
121
122#[derive(Debug)]
126pub struct Cache {
127 directory: PathBuf,
128 query_policy: FlagSet<QueryPolicy>,
129 ttl: Duration,
130 initial_poll: Option<Duration>,
131}
132
133#[derive(Debug)]
137#[non_exhaustive]
138pub struct PrevEntry<T> {
139 pub entry: Result<Entry<T>, json::Error>,
141
142 query_checksum: Option<String>,
143 query_ttl: Duration,
144}
145
146#[derive(Debug, Clone, Deserialize, Serialize)]
150#[non_exhaustive]
151pub struct Entry<T> {
152 pub pre_update_time: SystemTime,
154 pub post_update_time: SystemTime,
156 pub checksum: Option<String>,
158 pub data: T,
160}
161
162impl Default for Builder {
163 #[inline]
164 fn default() -> Self {
165 Self::new()
166 }
167}
168
169impl Builder {
170 #[inline]
172 pub fn new() -> Self {
173 Builder {
174 directory: None,
175 query_policy: QueryPolicy::default_set(),
176 ttl: Duration::from_secs(60),
177 initial_poll: None,
178 }
179 }
180
181 #[inline]
191 pub fn directory(mut self, directory: impl Into<PathBuf>) -> Self {
192 self.directory = Some(directory.into());
193 self
194 }
195
196 pub fn policy(mut self, query_policy: impl Into<FlagSet<QueryPolicy>>) -> Self {
201 self.query_policy = query_policy.into();
202 self
203 }
204
205 #[inline]
214 pub fn ttl(mut self, ttl: Duration) -> Self {
215 self.ttl = ttl;
216 self
217 }
218
219 #[inline]
231 pub fn initial_poll(mut self, initial_poll: Duration) -> Self {
232 self.initial_poll = Some(initial_poll);
233 self
234 }
235
236 pub fn try_build(self) -> Result<Cache, BuildError> {
240 let Self {
241 directory,
242 query_policy,
243 ttl,
244 initial_poll,
245 } = self;
246
247 let directory = match directory {
248 Some(directory) => directory,
249 None => env::try_workflow_cache_or_default()
250 .ok_or(BuildError::NoHomeDir)?
251 .join("cache"),
252 };
253
254 Ok(Cache {
255 directory,
256 query_policy,
257 ttl,
258 initial_poll,
259 })
260 }
261
262 #[track_caller]
268 #[inline]
269 pub fn build(self) -> Cache {
270 self.try_build().expect("failed to build cache")
271 }
272}
273
274impl<T> PrevEntry<T> {
275 fn build(buf: &[u8], query_checksum: Option<String>, query_ttl: Duration) -> Self
276 where
277 T: for<'de> Deserialize<'de>,
278 {
279 let entry: Result<Entry<T>, _> = json::from_slice(buf);
280 Self {
281 entry,
282 query_checksum,
283 query_ttl,
284 }
285 }
286
287 fn should_update(&self, policy: FlagSet<QueryPolicy>) -> bool {
288 policy.contains(QueryPolicy::UpdateAlways)
289 || self.is_bad_data() && policy.contains(QueryPolicy::UpdateBadData)
290 || self.is_checksum_mismatch() && policy.contains(QueryPolicy::UpdateChecksumMismatch)
291 || self.is_expired() && policy.contains(QueryPolicy::UpdateExpired)
292 }
293
294 #[rustfmt::skip]
295 fn should_return(&self, policy: FlagSet<QueryPolicy>) -> bool {
296 policy.contains(QueryPolicy::ReturnAlways) || {
297 (!self.is_bad_data() || policy.contains(QueryPolicy::ReturnBadDataErr))
298 && (!self.is_checksum_mismatch() || policy.contains(QueryPolicy::ReturnChecksumMismatch))
299 && (!self.is_expired() || policy.contains(QueryPolicy::ReturnExpired))
300 }
301 }
302
303 fn into_data(self, policy: FlagSet<QueryPolicy>) -> Result<T, QueryError> {
304 if self.should_return(policy) {
305 Ok(self.entry.map(|c| c.data)?)
306 } else {
307 Err(QueryError::Miss)
308 }
309 }
310
311 #[inline]
313 pub fn is_bad_data(&self) -> bool {
314 self.entry.is_err()
315 }
316
317 #[inline]
319 pub fn is_checksum_mismatch(&self) -> bool {
320 self.entry.as_ref().is_ok_and(|entry| {
321 self.query_checksum.is_some() && entry.checksum.as_ref() != self.query_checksum.as_ref()
322 })
323 }
324
325 #[inline]
327 pub fn is_expired(&self) -> bool {
328 self.entry.as_ref().is_ok_and(|entry| {
329 entry
330 .post_update_time
331 .elapsed()
332 .map_or(true, |d| d > self.query_ttl)
333 })
334 }
335}
336
337impl Cache {
338 pub fn query<'a, T, E>(&self, query: Query<'a, T, E>) -> Result<T, QueryError>
340 where
341 T: Serialize + for<'de> Deserialize<'de>,
342 E: Into<Box<dyn std::error::Error + Send + Sync>>,
343 {
344 let Query {
345 key,
346 checksum,
347 policy,
348 ttl,
349 initial_poll,
350 update_fn,
351 } = query;
352
353 let directory = self.directory.join(key);
354 let path = directory.join(DATA);
355
356 let policy = policy.unwrap_or(self.query_policy);
357 let ttl = ttl.unwrap_or(self.ttl);
358 let initial_poll = initial_poll.or(self.initial_poll).map(|d| {
359 let sleep = (d / 5).min(Duration::from_millis(100)).min(d);
360 (d, sleep)
361 });
362
363 let update_fn = update_fn.map(|f| {
364 let checksum = checksum.clone();
365 |prev_data| match update(&directory, &path, checksum, prev_data, f) {
366 Ok(true) => log::debug!("cache: updated {key}"),
367 Ok(false) => log::debug!("cache: another process updated {key}"),
368 Err(err) => log::error!(
369 "cache: failed to update {key}: {}",
370 detach::format_err(&err)
371 ),
372 }
373 });
374
375 match fs::read(&path) {
376 Ok(buf) => {
377 let prev = PrevEntry::build(&buf, checksum, ttl);
378 match update_fn {
379 Some(update_fn) if prev.should_update(policy) => {
380 detach::spawn_with(prev, |prev| update_fn(Some(prev)))?
381 }
382 _ => prev,
383 }
384 .into_data(policy)
385 }
386
387 Err(err) if err.kind() == io::ErrorKind::NotFound => {
388 if let Some(update_fn) = update_fn {
389 detach::spawn(|| update_fn(None))?;
390 }
391
392 if let Some((poll_duration, poll_sleep)) = initial_poll {
394 let start = Instant::now();
395 while Instant::now().duration_since(start) < poll_duration {
396 thread::sleep(poll_sleep);
397 match fs::read(&path) {
398 Ok(buf) => {
399 return PrevEntry::build(&buf, checksum, ttl).into_data(policy);
400 }
401 Err(err) if err.kind() == io::ErrorKind::NotFound => continue,
402 Err(err) => return Err(err.into()),
403 }
404 }
405 }
406
407 Err(QueryError::Miss)
408 }
409
410 Err(err) => Err(err.into()),
411 }
412 }
413}
414
415fn update<'d, 'f, T, E>(
416 directory: &Path,
417 path: &Path,
418 checksum: Option<String>,
419 prev_entry: Option<PrevEntry<T>>,
420 update_fn: UpdateFn<'f, T, E>,
421) -> Result<bool, UpdateError>
422where
423 T: Serialize + for<'de> Deserialize<'de>,
424 E: Into<Box<dyn std::error::Error + Send + Sync>>,
425{
426 fs::create_dir_all(directory)?;
427 let tmp = path.with_extension("tmp");
428
429 match fs::File::open(directory)?.try_lock() {
430 Ok(()) => {
431 let pre_update_time = SystemTime::now();
432 let data = update_fn(prev_entry).map_err(Into::into)?;
433 let post_update_time = SystemTime::now();
434 let file = fs::File::create(&tmp)?;
435 json::to_writer(
436 &file,
437 &Entry {
438 pre_update_time,
439 post_update_time,
440 checksum,
441 data,
442 },
443 )?;
444 fs::rename(tmp, path)?;
445 Ok(true)
446 }
447 Err(TryLockError::Error(err)) => Err(err.into()),
448 Err(TryLockError::WouldBlock) => Ok(false),
449 }
450}