use std::{collections::HashSet, ops::Deref as DerefTrait, sync::Arc};
use dashmap::DashMap;
use tokio::sync::{Mutex as TokioMutex, OnceCell, RwLock as TokioRwLock};
use crate::{
INIT_LOGGER_DOMAIN,
cache::{
GeneralCache, MetadataCache, RateLimiterCache,
metadata::MetadataPrefetcher,
},
config::core::Config,
core::backend::{constants::DISK_BACKEND_TYPE, upstream_proxy, webdav},
info_log,
util::path_rewriter::PathRewriter,
};
const PROBLEMATIC_CLIENTS: &[&str] =
&["yamby", "hills", "embytolocalplayer", "Emby/"];
pub struct AppState {
config: TokioRwLock<Config>,
frontend_path_rewrite_cache: OnceCell<Vec<PathRewriter>>,
problematic_clients_cache: OnceCell<Vec<String>>,
metadata_cache: OnceCell<MetadataCache>,
metadata_prefetcher: OnceCell<Arc<MetadataPrefetcher>>,
encrypt_cache: OnceCell<GeneralCache>,
decrypt_cache: OnceCell<GeneralCache>,
strm_file_cache: OnceCell<GeneralCache>,
forward_info_cache: OnceCell<GeneralCache>,
open_list_cache: OnceCell<GeneralCache>,
api_response_cache: OnceCell<GeneralCache>,
rate_limiter_cache: OnceCell<DashMap<String, RateLimiterCache>>,
pub(crate) webdav_auth_cache: DashMap<String, String>,
pub(crate) webdav_auth_probe_locks: DashMap<String, Arc<TokioMutex<()>>>,
}
impl AppState {
pub async fn new(config: Config) -> Self {
Self {
config: TokioRwLock::new(config),
frontend_path_rewrite_cache: OnceCell::new(),
problematic_clients_cache: OnceCell::new(),
metadata_cache: OnceCell::new(),
metadata_prefetcher: OnceCell::new(),
encrypt_cache: OnceCell::new(),
decrypt_cache: OnceCell::new(),
strm_file_cache: OnceCell::new(),
forward_info_cache: OnceCell::new(),
open_list_cache: OnceCell::new(),
api_response_cache: OnceCell::new(),
rate_limiter_cache: OnceCell::new(),
webdav_auth_cache: DashMap::new(),
webdav_auth_probe_locks: DashMap::new(),
}
}
pub async fn get_config(&self) -> impl DerefTrait<Target = Config> + '_ {
self.config.read().await
}
pub async fn get_cache_settings(&self) -> (u64, u64) {
let config = self.get_config().await;
match config.general.memory_mode.as_str() {
"low" => (256, 60 * 60 * 4),
"high" => (2048, 60 * 60 * 12),
_ => (512, 60 * 60 * 8),
}
}
pub async fn get_frontend_path_rewrite_cache(&self) -> &Vec<PathRewriter> {
let config = self.get_config().await;
self.frontend_path_rewrite_cache
.get_or_init(|| async move {
let frontend_config = match &config.frontend {
Some(config) => config,
None => return vec![],
};
frontend_config
.clone()
.path_rewrites
.into_iter()
.map(|path_rewrite| {
PathRewriter::new(
path_rewrite.enable,
&path_rewrite.pattern,
&path_rewrite.replacement,
)
})
.collect()
})
.await
}
pub async fn get_problematic_clients(&self) -> &Vec<String> {
let config = self.get_config().await;
self.problematic_clients_cache
.get_or_init(|| async move {
let mut clients: HashSet<String> = PROBLEMATIC_CLIENTS
.iter()
.map(|s| s.to_lowercase())
.collect();
if let Some(backend_config) = config.backend.as_ref() {
clients.extend(
backend_config
.problematic_clients
.iter()
.map(|s| s.to_lowercase()),
);
}
clients.into_iter().filter(|s| !s.is_empty()).collect()
})
.await
}
pub async fn get_metadata_cache(&self) -> &MetadataCache {
let (capacity, ttl) = self.get_cache_settings().await;
self.metadata_cache
.get_or_init(|| async move { MetadataCache::new(capacity, ttl) })
.await
}
pub async fn get_metadata_prefetcher(&self) -> &Arc<MetadataPrefetcher> {
self.metadata_prefetcher
.get_or_init(|| async {
let cache = Arc::new(self.get_metadata_cache().await.clone());
let prefetcher = Arc::new(MetadataPrefetcher::new(cache));
prefetcher.clone().start_prefetch_task();
prefetcher
})
.await
}
pub async fn get_encrypt_cache(&self) -> &GeneralCache {
let (capacity, ttl) = self.get_cache_settings().await;
self.encrypt_cache
.get_or_init(|| async move { GeneralCache::new(capacity, ttl) })
.await
}
pub async fn get_decrypt_cache(&self) -> &GeneralCache {
let (capacity, ttl) = self.get_cache_settings().await;
self.decrypt_cache
.get_or_init(|| async move { GeneralCache::new(capacity, ttl) })
.await
}
pub async fn get_strm_file_cache(&self) -> &GeneralCache {
let (capacity, ttl) = self.get_cache_settings().await;
self.strm_file_cache
.get_or_init(|| async move { GeneralCache::new(capacity, ttl) })
.await
}
pub async fn get_forward_info_cache(&self) -> &GeneralCache {
let (capacity, ttl) = self.get_cache_settings().await;
self.forward_info_cache
.get_or_init(|| async move { GeneralCache::new(capacity, ttl) })
.await
}
pub async fn get_open_list_cache(&self) -> &GeneralCache {
let (capacity, ttl) = self.get_cache_settings().await;
self.open_list_cache
.get_or_init(|| async move { GeneralCache::new(capacity, ttl) })
.await
}
pub async fn get_api_response_cache(&self) -> &GeneralCache {
self.api_response_cache
.get_or_init(|| async move {
let config = self.get_config().await;
let (max_capacity, default_ttl) =
match config.general.memory_mode.as_str() {
"low" => (2048, 60 * 60 * 2),
"high" => (8192, 60 * 60 * 4),
_ => (4096, 60 * 60 * 2),
};
GeneralCache::new(max_capacity, default_ttl)
})
.await
}
pub async fn get_rate_limiter_cache(
&self,
node_uuid: &str,
) -> Option<RateLimiterCache> {
let cache_map = self
.rate_limiter_cache
.get_or_init(|| async move {
let config = self.get_config().await;
let (capacity, ttl) = self.get_cache_settings().await;
let map = DashMap::new();
for node in &config.backend_nodes {
if !node
.backend_type
.eq_ignore_ascii_case(DISK_BACKEND_TYPE)
{
continue;
}
let cache = RateLimiterCache::new(
capacity * 2,
ttl,
node.client_speed_limit_kbs,
node.client_burst_speed_kbs,
);
cache.start_refill_task();
map.insert(node.uuid.clone(), cache);
}
map
})
.await;
cache_map.get(node_uuid).map(|r| r.value().clone())
}
pub async fn init_rate_limiters(&self) {
self.get_rate_limiter_cache("").await;
}
pub async fn warmup_webdav_connections(&self) {
let config = self.get_config().await;
let webdav_nodes: Vec<_> = config
.backend_nodes
.iter()
.filter(|node| {
node.backend_type.eq_ignore_ascii_case(webdav::BACKEND_TYPE)
})
.collect();
if webdav_nodes.is_empty() {
return;
}
info_log!(
INIT_LOGGER_DOMAIN,
"Warming up {} WebDAV connections...",
webdav_nodes.len()
);
let mut tasks = Vec::new();
for node in webdav_nodes {
if let Ok(uri) = node.base_url.parse() {
let task = tokio::spawn(async move {
let _ = upstream_proxy::warmup_connection(uri).await;
});
tasks.push(task);
}
}
for task in tasks {
let _ = task.await;
}
info_log!(INIT_LOGGER_DOMAIN, "WebDAV connection warmup completed");
}
}