Skip to main content

immich_lib/
executor.rs

1//! Execution pipeline for duplicate processing.
2//!
3//! This module provides the `Executor` struct which handles rate-limited,
4//! concurrent execution of duplicate processing operations including
5//! downloading backups and deleting duplicates.
6
7use std::num::NonZeroU32;
8use std::sync::Arc;
9
10use governor::{Quota, RateLimiter};
11use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
12use nonzero_ext::nonzero;
13use tokio::sync::Semaphore;
14
15use crate::client::ImmichClient;
16use crate::error::Result;
17use crate::models::{
18    ConsolidationResult, ExecutionConfig, ExecutionReport, GroupResult, OperationResult,
19};
20use crate::scoring::DuplicateAnalysis;
21
22/// Type alias for the governor rate limiter.
23type DirectRateLimiter = RateLimiter<
24    governor::state::NotKeyed,
25    governor::state::InMemoryState,
26    governor::clock::DefaultClock,
27>;
28
29/// Executor for duplicate processing operations.
30///
31/// Handles rate-limited, concurrent execution of the duplicate processing pipeline:
32/// 1. Download backup copies of loser assets
33/// 2. Delete successfully downloaded assets
34///
35/// # Example
36///
37/// ```no_run
38/// use immich_lib::{ImmichClient, Executor};
39/// use immich_lib::models::ExecutionConfig;
40///
41/// # async fn example() -> immich_lib::Result<()> {
42/// let client = ImmichClient::new("https://immich.example.com", "api-key")?;
43/// let config = ExecutionConfig::default();
44/// let executor = Executor::new(client, config);
45///
46/// // Execute analysis results
47/// // let report = executor.execute_all(&analyses).await;
48/// # Ok(())
49/// # }
50/// ```
51pub struct Executor {
52    /// The Immich API client
53    client: ImmichClient,
54
55    /// Rate limiter for API requests
56    rate_limiter: DirectRateLimiter,
57
58    /// Semaphore for concurrent operation control
59    concurrency: Arc<Semaphore>,
60
61    /// Execution configuration
62    config: ExecutionConfig,
63}
64
65impl Executor {
66    /// Create a new executor with the given client and configuration.
67    ///
68    /// # Arguments
69    ///
70    /// * `client` - The Immich API client to use for operations
71    /// * `config` - Execution configuration (rate limits, concurrency, backup dir)
72    pub fn new(client: ImmichClient, config: ExecutionConfig) -> Self {
73        // Create rate limiter with configured requests per second
74        let quota = Quota::per_second(
75            NonZeroU32::new(config.requests_per_sec).unwrap_or(nonzero!(10u32)),
76        );
77        let rate_limiter = RateLimiter::direct(quota);
78
79        // Create semaphore for concurrency control
80        let concurrency = Arc::new(Semaphore::new(config.max_concurrent));
81
82        Self {
83            client,
84            rate_limiter,
85            concurrency,
86            config,
87        }
88    }
89
90    /// Wait for rate limit and acquire concurrency permit before executing an operation.
91    ///
92    /// This helper ensures all API operations respect rate limits and concurrency bounds.
93    async fn rate_limited<F, T>(&self, op: F) -> Result<T>
94    where
95        F: std::future::Future<Output = Result<T>>,
96    {
97        // Wait for rate limit allowance
98        self.rate_limiter.until_ready().await;
99
100        // Acquire concurrency permit (automatically released when dropped)
101        let _permit = self.concurrency.acquire().await.expect("semaphore closed");
102
103        // Execute the operation
104        op.await
105    }
106
107    /// Execute processing for all duplicate groups.
108    ///
109    /// Iterates through all groups, downloading backups and deleting duplicates
110    /// for each. Shows progress via console progress bars.
111    ///
112    /// # Arguments
113    ///
114    /// * `groups` - Slice of duplicate analysis results to process
115    ///
116    /// # Returns
117    ///
118    /// An execution report summarizing all operations and their outcomes.
119    pub async fn execute_all(&self, groups: &[DuplicateAnalysis]) -> ExecutionReport {
120        let mut report = ExecutionReport::new();
121
122        if groups.is_empty() {
123            return report;
124        }
125
126        // Create multi-progress container
127        let multi_progress = MultiProgress::new();
128
129        // Create overall progress bar
130        let overall_style = ProgressStyle::default_bar()
131            .template("[{elapsed_precise}] {bar:40.cyan/blue} {pos}/{len} groups ({eta})")
132            .expect("valid template")
133            .progress_chars("##-");
134
135        let overall_pb = multi_progress.add(ProgressBar::new(groups.len() as u64));
136        overall_pb.set_style(overall_style);
137
138        // Create progress bar for current group operations
139        let group_style = ProgressStyle::default_bar()
140            .template("  {spinner:.green} {msg}")
141            .expect("valid template");
142
143        let group_pb = multi_progress.add(ProgressBar::new_spinner());
144        group_pb.set_style(group_style);
145
146        // Ensure backup directory exists
147        if let Err(e) = tokio::fs::create_dir_all(&self.config.backup_dir).await {
148            overall_pb.finish_with_message(format!("Failed to create backup directory: {}", e));
149            return report;
150        }
151
152        // Process each group
153        for analysis in groups {
154            group_pb.set_message(format!(
155                "Processing group {} ({} losers)",
156                analysis.duplicate_id,
157                analysis.losers.len()
158            ));
159
160            let result = self.execute_group(analysis, &group_pb).await;
161            report.add_group_result(result);
162
163            overall_pb.inc(1);
164        }
165
166        overall_pb.finish_with_message("Complete");
167        group_pb.finish_and_clear();
168
169        report
170    }
171
172    /// Execute processing for a single duplicate group.
173    ///
174    /// 1. Consolidates metadata from losers to winner (GPS, datetime, description)
175    /// 2. Downloads backup copies of all loser assets
176    /// 3. Deletes only those that were successfully downloaded
177    ///
178    /// # Arguments
179    ///
180    /// * `analysis` - The duplicate analysis for this group
181    /// * `pb` - Progress bar to update with status messages
182    ///
183    /// # Returns
184    ///
185    /// A group result detailing the outcome of each operation.
186    pub async fn execute_group(
187        &self,
188        analysis: &DuplicateAnalysis,
189        pb: &ProgressBar,
190    ) -> GroupResult {
191        let mut download_results = Vec::new();
192
193        // Step 1: Consolidate metadata from losers to winner
194        pb.set_message("Checking metadata consolidation");
195        let consolidation_result = self.consolidate_metadata(analysis).await;
196
197        // Step 2: Download each loser asset
198        for loser in &analysis.losers {
199            pb.set_message(format!("Downloading {}", loser.filename));
200
201            let result = self.download_loser(&loser.asset_id, &loser.filename).await;
202            download_results.push(result);
203        }
204
205        // Collect successfully downloaded asset IDs for deletion
206        let downloaded_ids: Vec<String> = download_results
207            .iter()
208            .filter_map(|r| match r {
209                OperationResult::Success { id, .. } => Some(id.clone()),
210                _ => None,
211            })
212            .collect();
213
214        // Step 3: Only delete if we have successfully downloaded assets
215        let delete_result = if downloaded_ids.is_empty() {
216            Some(OperationResult::Skipped {
217                id: analysis.duplicate_id.clone(),
218                reason: "No assets were successfully downloaded".to_string(),
219            })
220        } else {
221            pb.set_message(format!("Deleting {} assets", downloaded_ids.len()));
222
223            match self.delete_assets(&downloaded_ids).await {
224                Ok(()) => Some(OperationResult::Success {
225                    id: analysis.duplicate_id.clone(),
226                    path: None,
227                }),
228                Err(e) => Some(OperationResult::Failed {
229                    id: analysis.duplicate_id.clone(),
230                    error: e.to_string(),
231                }),
232            }
233        };
234
235        GroupResult {
236            duplicate_id: analysis.duplicate_id.clone(),
237            winner_id: analysis.winner.asset_id.clone(),
238            consolidation_result,
239            download_results,
240            delete_result,
241        }
242    }
243
244    /// Consolidate metadata from loser assets to the winner.
245    ///
246    /// Checks if the winner lacks GPS, datetime, or description that any loser has,
247    /// and transfers the metadata to preserve it before deletion.
248    async fn consolidate_metadata(
249        &self,
250        analysis: &DuplicateAnalysis,
251    ) -> Option<ConsolidationResult> {
252        // Fetch winner asset to check what metadata it already has
253        let winner_asset = match self
254            .rate_limited(async { self.client.get_asset(&analysis.winner.asset_id).await })
255            .await
256        {
257            Ok(asset) => asset,
258            Err(_) => return None, // Can't consolidate if we can't fetch winner
259        };
260
261        let winner_exif = winner_asset.exif_info.as_ref();
262        let winner_has_gps = winner_exif.map(|e| e.has_gps()).unwrap_or(false);
263        let winner_has_datetime = winner_exif
264            .and_then(|e| e.date_time_original.as_ref())
265            .is_some();
266        let winner_has_description = winner_exif.and_then(|e| e.description.as_ref()).is_some();
267
268        // If winner has all metadata, no consolidation needed
269        if winner_has_gps && winner_has_datetime && winner_has_description {
270            return None;
271        }
272
273        // Find best source for each missing field from losers (owned values)
274        let mut best_gps: Option<(f64, f64, String)> = None;
275        let mut best_datetime: Option<(String, String)> = None;
276        let mut best_description: Option<(String, String)> = None;
277
278        for loser in &analysis.losers {
279            let loser_asset = match self
280                .rate_limited(async { self.client.get_asset(&loser.asset_id).await })
281                .await
282            {
283                Ok(asset) => asset,
284                Err(_) => continue, // Skip losers we can't fetch
285            };
286
287            if let Some(exif) = &loser_asset.exif_info {
288                // Check GPS
289                if !winner_has_gps
290                    && best_gps.is_none()
291                    && exif.has_gps()
292                    && let (Some(lat), Some(lon)) = (exif.latitude, exif.longitude)
293                {
294                    best_gps = Some((lat, lon, loser.asset_id.clone()));
295                }
296
297                // Check datetime
298                if !winner_has_datetime
299                    && best_datetime.is_none()
300                    && let Some(dt) = &exif.date_time_original
301                {
302                    best_datetime = Some((dt.clone(), loser.asset_id.clone()));
303                }
304
305                // Check description
306                if !winner_has_description
307                    && best_description.is_none()
308                    && let Some(desc) = &exif.description
309                {
310                    best_description = Some((desc.clone(), loser.asset_id.clone()));
311                }
312            }
313
314            // If we've found all we need, stop searching
315            if (winner_has_gps || best_gps.is_some())
316                && (winner_has_datetime || best_datetime.is_some())
317                && (winner_has_description || best_description.is_some())
318            {
319                break;
320            }
321        }
322
323        // Nothing to consolidate
324        if best_gps.is_none() && best_datetime.is_none() && best_description.is_none() {
325            return None;
326        }
327
328        // Prepare update parameters
329        let (latitude, longitude) = match &best_gps {
330            Some((lat, lon, _)) => (Some(*lat), Some(*lon)),
331            None => (None, None),
332        };
333        let date_time_original = best_datetime.as_ref().map(|(dt, _)| dt.as_str());
334        let description = best_description.as_ref().map(|(desc, _)| desc.as_str());
335
336        // Determine source asset ID (prefer GPS source, then datetime, then description)
337        let source_asset_id = best_gps
338            .as_ref()
339            .map(|(_, _, id)| id.clone())
340            .or_else(|| best_datetime.as_ref().map(|(_, id)| id.clone()))
341            .or_else(|| best_description.as_ref().map(|(_, id)| id.clone()));
342
343        // Update winner with consolidated metadata
344        let update_result = self
345            .rate_limited(async {
346                self.client
347                    .update_asset_metadata(
348                        &analysis.winner.asset_id,
349                        latitude,
350                        longitude,
351                        date_time_original,
352                        description,
353                    )
354                    .await
355            })
356            .await;
357
358        if update_result.is_ok() {
359            Some(ConsolidationResult {
360                gps_transferred: best_gps.is_some(),
361                datetime_transferred: best_datetime.is_some(),
362                description_transferred: best_description.is_some(),
363                source_asset_id,
364            })
365        } else {
366            None // Consolidation failed, but we can still proceed with download/delete
367        }
368    }
369
370    /// Download a loser asset to the backup directory.
371    ///
372    /// Files are named as `{asset_id}_{filename}` to avoid collisions.
373    async fn download_loser(&self, asset_id: &str, filename: &str) -> OperationResult {
374        // Build path with asset ID prefix to avoid collisions
375        let safe_filename = format!("{}_{}", asset_id, filename);
376        let path = self.config.backup_dir.join(&safe_filename);
377
378        let download_result = self
379            .rate_limited(async { self.client.download_asset(asset_id, &path).await })
380            .await;
381
382        match download_result {
383            Ok(_bytes) => OperationResult::Success {
384                id: asset_id.to_string(),
385                path: Some(path),
386            },
387            Err(e) => OperationResult::Failed {
388                id: asset_id.to_string(),
389                error: e.to_string(),
390            },
391        }
392    }
393
394    /// Delete assets using the API.
395    async fn delete_assets(&self, asset_ids: &[String]) -> Result<()> {
396        self.rate_limited(async {
397            self.client
398                .delete_assets(asset_ids, self.config.force_delete)
399                .await
400        })
401        .await
402    }
403}