use std::{
fs,
io::{Read, Write},
net::TcpStream,
path::{Path, PathBuf},
time::Duration,
};
use anyhow::{Context, bail};
use bytes::Bytes;
use iterators_extended::bucket::Bucket;
use reqwest::{StatusCode, blocking::Client};
use url::Url;
use winnow::{
Parser,
binary::{le_u8, length_repeat, length_take},
combinator::terminated,
token::take,
};
use crate::file_parsers::shared::winnow::WinnowParser;
pub struct CDNLoader {
base_url: Url,
cache_dir: PathBuf,
fallback_cache_dirs_old: Vec<PathBuf>,
fallback_cache_dirs_new: Vec<PathBuf>,
}
fn parse_patch_parts(filename: &str) -> anyhow::Result<Vec<u64>> {
filename
.split('.')
.map(|x| {
x.parse::<u64>()
.with_context(|| format!("Failed to parse filename part as u64: {x:?}"))
})
.collect()
}
fn get_fallback_cache_dirs(cache_dir: &Path) -> anyhow::Result<(Vec<PathBuf>, Vec<PathBuf>)> {
let current_patch = parse_patch_parts(cache_dir.file_name().unwrap().to_str().unwrap())?;
let parent = cache_dir.parent().unwrap();
let [mut old, mut new] = parent
.read_dir()
.map(|d| d.collect::<Vec<_>>())
.unwrap_or(vec![])
.into_iter()
.filter_map(Result::ok)
.filter_map(|p| {
let binding = p.file_name();
let filename = binding.to_str().unwrap();
parse_patch_parts(filename)
.ok()
.map(|patch| (patch, p.path()))
})
.filter(|(patch, _)| *patch != current_patch)
.bucket_arr(|(patch, _)| if *patch < current_patch { 0 } else { 1 });
old.sort_unstable_by_key(|(patch, _)| patch.clone());
old.reverse();
let old = old.into_iter().map(|(_, path)| path).collect();
new.sort_unstable_by_key(|(patch, _)| patch.clone());
let new = new.into_iter().map(|(_, path)| path).collect();
Ok((old, new))
}
impl CDNLoader {
pub fn new(base_url: &Url, cache_dir: &str) -> anyhow::Result<Self> {
let cache_dir = PathBuf::from(cache_dir).join(format!(
"{}{}",
base_url.domain().context("CDN URL has no domain")?,
base_url.path()
));
let (fallback_cache_dirs_old, fallback_cache_dirs_new) =
get_fallback_cache_dirs(&cache_dir)?;
Ok(Self {
base_url: base_url.clone(),
cache_dir,
fallback_cache_dirs_old,
fallback_cache_dirs_new,
})
}
fn get_fallbacks(&self, path_stub: &Path) -> impl Iterator<Item = (PathBuf, String)> {
[&self.fallback_cache_dirs_old, &self.fallback_cache_dirs_new]
.into_iter()
.flat_map(move |cache_dirs| {
cache_dirs
.iter()
.flat_map(|cache_dir| {
let file_path = cache_dir.join(path_stub);
let etag_path = file_path.with_added_extension("etag");
fs::read_to_string(&etag_path)
.ok()
.map(|etag| (file_path, etag))
})
.next()
})
}
pub fn load(&self, path_stub: &Path) -> anyhow::Result<Bytes> {
let url = self.base_url.join(
path_stub
.to_str()
.context("Failed to parse path as string")?,
)?;
let cache_path = self.cache_dir.join(path_stub);
if let Ok(bytes) = fs::read(&cache_path) {
log::debug!("Using cached bundle: {:?}", cache_path);
return Ok(Bytes::from(bytes));
}
let client = Client::builder()
.connect_timeout(Duration::from_secs(10))
.timeout(None)
.build()?;
let fallback =
self.get_fallbacks(path_stub)
.try_find(|(_, fallback_etag)| -> anyhow::Result<_> {
let resp = client
.head(url.clone())
.header("If-None-Match", fallback_etag.clone())
.send()?
.error_for_status()?;
let resp_etag = resp.headers().get("etag").context("no etag")?.to_str()?;
let is_match = resp.status() == StatusCode::NOT_MODIFIED
|| (resp.status() == StatusCode::OK && resp_etag == fallback_etag);
Ok(is_match)
})?;
let bytes = if let Some((fallback_path, _)) = fallback {
log::debug!(
"Using cached bundle from different patch: {:?}",
fallback_path
);
fs::create_dir_all(cache_path.parent().context("Failed to get path parent")?)?;
fs::copy(&fallback_path, &cache_path)?;
fs::copy(
fallback_path.with_added_extension("etag"),
cache_path.with_added_extension("etag"),
)?;
Bytes::from(fs::read(&cache_path)?)
} else {
log::info!("Downloading bundle: {}", url);
let resp = client.get(url).send()?;
let etag = resp
.headers()
.get("etag")
.context("no etag")?
.to_str()?
.to_owned();
let bytes = resp.bytes()?;
fs::create_dir_all(cache_path.parent().context("Failed to get path parent")?)?;
fs::write(&cache_path, &bytes)?;
fs::write(cache_path.with_added_extension("etag"), etag.as_bytes())?;
bytes
};
Ok(bytes)
}
pub async fn load_async(&self, path_stub: &Path) -> anyhow::Result<Bytes> {
let url = self.base_url.join(
path_stub
.to_str()
.context("Failed to parse path as string")?,
)?;
let client = reqwest::Client::builder()
.connect_timeout(Duration::from_secs(10))
.build()?;
let cache_path = self.cache_dir.join(path_stub);
if let Ok(bytes) = tokio::fs::read(&cache_path).await {
log::debug!("Using cached bundle: {:?}", cache_path);
return Ok(Bytes::from(bytes));
}
let mut fallbacks = self.get_fallbacks(path_stub);
let fallback = loop {
let Some((fallback_path, fallback_etag)) = fallbacks.next() else {
break None;
};
let resp = client
.head(url.clone())
.header("If-None-Match", fallback_etag.clone())
.send()
.await?
.error_for_status()?;
let resp_etag = resp.headers().get("etag").context("no etag")?.to_str()?;
if resp.status() == StatusCode::NOT_MODIFIED
|| (resp.status() == StatusCode::OK && resp_etag == fallback_etag)
{
break Some((fallback_path, fallback_etag));
}
};
let bytes = if let Some((fallback_path, _)) = fallback {
log::debug!(
"Using cached bundle from different patch: {:?}",
fallback_path
);
tokio::fs::create_dir_all(cache_path.parent().context("Failed to get path parent")?)
.await?;
tokio::fs::copy(&fallback_path, &cache_path).await?;
tokio::fs::copy(
fallback_path.with_added_extension("etag"),
cache_path.with_added_extension("etag"),
)
.await?;
Bytes::from(fs::read(&cache_path)?)
} else {
log::debug!("Downloading bundle: {}", url);
let resp = client.get(url).send().await?;
let etag = resp
.headers()
.get("etag")
.context("no etag")?
.to_str()?
.to_owned();
let bytes = resp.bytes().await?;
tokio::fs::create_dir_all(cache_path.parent().context("Failed to get path parent")?)
.await?;
tokio::fs::write(&cache_path, &bytes).await?;
tokio::fs::write(cache_path.with_added_extension("etag"), etag.as_bytes()).await?;
bytes
};
Ok(bytes)
}
}
pub fn cdn_base_url(cache_dir: &Path, version: &str) -> anyhow::Result<Url> {
let cache_dir = cache_dir.join("cdn_url");
let cache_file = cache_dir.join(version);
if cache_file.exists() && fs::metadata(&cache_file)?.modified()?.elapsed()?.as_secs() < 3600 {
let url = Url::parse(fs::read_to_string(&cache_file)?.as_str())
.with_context(|| "Failed to parse URL")?;
log::debug!("Using cached CDN URL: {}", url);
return Ok(url);
}
let url = match version {
"1" => cur_url("patch.pathofexile.com:12995".to_string(), &[1, 6]),
"2" => cur_url("patch.pathofexile2.com:13060".to_string(), &[1, 7]),
v if v.starts_with("3.") => Url::parse(format!("https://patch.poecdn.com/{}/", v).as_str())
.with_context(|| "Failed to parse URL"),
v if v.starts_with("4.") => {
Url::parse(format!("https://patch-poe2.poecdn.com/{}/", v).as_str())
.with_context(|| "Failed to parse URL")
}
_ => panic!("Invalid version provided"),
}
.with_context(|| format!("Failed to get URL for version: {}", version))?;
fs::create_dir_all(&cache_dir).context("Failed to create cache directory")?;
fs::write(&cache_file, url.as_str()).context("Failed to write URL to cache")?;
log::debug!("Refreshed CDN URL: {}", url);
Ok(url)
}
fn parse_response<'a>() -> impl WinnowParser<&'a [u8], Vec<String>> {
length_repeat(
terminated(le_u8, take(33_usize)), parse_utf16_string(),
)
}
fn parse_utf16_string<'a>() -> impl WinnowParser<&'a [u8], String> {
winnow::trace!(
"parse_utf16_string",
length_take(le_u8.map(|l| l * 2)).try_map(String::from_utf16le)
)
}
fn cur_url(host: String, send: &[u8]) -> anyhow::Result<Url> {
let mut stream = TcpStream::connect(host)?;
stream.write_all(send)?;
let mut buf = [0; 1024];
let read = stream.read(&mut buf)?;
let strings = if let Ok(strings) = parse_response().parse_next(&mut &buf[..read]) {
strings
} else {
bail!("Failed to parse URLs from CDN")
};
strings
.into_iter()
.map(|s| Url::parse(&s).expect("Failed to parse URL"))
.next()
.with_context(|| "No URLs returned from CDN")
}