1use std::collections::{HashMap, HashSet};
21use std::sync::Arc;
22
23use crate::application::error::SyncError;
24use crate::domain::file_type::FileType;
25use crate::domain::fingerprint::FileFingerprint;
26use crate::domain::graph::RouteGraph;
27use crate::domain::location::LocationId;
28use crate::domain::location_file::{self, LocationFile};
29use crate::domain::plan::{plan_distribution, PlannedTransfer};
30use tracing::{debug, info, trace};
31
32use crate::domain::distribute::distribute_actions;
33use crate::domain::topology_delta::TopologyDelta;
34use crate::domain::topology_file::TopologyFile;
35use crate::domain::transfer::{Transfer, TransferKind};
36use crate::infra::location_file_store::LocationFileStore;
37use crate::infra::topology_file_store::TopologyFileStore;
38use crate::infra::transfer_store::TransferStore;
39
40#[derive(Debug, Clone, Default, serde::Serialize)]
46pub struct TopologySyncResult {
47 pub scanned: usize,
49 pub ingested: usize,
51 pub distributed: usize,
53 pub transfers_created: usize,
55 pub conflicts: Vec<crate::domain::distribute::ConflictEntry>,
60}
61
62#[derive(Debug, serde::Serialize)]
64pub struct TopologyPutResult {
65 pub topology_file_id: String,
67 pub is_new: bool,
69 pub transfers_created: usize,
71}
72
73pub struct TopologyStore {
81 topology_files: Arc<dyn TopologyFileStore>,
82 location_files: Arc<dyn LocationFileStore>,
83 transfers: Arc<dyn TransferStore>,
84 graph: RouteGraph,
85 locations: Vec<LocationId>,
87}
88
89impl TopologyStore {
90 pub fn new(
91 topology_files: Arc<dyn TopologyFileStore>,
92 location_files: Arc<dyn LocationFileStore>,
93 transfers: Arc<dyn TransferStore>,
94 graph: RouteGraph,
95 locations: Vec<LocationId>,
96 ) -> Self {
97 Self {
98 topology_files,
99 location_files,
100 transfers,
101 graph,
102 locations,
103 }
104 }
105
106 pub async fn sync(&self, deltas: &[TopologyDelta]) -> Result<TopologySyncResult, SyncError> {
122 info!(delta_count = deltas.len(), "topology_store::sync: start");
123
124 let ingest_origins = self.apply_ingest(deltas).await?;
126 info!(
127 origins = ingest_origins.len(),
128 "topology_store::sync: phase1 apply done"
129 );
130
131 let active_tfs = self.topology_files.list_active(None, None).await?;
133 let active_tf_refs: Vec<&TopologyFile> = active_tfs.iter().collect();
134 let file_ids: Vec<&str> = active_tfs.iter().map(|tf| tf.id()).collect();
135 debug!(
136 active_files = active_tfs.len(),
137 "topology_store::sync: loaded active topology_files"
138 );
139
140 let lf_map = self.location_files.list_by_files(&file_ids).await?;
141 let lf_ref_map = to_ref_map(&lf_map);
142 debug!(
143 location_file_groups = lf_map.len(),
144 "topology_store::sync: loaded location_files"
145 );
146
147 let dist_result = distribute_actions(
148 &active_tf_refs,
149 &lf_ref_map,
150 &self.locations,
151 &ingest_origins,
152 );
153 info!(
154 actions = dist_result.actions.len(),
155 conflicts = dist_result.conflicts.len(),
156 "topology_store::sync: phase2 distribute done"
157 );
158
159 let deleted_tfs = self.topology_files.list_deleted().await?;
161 trace!(
162 deleted_tfs = deleted_tfs.len(),
163 "topology_store::sync: checking deleted topology_files for delete transfers"
164 );
165 let mut delete_transfers_created = 0;
166 let pending_dests = self.collect_pending_dests().await?;
167 for dtf in &deleted_tfs {
168 let lfs = self.location_files.list_by_file(dtf.id()).await?;
169 let empty = HashSet::new();
170 let pending = pending_dests.get(dtf.id()).unwrap_or(&empty);
171 for lf in &lfs {
172 let dest = lf.location_id().clone();
173 if pending.contains(&dest) {
174 trace!(
175 file_id = %dtf.id(),
176 dest = %dest,
177 "topology_store::sync: delete transfer skipped (pending)"
178 );
179 continue;
180 }
181 let src = self
182 .locations
183 .iter()
184 .find(|l| *l != &dest)
185 .cloned()
186 .unwrap_or_else(|| dest.clone());
187 if src == dest {
188 trace!(
189 file_id = %dtf.id(),
190 dest = %dest,
191 "topology_store::sync: delete transfer skipped (single location)"
192 );
193 continue;
194 }
195 trace!(
196 file_id = %dtf.id(),
197 src = %src,
198 dest = %dest,
199 "topology_store::sync: creating delete transfer"
200 );
201 let transfer = Transfer::new_delete(dtf.id().to_string(), src, dest)?;
202 self.transfers.insert_transfer(&transfer).await?;
203 delete_transfers_created += 1;
204 }
205 }
206 if delete_transfers_created > 0 {
207 debug!(
208 count = delete_transfers_created,
209 "topology_store::sync: delete transfers created"
210 );
211 }
212
213 let distributed = dist_result.actions.len();
214
215 let existing_presences: HashMap<String, HashSet<LocationId>> = lf_map
219 .iter()
220 .map(|(file_id, lfs)| {
221 let locs: HashSet<LocationId> = lfs
222 .iter()
223 .filter(|lf| lf.state().is_source_eligible())
224 .map(|lf| lf.location_id().clone())
225 .collect();
226 (file_id.clone(), locs)
227 })
228 .collect();
229 let planned = plan_distribution(
230 &dist_result.actions,
231 &self.graph,
232 &pending_dests,
233 &existing_presences,
234 );
235 debug!(
236 planned_count = planned.len(),
237 "topology_store::sync: phase3 route planned"
238 );
239
240 let transfers_created = self.create_transfers(&planned).await? + delete_transfers_created;
241 info!(
242 transfers_created = transfers_created,
243 "topology_store::sync: phase3 route done"
244 );
245
246 Ok(TopologySyncResult {
247 scanned: 0, ingested: deltas.len(),
249 distributed,
250 transfers_created,
251 conflicts: dist_result.conflicts,
252 })
253 }
254
255 pub async fn sync_route(
263 &self,
264 src: &LocationId,
265 dest: &LocationId,
266 ) -> Result<TopologySyncResult, SyncError> {
267 let active_tfs = self.topology_files.list_active(None, None).await?;
268 let active_tf_refs: Vec<&TopologyFile> = active_tfs.iter().collect();
269 let file_ids: Vec<&str> = active_tfs.iter().map(|tf| tf.id()).collect();
270 let lf_map = self.location_files.list_by_files(&file_ids).await?;
271 let lf_ref_map = to_ref_map(&lf_map);
272
273 let mut ingest_origins = HashMap::new();
275 for tf in &active_tfs {
276 if let Some(lfs) = lf_map.get(tf.id()) {
278 if lfs
279 .iter()
280 .any(|lf| lf.location_id() == src && lf.state().is_source_eligible())
281 {
282 ingest_origins
283 .entry(tf.id().to_string())
284 .or_insert_with(HashSet::new)
285 .insert(src.clone());
286 }
287 }
288 }
289
290 let dist_result = distribute_actions(
291 &active_tf_refs,
292 &lf_ref_map,
293 std::slice::from_ref(dest),
294 &ingest_origins,
295 );
296
297 let distributed = dist_result.actions.len();
298
299 let pending_dests = self.collect_pending_dests().await?;
301 let transfers: Vec<PlannedTransfer> = dist_result
302 .actions
303 .iter()
304 .filter_map(|action| {
305 let file_id = action.topology_file_id();
306 let empty = HashSet::new();
307 let pending = pending_dests.get(file_id).unwrap_or(&empty);
308 if pending.contains(dest) {
309 return None;
310 }
311 Some(PlannedTransfer {
312 file_id: file_id.to_string(),
313 src: src.clone(),
314 dest: dest.clone(),
315 kind: if action.is_delete() {
316 TransferKind::Delete
317 } else {
318 TransferKind::Sync
319 },
320 depends_on_index: None,
321 })
322 })
323 .collect();
324
325 let transfers_created = self.create_transfers(&transfers).await?;
326
327 Ok(TopologySyncResult {
328 scanned: 0,
329 ingested: 0,
330 distributed,
331 transfers_created,
332 conflicts: dist_result.conflicts,
333 })
334 }
335
336 pub async fn put(
345 &self,
346 relative_path: &str,
347 file_type: FileType,
348 fingerprint: FileFingerprint,
349 origin: &LocationId,
350 embedded_id: Option<String>,
351 ) -> Result<TopologyPutResult, SyncError> {
352 let existing = self.topology_files.get_by_path(relative_path).await?;
354
355 let (tf, is_new) = if let Some(mut tf) = existing {
356 tf.promote_canonical_digest(&fingerprint);
358 self.topology_files.upsert(&tf).await?;
359 (tf, false)
360 } else {
361 let mut tf = TopologyFile::new(relative_path.to_string(), file_type)
363 .map_err(SyncError::Domain)?;
364 tf.promote_canonical_digest(&fingerprint);
365 self.topology_files.upsert(&tf).await?;
366 (tf, true)
367 };
368
369 let existing_lf = self.location_files.get(tf.id(), origin).await?;
371 match existing_lf {
372 Some(mut lf) => {
373 lf.update_fingerprint(fingerprint.clone(), embedded_id);
374 self.location_files.upsert(&lf).await?;
375 }
376 None => {
377 let lf = tf
378 .materialize(
379 origin.clone(),
380 relative_path.to_string(),
381 fingerprint.clone(),
382 embedded_id,
383 )
384 .map_err(SyncError::Domain)?;
385 self.location_files.upsert(&lf).await?;
386 }
387 }
388
389 let mut ingest_origins = HashMap::new();
391 ingest_origins.insert(tf.id().to_string(), HashSet::from([origin.clone()]));
392
393 let lfs = self.location_files.list_by_file(tf.id()).await?;
394 let mut lf_map: HashMap<String, Vec<&LocationFile>> = HashMap::new();
395 lf_map.insert(tf.id().to_string(), lfs.iter().collect());
396
397 let dist_result = distribute_actions(&[&tf], &lf_map, &self.locations, &ingest_origins);
398
399 let pending_dests = self.collect_pending_dests().await?;
400 let sync_actions: Vec<_> = dist_result
401 .actions
402 .iter()
403 .filter(|a| !a.is_delete())
404 .cloned()
405 .collect();
406 let existing_presences: HashMap<String, HashSet<LocationId>> = {
407 let locs: HashSet<LocationId> = lfs.iter().map(|lf| lf.location_id().clone()).collect();
408 let mut m = HashMap::new();
409 m.insert(tf.id().to_string(), locs);
410 m
411 };
412 let planned = plan_distribution(
413 &sync_actions,
414 &self.graph,
415 &pending_dests,
416 &existing_presences,
417 );
418
419 let transfers_created = self.create_transfers(&planned).await?;
420
421 Ok(TopologyPutResult {
422 topology_file_id: tf.id().to_string(),
423 is_new,
424 transfers_created,
425 })
426 }
427
428 pub async fn get(&self, relative_path: &str) -> Result<Option<TopologyFileView>, SyncError> {
430 let tf = match self.topology_files.get_by_path(relative_path).await? {
431 Some(tf) => tf,
432 None => return Ok(None),
433 };
434 let lfs = self.location_files.list_by_file(tf.id()).await?;
435 Ok(Some(TopologyFileView {
436 topology_file: tf,
437 location_files: lfs,
438 }))
439 }
440
441 pub async fn list(
443 &self,
444 file_type: Option<FileType>,
445 limit: Option<usize>,
446 ) -> Result<Vec<TopologyFileView>, SyncError> {
447 let tfs = self.topology_files.list_active(file_type, limit).await?;
448 let file_ids: Vec<&str> = tfs.iter().map(|tf| tf.id()).collect();
449 let lf_map = self.location_files.list_by_files(&file_ids).await?;
450
451 let views = tfs
452 .into_iter()
453 .map(|tf| {
454 let lfs = lf_map.get(tf.id()).cloned().unwrap_or_default();
455 TopologyFileView {
456 topology_file: tf,
457 location_files: lfs,
458 }
459 })
460 .collect();
461 Ok(views)
462 }
463
464 pub async fn delete(&self, relative_path: &str) -> Result<usize, SyncError> {
469 let mut tf = self
470 .topology_files
471 .get_by_path(relative_path)
472 .await?
473 .ok_or_else(|| SyncError::NotRegistered(relative_path.to_string()))?;
474
475 tf.mark_deleted();
476 self.topology_files.upsert(&tf).await?;
477
478 let lfs = self.location_files.list_by_file(tf.id()).await?;
480 let pending_dests = self.collect_pending_dests().await?;
481 let empty = HashSet::new();
482 let pending = pending_dests.get(tf.id()).unwrap_or(&empty);
483
484 let mut created = 0;
485 for lf in &lfs {
486 let dest = lf.location_id().clone();
487 if pending.contains(&dest) {
488 continue;
489 }
490 let src = self
492 .locations
493 .iter()
494 .find(|l| *l != &dest)
495 .cloned()
496 .unwrap_or_else(|| dest.clone());
497 if src == dest {
498 continue;
500 }
501 let transfer = Transfer::new_delete(tf.id().to_string(), src, dest)?;
502 self.transfers.insert_transfer(&transfer).await?;
503 created += 1;
504 }
505
506 Ok(created)
507 }
508
509 pub async fn file_count(&self) -> Result<usize, SyncError> {
515 Ok(self.topology_files.count_active().await?)
516 }
517
518 pub fn locations(&self) -> &[LocationId] {
520 &self.locations
521 }
522
523 async fn apply_ingest(
536 &self,
537 deltas: &[TopologyDelta],
538 ) -> Result<HashMap<String, HashSet<LocationId>>, SyncError> {
539 let mut ingest_origins: HashMap<String, HashSet<LocationId>> = HashMap::new();
540
541 let mut sorted_deltas: Vec<&TopologyDelta> = deltas.iter().collect();
543 sorted_deltas.sort_by_key(|d| match d {
544 TopologyDelta::Renamed(_) => 0,
545 TopologyDelta::ContentChanged(_) => 1,
546 TopologyDelta::Discovered(_) => 2,
547 TopologyDelta::Vanished(_) => 3,
548 });
549
550 let total = sorted_deltas.len();
551 let log_interval = (total / 10).max(1);
552
553 for (i, delta) in sorted_deltas.iter().enumerate() {
554 if i % log_interval == 0 {
555 info!(progress = i, total = total, "apply_ingest: processing");
556 }
557 match delta {
558 TopologyDelta::Discovered(d) => {
559 let existing = self.topology_files.get_by_path(&d.relative_path).await?;
562 let is_new = existing.is_none();
563 let mut tf = if let Some(existing) = existing {
564 trace!(
565 path = %d.relative_path,
566 tf_id = %existing.id(),
567 origin = %d.origin,
568 "apply_ingest: Discovered — reusing existing TopologyFile"
569 );
570 existing
571 } else {
572 trace!(
573 path = %d.relative_path,
574 origin = %d.origin,
575 size = d.fingerprint.size,
576 content_digest = ?d.fingerprint.content_digest,
577 "apply_ingest: Discovered — creating new TopologyFile"
578 );
579 TopologyFile::new(d.relative_path.clone(), d.file_type)
580 .map_err(SyncError::Domain)?
581 };
582 tf.promote_canonical_digest(&d.fingerprint);
583 self.topology_files.upsert(&tf).await?;
584
585 let lf = tf
586 .materialize(
587 d.origin.clone(),
588 d.relative_path.clone(),
589 d.fingerprint.clone(),
590 d.embedded_id.clone(),
591 )
592 .map_err(SyncError::Domain)?;
593 self.location_files.upsert(&lf).await?;
594
595 if is_new {
596 debug!(
597 path = %d.relative_path,
598 tf_id = %tf.id(),
599 origin = %d.origin,
600 "apply_ingest: NEW file registered"
601 );
602 }
603
604 ingest_origins
605 .entry(tf.id().to_string())
606 .or_default()
607 .insert(d.origin.clone());
608 }
609 TopologyDelta::ContentChanged(c) => {
610 trace!(
611 path = %c.relative_path,
612 tf_id = %c.topology_file_id,
613 origin = %c.origin,
614 old_size = c.old_fingerprint.size,
615 new_size = c.new_fingerprint.size,
616 "apply_ingest: ContentChanged"
617 );
618 if let Some(mut tf) = self.topology_files.get_by_id(&c.topology_file_id).await?
620 {
621 tf.promote_canonical_digest(&c.new_fingerprint);
622 self.topology_files.upsert(&tf).await?;
623 }
624
625 let existing_lf = self
627 .location_files
628 .get(&c.topology_file_id, &c.origin)
629 .await?;
630 match existing_lf {
631 Some(mut lf) => {
632 lf.update_fingerprint(c.new_fingerprint.clone(), c.embedded_id.clone());
633 self.location_files.upsert(&lf).await?;
634 }
635 None => {
636 if let Some(tf) =
637 self.topology_files.get_by_id(&c.topology_file_id).await?
638 {
639 let lf = tf
640 .materialize(
641 c.origin.clone(),
642 c.relative_path.clone(),
643 c.new_fingerprint.clone(),
644 c.embedded_id.clone(),
645 )
646 .map_err(SyncError::Domain)?;
647 self.location_files.upsert(&lf).await?;
648 }
649 }
650 }
651
652 let all_lfs = self
654 .location_files
655 .list_by_file(&c.topology_file_id)
656 .await?;
657 for stale_lf in
658 location_file::stale_candidates(&all_lfs, &c.origin, &c.new_fingerprint)
659 {
660 let mut lf = stale_lf.clone();
661 lf.mark_stale();
662 self.location_files.upsert(&lf).await?;
663 }
664
665 ingest_origins
666 .entry(c.topology_file_id.clone())
667 .or_default()
668 .insert(c.origin.clone());
669 }
670 TopologyDelta::Renamed(r) => {
671 trace!(
672 tf_id = %r.topology_file_id,
673 old_path = %r.old_path,
674 new_path = %r.new_path,
675 origin = %r.origin,
676 "apply_ingest: Renamed"
677 );
678 if let Some(mut tf) = self.topology_files.get_by_id(&r.topology_file_id).await?
679 {
680 tf.update_path(r.new_path.clone());
681 tf.promote_canonical_digest(&r.fingerprint);
682 self.topology_files.upsert(&tf).await?;
683 }
684
685 let existing_lf = self
687 .location_files
688 .get(&r.topology_file_id, &r.origin)
689 .await?;
690 match existing_lf {
691 Some(mut lf) => {
692 lf.update_fingerprint(r.fingerprint.clone(), r.embedded_id.clone());
693 self.location_files.upsert(&lf).await?;
694 }
695 None => {
696 if let Some(tf) =
697 self.topology_files.get_by_id(&r.topology_file_id).await?
698 {
699 let lf = tf
700 .materialize(
701 r.origin.clone(),
702 r.new_path.clone(),
703 r.fingerprint.clone(),
704 r.embedded_id.clone(),
705 )
706 .map_err(SyncError::Domain)?;
707 self.location_files.upsert(&lf).await?;
708 }
709 }
710 }
711
712 ingest_origins
713 .entry(r.topology_file_id.clone())
714 .or_default()
715 .insert(r.origin.clone());
716 }
717 TopologyDelta::Vanished(v) => {
718 trace!(
719 path = %v.relative_path,
720 tf_id = %v.topology_file_id,
721 origin = %v.origin,
722 "apply_ingest: Vanished"
723 );
724 let existing_lf = self
726 .location_files
727 .get(&v.topology_file_id, &v.origin)
728 .await?;
729 if let Some(mut lf) = existing_lf {
730 lf.mark_missing();
731 self.location_files.upsert(&lf).await?;
732 }
733 }
738 }
739 }
740
741 info!(
742 processed = total,
743 origins = ingest_origins.len(),
744 "apply_ingest: done"
745 );
746 Ok(ingest_origins)
747 }
748
749 async fn create_transfers(&self, planned: &[PlannedTransfer]) -> Result<usize, SyncError> {
755 let mut created = 0;
756 let mut transfer_ids: Vec<String> = Vec::with_capacity(planned.len());
758
759 for pt in planned.iter() {
760 trace!(
761 file_id = %pt.file_id,
762 src = %pt.src,
763 dest = %pt.dest,
764 kind = ?pt.kind,
765 depends_on = ?pt.depends_on_index,
766 "create_transfers: creating transfer"
767 );
768 let transfer = if let Some(dep_idx) = pt.depends_on_index {
769 let dep_id = &transfer_ids[dep_idx];
770 Transfer::with_dependency(
771 pt.file_id.clone(),
772 pt.src.clone(),
773 pt.dest.clone(),
774 pt.kind,
775 dep_id.clone(),
776 )?
777 } else {
778 Transfer::with_kind(pt.file_id.clone(), pt.src.clone(), pt.dest.clone(), pt.kind)?
779 };
780 self.transfers.insert_transfer(&transfer).await?;
781 transfer_ids.push(transfer.id().to_string());
782 created += 1;
783 }
784
785 trace!(created = created, "create_transfers: done");
786 Ok(created)
787 }
788
789 async fn collect_pending_dests(
791 &self,
792 ) -> Result<HashMap<String, HashSet<LocationId>>, SyncError> {
793 let pending = self.transfers.all_pending_transfers().await?;
794 let mut map: HashMap<String, HashSet<LocationId>> = HashMap::new();
795 for t in &pending {
796 map.entry(t.file_id().to_string())
797 .or_default()
798 .insert(t.dest().clone());
799 }
800 Ok(map)
801 }
802}
803
804#[derive(Debug, Clone, serde::Serialize)]
810pub struct TopologyFileView {
811 pub topology_file: TopologyFile,
812 pub location_files: Vec<LocationFile>,
813}
814
815fn to_ref_map(map: &HashMap<String, Vec<LocationFile>>) -> HashMap<String, Vec<&LocationFile>> {
821 map.iter()
822 .map(|(k, v)| (k.clone(), v.iter().collect()))
823 .collect()
824}
825
826#[cfg(test)]
831mod tests {
832 use super::*;
833 use async_trait::async_trait;
834 use chrono::{DateTime, Utc};
835 use tokio::sync::Mutex;
836
837 use crate::domain::location_file::LocationFileState;
838 use crate::domain::topology_delta::{ContentChangedFile, DiscoveredFile, VanishedFile};
839 use crate::domain::transfer::TransferState;
840 use crate::infra::error::InfraError;
841 use crate::infra::transfer_store::TransferStatRow;
842
843 struct MockTopologyFileStore {
848 files: Mutex<Vec<TopologyFile>>,
849 }
850
851 impl MockTopologyFileStore {
852 fn new() -> Self {
853 Self {
854 files: Mutex::new(Vec::new()),
855 }
856 }
857 }
858
859 #[async_trait]
860 impl TopologyFileStore for MockTopologyFileStore {
861 async fn upsert(&self, file: &TopologyFile) -> Result<(), InfraError> {
862 let mut files = self.files.lock().await;
863 if let Some(pos) = files.iter().position(|f| f.id() == file.id()) {
864 files[pos] = file.clone();
865 } else {
866 files.push(file.clone());
867 }
868 Ok(())
869 }
870
871 async fn get_by_id(&self, id: &str) -> Result<Option<TopologyFile>, InfraError> {
872 Ok(self
873 .files
874 .lock()
875 .await
876 .iter()
877 .find(|f| f.id() == id)
878 .cloned())
879 }
880
881 async fn get_by_path(&self, path: &str) -> Result<Option<TopologyFile>, InfraError> {
882 Ok(self
883 .files
884 .lock()
885 .await
886 .iter()
887 .find(|f| f.relative_path() == path && f.deleted_at().is_none())
888 .cloned())
889 }
890
891 async fn find_by_canonical_hash(
892 &self,
893 hash: &str,
894 ) -> Result<Option<TopologyFile>, InfraError> {
895 Ok(self
896 .files
897 .lock()
898 .await
899 .iter()
900 .find(|f| f.canonical_hash() == Some(hash) && f.deleted_at().is_none())
901 .cloned())
902 }
903
904 async fn list_active(
905 &self,
906 file_type: Option<FileType>,
907 limit: Option<usize>,
908 ) -> Result<Vec<TopologyFile>, InfraError> {
909 let files = self.files.lock().await;
910 let mut result: Vec<_> = files
911 .iter()
912 .filter(|f| f.deleted_at().is_none())
913 .filter(|f| file_type.is_none_or(|ft| f.file_type() == ft))
914 .cloned()
915 .collect();
916 if let Some(n) = limit {
917 result.truncate(n);
918 }
919 Ok(result)
920 }
921
922 async fn list_deleted(&self) -> Result<Vec<TopologyFile>, InfraError> {
923 Ok(self
924 .files
925 .lock()
926 .await
927 .iter()
928 .filter(|f| f.deleted_at().is_some())
929 .cloned()
930 .collect())
931 }
932
933 async fn hard_delete(&self, id: &str) -> Result<bool, InfraError> {
934 let mut files = self.files.lock().await;
935 let len_before = files.len();
936 files.retain(|f| !(f.id() == id && f.deleted_at().is_some()));
937 Ok(files.len() < len_before)
938 }
939
940 async fn count_active(&self) -> Result<usize, InfraError> {
941 Ok(self
942 .files
943 .lock()
944 .await
945 .iter()
946 .filter(|f| f.deleted_at().is_none())
947 .count())
948 }
949
950 async fn list_active_paths(&self) -> Result<Vec<String>, InfraError> {
951 Ok(self
952 .files
953 .lock()
954 .await
955 .iter()
956 .filter(|f| f.deleted_at().is_none())
957 .map(|f| f.relative_path().to_string())
958 .collect())
959 }
960 }
961
962 struct MockLocationFileStore {
963 files: Mutex<Vec<LocationFile>>,
964 }
965
966 impl MockLocationFileStore {
967 fn new() -> Self {
968 Self {
969 files: Mutex::new(Vec::new()),
970 }
971 }
972 }
973
974 #[async_trait]
975 impl LocationFileStore for MockLocationFileStore {
976 async fn upsert(&self, file: &LocationFile) -> Result<(), InfraError> {
977 let mut files = self.files.lock().await;
978 if let Some(pos) = files.iter().position(|f| {
979 f.file_id() == file.file_id() && f.location_id() == file.location_id()
980 }) {
981 files[pos] = file.clone();
982 } else {
983 files.push(file.clone());
984 }
985 Ok(())
986 }
987
988 async fn get(
989 &self,
990 file_id: &str,
991 location_id: &LocationId,
992 ) -> Result<Option<LocationFile>, InfraError> {
993 Ok(self
994 .files
995 .lock()
996 .await
997 .iter()
998 .find(|f| f.file_id() == file_id && f.location_id() == location_id)
999 .cloned())
1000 }
1001
1002 async fn list_by_file(&self, file_id: &str) -> Result<Vec<LocationFile>, InfraError> {
1003 Ok(self
1004 .files
1005 .lock()
1006 .await
1007 .iter()
1008 .filter(|f| f.file_id() == file_id)
1009 .cloned()
1010 .collect())
1011 }
1012
1013 async fn list_by_location(
1014 &self,
1015 location_id: &LocationId,
1016 ) -> Result<Vec<LocationFile>, InfraError> {
1017 Ok(self
1018 .files
1019 .lock()
1020 .await
1021 .iter()
1022 .filter(|f| f.location_id() == location_id)
1023 .cloned()
1024 .collect())
1025 }
1026
1027 async fn list_by_files(
1028 &self,
1029 file_ids: &[&str],
1030 ) -> Result<HashMap<String, Vec<LocationFile>>, InfraError> {
1031 let files = self.files.lock().await;
1032 let mut map: HashMap<String, Vec<LocationFile>> = HashMap::new();
1033 for f in files.iter() {
1034 if file_ids.contains(&f.file_id()) {
1035 map.entry(f.file_id().to_string())
1036 .or_default()
1037 .push(f.clone());
1038 }
1039 }
1040 Ok(map)
1041 }
1042
1043 async fn delete(
1044 &self,
1045 file_id: &str,
1046 location_id: &LocationId,
1047 ) -> Result<bool, InfraError> {
1048 let mut files = self.files.lock().await;
1049 let before = files.len();
1050 files.retain(|f| !(f.file_id() == file_id && f.location_id() == location_id));
1051 Ok(files.len() < before)
1052 }
1053
1054 async fn count_by_location(&self, location_id: &LocationId) -> Result<usize, InfraError> {
1055 Ok(self
1056 .files
1057 .lock()
1058 .await
1059 .iter()
1060 .filter(|f| f.location_id() == location_id)
1061 .count())
1062 }
1063 }
1064
1065 struct MockTransferStore {
1066 transfers: Mutex<Vec<Transfer>>,
1067 }
1068
1069 impl MockTransferStore {
1070 fn new() -> Self {
1071 Self {
1072 transfers: Mutex::new(Vec::new()),
1073 }
1074 }
1075 }
1076
1077 #[async_trait]
1078 impl TransferStore for MockTransferStore {
1079 async fn insert_transfer(&self, transfer: &Transfer) -> Result<(), InfraError> {
1080 self.transfers.lock().await.push(transfer.clone());
1081 Ok(())
1082 }
1083
1084 async fn update_transfer(&self, transfer: &Transfer) -> Result<(), InfraError> {
1085 let mut transfers = self.transfers.lock().await;
1086 if let Some(pos) = transfers.iter().position(|t| t.id() == transfer.id()) {
1087 transfers[pos] = transfer.clone();
1088 }
1089 Ok(())
1090 }
1091
1092 async fn queued_transfers(&self, dest: &LocationId) -> Result<Vec<Transfer>, InfraError> {
1093 Ok(self
1094 .transfers
1095 .lock()
1096 .await
1097 .iter()
1098 .filter(|t| t.dest() == dest && t.state() == TransferState::Queued)
1099 .cloned()
1100 .collect())
1101 }
1102
1103 async fn latest_transfers_by_file(
1104 &self,
1105 file_id: &str,
1106 ) -> Result<Vec<Transfer>, InfraError> {
1107 Ok(self
1108 .transfers
1109 .lock()
1110 .await
1111 .iter()
1112 .filter(|t| t.file_id() == file_id)
1113 .cloned()
1114 .collect())
1115 }
1116
1117 async fn failed_transfers(&self) -> Result<Vec<Transfer>, InfraError> {
1118 Ok(Vec::new())
1119 }
1120
1121 async fn prune_completed(&self, _before: DateTime<Utc>) -> Result<usize, InfraError> {
1122 Ok(0)
1123 }
1124
1125 async fn count_queued(&self) -> Result<usize, InfraError> {
1126 Ok(self
1127 .transfers
1128 .lock()
1129 .await
1130 .iter()
1131 .filter(|t| t.state() == TransferState::Queued)
1132 .count())
1133 }
1134
1135 async fn cancel_orphaned_inflight(&self) -> Result<usize, InfraError> {
1136 Ok(0)
1137 }
1138
1139 async fn unblock_dependents(
1140 &self,
1141 _completed_transfer_id: &str,
1142 ) -> Result<usize, InfraError> {
1143 Ok(0)
1144 }
1145
1146 async fn all_pending_transfers(&self) -> Result<Vec<Transfer>, InfraError> {
1147 Ok(self
1148 .transfers
1149 .lock()
1150 .await
1151 .iter()
1152 .filter(|t| {
1153 t.state() == TransferState::Queued || t.state() == TransferState::Blocked
1154 })
1155 .cloned()
1156 .collect())
1157 }
1158
1159 async fn transfer_stats(&self) -> Result<Vec<TransferStatRow>, InfraError> {
1160 Ok(Vec::new())
1161 }
1162
1163 async fn present_counts_by_location(
1164 &self,
1165 ) -> Result<HashMap<LocationId, usize>, InfraError> {
1166 Ok(HashMap::new())
1167 }
1168 }
1169
1170 fn loc(name: &str) -> LocationId {
1175 LocationId::new(name).expect("valid location name")
1176 }
1177
1178 fn fp(hash: &str, size: u64) -> FileFingerprint {
1179 use crate::domain::digest::{ByteDigest, ContentDigest};
1180 FileFingerprint {
1181 byte_digest: Some(ByteDigest::Djb2(hash.to_string())),
1182 content_digest: Some(ContentDigest(hash.to_string())),
1183 meta_digest: None,
1184 size,
1185 modified_at: None,
1186 }
1187 }
1188
1189 fn three_loc_setup() -> (RouteGraph, Vec<LocationId>) {
1191 let local = loc("local");
1192 let pod = loc("pod");
1193 let cloud = loc("cloud");
1194 let mut g = RouteGraph::new();
1195 g.add(local.clone(), pod.clone());
1196 g.add(pod.clone(), cloud.clone());
1197 g.add(pod.clone(), local.clone());
1198 g.add(cloud.clone(), pod.clone());
1199 (g, vec![local, pod, cloud])
1200 }
1201
1202 fn make_store(
1203 tf: Arc<MockTopologyFileStore>,
1204 lf: Arc<MockLocationFileStore>,
1205 tr: Arc<MockTransferStore>,
1206 ) -> TopologyStore {
1207 let (graph, locations) = three_loc_setup();
1208 TopologyStore::new(tf, lf, tr, graph, locations)
1209 }
1210
1211 fn discovered(path: &str, hash: &str, origin: &str) -> TopologyDelta {
1212 TopologyDelta::Discovered(DiscoveredFile {
1213 relative_path: path.to_string(),
1214 file_type: FileType::Image,
1215 fingerprint: fp(hash, 1024),
1216 origin: loc(origin),
1217 embedded_id: None,
1218 })
1219 }
1220
1221 #[tokio::test]
1229 async fn pipeline_discovered_creates_topology_and_routes_transfers() {
1230 let tf_s = Arc::new(MockTopologyFileStore::new());
1231 let lf_s = Arc::new(MockLocationFileStore::new());
1232 let tr_s = Arc::new(MockTransferStore::new());
1233 let store = make_store(tf_s.clone(), lf_s.clone(), tr_s.clone());
1234
1235 let result = store
1236 .sync(&[discovered("output/gen-001.png", "abc123", "local")])
1237 .await
1238 .unwrap();
1239
1240 let tfs = tf_s.files.lock().await;
1242 assert_eq!(tfs.len(), 1);
1243 assert_eq!(tfs[0].relative_path(), "output/gen-001.png");
1244 assert!(
1245 tfs[0].canonical_hash().is_some(),
1246 "canonical_hash should be promoted from fingerprint"
1247 );
1248
1249 let lfs = lf_s.files.lock().await;
1250 assert_eq!(lfs.len(), 1);
1251 assert_eq!(lfs[0].location_id(), &loc("local"));
1252 assert_eq!(lfs[0].state(), LocationFileState::Active);
1253
1254 assert_eq!(result.ingested, 1);
1256 assert!(result.distributed >= 2, "pod + cloud へのDistributeAction");
1257
1258 let transfers = tr_s.transfers.lock().await;
1260 assert!(
1261 !transfers.is_empty(),
1262 "Transfers should be created for reachable locations"
1263 );
1264 let tf_id = tfs[0].id();
1265 for t in transfers.iter() {
1266 assert_eq!(t.file_id(), tf_id, "All transfers for same file");
1267 assert_ne!(t.src(), t.dest(), "No self-transfer");
1268 }
1269 }
1270
1271 #[tokio::test]
1279 async fn pipeline_content_changed_stales_others_and_creates_transfers() {
1280 let tf_s = Arc::new(MockTopologyFileStore::new());
1281 let lf_s = Arc::new(MockLocationFileStore::new());
1282 let tr_s = Arc::new(MockTransferStore::new());
1283 let store = make_store(tf_s.clone(), lf_s.clone(), tr_s.clone());
1284
1285 store
1287 .sync(&[discovered("output/img.png", "v1_hash", "local")])
1288 .await
1289 .unwrap();
1290
1291 let tf_id = tf_s.files.lock().await[0].id().to_string();
1292
1293 let pod_lf = LocationFile::new(
1295 tf_id.clone(),
1296 loc("pod"),
1297 "output/img.png".to_string(),
1298 fp("v1_hash", 1024),
1299 None,
1300 )
1301 .unwrap();
1302 lf_s.upsert(&pod_lf).await.unwrap();
1303
1304 tr_s.transfers.lock().await.clear();
1306
1307 let delta = TopologyDelta::ContentChanged(ContentChangedFile {
1309 topology_file_id: tf_id.clone(),
1310 relative_path: "output/img.png".to_string(),
1311 file_type: FileType::Image,
1312 old_fingerprint: fp("v1_hash", 1024),
1313 new_fingerprint: fp("v2_hash", 2048),
1314 origin: loc("local"),
1315 embedded_id: None,
1316 });
1317
1318 let result = store.sync(&[delta]).await.unwrap();
1319
1320 let pod_lf = lf_s.get(&tf_id, &loc("pod")).await.unwrap().unwrap();
1322 assert_eq!(pod_lf.state(), LocationFileState::Stale);
1323
1324 let local_lf = lf_s.get(&tf_id, &loc("local")).await.unwrap().unwrap();
1326 assert_eq!(local_lf.state(), LocationFileState::Active);
1327
1328 assert!(result.distributed > 0);
1330 assert!(result.transfers_created > 0);
1331 }
1332
1333 #[tokio::test]
1341 async fn pipeline_vanished_marks_location_file_missing() {
1342 let tf_s = Arc::new(MockTopologyFileStore::new());
1343 let lf_s = Arc::new(MockLocationFileStore::new());
1344 let tr_s = Arc::new(MockTransferStore::new());
1345 let store = make_store(tf_s.clone(), lf_s.clone(), tr_s.clone());
1346
1347 store
1348 .sync(&[discovered("output/gone.png", "gone_hash", "local")])
1349 .await
1350 .unwrap();
1351
1352 let tf_id = tf_s.files.lock().await[0].id().to_string();
1353
1354 let delta = TopologyDelta::Vanished(VanishedFile {
1355 topology_file_id: tf_id.clone(),
1356 relative_path: "output/gone.png".to_string(),
1357 origin: loc("local"),
1358 });
1359
1360 store.sync(&[delta]).await.unwrap();
1361
1362 let lf = lf_s.get(&tf_id, &loc("local")).await.unwrap().unwrap();
1364 assert_eq!(lf.state(), LocationFileState::Missing);
1365
1366 let tf = tf_s.files.lock().await;
1368 assert!(tf[0].deleted_at().is_none());
1369 }
1370
1371 #[tokio::test]
1378 async fn pipeline_batch_discovered_multi_origin() {
1379 let tf_s = Arc::new(MockTopologyFileStore::new());
1380 let lf_s = Arc::new(MockLocationFileStore::new());
1381 let tr_s = Arc::new(MockTransferStore::new());
1382 let store = make_store(tf_s.clone(), lf_s.clone(), tr_s.clone());
1383
1384 let deltas = vec![
1385 discovered("a.png", "ha", "local"),
1386 discovered("b.png", "hb", "local"),
1387 discovered("c.png", "hc", "pod"), ];
1389
1390 let result = store.sync(&deltas).await.unwrap();
1391
1392 assert_eq!(result.ingested, 3);
1393 assert_eq!(tf_s.files.lock().await.len(), 3);
1394 assert_eq!(lf_s.files.lock().await.len(), 3);
1395
1396 assert!(result.distributed >= 3);
1398 assert!(result.transfers_created >= 3);
1399 }
1400
1401 #[tokio::test]
1408 async fn pipeline_put_then_empty_sync_is_consistent() {
1409 let tf_s = Arc::new(MockTopologyFileStore::new());
1410 let lf_s = Arc::new(MockLocationFileStore::new());
1411 let tr_s = Arc::new(MockTransferStore::new());
1412 let store = make_store(tf_s.clone(), lf_s.clone(), tr_s.clone());
1413
1414 let put_result = store
1416 .put("x.png", FileType::Image, fp("xh", 100), &loc("local"), None)
1417 .await
1418 .unwrap();
1419
1420 assert!(put_result.is_new);
1421 let initial_transfers = tr_s.transfers.lock().await.len();
1422 assert!(initial_transfers > 0);
1423
1424 let sync_result = store.sync(&[]).await.unwrap();
1426
1427 assert_eq!(sync_result.ingested, 0);
1428 }
1431
1432 #[tokio::test]
1439 async fn pipeline_delete_creates_delete_transfers() {
1440 let tf_s = Arc::new(MockTopologyFileStore::new());
1441 let lf_s = Arc::new(MockLocationFileStore::new());
1442 let tr_s = Arc::new(MockTransferStore::new());
1443 let store = make_store(tf_s.clone(), lf_s.clone(), tr_s.clone());
1444
1445 store
1447 .put(
1448 "del.png",
1449 FileType::Image,
1450 fp("dh", 100),
1451 &loc("local"),
1452 None,
1453 )
1454 .await
1455 .unwrap();
1456
1457 let tf_id = tf_s.files.lock().await[0].id().to_string();
1459 let pod_lf = LocationFile::new(
1460 tf_id.clone(),
1461 loc("pod"),
1462 "del.png".to_string(),
1463 fp("dh", 100),
1464 None,
1465 )
1466 .unwrap();
1467 lf_s.upsert(&pod_lf).await.unwrap();
1468
1469 tr_s.transfers.lock().await.clear();
1471
1472 let delete_count = store.delete("del.png").await.unwrap();
1474
1475 let tf = tf_s.files.lock().await;
1477 assert!(tf[0].deleted_at().is_some());
1478
1479 assert_eq!(delete_count, 2, "Delete Transfer for local + pod");
1482 let transfers = tr_s.transfers.lock().await;
1483 assert_eq!(transfers.len(), 2);
1484 for t in transfers.iter() {
1485 assert!(t.is_delete(), "All should be Delete kind");
1486 assert_ne!(t.src(), t.dest(), "No self-transfer");
1487 }
1488 }
1489}