vdsl_sync/application/sdk_impl/
sync_ops.rs1use std::path::Path;
8
9use async_trait::async_trait;
10use tracing::{error, info, trace, warn};
11
12use super::SdkImpl;
13use crate::application::error::SyncError;
14use crate::application::sdk::{PutReport, SyncReport, SyncReportError, SyncStoreSdk};
15use crate::application::topology_store::TopologyFileView;
16use crate::application::transfer_engine::PreparedTransfer;
17use crate::domain::file_type::FileType;
18use crate::domain::fingerprint::FileFingerprint;
19use crate::domain::location::{LocationId, SyncSummary};
20use crate::domain::view::{ErrorEntry, PendingEntry, PresenceState};
21use crate::infra::backend::ProgressFn;
22
23#[async_trait]
24impl SyncStoreSdk for SdkImpl {
25 async fn sync(&self) -> Result<SyncReport, SyncError> {
30 info!("sdk_impl::sync: pipeline start");
31 self.report_progress("ensure: checking locations");
32
33 let location_ids: Vec<String> = self.locations.iter().map(|l| l.id().to_string()).collect();
36 info!(
37 location_count = self.locations.len(),
38 locations = %location_ids.join(", "),
39 "sdk_impl::sync: ensure start"
40 );
41 let mut failed_locations: std::collections::HashSet<LocationId> =
42 std::collections::HashSet::new();
43 for loc in &self.locations {
44 info!(
45 location = %loc.id(),
46 kind = ?loc.kind(),
47 "sdk_impl::sync: ensure checking"
48 );
49 match loc.ensure().await {
50 Ok(()) => {
51 info!(location = %loc.id(), "sdk_impl::sync: ensure ok");
52 }
53 Err(e) => {
54 error!(
55 location = %loc.id(),
56 kind = ?loc.kind(),
57 error = %e,
58 "sdk_impl::sync: ensure FAILED — this location will be excluded from sync"
59 );
60 failed_locations.insert(loc.id().clone());
61 }
62 }
63 }
64 if failed_locations.is_empty() {
65 info!("sdk_impl::sync: ensure done — all locations reachable");
66 } else {
67 let excluded: Vec<String> = failed_locations.iter().map(|l| l.to_string()).collect();
68 warn!(
69 excluded = %excluded.join(", "),
70 "sdk_impl::sync: ensure done — {} location(s) excluded due to ensure failure",
71 failed_locations.len()
72 );
73 }
74
75 let cancelled = self.transfer_store.cancel_orphaned_inflight().await?;
77 if cancelled > 0 {
78 info!(
79 cancelled_count = cancelled,
80 "sdk_impl::sync: cancelled orphaned InFlight transfers"
81 );
82 }
83
84 self.report_progress("scan: scanning locations");
86 info!("sdk_impl::sync: phase1 scan start");
87 let progress_cb = self.progress.lock().ok().and_then(|g| g.clone());
88 let scan_result = self
89 .scanner
90 .scan_all(&self.scan_excludes, &failed_locations, progress_cb.as_ref())
91 .await?;
92 info!(
93 scanned = scan_result.scanned,
94 deltas = scan_result.deltas.len(),
95 scan_errors = scan_result.scan_errors.len(),
96 "sdk_impl::sync: phase1 scan done"
97 );
98 for delta in &scan_result.deltas {
100 trace!(delta = ?delta, "sdk_impl::sync: delta");
101 }
102
103 self.report_progress(&format!(
105 "plan: {} files scanned, {} deltas",
106 scan_result.scanned,
107 scan_result.deltas.len()
108 ));
109 info!(
110 delta_count = scan_result.deltas.len(),
111 "sdk_impl::sync: phase2 plan start"
112 );
113 let plan_result = self.topology.sync(&scan_result.deltas).await?;
114 info!(
115 transfers_created = plan_result.transfers_created,
116 conflicts = plan_result.conflicts.len(),
117 "sdk_impl::sync: phase2 plan done"
118 );
119
120 if let Ok(guard) = self.progress.lock() {
123 self.engine.set_progress_callback(guard.clone());
124 }
125 self.report_progress(&format!(
126 "execute: {} transfers queued",
127 plan_result.transfers_created
128 ));
129 info!("sdk_impl::sync: phase3 execute start");
130 let (transferred, failed, errors) = self.execute_bfs(&failed_locations).await?;
131 self.engine.set_progress_callback(None);
133 info!(
134 transferred = transferred,
135 failed = failed,
136 error_count = errors.len(),
137 "sdk_impl::sync: phase3 execute done"
138 );
139
140 Ok(SyncReport {
141 scanned: scan_result.scanned,
142 scan_errors: scan_result
143 .scan_errors
144 .iter()
145 .map(|e| SyncReportError {
146 path: e.path.clone(),
147 error: e.error.clone(),
148 })
149 .collect(),
150 transfers_created: plan_result.transfers_created,
151 transferred,
152 failed,
153 errors,
154 conflicts: plan_result
155 .conflicts
156 .iter()
157 .map(crate::application::sdk::SyncReportConflict::from)
158 .collect(),
159 })
160 }
161
162 async fn sync_route(
163 &self,
164 src: &LocationId,
165 dest: &LocationId,
166 ) -> Result<SyncReport, SyncError> {
167 let cancelled = self.transfer_store.cancel_orphaned_inflight().await?;
169 if cancelled > 0 {
170 info!(
171 cancelled_count = cancelled,
172 "sync_route: cancelled orphaned InFlight transfers"
173 );
174 }
175
176 self.report_progress(&format!("plan: route {src} → {dest}"));
178 let plan_result = self.topology.sync_route(src, dest).await?;
179
180 if let Ok(guard) = self.progress.lock() {
183 self.engine.set_progress_callback(guard.clone());
184 }
185 let queued = self.transfer_store.queued_transfers(dest).await?;
186 let eligible: Vec<_> = queued.into_iter().filter(|t| t.src() == src).collect();
187
188 let mut prepared = Vec::with_capacity(eligible.len());
189 let mut total_failed = 0usize;
190 let mut all_errors: Vec<SyncReportError> = Vec::new();
191
192 for transfer in eligible {
193 match self.topology_files.get_by_id(transfer.file_id()).await {
194 Ok(Some(file)) => {
195 prepared.push(PreparedTransfer {
196 transfer,
197 relative_path: file.relative_path().to_string(),
198 });
199 }
200 Ok(None) => {
201 total_failed += 1;
202 all_errors.push(SyncReportError {
203 path: transfer.file_id().to_string(),
204 error: format!("file {} not found in store", transfer.file_id()),
205 });
206 }
207 Err(e) => {
208 total_failed += 1;
209 all_errors.push(SyncReportError {
210 path: transfer.file_id().to_string(),
211 error: e.to_string(),
212 });
213 }
214 }
215 }
216
217 self.report_progress(&format!(
218 "execute: {} transfers ({src} → {dest})",
219 prepared.len()
220 ));
221 let outcomes = self.engine.execute_prepared(prepared).await;
222 self.engine.set_progress_callback(None);
224 let mut total_transferred = 0usize;
225
226 self.persist_outcomes(
227 &outcomes,
228 &mut total_transferred,
229 &mut total_failed,
230 &mut all_errors,
231 )
232 .await?;
233
234 Ok(SyncReport {
235 scanned: 0,
236 scan_errors: Vec::new(),
237 transfers_created: plan_result.transfers_created,
238 transferred: total_transferred,
239 failed: total_failed,
240 errors: all_errors,
241 conflicts: plan_result
242 .conflicts
243 .iter()
244 .map(crate::application::sdk::SyncReportConflict::from)
245 .collect(),
246 })
247 }
248
249 async fn put(
254 &self,
255 path: &str,
256 file_type: FileType,
257 fingerprint: FileFingerprint,
258 origin: &LocationId,
259 embedded_id: Option<String>,
260 ) -> Result<PutReport, SyncError> {
261 let result = self
262 .topology
263 .put(path, file_type, fingerprint, origin, embedded_id)
264 .await?;
265 Ok(PutReport {
266 file_id: result.topology_file_id,
267 is_new: result.is_new,
268 transfers_created: result.transfers_created,
269 })
270 }
271
272 async fn delete(&self, path: &str) -> Result<usize, SyncError> {
273 self.topology.delete(path).await
274 }
275
276 async fn restore(&self, path: &str, revision: &str) -> Result<(), SyncError> {
277 info!(path = %path, revision = %revision, "sdk_impl::restore: start");
278
279 let route = self.engine.archive_route().ok_or_else(|| -> SyncError {
281 crate::infra::error::InfraError::Transfer {
282 reason: "restore: no route with archive_root configured".into(),
283 }
284 .into()
285 })?;
286
287 route.restore_from_archive(path, revision).await?;
289 info!(path = %path, "sdk_impl::restore: physical restore done");
290
291 let deleted_tfs = self.topology_files.list_deleted().await?;
296 match deleted_tfs.into_iter().find(|t| t.relative_path() == path) {
297 Some(mut tf) => {
298 tf.unmark_deleted();
299 self.topology_files.upsert(&tf).await?;
300 info!(path = %path, file_id = %tf.id(), "sdk_impl::restore: TopologyFile unmarked");
301 }
302 None => {
303 warn!(
304 path = %path,
305 "sdk_impl::restore: TopologyFile not in deleted list (likely hard-deleted after delete transfers). Physical restore succeeded — next full sync will re-register."
306 );
307 }
308 }
309
310 Ok(())
311 }
312
313 async fn get(&self, path: &str) -> Result<Option<TopologyFileView>, SyncError> {
318 self.topology.get(path).await
319 }
320
321 async fn list(
322 &self,
323 file_type: Option<FileType>,
324 limit: Option<usize>,
325 ) -> Result<Vec<TopologyFileView>, SyncError> {
326 self.topology.list(file_type, limit).await
327 }
328
329 async fn status(&self) -> Result<SyncSummary, SyncError> {
330 use crate::domain::location::LocationSummary;
331 use crate::domain::transfer::TransferState;
332 use std::collections::HashMap;
333
334 let retry_policy = self.config.retry_policy();
335 let total_files = self.topology.file_count().await?;
336 let stats = self.transfer_store.transfer_stats().await?;
337 let present_counts = self.transfer_store.present_counts_by_location().await?;
338 let failed = self.transfer_store.failed_transfers().await?;
339 let pending = self.transfer_store.all_pending_transfers().await?;
340
341 let mut locations: HashMap<LocationId, LocationSummary> = HashMap::new();
342 let mut total_errors = 0usize;
343
344 for (loc, count) in &present_counts {
345 let summary = locations.entry(loc.clone()).or_default();
346 summary.present = *count;
347 }
348
349 for row in &stats {
350 if row.state == TransferState::Completed || row.state == TransferState::Cancelled {
351 continue;
352 }
353 let dest_state = match row.state {
354 TransferState::Blocked | TransferState::Queued => PresenceState::Pending,
355 TransferState::InFlight => PresenceState::Syncing,
356 TransferState::Failed => {
357 let exhausted = match row.error_kind.as_deref() {
358 Some("permanent") => true,
359 _ => row.attempt >= retry_policy.max_attempts(),
360 };
361 if exhausted {
362 PresenceState::Failed
363 } else {
364 PresenceState::Pending
365 }
366 }
367 TransferState::Completed | TransferState::Cancelled => PresenceState::Absent,
368 };
369
370 let dest_summary = locations.entry(row.dest.clone()).or_default();
371 match dest_state {
372 PresenceState::Pending => {
373 dest_summary.pending = dest_summary.pending.saturating_add(row.file_count);
374 }
375 PresenceState::Syncing => {
376 dest_summary.syncing = dest_summary.syncing.saturating_add(row.file_count);
377 }
378 PresenceState::Failed => {
379 dest_summary.failed = dest_summary.failed.saturating_add(row.file_count);
380 total_errors = total_errors.saturating_add(row.file_count);
381 }
382 PresenceState::Absent => {
383 dest_summary.absent = dest_summary.absent.saturating_add(row.file_count);
384 }
385 PresenceState::Present => {}
386 }
387 }
388
389 let error_entries: Vec<ErrorEntry> = failed
390 .iter()
391 .filter(|t| {
392 let state = PresenceState::from_transfer(t, &retry_policy);
393 state == PresenceState::Failed
394 })
395 .map(ErrorEntry::from_transfer)
396 .collect();
397
398 let mut pending_entries: Vec<PendingEntry> =
399 pending.iter().map(PendingEntry::from_transfer).collect();
400 for t in &failed {
401 let state = PresenceState::from_transfer(t, &retry_policy);
402 if state == PresenceState::Pending {
403 pending_entries.push(PendingEntry::from_transfer(t));
404 }
405 }
406
407 Ok(SyncSummary {
408 locations,
409 total_entries: total_files,
410 total_errors,
411 error_entries,
412 pending_entries,
413 })
414 }
415
416 async fn errors(&self) -> Result<Vec<ErrorEntry>, SyncError> {
417 let retry_policy = self.config.retry_policy();
418 let failed = self.transfer_store.failed_transfers().await?;
419 Ok(failed
420 .iter()
421 .filter(|t| {
422 let state = PresenceState::from_transfer(t, &retry_policy);
423 state == PresenceState::Failed
424 })
425 .map(ErrorEntry::from_transfer)
426 .collect())
427 }
428
429 async fn pending(&self, dest: &LocationId) -> Result<Vec<PendingEntry>, SyncError> {
430 let retry_policy = self.config.retry_policy();
431
432 let all_pending = self.transfer_store.all_pending_transfers().await?;
434 let mut entries: Vec<PendingEntry> = all_pending
435 .iter()
436 .filter(|t| t.dest() == dest)
437 .map(PendingEntry::from_transfer)
438 .collect();
439
440 let failed = self.transfer_store.failed_transfers().await?;
442 for t in &failed {
443 if t.dest() == dest {
444 let state = PresenceState::from_transfer(t, &retry_policy);
445 if state == PresenceState::Pending {
446 entries.push(PendingEntry::from_transfer(t));
447 }
448 }
449 }
450
451 Ok(entries)
452 }
453
454 fn locations(&self) -> Vec<LocationId> {
459 self.topology.locations().to_vec()
460 }
461
462 fn all_edges(&self) -> Vec<(LocationId, LocationId)> {
463 self.engine.all_edges()
464 }
465
466 fn local_root(&self) -> Option<&Path> {
467 self.engine.local_root()
468 }
469
470 fn set_progress_callback(&self, callback: Option<ProgressFn>) {
471 if let Ok(mut guard) = self.progress.lock() {
472 *guard = callback;
473 }
474 }
475}