jsona_util/schema/
fetcher.rs

1use arc_swap::ArcSwap;
2use std::sync::Arc;
3use tokio::sync::Semaphore;
4use url::Url;
5
6use crate::environment::Environment;
7
8#[derive(Clone)]
9pub struct Fetcher<E: Environment> {
10    env: E,
11    cache_path: Arc<ArcSwap<Option<Url>>>,
12    concurrent_requests: Arc<Semaphore>,
13}
14
15impl<E: Environment> Fetcher<E> {
16    pub fn new(env: E) -> Self {
17        Self {
18            env,
19            cache_path: Default::default(),
20            concurrent_requests: Arc::new(Semaphore::new(10)),
21        }
22    }
23
24    pub fn set_cache_path(&self, path: Option<Url>) {
25        self.cache_path.swap(Arc::new(path));
26    }
27
28    #[tracing::instrument(skip_all, fields(%url))]
29    pub async fn fetch(&self, url: &Url) -> Result<Vec<u8>, anyhow::Error> {
30        let data: Vec<u8> = match url.scheme() {
31            "http" | "https" => self.fetch_file(url).await?,
32            _ => self.env.read_file(url).await?,
33        };
34        Ok(data)
35    }
36
37    async fn fetch_file(&self, url: &Url) -> Result<Vec<u8>, anyhow::Error> {
38        let _permit = self.concurrent_requests.acquire().await?;
39        if let Some(cache_root) = &**self.cache_path.load() {
40            let cache_name = format!("{:x}", md5::compute(url.to_string().as_bytes()));
41            let cache_path = cache_root.join(&cache_name)?;
42            if let Ok(data) = self.env.read_file(&cache_path).await {
43                tracing::debug!("fetch file from cache {}", cache_path);
44                return Ok(data);
45            }
46            if let Ok(data) = self.env.fetch_file(url).await {
47                tracing::debug!("fetch file from remote");
48                if let Err(err) = self.env.write_file(&cache_path, &data).await {
49                    tracing::warn!("failed to cache file {}, {}", cache_path, err);
50                }
51                return Ok(data);
52            }
53        }
54        self.env.fetch_file(url).await
55    }
56}