gravityfile_ops/
copy.rs

1//! Async copy operation with progress reporting.
2
3use std::fs;
4use std::path::PathBuf;
5
6use tokio::sync::mpsc;
7
8use crate::conflict::{auto_rename_path, Conflict, ConflictKind, ConflictResolution};
9use crate::progress::{OperationComplete, OperationProgress, OperationType};
10use crate::{OperationError, OPERATION_CHANNEL_SIZE};
11
12/// Result sent through the channel during copy operations.
13#[derive(Debug)]
14pub enum CopyResult {
15    /// Progress update.
16    Progress(OperationProgress),
17    /// A conflict was detected that needs resolution.
18    Conflict(Conflict),
19    /// The operation completed.
20    Complete(OperationComplete),
21}
22
23/// Options for copy operations.
24#[derive(Debug, Clone, Default)]
25pub struct CopyOptions {
26    /// How to handle conflicts (None means ask for each).
27    pub conflict_resolution: Option<ConflictResolution>,
28    /// Whether to preserve timestamps.
29    pub preserve_timestamps: bool,
30}
31
32/// Start an async copy operation.
33///
34/// Returns a receiver for progress updates and results.
35pub fn start_copy(
36    sources: Vec<PathBuf>,
37    destination: PathBuf,
38    options: CopyOptions,
39) -> mpsc::Receiver<CopyResult> {
40    let (tx, rx) = mpsc::channel(OPERATION_CHANNEL_SIZE);
41
42    if sources.is_empty() {
43        // Send immediate completion for empty sources
44        let complete = OperationComplete {
45            operation_type: OperationType::Copy,
46            succeeded: 0,
47            failed: 0,
48            bytes_processed: 0,
49            errors: vec![],
50        };
51        tokio::spawn(async move {
52            let _ = tx.send(CopyResult::Complete(complete)).await;
53        });
54        return rx;
55    }
56
57    tokio::spawn(async move {
58        copy_impl(sources, destination, options, tx).await;
59    });
60
61    rx
62}
63
64/// Internal implementation of copy operation.
65async fn copy_impl(
66    sources: Vec<PathBuf>,
67    destination: PathBuf,
68    options: CopyOptions,
69    tx: mpsc::Sender<CopyResult>,
70) {
71    // First, calculate total size and file count
72    let (total_files, total_bytes) = calculate_totals(&sources);
73
74    let mut progress = OperationProgress::new(OperationType::Copy, total_files, total_bytes);
75    let global_resolution: Option<ConflictResolution> = options.conflict_resolution;
76    let mut succeeded = 0;
77    let mut failed = 0;
78
79    // Ensure destination exists and is a directory
80    if !destination.exists() {
81        if let Err(e) = fs::create_dir_all(&destination) {
82            progress.add_error(OperationError::new(
83                destination.clone(),
84                format!("Failed to create destination: {}", e),
85            ));
86            let _ = tx
87                .send(CopyResult::Complete(OperationComplete {
88                    operation_type: OperationType::Copy,
89                    succeeded: 0,
90                    failed: sources.len(),
91                    bytes_processed: 0,
92                    errors: progress.errors.clone(),
93                }))
94                .await;
95            return;
96        }
97    }
98
99    for source in sources {
100        let dest_path = destination.join(source.file_name().unwrap_or_default());
101
102        // Check for conflicts
103        if dest_path.exists() {
104            let conflict_kind = if dest_path.is_dir() {
105                ConflictKind::DirectoryExists
106            } else {
107                ConflictKind::FileExists
108            };
109
110            let resolution = if let Some(res) = global_resolution {
111                res.to_single()
112            } else {
113                // Send conflict and wait (in real impl, would need response channel)
114                // For now, default to skip
115                let _ = tx
116                    .send(CopyResult::Conflict(Conflict::new(
117                        source.clone(),
118                        dest_path.clone(),
119                        conflict_kind,
120                    )))
121                    .await;
122                ConflictResolution::Skip
123            };
124
125            match resolution {
126                ConflictResolution::Skip | ConflictResolution::SkipAll => {
127                    failed += 1;
128                    continue;
129                }
130                ConflictResolution::Abort => {
131                    let _ = tx
132                        .send(CopyResult::Complete(OperationComplete {
133                            operation_type: OperationType::Copy,
134                            succeeded,
135                            failed: failed + 1,
136                            bytes_processed: progress.bytes_processed,
137                            errors: progress.errors.clone(),
138                        }))
139                        .await;
140                    return;
141                }
142                ConflictResolution::AutoRename => {
143                    let new_dest = auto_rename_path(&dest_path);
144                    if let Err(e) = copy_item(&source, &new_dest, &mut progress, &tx).await {
145                        progress.add_error(OperationError::new(source.clone(), e));
146                        failed += 1;
147                    } else {
148                        succeeded += 1;
149                    }
150                    continue;
151                }
152                ConflictResolution::Overwrite | ConflictResolution::OverwriteAll => {
153                    // Remove existing before copy
154                    let _ = if dest_path.is_dir() {
155                        fs::remove_dir_all(&dest_path)
156                    } else {
157                        fs::remove_file(&dest_path)
158                    };
159                }
160            }
161        }
162
163        // Perform the copy
164        progress.set_current_file(Some(source.clone()));
165        let _ = tx.send(CopyResult::Progress(progress.clone())).await;
166
167        if let Err(e) = copy_item(&source, &dest_path, &mut progress, &tx).await {
168            progress.add_error(OperationError::new(source.clone(), e));
169            failed += 1;
170        } else {
171            succeeded += 1;
172        }
173    }
174
175    // Send completion
176    let _ = tx
177        .send(CopyResult::Complete(OperationComplete {
178            operation_type: OperationType::Copy,
179            succeeded,
180            failed,
181            bytes_processed: progress.bytes_processed,
182            errors: progress.errors,
183        }))
184        .await;
185}
186
187/// Copy a single item (file or directory).
188async fn copy_item(
189    source: &PathBuf,
190    dest: &PathBuf,
191    progress: &mut OperationProgress,
192    tx: &mpsc::Sender<CopyResult>,
193) -> Result<(), String> {
194    let source = source.clone();
195    let dest = dest.clone();
196
197    let result = tokio::task::spawn_blocking(move || {
198        if source.is_dir() {
199            copy_dir_recursive(&source, &dest)
200        } else {
201            copy_file(&source, &dest)
202        }
203    })
204    .await
205    .map_err(|e| format!("Task failed: {}", e))?;
206
207    match result {
208        Ok(bytes) => {
209            progress.complete_file(bytes);
210            let _ = tx.send(CopyResult::Progress(progress.clone())).await;
211            Ok(())
212        }
213        Err(e) => Err(e),
214    }
215}
216
217/// Copy a single file.
218fn copy_file(source: &PathBuf, dest: &PathBuf) -> Result<u64, String> {
219    let metadata = fs::metadata(source).map_err(|e| format!("Failed to read metadata: {}", e))?;
220    let size = metadata.len();
221
222    fs::copy(source, dest).map_err(|e| format!("Failed to copy: {}", e))?;
223
224    Ok(size)
225}
226
227/// Recursively copy a directory.
228fn copy_dir_recursive(source: &PathBuf, dest: &PathBuf) -> Result<u64, String> {
229    fs::create_dir_all(dest).map_err(|e| format!("Failed to create directory: {}", e))?;
230
231    let mut total_bytes = 0u64;
232
233    let entries =
234        fs::read_dir(source).map_err(|e| format!("Failed to read directory: {}", e))?;
235
236    for entry in entries {
237        let entry = entry.map_err(|e| format!("Failed to read entry: {}", e))?;
238        let path = entry.path();
239        let dest_path = dest.join(entry.file_name());
240
241        if path.is_dir() {
242            total_bytes += copy_dir_recursive(&path, &dest_path)?;
243        } else {
244            total_bytes += copy_file(&path, &dest_path)?;
245        }
246    }
247
248    Ok(total_bytes)
249}
250
251/// Calculate total files and bytes for a list of sources.
252fn calculate_totals(sources: &[PathBuf]) -> (usize, u64) {
253    let mut files = 0;
254    let mut bytes = 0u64;
255
256    for source in sources {
257        if source.is_dir() {
258            let (f, b) = calculate_dir_totals(source);
259            files += f;
260            bytes += b;
261        } else if let Ok(metadata) = fs::metadata(source) {
262            files += 1;
263            bytes += metadata.len();
264        }
265    }
266
267    (files, bytes)
268}
269
270/// Calculate totals for a directory recursively.
271fn calculate_dir_totals(dir: &PathBuf) -> (usize, u64) {
272    let mut files = 0;
273    let mut bytes = 0u64;
274
275    if let Ok(entries) = fs::read_dir(dir) {
276        for entry in entries.flatten() {
277            let path = entry.path();
278            if path.is_dir() {
279                let (f, b) = calculate_dir_totals(&path);
280                files += f;
281                bytes += b;
282            } else if let Ok(metadata) = fs::metadata(&path) {
283                files += 1;
284                bytes += metadata.len();
285            }
286        }
287    }
288
289    (files, bytes)
290}