use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::sync::mpsc;
use par_term_config::{ConflictResolution, DynamicProfileSource};
use crate::profile::dynamic::fetch::fetch_profiles;
#[derive(Debug, Clone)]
pub struct DynamicProfileUpdate {
pub url: String,
pub profiles: Vec<par_term_config::Profile>,
pub conflict_resolution: ConflictResolution,
pub error: Option<String>,
}
#[derive(Debug, Clone)]
pub struct SourceStatus {
pub url: String,
pub enabled: bool,
pub last_fetch: Option<SystemTime>,
pub last_error: Option<String>,
pub profile_count: usize,
pub fetching: bool,
}
pub struct DynamicProfileManager {
pub update_rx: mpsc::UnboundedReceiver<DynamicProfileUpdate>,
update_tx: mpsc::UnboundedSender<DynamicProfileUpdate>,
pub statuses: HashMap<String, SourceStatus>,
task_handles: Vec<tokio::task::JoinHandle<()>>,
}
impl DynamicProfileManager {
pub fn new() -> Self {
let (update_tx, update_rx) = mpsc::unbounded_channel();
Self {
update_rx,
update_tx,
statuses: HashMap::new(),
task_handles: Vec::new(),
}
}
pub fn start(
&mut self,
sources: &[DynamicProfileSource],
runtime: &Arc<tokio::runtime::Runtime>,
) {
use crate::profile::dynamic::cache::read_cache;
self.stop();
for source in sources {
if !source.enabled || source.url.is_empty() {
continue;
}
self.statuses.insert(
source.url.clone(),
SourceStatus {
url: source.url.clone(),
enabled: source.enabled,
last_fetch: None,
last_error: None,
profile_count: 0,
fetching: false,
},
);
if let Ok((profiles, meta)) = read_cache(&source.url) {
let update = DynamicProfileUpdate {
url: source.url.clone(),
profiles,
conflict_resolution: source.conflict_resolution.clone(),
error: None,
};
let _ = self.update_tx.send(update);
if let Some(status) = self.statuses.get_mut(&source.url) {
status.last_fetch = Some(meta.last_fetched);
status.profile_count = meta.profile_count;
}
}
let tx = self.update_tx.clone();
let source_clone = source.clone();
let url_for_log = source.url.clone();
let handle = runtime.spawn(async move {
let src = source_clone.clone();
let conflict = source_clone.conflict_resolution.clone();
match tokio::task::spawn_blocking(move || fetch_profiles(&src)).await {
Ok(result) => {
if tx
.send(DynamicProfileUpdate {
url: result.url.clone(),
profiles: result.profiles,
conflict_resolution: conflict,
error: result.error,
})
.is_err()
{
return; }
}
Err(e) => {
log::error!(
"Dynamic profile fetch task panicked for {}: {}",
url_for_log,
e
);
}
}
let mut interval =
tokio::time::interval(Duration::from_secs(source_clone.refresh_interval_secs));
interval.tick().await; loop {
interval.tick().await;
let src = source_clone.clone();
let source_clone2 = source_clone.clone();
let tx_clone = tx.clone();
match tokio::task::spawn_blocking(move || fetch_profiles(&src)).await {
Ok(result) => {
if tx_clone
.send(DynamicProfileUpdate {
url: result.url.clone(),
profiles: result.profiles,
conflict_resolution: source_clone2.conflict_resolution.clone(),
error: result.error,
})
.is_err()
{
break; }
}
Err(e) => {
log::error!(
"Dynamic profile fetch task panicked for {}: {}",
url_for_log,
e
);
}
}
}
});
self.task_handles.push(handle);
if let Some(status) = self.statuses.get_mut(&source.url) {
status.fetching = true;
}
}
}
pub fn stop(&mut self) {
for handle in self.task_handles.drain(..) {
handle.abort();
}
}
pub fn refresh_all(
&mut self,
sources: &[DynamicProfileSource],
runtime: &Arc<tokio::runtime::Runtime>,
) {
for source in sources {
if !source.enabled || source.url.is_empty() {
continue;
}
self.refresh_source(source, runtime);
}
}
pub fn refresh_source(
&mut self,
source: &DynamicProfileSource,
runtime: &Arc<tokio::runtime::Runtime>,
) {
let tx = self.update_tx.clone();
let source_clone = source.clone();
let url_for_log = source.url.clone();
runtime.spawn(async move {
let conflict = source_clone.conflict_resolution.clone();
match tokio::task::spawn_blocking(move || fetch_profiles(&source_clone)).await {
Ok(result) => {
let _ = tx.send(DynamicProfileUpdate {
url: result.url.clone(),
profiles: result.profiles,
conflict_resolution: conflict,
error: result.error,
});
}
Err(e) => {
log::error!(
"Dynamic profile fetch task panicked for {}: {}",
url_for_log,
e
);
}
}
});
if let Some(status) = self.statuses.get_mut(&source.url) {
status.fetching = true;
}
}
pub fn try_recv(&mut self) -> Option<DynamicProfileUpdate> {
self.update_rx.try_recv().ok()
}
pub fn update_status(&mut self, update: &DynamicProfileUpdate) {
if let Some(status) = self.statuses.get_mut(&update.url) {
status.fetching = false;
status.last_error = update.error.clone();
if update.error.is_none() {
status.last_fetch = Some(SystemTime::now());
status.profile_count = update.profiles.len();
}
}
}
}
impl Default for DynamicProfileManager {
fn default() -> Self {
Self::new()
}
}
impl Drop for DynamicProfileManager {
fn drop(&mut self) {
self.stop();
}
}