jsona_util/schema/
fetcher.rs1use arc_swap::ArcSwap;
2use std::sync::Arc;
3use tokio::sync::Semaphore;
4use url::Url;
5
6use crate::environment::Environment;
7
8#[derive(Clone)]
9pub struct Fetcher<E: Environment> {
10 env: E,
11 cache_path: Arc<ArcSwap<Option<Url>>>,
12 concurrent_requests: Arc<Semaphore>,
13}
14
15impl<E: Environment> Fetcher<E> {
16 pub fn new(env: E) -> Self {
17 Self {
18 env,
19 cache_path: Default::default(),
20 concurrent_requests: Arc::new(Semaphore::new(10)),
21 }
22 }
23
24 pub fn set_cache_path(&self, path: Option<Url>) {
25 self.cache_path.swap(Arc::new(path));
26 }
27
28 #[tracing::instrument(skip_all, fields(%url))]
29 pub async fn fetch(&self, url: &Url) -> Result<Vec<u8>, anyhow::Error> {
30 let data: Vec<u8> = match url.scheme() {
31 "http" | "https" => self.fetch_file(url).await?,
32 _ => self.env.read_file(url).await?,
33 };
34 Ok(data)
35 }
36
37 async fn fetch_file(&self, url: &Url) -> Result<Vec<u8>, anyhow::Error> {
38 let _permit = self.concurrent_requests.acquire().await?;
39 if let Some(cache_root) = &**self.cache_path.load() {
40 let cache_name = format!("{:x}", md5::compute(url.to_string().as_bytes()));
41 let cache_path = cache_root.join(&cache_name)?;
42 if let Ok(data) = self.env.read_file(&cache_path).await {
43 tracing::debug!("fetch file from cache {}", cache_path);
44 return Ok(data);
45 }
46 if let Ok(data) = self.env.fetch_file(url).await {
47 tracing::debug!("fetch file from remote");
48 if let Err(err) = self.env.write_file(&cache_path, &data).await {
49 tracing::warn!("failed to cache file {}, {}", cache_path, err);
50 }
51 return Ok(data);
52 }
53 }
54 self.env.fetch_file(url).await
55 }
56}