use std::num::NonZeroU32;
use std::sync::Arc;
use governor::{Quota, RateLimiter};
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use nonzero_ext::nonzero;
use tokio::sync::Semaphore;
use crate::client::ImmichClient;
use crate::error::Result;
use crate::models::{
ConsolidationResult, ExecutionConfig, ExecutionReport, GroupResult, OperationResult,
};
use crate::scoring::DuplicateAnalysis;
type DirectRateLimiter = RateLimiter<
governor::state::NotKeyed,
governor::state::InMemoryState,
governor::clock::DefaultClock,
>;
pub struct Executor {
client: ImmichClient,
rate_limiter: DirectRateLimiter,
concurrency: Arc<Semaphore>,
config: ExecutionConfig,
}
impl Executor {
pub fn new(client: ImmichClient, config: ExecutionConfig) -> Self {
let quota = Quota::per_second(
NonZeroU32::new(config.requests_per_sec).unwrap_or(nonzero!(10u32)),
);
let rate_limiter = RateLimiter::direct(quota);
let concurrency = Arc::new(Semaphore::new(config.max_concurrent));
Self {
client,
rate_limiter,
concurrency,
config,
}
}
async fn rate_limited<F, T>(&self, op: F) -> Result<T>
where
F: std::future::Future<Output = Result<T>>,
{
self.rate_limiter.until_ready().await;
let _permit = self.concurrency.acquire().await.expect("semaphore closed");
op.await
}
pub async fn execute_all(&self, groups: &[DuplicateAnalysis]) -> ExecutionReport {
let mut report = ExecutionReport::new();
if groups.is_empty() {
return report;
}
let multi_progress = MultiProgress::new();
let overall_style = ProgressStyle::default_bar()
.template("[{elapsed_precise}] {bar:40.cyan/blue} {pos}/{len} groups ({eta})")
.expect("valid template")
.progress_chars("##-");
let overall_pb = multi_progress.add(ProgressBar::new(groups.len() as u64));
overall_pb.set_style(overall_style);
let group_style = ProgressStyle::default_bar()
.template(" {spinner:.green} {msg}")
.expect("valid template");
let group_pb = multi_progress.add(ProgressBar::new_spinner());
group_pb.set_style(group_style);
if let Err(e) = tokio::fs::create_dir_all(&self.config.backup_dir).await {
overall_pb.finish_with_message(format!("Failed to create backup directory: {}", e));
return report;
}
for analysis in groups {
group_pb.set_message(format!(
"Processing group {} ({} losers)",
analysis.duplicate_id,
analysis.losers.len()
));
let result = self.execute_group(analysis, &group_pb).await;
report.add_group_result(result);
overall_pb.inc(1);
}
overall_pb.finish_with_message("Complete");
group_pb.finish_and_clear();
report
}
pub async fn execute_group(
&self,
analysis: &DuplicateAnalysis,
pb: &ProgressBar,
) -> GroupResult {
let mut download_results = Vec::new();
pb.set_message("Checking metadata consolidation");
let consolidation_result = self.consolidate_metadata(analysis).await;
for loser in &analysis.losers {
pb.set_message(format!("Downloading {}", loser.filename));
let result = self.download_loser(&loser.asset_id, &loser.filename).await;
download_results.push(result);
}
let downloaded_ids: Vec<String> = download_results
.iter()
.filter_map(|r| match r {
OperationResult::Success { id, .. } => Some(id.clone()),
_ => None,
})
.collect();
let delete_result = if downloaded_ids.is_empty() {
Some(OperationResult::Skipped {
id: analysis.duplicate_id.clone(),
reason: "No assets were successfully downloaded".to_string(),
})
} else {
pb.set_message(format!("Deleting {} assets", downloaded_ids.len()));
match self.delete_assets(&downloaded_ids).await {
Ok(()) => Some(OperationResult::Success {
id: analysis.duplicate_id.clone(),
path: None,
}),
Err(e) => Some(OperationResult::Failed {
id: analysis.duplicate_id.clone(),
error: e.to_string(),
}),
}
};
GroupResult {
duplicate_id: analysis.duplicate_id.clone(),
winner_id: analysis.winner.asset_id.clone(),
consolidation_result,
download_results,
delete_result,
}
}
async fn consolidate_metadata(
&self,
analysis: &DuplicateAnalysis,
) -> Option<ConsolidationResult> {
let winner_asset = match self
.rate_limited(async { self.client.get_asset(&analysis.winner.asset_id).await })
.await
{
Ok(asset) => asset,
Err(_) => return None, };
let winner_exif = winner_asset.exif_info.as_ref();
let winner_has_gps = winner_exif.map(|e| e.has_gps()).unwrap_or(false);
let winner_has_datetime = winner_exif
.and_then(|e| e.date_time_original.as_ref())
.is_some();
let winner_has_description = winner_exif.and_then(|e| e.description.as_ref()).is_some();
if winner_has_gps && winner_has_datetime && winner_has_description {
return None;
}
let mut best_gps: Option<(f64, f64, String)> = None;
let mut best_datetime: Option<(String, String)> = None;
let mut best_description: Option<(String, String)> = None;
for loser in &analysis.losers {
let loser_asset = match self
.rate_limited(async { self.client.get_asset(&loser.asset_id).await })
.await
{
Ok(asset) => asset,
Err(_) => continue, };
if let Some(exif) = &loser_asset.exif_info {
if !winner_has_gps
&& best_gps.is_none()
&& exif.has_gps()
&& let (Some(lat), Some(lon)) = (exif.latitude, exif.longitude)
{
best_gps = Some((lat, lon, loser.asset_id.clone()));
}
if !winner_has_datetime
&& best_datetime.is_none()
&& let Some(dt) = &exif.date_time_original
{
best_datetime = Some((dt.clone(), loser.asset_id.clone()));
}
if !winner_has_description
&& best_description.is_none()
&& let Some(desc) = &exif.description
{
best_description = Some((desc.clone(), loser.asset_id.clone()));
}
}
if (winner_has_gps || best_gps.is_some())
&& (winner_has_datetime || best_datetime.is_some())
&& (winner_has_description || best_description.is_some())
{
break;
}
}
if best_gps.is_none() && best_datetime.is_none() && best_description.is_none() {
return None;
}
let (latitude, longitude) = match &best_gps {
Some((lat, lon, _)) => (Some(*lat), Some(*lon)),
None => (None, None),
};
let date_time_original = best_datetime.as_ref().map(|(dt, _)| dt.as_str());
let description = best_description.as_ref().map(|(desc, _)| desc.as_str());
let source_asset_id = best_gps
.as_ref()
.map(|(_, _, id)| id.clone())
.or_else(|| best_datetime.as_ref().map(|(_, id)| id.clone()))
.or_else(|| best_description.as_ref().map(|(_, id)| id.clone()));
let update_result = self
.rate_limited(async {
self.client
.update_asset_metadata(
&analysis.winner.asset_id,
latitude,
longitude,
date_time_original,
description,
)
.await
})
.await;
if update_result.is_ok() {
Some(ConsolidationResult {
gps_transferred: best_gps.is_some(),
datetime_transferred: best_datetime.is_some(),
description_transferred: best_description.is_some(),
source_asset_id,
})
} else {
None }
}
async fn download_loser(&self, asset_id: &str, filename: &str) -> OperationResult {
let safe_filename = format!("{}_{}", asset_id, filename);
let path = self.config.backup_dir.join(&safe_filename);
let download_result = self
.rate_limited(async { self.client.download_asset(asset_id, &path).await })
.await;
match download_result {
Ok(_bytes) => OperationResult::Success {
id: asset_id.to_string(),
path: Some(path),
},
Err(e) => OperationResult::Failed {
id: asset_id.to_string(),
error: e.to_string(),
},
}
}
async fn delete_assets(&self, asset_ids: &[String]) -> Result<()> {
self.rate_limited(async {
self.client
.delete_assets(asset_ids, self.config.force_delete)
.await
})
.await
}
}