mod query;
use std::error::Error as StdError;
use std::fs;
use std::fs::TryLockError;
use std::io;
use std::path::{Path, PathBuf};
use std::thread;
use std::time::{Duration, Instant, SystemTime};
use flagset::FlagSet;
use serde::{Deserialize, Serialize};
use serde_json as json;
use thiserror::Error;
use powerpack_detach as detach;
use powerpack_env as env;
pub use crate::query::{Query, QueryError, QueryPolicy};
const DATA: &str = "v2.json";
pub type UpdateFn<'f, T, E> = Box<dyn FnOnce(Option<PrevEntry<T>>) -> Result<T, E> + 'f>;
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum BuildError {
#[error("home directory not found")]
NoHomeDir,
}
#[derive(Debug, Error)]
#[non_exhaustive]
enum UpdateError {
#[error("io error")]
Io(#[from] io::Error),
#[error("serialization error")]
Serialize(#[from] json::Error),
#[error("update fn failed: {0}")]
UpdateFn(#[from] Box<dyn StdError + Send + Sync + 'static>),
}
#[derive(Debug, Clone)]
pub struct Builder {
directory: Option<PathBuf>,
query_policy: FlagSet<QueryPolicy>,
ttl: Duration,
initial_poll: Option<Duration>,
}
#[derive(Debug)]
pub struct Cache {
directory: PathBuf,
query_policy: FlagSet<QueryPolicy>,
ttl: Duration,
initial_poll: Option<Duration>,
}
#[derive(Debug)]
#[non_exhaustive]
pub struct PrevEntry<T> {
pub entry: Result<Entry<T>, json::Error>,
query_checksum: Option<String>,
query_ttl: Duration,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[non_exhaustive]
pub struct Entry<T> {
pub pre_update_time: SystemTime,
pub post_update_time: SystemTime,
pub checksum: Option<String>,
pub data: T,
}
impl Default for Builder {
#[inline]
fn default() -> Self {
Self::new()
}
}
impl Builder {
#[inline]
pub fn new() -> Self {
Builder {
directory: None,
query_policy: QueryPolicy::default_set(),
ttl: Duration::from_secs(60),
initial_poll: None,
}
}
#[inline]
pub fn directory(mut self, directory: impl Into<PathBuf>) -> Self {
self.directory = Some(directory.into());
self
}
pub fn policy(mut self, query_policy: impl Into<FlagSet<QueryPolicy>>) -> Self {
self.query_policy = query_policy.into();
self
}
#[inline]
pub fn ttl(mut self, ttl: Duration) -> Self {
self.ttl = ttl;
self
}
#[inline]
pub fn initial_poll(mut self, initial_poll: Duration) -> Self {
self.initial_poll = Some(initial_poll);
self
}
pub fn try_build(self) -> Result<Cache, BuildError> {
let Self {
directory,
query_policy,
ttl,
initial_poll,
} = self;
let directory = match directory {
Some(directory) => directory,
None => env::try_workflow_cache_or_default()
.ok_or(BuildError::NoHomeDir)?
.join("cache"),
};
Ok(Cache {
directory,
query_policy,
ttl,
initial_poll,
})
}
#[track_caller]
#[inline]
pub fn build(self) -> Cache {
self.try_build().expect("failed to build cache")
}
}
impl<T> PrevEntry<T> {
fn build(buf: &[u8], query_checksum: Option<String>, query_ttl: Duration) -> Self
where
T: for<'de> Deserialize<'de>,
{
let entry: Result<Entry<T>, _> = json::from_slice(buf);
Self {
entry,
query_checksum,
query_ttl,
}
}
fn should_update(&self, policy: FlagSet<QueryPolicy>) -> bool {
policy.contains(QueryPolicy::UpdateAlways)
|| self.is_bad_data() && policy.contains(QueryPolicy::UpdateBadData)
|| self.is_checksum_mismatch() && policy.contains(QueryPolicy::UpdateChecksumMismatch)
|| self.is_expired() && policy.contains(QueryPolicy::UpdateExpired)
}
#[rustfmt::skip]
fn should_return(&self, policy: FlagSet<QueryPolicy>) -> bool {
policy.contains(QueryPolicy::ReturnAlways) || {
(!self.is_bad_data() || policy.contains(QueryPolicy::ReturnBadDataErr))
&& (!self.is_checksum_mismatch() || policy.contains(QueryPolicy::ReturnChecksumMismatch))
&& (!self.is_expired() || policy.contains(QueryPolicy::ReturnExpired))
}
}
fn into_data(self, policy: FlagSet<QueryPolicy>) -> Result<T, QueryError> {
if self.should_return(policy) {
Ok(self.entry.map(|c| c.data)?)
} else {
Err(QueryError::Miss)
}
}
#[inline]
pub fn is_bad_data(&self) -> bool {
self.entry.is_err()
}
#[inline]
pub fn is_checksum_mismatch(&self) -> bool {
self.entry.as_ref().is_ok_and(|entry| {
self.query_checksum.is_some() && entry.checksum.as_ref() != self.query_checksum.as_ref()
})
}
#[inline]
pub fn is_expired(&self) -> bool {
self.entry.as_ref().is_ok_and(|entry| {
entry
.post_update_time
.elapsed()
.map_or(true, |d| d > self.query_ttl)
})
}
}
impl Cache {
pub fn query<'a, T, E>(&self, query: Query<'a, T, E>) -> Result<T, QueryError>
where
T: Serialize + for<'de> Deserialize<'de>,
E: Into<Box<dyn std::error::Error + Send + Sync>>,
{
let Query {
key,
checksum,
policy,
ttl,
initial_poll,
update_fn,
} = query;
let directory = self.directory.join(key);
let path = directory.join(DATA);
let policy = policy.unwrap_or(self.query_policy);
let ttl = ttl.unwrap_or(self.ttl);
let initial_poll = initial_poll.or(self.initial_poll).map(|d| {
let sleep = (d / 5).min(Duration::from_millis(100)).min(d);
(d, sleep)
});
let update_fn = update_fn.map(|f| {
let checksum = checksum.clone();
|prev_data| match update(&directory, &path, checksum, prev_data, f) {
Ok(true) => log::debug!("cache: updated {key}"),
Ok(false) => log::debug!("cache: another process updated {key}"),
Err(err) => log::error!(
"cache: failed to update {key}: {}",
detach::format_err(&err)
),
}
});
match fs::read(&path) {
Ok(buf) => {
let prev = PrevEntry::build(&buf, checksum, ttl);
match update_fn {
Some(update_fn) if prev.should_update(policy) => {
detach::spawn_with(prev, |prev| update_fn(Some(prev)))?
}
_ => prev,
}
.into_data(policy)
}
Err(err) if err.kind() == io::ErrorKind::NotFound => {
if let Some(update_fn) = update_fn {
detach::spawn(|| update_fn(None))?;
}
if let Some((poll_duration, poll_sleep)) = initial_poll {
let start = Instant::now();
while Instant::now().duration_since(start) < poll_duration {
thread::sleep(poll_sleep);
match fs::read(&path) {
Ok(buf) => {
return PrevEntry::build(&buf, checksum, ttl).into_data(policy);
}
Err(err) if err.kind() == io::ErrorKind::NotFound => continue,
Err(err) => return Err(err.into()),
}
}
}
Err(QueryError::Miss)
}
Err(err) => Err(err.into()),
}
}
}
fn update<'d, 'f, T, E>(
directory: &Path,
path: &Path,
checksum: Option<String>,
prev_entry: Option<PrevEntry<T>>,
update_fn: UpdateFn<'f, T, E>,
) -> Result<bool, UpdateError>
where
T: Serialize + for<'de> Deserialize<'de>,
E: Into<Box<dyn std::error::Error + Send + Sync>>,
{
fs::create_dir_all(directory)?;
let tmp = path.with_extension("tmp");
match fs::File::open(directory)?.try_lock() {
Ok(()) => {
let pre_update_time = SystemTime::now();
let data = update_fn(prev_entry).map_err(Into::into)?;
let post_update_time = SystemTime::now();
let file = fs::File::create(&tmp)?;
json::to_writer(
&file,
&Entry {
pre_update_time,
post_update_time,
checksum,
data,
},
)?;
fs::rename(tmp, path)?;
Ok(true)
}
Err(TryLockError::Error(err)) => Err(err.into()),
Err(TryLockError::WouldBlock) => Ok(false),
}
}