use crate::services::putio;
use anyhow::Result;
use log::{debug, error, info, warn};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
const CONFIG_KEY: &str = "putioarr_transfers";
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TransferState {
pub hash: String,
pub source_category: String,
pub download_dir: String,
}
#[derive(Debug, Clone)]
pub struct OrphanFile {
pub file_id: i64,
pub name: String,
pub hash: String,
pub size: i64,
pub download_dir: String,
}
#[derive(Clone)]
pub struct StateManager {
api_token: String,
transfers: Arc<RwLock<HashMap<String, TransferState>>>,
local_complete: Arc<RwLock<HashSet<u64>>>,
file_names: Arc<RwLock<HashMap<i64, String>>>,
failed_names: Arc<RwLock<HashMap<i64, Instant>>>,
orphans: Arc<RwLock<HashMap<i64, OrphanFile>>>,
arr_error_logged: Arc<RwLock<HashMap<String, Instant>>>,
}
impl StateManager {
pub fn new(api_token: String) -> Self {
Self {
api_token,
transfers: Arc::new(RwLock::new(HashMap::new())),
local_complete: Arc::new(RwLock::new(HashSet::new())),
file_names: Arc::new(RwLock::new(HashMap::new())),
failed_names: Arc::new(RwLock::new(HashMap::new())),
orphans: Arc::new(RwLock::new(HashMap::new())),
arr_error_logged: Arc::new(RwLock::new(HashMap::new())),
}
}
pub const ARR_ERROR_LOG_INTERVAL: Duration = Duration::from_secs(300);
pub async fn should_log_arr_error(&self, app: &str) -> bool {
let mut map = self.arr_error_logged.write().await;
let now = Instant::now();
match map.get(app) {
Some(at) if now.saturating_duration_since(*at) < Self::ARR_ERROR_LOG_INTERVAL => false,
_ => {
map.insert(app.to_string(), now);
true
}
}
}
pub async fn add_orphan(&self, orphan: OrphanFile) {
self.orphans.write().await.insert(orphan.file_id, orphan);
}
pub async fn has_orphan(&self, file_id: i64) -> bool {
self.orphans.read().await.contains_key(&file_id)
}
pub async fn remove_orphan(&self, file_id: i64) {
self.orphans.write().await.remove(&file_id);
}
pub async fn orphans(&self) -> Vec<OrphanFile> {
self.orphans.read().await.values().cloned().collect()
}
const NAME_FAILURE_TTL: Duration = Duration::from_secs(600);
pub async fn get_file_name(&self, file_id: i64) -> Option<String> {
self.file_names.read().await.get(&file_id).cloned()
}
pub async fn set_file_name(&self, file_id: i64, name: String) {
self.file_names.write().await.insert(file_id, name);
self.failed_names.write().await.remove(&file_id);
}
pub async fn mark_name_failed(&self, file_id: i64) {
self.failed_names.write().await.insert(file_id, Instant::now());
}
pub async fn name_lookup_suppressed(&self, file_id: i64) -> bool {
match self.failed_names.read().await.get(&file_id) {
Some(at) => at.elapsed() < Self::NAME_FAILURE_TTL,
None => false,
}
}
pub async fn retain_file_names(&self, keep: &HashSet<i64>) {
self.file_names.write().await.retain(|id, _| keep.contains(id));
self.failed_names.write().await.retain(|id, _| keep.contains(id));
}
pub async fn mark_local_complete(&self, id: u64) {
self.local_complete.write().await.insert(id);
}
pub async fn is_local_complete(&self, id: u64) -> bool {
self.local_complete.read().await.contains(&id)
}
pub async fn clear_local_complete(&self, id: u64) {
self.local_complete.write().await.remove(&id);
}
pub async fn load(&self) -> Result<()> {
match putio::get_config_value::<HashMap<String, TransferState>>(&self.api_token, CONFIG_KEY)
.await
{
Ok(Some(map)) => {
let count = map.len();
*self.transfers.write().await = map;
info!("state: loaded {} transfer(s) from put.io config", count);
}
Ok(None) => debug!("state: no persisted state found in put.io config"),
Err(e) => warn!("state: failed to load persisted state from put.io: {}", e),
}
Ok(())
}
async fn persist(&self) {
let map = self.transfers.read().await.clone();
if let Err(e) = putio::set_config_value(&self.api_token, CONFIG_KEY, &map).await {
error!("state: failed to persist state to put.io: {}", e);
}
}
pub async fn add_transfer(
&self,
hash: String,
category: String,
download_dir: String,
) -> Result<()> {
let key = hash.to_lowercase();
debug!(
"state: add_transfer hash={} category={} dir={}",
key, category, download_dir
);
{
let mut transfers = self.transfers.write().await;
transfers.insert(
key.clone(),
TransferState {
hash: key,
source_category: category,
download_dir,
},
);
}
self.persist().await;
Ok(())
}
pub async fn get_transfer(&self, hash: &str) -> Option<TransferState> {
let transfers = self.transfers.read().await;
transfers.get(&hash.to_lowercase()).cloned()
}
pub async fn remove_transfer(&self, hash: &str) -> Result<()> {
{
let mut transfers = self.transfers.write().await;
transfers.remove(&hash.to_lowercase());
}
self.persist().await;
Ok(())
}
pub async fn get_download_dir_for_transfer(&self, hash: &str, default_dir: &str) -> String {
if let Some(state) = self.get_transfer(hash).await {
state.download_dir
} else {
debug!(
"state: no entry for hash={} (using default dir {})",
hash, default_dir
);
default_dir.to_string()
}
}
}