vdsl_sync/application/sdk_impl/
sync_ops.rs1use std::path::Path;
8
9use async_trait::async_trait;
10use tracing::{error, info, trace, warn};
11
12use super::SdkImpl;
13use crate::application::error::SyncError;
14use crate::application::sdk::{PutReport, SyncReport, SyncReportError, SyncStoreSdk};
15use crate::application::topology_store::TopologyFileView;
16use crate::application::transfer_engine::PreparedTransfer;
17use crate::domain::file_type::FileType;
18use crate::domain::fingerprint::FileFingerprint;
19use crate::domain::location::{LocationId, SyncSummary};
20use crate::domain::view::{ErrorEntry, PendingEntry, PresenceState};
21use crate::infra::backend::ProgressFn;
22
23#[async_trait]
24impl SyncStoreSdk for SdkImpl {
25 async fn sync(&self) -> Result<SyncReport, SyncError> {
30 info!("sdk_impl::sync: pipeline start");
31 self.report_progress("ensure: checking locations");
32
33 let location_ids: Vec<String> =
36 self.locations.iter().map(|l| l.id().to_string()).collect();
37 info!(
38 location_count = self.locations.len(),
39 locations = %location_ids.join(", "),
40 "sdk_impl::sync: ensure start"
41 );
42 let mut failed_locations: std::collections::HashSet<LocationId> =
43 std::collections::HashSet::new();
44 for loc in &self.locations {
45 info!(
46 location = %loc.id(),
47 kind = ?loc.kind(),
48 "sdk_impl::sync: ensure checking"
49 );
50 match loc.ensure().await {
51 Ok(()) => {
52 info!(location = %loc.id(), "sdk_impl::sync: ensure ok");
53 }
54 Err(e) => {
55 error!(
56 location = %loc.id(),
57 kind = ?loc.kind(),
58 error = %e,
59 "sdk_impl::sync: ensure FAILED — this location will be excluded from sync"
60 );
61 failed_locations.insert(loc.id().clone());
62 }
63 }
64 }
65 if failed_locations.is_empty() {
66 info!("sdk_impl::sync: ensure done — all locations reachable");
67 } else {
68 let excluded: Vec<String> =
69 failed_locations.iter().map(|l| l.to_string()).collect();
70 warn!(
71 excluded = %excluded.join(", "),
72 "sdk_impl::sync: ensure done — {} location(s) excluded due to ensure failure",
73 failed_locations.len()
74 );
75 }
76
77 let cancelled = self.transfer_store.cancel_orphaned_inflight().await?;
79 if cancelled > 0 {
80 info!(
81 cancelled_count = cancelled,
82 "sdk_impl::sync: cancelled orphaned InFlight transfers"
83 );
84 }
85
86 self.report_progress("scan: scanning locations");
88 info!("sdk_impl::sync: phase1 scan start");
89 let progress_cb = self.progress.lock().ok().and_then(|g| g.clone());
90 let scan_result = self
91 .scanner
92 .scan_all(&self.scan_excludes, &failed_locations, progress_cb.as_ref())
93 .await?;
94 info!(
95 scanned = scan_result.scanned,
96 deltas = scan_result.deltas.len(),
97 scan_errors = scan_result.scan_errors.len(),
98 "sdk_impl::sync: phase1 scan done"
99 );
100 for delta in &scan_result.deltas {
102 trace!(delta = ?delta, "sdk_impl::sync: delta");
103 }
104
105 self.report_progress(&format!(
107 "plan: {} files scanned, {} deltas",
108 scan_result.scanned,
109 scan_result.deltas.len()
110 ));
111 info!(
112 delta_count = scan_result.deltas.len(),
113 "sdk_impl::sync: phase2 plan start"
114 );
115 let plan_result = self.topology.sync(&scan_result.deltas).await?;
116 info!(
117 transfers_created = plan_result.transfers_created,
118 conflicts = plan_result.conflicts.len(),
119 "sdk_impl::sync: phase2 plan done"
120 );
121
122 if let Ok(guard) = self.progress.lock() {
125 self.engine.set_progress_callback(guard.clone());
126 }
127 self.report_progress(&format!(
128 "execute: {} transfers queued",
129 plan_result.transfers_created
130 ));
131 info!("sdk_impl::sync: phase3 execute start");
132 let (transferred, failed, errors) = self.execute_bfs(&failed_locations).await?;
133 self.engine.set_progress_callback(None);
135 info!(
136 transferred = transferred,
137 failed = failed,
138 error_count = errors.len(),
139 "sdk_impl::sync: phase3 execute done"
140 );
141
142 Ok(SyncReport {
143 scanned: scan_result.scanned,
144 scan_errors: scan_result
145 .scan_errors
146 .iter()
147 .map(|e| SyncReportError {
148 path: e.path.clone(),
149 error: e.error.clone(),
150 })
151 .collect(),
152 transfers_created: plan_result.transfers_created,
153 transferred,
154 failed,
155 errors,
156 conflicts: plan_result
157 .conflicts
158 .iter()
159 .map(crate::application::sdk::SyncReportConflict::from)
160 .collect(),
161 })
162 }
163
164 async fn sync_route(
165 &self,
166 src: &LocationId,
167 dest: &LocationId,
168 ) -> Result<SyncReport, SyncError> {
169 let cancelled = self.transfer_store.cancel_orphaned_inflight().await?;
171 if cancelled > 0 {
172 info!(
173 cancelled_count = cancelled,
174 "sync_route: cancelled orphaned InFlight transfers"
175 );
176 }
177
178 self.report_progress(&format!("plan: route {src} → {dest}"));
180 let plan_result = self.topology.sync_route(src, dest).await?;
181
182 if let Ok(guard) = self.progress.lock() {
185 self.engine.set_progress_callback(guard.clone());
186 }
187 let queued = self.transfer_store.queued_transfers(dest).await?;
188 let eligible: Vec<_> = queued.into_iter().filter(|t| t.src() == src).collect();
189
190 let mut prepared = Vec::with_capacity(eligible.len());
191 let mut total_failed = 0usize;
192 let mut all_errors: Vec<SyncReportError> = Vec::new();
193
194 for transfer in eligible {
195 match self.topology_files.get_by_id(transfer.file_id()).await {
196 Ok(Some(file)) => {
197 prepared.push(PreparedTransfer {
198 transfer,
199 relative_path: file.relative_path().to_string(),
200 });
201 }
202 Ok(None) => {
203 total_failed += 1;
204 all_errors.push(SyncReportError {
205 path: transfer.file_id().to_string(),
206 error: format!("file {} not found in store", transfer.file_id()),
207 });
208 }
209 Err(e) => {
210 total_failed += 1;
211 all_errors.push(SyncReportError {
212 path: transfer.file_id().to_string(),
213 error: e.to_string(),
214 });
215 }
216 }
217 }
218
219 self.report_progress(&format!(
220 "execute: {} transfers ({src} → {dest})",
221 prepared.len()
222 ));
223 let outcomes = self.engine.execute_prepared(prepared).await;
224 self.engine.set_progress_callback(None);
226 let mut total_transferred = 0usize;
227
228 self.persist_outcomes(
229 &outcomes,
230 &mut total_transferred,
231 &mut total_failed,
232 &mut all_errors,
233 )
234 .await?;
235
236 Ok(SyncReport {
237 scanned: 0,
238 scan_errors: Vec::new(),
239 transfers_created: plan_result.transfers_created,
240 transferred: total_transferred,
241 failed: total_failed,
242 errors: all_errors,
243 conflicts: plan_result
244 .conflicts
245 .iter()
246 .map(crate::application::sdk::SyncReportConflict::from)
247 .collect(),
248 })
249 }
250
251 async fn put(
256 &self,
257 path: &str,
258 file_type: FileType,
259 fingerprint: FileFingerprint,
260 origin: &LocationId,
261 embedded_id: Option<String>,
262 ) -> Result<PutReport, SyncError> {
263 let result = self
264 .topology
265 .put(path, file_type, fingerprint, origin, embedded_id)
266 .await?;
267 Ok(PutReport {
268 file_id: result.topology_file_id,
269 is_new: result.is_new,
270 transfers_created: result.transfers_created,
271 })
272 }
273
274 async fn delete(&self, path: &str) -> Result<usize, SyncError> {
275 self.topology.delete(path).await
276 }
277
278 async fn restore(&self, path: &str, revision: &str) -> Result<(), SyncError> {
279 info!(path = %path, revision = %revision, "sdk_impl::restore: start");
280
281 let route = self.engine.archive_route().ok_or_else(|| -> SyncError {
283 crate::infra::error::InfraError::Transfer {
284 reason: "restore: no route with archive_root configured".into(),
285 }
286 .into()
287 })?;
288
289 route.restore_from_archive(path, revision).await?;
291 info!(path = %path, "sdk_impl::restore: physical restore done");
292
293 let deleted_tfs = self.topology_files.list_deleted().await?;
298 match deleted_tfs.into_iter().find(|t| t.relative_path() == path) {
299 Some(mut tf) => {
300 tf.unmark_deleted();
301 self.topology_files.upsert(&tf).await?;
302 info!(path = %path, file_id = %tf.id(), "sdk_impl::restore: TopologyFile unmarked");
303 }
304 None => {
305 warn!(
306 path = %path,
307 "sdk_impl::restore: TopologyFile not in deleted list (likely hard-deleted after delete transfers). Physical restore succeeded — next full sync will re-register."
308 );
309 }
310 }
311
312 Ok(())
313 }
314
315 async fn get(&self, path: &str) -> Result<Option<TopologyFileView>, SyncError> {
320 self.topology.get(path).await
321 }
322
323 async fn list(
324 &self,
325 file_type: Option<FileType>,
326 limit: Option<usize>,
327 ) -> Result<Vec<TopologyFileView>, SyncError> {
328 self.topology.list(file_type, limit).await
329 }
330
331 async fn status(&self) -> Result<SyncSummary, SyncError> {
332 use crate::domain::location::LocationSummary;
333 use crate::domain::transfer::TransferState;
334 use std::collections::HashMap;
335
336 let retry_policy = self.config.retry_policy();
337 let total_files = self.topology.file_count().await?;
338 let stats = self.transfer_store.transfer_stats().await?;
339 let present_counts = self.transfer_store.present_counts_by_location().await?;
340 let failed = self.transfer_store.failed_transfers().await?;
341 let pending = self.transfer_store.all_pending_transfers().await?;
342
343 let mut locations: HashMap<LocationId, LocationSummary> = HashMap::new();
344 let mut total_errors = 0usize;
345
346 for (loc, count) in &present_counts {
347 let summary = locations.entry(loc.clone()).or_default();
348 summary.present = *count;
349 }
350
351 for row in &stats {
352 if row.state == TransferState::Completed || row.state == TransferState::Cancelled {
353 continue;
354 }
355 let dest_state = match row.state {
356 TransferState::Blocked | TransferState::Queued => PresenceState::Pending,
357 TransferState::InFlight => PresenceState::Syncing,
358 TransferState::Failed => {
359 let exhausted = match row.error_kind.as_deref() {
360 Some("permanent") => true,
361 _ => row.attempt >= retry_policy.max_attempts(),
362 };
363 if exhausted {
364 PresenceState::Failed
365 } else {
366 PresenceState::Pending
367 }
368 }
369 TransferState::Completed | TransferState::Cancelled => PresenceState::Absent,
370 };
371
372 let dest_summary = locations.entry(row.dest.clone()).or_default();
373 match dest_state {
374 PresenceState::Pending => {
375 dest_summary.pending = dest_summary.pending.saturating_add(row.file_count);
376 }
377 PresenceState::Syncing => {
378 dest_summary.syncing = dest_summary.syncing.saturating_add(row.file_count);
379 }
380 PresenceState::Failed => {
381 dest_summary.failed = dest_summary.failed.saturating_add(row.file_count);
382 total_errors = total_errors.saturating_add(row.file_count);
383 }
384 PresenceState::Absent => {
385 dest_summary.absent = dest_summary.absent.saturating_add(row.file_count);
386 }
387 PresenceState::Present => {}
388 }
389 }
390
391 let error_entries: Vec<ErrorEntry> = failed
392 .iter()
393 .filter(|t| {
394 let state = PresenceState::from_transfer(t, &retry_policy);
395 state == PresenceState::Failed
396 })
397 .map(ErrorEntry::from_transfer)
398 .collect();
399
400 let mut pending_entries: Vec<PendingEntry> =
401 pending.iter().map(PendingEntry::from_transfer).collect();
402 for t in &failed {
403 let state = PresenceState::from_transfer(t, &retry_policy);
404 if state == PresenceState::Pending {
405 pending_entries.push(PendingEntry::from_transfer(t));
406 }
407 }
408
409 Ok(SyncSummary {
410 locations,
411 total_entries: total_files,
412 total_errors,
413 error_entries,
414 pending_entries,
415 })
416 }
417
418 async fn errors(&self) -> Result<Vec<ErrorEntry>, SyncError> {
419 let retry_policy = self.config.retry_policy();
420 let failed = self.transfer_store.failed_transfers().await?;
421 Ok(failed
422 .iter()
423 .filter(|t| {
424 let state = PresenceState::from_transfer(t, &retry_policy);
425 state == PresenceState::Failed
426 })
427 .map(ErrorEntry::from_transfer)
428 .collect())
429 }
430
431 async fn pending(&self, dest: &LocationId) -> Result<Vec<PendingEntry>, SyncError> {
432 let retry_policy = self.config.retry_policy();
433
434 let all_pending = self.transfer_store.all_pending_transfers().await?;
436 let mut entries: Vec<PendingEntry> = all_pending
437 .iter()
438 .filter(|t| t.dest() == dest)
439 .map(PendingEntry::from_transfer)
440 .collect();
441
442 let failed = self.transfer_store.failed_transfers().await?;
444 for t in &failed {
445 if t.dest() == dest {
446 let state = PresenceState::from_transfer(t, &retry_policy);
447 if state == PresenceState::Pending {
448 entries.push(PendingEntry::from_transfer(t));
449 }
450 }
451 }
452
453 Ok(entries)
454 }
455
456 fn locations(&self) -> Vec<LocationId> {
461 self.topology.locations().to_vec()
462 }
463
464 fn all_edges(&self) -> Vec<(LocationId, LocationId)> {
465 self.engine.all_edges()
466 }
467
468 fn local_root(&self) -> Option<&Path> {
469 self.engine.local_root()
470 }
471
472 fn set_progress_callback(&self, callback: Option<ProgressFn>) {
473 if let Ok(mut guard) = self.progress.lock() {
474 *guard = callback;
475 }
476 }
477}