1use std::num::NonZeroU32;
8use std::sync::Arc;
9
10use governor::{Quota, RateLimiter};
11use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
12use nonzero_ext::nonzero;
13use tokio::sync::Semaphore;
14
15use crate::client::ImmichClient;
16use crate::error::Result;
17use crate::models::{
18 ConsolidationResult, ExecutionConfig, ExecutionReport, GroupResult, OperationResult,
19};
20use crate::scoring::DuplicateAnalysis;
21
22type DirectRateLimiter = RateLimiter<
24 governor::state::NotKeyed,
25 governor::state::InMemoryState,
26 governor::clock::DefaultClock,
27>;
28
29pub struct Executor {
52 client: ImmichClient,
54
55 rate_limiter: DirectRateLimiter,
57
58 concurrency: Arc<Semaphore>,
60
61 config: ExecutionConfig,
63}
64
65impl Executor {
66 pub fn new(client: ImmichClient, config: ExecutionConfig) -> Self {
73 let quota = Quota::per_second(
75 NonZeroU32::new(config.requests_per_sec).unwrap_or(nonzero!(10u32)),
76 );
77 let rate_limiter = RateLimiter::direct(quota);
78
79 let concurrency = Arc::new(Semaphore::new(config.max_concurrent));
81
82 Self {
83 client,
84 rate_limiter,
85 concurrency,
86 config,
87 }
88 }
89
90 async fn rate_limited<F, T>(&self, op: F) -> Result<T>
94 where
95 F: std::future::Future<Output = Result<T>>,
96 {
97 self.rate_limiter.until_ready().await;
99
100 let _permit = self.concurrency.acquire().await.expect("semaphore closed");
102
103 op.await
105 }
106
107 pub async fn execute_all(&self, groups: &[DuplicateAnalysis]) -> ExecutionReport {
120 let mut report = ExecutionReport::new();
121
122 if groups.is_empty() {
123 return report;
124 }
125
126 let multi_progress = MultiProgress::new();
128
129 let overall_style = ProgressStyle::default_bar()
131 .template("[{elapsed_precise}] {bar:40.cyan/blue} {pos}/{len} groups ({eta})")
132 .expect("valid template")
133 .progress_chars("##-");
134
135 let overall_pb = multi_progress.add(ProgressBar::new(groups.len() as u64));
136 overall_pb.set_style(overall_style);
137
138 let group_style = ProgressStyle::default_bar()
140 .template(" {spinner:.green} {msg}")
141 .expect("valid template");
142
143 let group_pb = multi_progress.add(ProgressBar::new_spinner());
144 group_pb.set_style(group_style);
145
146 if let Err(e) = tokio::fs::create_dir_all(&self.config.backup_dir).await {
148 overall_pb.finish_with_message(format!("Failed to create backup directory: {}", e));
149 return report;
150 }
151
152 for analysis in groups {
154 group_pb.set_message(format!(
155 "Processing group {} ({} losers)",
156 analysis.duplicate_id,
157 analysis.losers.len()
158 ));
159
160 let result = self.execute_group(analysis, &group_pb).await;
161 report.add_group_result(result);
162
163 overall_pb.inc(1);
164 }
165
166 overall_pb.finish_with_message("Complete");
167 group_pb.finish_and_clear();
168
169 report
170 }
171
172 pub async fn execute_group(
187 &self,
188 analysis: &DuplicateAnalysis,
189 pb: &ProgressBar,
190 ) -> GroupResult {
191 let mut download_results = Vec::new();
192
193 pb.set_message("Checking metadata consolidation");
195 let consolidation_result = self.consolidate_metadata(analysis).await;
196
197 for loser in &analysis.losers {
199 pb.set_message(format!("Downloading {}", loser.filename));
200
201 let result = self.download_loser(&loser.asset_id, &loser.filename).await;
202 download_results.push(result);
203 }
204
205 let downloaded_ids: Vec<String> = download_results
207 .iter()
208 .filter_map(|r| match r {
209 OperationResult::Success { id, .. } => Some(id.clone()),
210 _ => None,
211 })
212 .collect();
213
214 let delete_result = if downloaded_ids.is_empty() {
216 Some(OperationResult::Skipped {
217 id: analysis.duplicate_id.clone(),
218 reason: "No assets were successfully downloaded".to_string(),
219 })
220 } else {
221 pb.set_message(format!("Deleting {} assets", downloaded_ids.len()));
222
223 match self.delete_assets(&downloaded_ids).await {
224 Ok(()) => Some(OperationResult::Success {
225 id: analysis.duplicate_id.clone(),
226 path: None,
227 }),
228 Err(e) => Some(OperationResult::Failed {
229 id: analysis.duplicate_id.clone(),
230 error: e.to_string(),
231 }),
232 }
233 };
234
235 GroupResult {
236 duplicate_id: analysis.duplicate_id.clone(),
237 winner_id: analysis.winner.asset_id.clone(),
238 consolidation_result,
239 download_results,
240 delete_result,
241 }
242 }
243
244 async fn consolidate_metadata(
249 &self,
250 analysis: &DuplicateAnalysis,
251 ) -> Option<ConsolidationResult> {
252 let winner_asset = match self
254 .rate_limited(async { self.client.get_asset(&analysis.winner.asset_id).await })
255 .await
256 {
257 Ok(asset) => asset,
258 Err(_) => return None, };
260
261 let winner_exif = winner_asset.exif_info.as_ref();
262 let winner_has_gps = winner_exif.map(|e| e.has_gps()).unwrap_or(false);
263 let winner_has_datetime = winner_exif
264 .and_then(|e| e.date_time_original.as_ref())
265 .is_some();
266 let winner_has_description = winner_exif.and_then(|e| e.description.as_ref()).is_some();
267
268 if winner_has_gps && winner_has_datetime && winner_has_description {
270 return None;
271 }
272
273 let mut best_gps: Option<(f64, f64, String)> = None;
275 let mut best_datetime: Option<(String, String)> = None;
276 let mut best_description: Option<(String, String)> = None;
277
278 for loser in &analysis.losers {
279 let loser_asset = match self
280 .rate_limited(async { self.client.get_asset(&loser.asset_id).await })
281 .await
282 {
283 Ok(asset) => asset,
284 Err(_) => continue, };
286
287 if let Some(exif) = &loser_asset.exif_info {
288 if !winner_has_gps
290 && best_gps.is_none()
291 && exif.has_gps()
292 && let (Some(lat), Some(lon)) = (exif.latitude, exif.longitude)
293 {
294 best_gps = Some((lat, lon, loser.asset_id.clone()));
295 }
296
297 if !winner_has_datetime
299 && best_datetime.is_none()
300 && let Some(dt) = &exif.date_time_original
301 {
302 best_datetime = Some((dt.clone(), loser.asset_id.clone()));
303 }
304
305 if !winner_has_description
307 && best_description.is_none()
308 && let Some(desc) = &exif.description
309 {
310 best_description = Some((desc.clone(), loser.asset_id.clone()));
311 }
312 }
313
314 if (winner_has_gps || best_gps.is_some())
316 && (winner_has_datetime || best_datetime.is_some())
317 && (winner_has_description || best_description.is_some())
318 {
319 break;
320 }
321 }
322
323 if best_gps.is_none() && best_datetime.is_none() && best_description.is_none() {
325 return None;
326 }
327
328 let (latitude, longitude) = match &best_gps {
330 Some((lat, lon, _)) => (Some(*lat), Some(*lon)),
331 None => (None, None),
332 };
333 let date_time_original = best_datetime.as_ref().map(|(dt, _)| dt.as_str());
334 let description = best_description.as_ref().map(|(desc, _)| desc.as_str());
335
336 let source_asset_id = best_gps
338 .as_ref()
339 .map(|(_, _, id)| id.clone())
340 .or_else(|| best_datetime.as_ref().map(|(_, id)| id.clone()))
341 .or_else(|| best_description.as_ref().map(|(_, id)| id.clone()));
342
343 let update_result = self
345 .rate_limited(async {
346 self.client
347 .update_asset_metadata(
348 &analysis.winner.asset_id,
349 latitude,
350 longitude,
351 date_time_original,
352 description,
353 )
354 .await
355 })
356 .await;
357
358 if update_result.is_ok() {
359 Some(ConsolidationResult {
360 gps_transferred: best_gps.is_some(),
361 datetime_transferred: best_datetime.is_some(),
362 description_transferred: best_description.is_some(),
363 source_asset_id,
364 })
365 } else {
366 None }
368 }
369
370 async fn download_loser(&self, asset_id: &str, filename: &str) -> OperationResult {
374 let safe_filename = format!("{}_{}", asset_id, filename);
376 let path = self.config.backup_dir.join(&safe_filename);
377
378 let download_result = self
379 .rate_limited(async { self.client.download_asset(asset_id, &path).await })
380 .await;
381
382 match download_result {
383 Ok(_bytes) => OperationResult::Success {
384 id: asset_id.to_string(),
385 path: Some(path),
386 },
387 Err(e) => OperationResult::Failed {
388 id: asset_id.to_string(),
389 error: e.to_string(),
390 },
391 }
392 }
393
394 async fn delete_assets(&self, asset_ids: &[String]) -> Result<()> {
396 self.rate_limited(async {
397 self.client
398 .delete_assets(asset_ids, self.config.force_delete)
399 .await
400 })
401 .await
402 }
403}