1use std::collections::{HashMap, HashSet};
21use std::sync::Arc;
22
23use crate::application::error::SyncError;
24use crate::domain::file_type::FileType;
25use crate::domain::fingerprint::FileFingerprint;
26use crate::domain::graph::RouteGraph;
27use crate::domain::location::LocationId;
28use crate::domain::location_file::{self, LocationFile};
29use crate::domain::plan::{plan_distribution, PlannedTransfer};
30use tracing::{debug, info, trace};
31
32use crate::domain::distribute::distribute_actions;
33use crate::domain::topology_delta::TopologyDelta;
34use crate::domain::topology_file::TopologyFile;
35use crate::domain::transfer::{Transfer, TransferKind};
36use crate::infra::location_file_store::LocationFileStore;
37use crate::infra::topology_file_store::TopologyFileStore;
38use crate::infra::transfer_store::TransferStore;
39
40#[derive(Debug, Clone, Default, serde::Serialize)]
46pub struct TopologySyncResult {
47 pub scanned: usize,
49 pub ingested: usize,
51 pub distributed: usize,
53 pub transfers_created: usize,
55 pub conflicts: Vec<crate::domain::distribute::ConflictEntry>,
60}
61
62#[derive(Debug, serde::Serialize)]
64pub struct TopologyPutResult {
65 pub topology_file_id: String,
67 pub is_new: bool,
69 pub transfers_created: usize,
71}
72
73pub struct TopologyStore {
81 topology_files: Arc<dyn TopologyFileStore>,
82 location_files: Arc<dyn LocationFileStore>,
83 transfers: Arc<dyn TransferStore>,
84 graph: RouteGraph,
85 locations: Vec<LocationId>,
87}
88
89impl TopologyStore {
90 pub fn new(
91 topology_files: Arc<dyn TopologyFileStore>,
92 location_files: Arc<dyn LocationFileStore>,
93 transfers: Arc<dyn TransferStore>,
94 graph: RouteGraph,
95 locations: Vec<LocationId>,
96 ) -> Self {
97 Self {
98 topology_files,
99 location_files,
100 transfers,
101 graph,
102 locations,
103 }
104 }
105
106 pub async fn sync(&self, deltas: &[TopologyDelta]) -> Result<TopologySyncResult, SyncError> {
122 info!(delta_count = deltas.len(), "topology_store::sync: start");
123
124 let ingest_origins = self.apply_ingest(deltas).await?;
126 info!(
127 origins = ingest_origins.len(),
128 "topology_store::sync: phase1 apply done"
129 );
130
131 let active_tfs = self.topology_files.list_active(None, None).await?;
133 let active_tf_refs: Vec<&TopologyFile> = active_tfs.iter().collect();
134 let file_ids: Vec<&str> = active_tfs.iter().map(|tf| tf.id()).collect();
135 debug!(
136 active_files = active_tfs.len(),
137 "topology_store::sync: loaded active topology_files"
138 );
139
140 let lf_map = self.location_files.list_by_files(&file_ids).await?;
141 let lf_ref_map = to_ref_map(&lf_map);
142 debug!(
143 location_file_groups = lf_map.len(),
144 "topology_store::sync: loaded location_files"
145 );
146
147 let dist_result = distribute_actions(
148 &active_tf_refs,
149 &lf_ref_map,
150 &self.locations,
151 &ingest_origins,
152 );
153 info!(
154 actions = dist_result.actions.len(),
155 conflicts = dist_result.conflicts.len(),
156 "topology_store::sync: phase2 distribute done"
157 );
158
159 let deleted_tfs = self.topology_files.list_deleted().await?;
161 trace!(
162 deleted_tfs = deleted_tfs.len(),
163 "topology_store::sync: checking deleted topology_files for delete transfers"
164 );
165 let mut delete_transfers_created = 0;
166 let pending_dests = self.collect_pending_dests().await?;
167 for dtf in &deleted_tfs {
168 let lfs = self.location_files.list_by_file(dtf.id()).await?;
169 let empty = HashSet::new();
170 let pending = pending_dests.get(dtf.id()).unwrap_or(&empty);
171 for lf in &lfs {
172 let dest = lf.location_id().clone();
173 if pending.contains(&dest) {
174 trace!(
175 file_id = %dtf.id(),
176 dest = %dest,
177 "topology_store::sync: delete transfer skipped (pending)"
178 );
179 continue;
180 }
181 let src = self
182 .locations
183 .iter()
184 .find(|l| *l != &dest)
185 .cloned()
186 .unwrap_or_else(|| dest.clone());
187 if src == dest {
188 trace!(
189 file_id = %dtf.id(),
190 dest = %dest,
191 "topology_store::sync: delete transfer skipped (single location)"
192 );
193 continue;
194 }
195 trace!(
196 file_id = %dtf.id(),
197 src = %src,
198 dest = %dest,
199 "topology_store::sync: creating delete transfer"
200 );
201 let transfer = Transfer::new_delete(dtf.id().to_string(), src, dest)?;
202 self.transfers.insert_transfer(&transfer).await?;
203 delete_transfers_created += 1;
204 }
205 }
206 if delete_transfers_created > 0 {
207 debug!(
208 count = delete_transfers_created,
209 "topology_store::sync: delete transfers created"
210 );
211 }
212
213 let distributed = dist_result.actions.len();
214
215 let existing_presences: HashMap<String, HashSet<LocationId>> = lf_map
219 .iter()
220 .map(|(file_id, lfs)| {
221 let locs: HashSet<LocationId> = lfs
222 .iter()
223 .filter(|lf| lf.state().is_source_eligible())
224 .map(|lf| lf.location_id().clone())
225 .collect();
226 (file_id.clone(), locs)
227 })
228 .collect();
229 let planned = plan_distribution(
230 &dist_result.actions,
231 &self.graph,
232 &pending_dests,
233 &existing_presences,
234 );
235 debug!(
236 planned_count = planned.len(),
237 "topology_store::sync: phase3 route planned"
238 );
239
240 let transfers_created = self.create_transfers(&planned).await? + delete_transfers_created;
241 info!(
242 transfers_created = transfers_created,
243 "topology_store::sync: phase3 route done"
244 );
245
246 Ok(TopologySyncResult {
247 scanned: 0, ingested: deltas.len(),
249 distributed,
250 transfers_created,
251 conflicts: dist_result.conflicts,
252 })
253 }
254
255 pub async fn sync_route(
263 &self,
264 src: &LocationId,
265 dest: &LocationId,
266 ) -> Result<TopologySyncResult, SyncError> {
267 let active_tfs = self.topology_files.list_active(None, None).await?;
268 let active_tf_refs: Vec<&TopologyFile> = active_tfs.iter().collect();
269 let file_ids: Vec<&str> = active_tfs.iter().map(|tf| tf.id()).collect();
270 let lf_map = self.location_files.list_by_files(&file_ids).await?;
271 let lf_ref_map = to_ref_map(&lf_map);
272
273 let mut ingest_origins = HashMap::new();
275 for tf in &active_tfs {
276 if let Some(lfs) = lf_map.get(tf.id()) {
278 if lfs
279 .iter()
280 .any(|lf| lf.location_id() == src && lf.state().is_source_eligible())
281 {
282 ingest_origins
283 .entry(tf.id().to_string())
284 .or_insert_with(HashSet::new)
285 .insert(src.clone());
286 }
287 }
288 }
289
290 let dist_result = distribute_actions(
291 &active_tf_refs,
292 &lf_ref_map,
293 std::slice::from_ref(dest),
294 &ingest_origins,
295 );
296
297 let distributed = dist_result.actions.len();
298
299 let pending_dests = self.collect_pending_dests().await?;
301 let transfers: Vec<PlannedTransfer> = dist_result
302 .actions
303 .iter()
304 .filter_map(|action| {
305 let file_id = action.topology_file_id();
306 let empty = HashSet::new();
307 let pending = pending_dests.get(file_id).unwrap_or(&empty);
308 if pending.contains(dest) {
309 return None;
310 }
311 Some(PlannedTransfer {
312 file_id: file_id.to_string(),
313 src: src.clone(),
314 dest: dest.clone(),
315 kind: if action.is_delete() {
316 TransferKind::Delete
317 } else {
318 TransferKind::Sync
319 },
320 depends_on_index: None,
321 })
322 })
323 .collect();
324
325 let transfers_created = self.create_transfers(&transfers).await?;
326
327 Ok(TopologySyncResult {
328 scanned: 0,
329 ingested: 0,
330 distributed,
331 transfers_created,
332 conflicts: dist_result.conflicts,
333 })
334 }
335
336 pub async fn put(
345 &self,
346 relative_path: &str,
347 file_type: FileType,
348 fingerprint: FileFingerprint,
349 origin: &LocationId,
350 embedded_id: Option<String>,
351 ) -> Result<TopologyPutResult, SyncError> {
352 let existing = self.topology_files.get_by_path(relative_path).await?;
354
355 let (tf, is_new) = if let Some(mut tf) = existing {
356 tf.promote_canonical_digest(&fingerprint);
358 self.topology_files.upsert(&tf).await?;
359 (tf, false)
360 } else {
361 let mut tf = TopologyFile::new(relative_path.to_string(), file_type)
363 .map_err(SyncError::Domain)?;
364 tf.promote_canonical_digest(&fingerprint);
365 self.topology_files.upsert(&tf).await?;
366 (tf, true)
367 };
368
369 let existing_lf = self.location_files.get(tf.id(), origin).await?;
371 match existing_lf {
372 Some(mut lf) => {
373 lf.update_fingerprint(fingerprint.clone(), embedded_id);
374 self.location_files.upsert(&lf).await?;
375 }
376 None => {
377 let lf = tf
378 .materialize(
379 origin.clone(),
380 relative_path.to_string(),
381 fingerprint.clone(),
382 embedded_id,
383 )
384 .map_err(SyncError::Domain)?;
385 self.location_files.upsert(&lf).await?;
386 }
387 }
388
389 let mut ingest_origins = HashMap::new();
391 ingest_origins.insert(tf.id().to_string(), HashSet::from([origin.clone()]));
392
393 let lfs = self.location_files.list_by_file(tf.id()).await?;
394 let mut lf_map: HashMap<String, Vec<&LocationFile>> = HashMap::new();
395 lf_map.insert(tf.id().to_string(), lfs.iter().collect());
396
397 let dist_result = distribute_actions(&[&tf], &lf_map, &self.locations, &ingest_origins);
398
399 let pending_dests = self.collect_pending_dests().await?;
400 let sync_actions: Vec<_> = dist_result
401 .actions
402 .iter()
403 .filter(|a| !a.is_delete())
404 .cloned()
405 .collect();
406 let existing_presences: HashMap<String, HashSet<LocationId>> = {
407 let locs: HashSet<LocationId> = lfs.iter().map(|lf| lf.location_id().clone()).collect();
408 let mut m = HashMap::new();
409 m.insert(tf.id().to_string(), locs);
410 m
411 };
412 let planned = plan_distribution(
413 &sync_actions,
414 &self.graph,
415 &pending_dests,
416 &existing_presences,
417 );
418
419 let transfers_created = self.create_transfers(&planned).await?;
420
421 Ok(TopologyPutResult {
422 topology_file_id: tf.id().to_string(),
423 is_new,
424 transfers_created,
425 })
426 }
427
428 pub async fn get(&self, relative_path: &str) -> Result<Option<TopologyFileView>, SyncError> {
430 let tf = match self.topology_files.get_by_path(relative_path).await? {
431 Some(tf) => tf,
432 None => return Ok(None),
433 };
434 let lfs = self.location_files.list_by_file(tf.id()).await?;
435 Ok(Some(TopologyFileView {
436 topology_file: tf,
437 location_files: lfs,
438 }))
439 }
440
441 pub async fn list(
443 &self,
444 file_type: Option<FileType>,
445 limit: Option<usize>,
446 ) -> Result<Vec<TopologyFileView>, SyncError> {
447 let tfs = self.topology_files.list_active(file_type, limit).await?;
448 let file_ids: Vec<&str> = tfs.iter().map(|tf| tf.id()).collect();
449 let lf_map = self.location_files.list_by_files(&file_ids).await?;
450
451 let views = tfs
452 .into_iter()
453 .map(|tf| {
454 let lfs = lf_map.get(tf.id()).cloned().unwrap_or_default();
455 TopologyFileView {
456 topology_file: tf,
457 location_files: lfs,
458 }
459 })
460 .collect();
461 Ok(views)
462 }
463
464 pub async fn delete(&self, relative_path: &str) -> Result<usize, SyncError> {
469 let mut tf = self
470 .topology_files
471 .get_by_path(relative_path)
472 .await?
473 .ok_or_else(|| SyncError::NotRegistered(relative_path.to_string()))?;
474
475 tf.mark_deleted();
476 self.topology_files.upsert(&tf).await?;
477
478 let lfs = self.location_files.list_by_file(tf.id()).await?;
480 let pending_dests = self.collect_pending_dests().await?;
481 let empty = HashSet::new();
482 let pending = pending_dests.get(tf.id()).unwrap_or(&empty);
483
484 let mut created = 0;
485 for lf in &lfs {
486 let dest = lf.location_id().clone();
487 if pending.contains(&dest) {
488 continue;
489 }
490 let src = self
492 .locations
493 .iter()
494 .find(|l| *l != &dest)
495 .cloned()
496 .unwrap_or_else(|| dest.clone());
497 if src == dest {
498 continue;
500 }
501 let transfer = Transfer::new_delete(tf.id().to_string(), src, dest)?;
502 self.transfers.insert_transfer(&transfer).await?;
503 created += 1;
504 }
505
506 Ok(created)
507 }
508
509 pub async fn file_count(&self) -> Result<usize, SyncError> {
515 Ok(self.topology_files.count_active().await?)
516 }
517
518 pub fn locations(&self) -> &[LocationId] {
520 &self.locations
521 }
522
523 async fn apply_ingest(
536 &self,
537 deltas: &[TopologyDelta],
538 ) -> Result<HashMap<String, HashSet<LocationId>>, SyncError> {
539 let mut ingest_origins: HashMap<String, HashSet<LocationId>> = HashMap::new();
540
541 let mut sorted_deltas: Vec<&TopologyDelta> = deltas.iter().collect();
543 sorted_deltas.sort_by_key(|d| match d {
544 TopologyDelta::Renamed(_) => 0,
545 TopologyDelta::ContentChanged(_) => 1,
546 TopologyDelta::Discovered(_) => 2,
547 TopologyDelta::Vanished(_) => 3,
548 });
549
550 let total = sorted_deltas.len();
551 let log_interval = (total / 10).max(1);
552
553 for (i, delta) in sorted_deltas.iter().enumerate() {
554 if i % log_interval == 0 {
555 info!(progress = i, total = total, "apply_ingest: processing");
556 }
557 match delta {
558 TopologyDelta::Discovered(d) => {
559 let existing = self.topology_files.get_by_path(&d.relative_path).await?;
562 let is_new = existing.is_none();
563 let mut tf = if let Some(existing) = existing {
564 trace!(
565 path = %d.relative_path,
566 tf_id = %existing.id(),
567 origin = %d.origin,
568 "apply_ingest: Discovered — reusing existing TopologyFile"
569 );
570 existing
571 } else {
572 trace!(
573 path = %d.relative_path,
574 origin = %d.origin,
575 size = d.fingerprint.size,
576 content_digest = ?d.fingerprint.content_digest,
577 "apply_ingest: Discovered — creating new TopologyFile"
578 );
579 TopologyFile::new(d.relative_path.clone(), d.file_type)
580 .map_err(SyncError::Domain)?
581 };
582 tf.promote_canonical_digest(&d.fingerprint);
583 self.topology_files.upsert(&tf).await?;
584
585 let lf = tf
586 .materialize(
587 d.origin.clone(),
588 d.relative_path.clone(),
589 d.fingerprint.clone(),
590 d.embedded_id.clone(),
591 )
592 .map_err(SyncError::Domain)?;
593 self.location_files.upsert(&lf).await?;
594
595 if is_new {
596 debug!(
597 path = %d.relative_path,
598 tf_id = %tf.id(),
599 origin = %d.origin,
600 "apply_ingest: NEW file registered"
601 );
602 }
603
604 ingest_origins
605 .entry(tf.id().to_string())
606 .or_default()
607 .insert(d.origin.clone());
608 }
609 TopologyDelta::ContentChanged(c) => {
610 trace!(
611 path = %c.relative_path,
612 tf_id = %c.topology_file_id,
613 origin = %c.origin,
614 old_size = c.old_fingerprint.size,
615 new_size = c.new_fingerprint.size,
616 "apply_ingest: ContentChanged"
617 );
618 if let Some(mut tf) = self.topology_files.get_by_id(&c.topology_file_id).await?
620 {
621 tf.promote_canonical_digest(&c.new_fingerprint);
622 self.topology_files.upsert(&tf).await?;
623 }
624
625 let existing_lf = self
627 .location_files
628 .get(&c.topology_file_id, &c.origin)
629 .await?;
630 match existing_lf {
631 Some(mut lf) => {
632 lf.update_fingerprint(c.new_fingerprint.clone(), c.embedded_id.clone());
633 self.location_files.upsert(&lf).await?;
634 }
635 None => {
636 if let Some(tf) =
637 self.topology_files.get_by_id(&c.topology_file_id).await?
638 {
639 let lf = tf
640 .materialize(
641 c.origin.clone(),
642 c.relative_path.clone(),
643 c.new_fingerprint.clone(),
644 c.embedded_id.clone(),
645 )
646 .map_err(SyncError::Domain)?;
647 self.location_files.upsert(&lf).await?;
648 }
649 }
650 }
651
652 let all_lfs = self
654 .location_files
655 .list_by_file(&c.topology_file_id)
656 .await?;
657 for stale_lf in
658 location_file::stale_candidates(&all_lfs, &c.origin, &c.new_fingerprint)
659 {
660 let mut lf = stale_lf.clone();
661 lf.mark_stale();
662 self.location_files.upsert(&lf).await?;
663 }
664
665 ingest_origins
666 .entry(c.topology_file_id.clone())
667 .or_default()
668 .insert(c.origin.clone());
669 }
670 TopologyDelta::Renamed(r) => {
671 trace!(
672 tf_id = %r.topology_file_id,
673 old_path = %r.old_path,
674 new_path = %r.new_path,
675 origin = %r.origin,
676 "apply_ingest: Renamed"
677 );
678 if let Some(mut tf) = self.topology_files.get_by_id(&r.topology_file_id).await?
679 {
680 tf.update_path(r.new_path.clone());
681 tf.promote_canonical_digest(&r.fingerprint);
682 self.topology_files.upsert(&tf).await?;
683 }
684
685 let existing_lf = self
687 .location_files
688 .get(&r.topology_file_id, &r.origin)
689 .await?;
690 match existing_lf {
691 Some(mut lf) => {
692 lf.update_fingerprint(r.fingerprint.clone(), r.embedded_id.clone());
693 self.location_files.upsert(&lf).await?;
694 }
695 None => {
696 if let Some(tf) =
697 self.topology_files.get_by_id(&r.topology_file_id).await?
698 {
699 let lf = tf
700 .materialize(
701 r.origin.clone(),
702 r.new_path.clone(),
703 r.fingerprint.clone(),
704 r.embedded_id.clone(),
705 )
706 .map_err(SyncError::Domain)?;
707 self.location_files.upsert(&lf).await?;
708 }
709 }
710 }
711
712 ingest_origins
713 .entry(r.topology_file_id.clone())
714 .or_default()
715 .insert(r.origin.clone());
716 }
717 TopologyDelta::Vanished(v) => {
718 trace!(
719 path = %v.relative_path,
720 tf_id = %v.topology_file_id,
721 origin = %v.origin,
722 "apply_ingest: Vanished"
723 );
724 let existing_lf = self
726 .location_files
727 .get(&v.topology_file_id, &v.origin)
728 .await?;
729 if let Some(mut lf) = existing_lf {
730 lf.mark_missing();
731 self.location_files.upsert(&lf).await?;
732 }
733 }
735 }
736 }
737
738 info!(
739 processed = total,
740 origins = ingest_origins.len(),
741 "apply_ingest: done"
742 );
743 Ok(ingest_origins)
744 }
745
746 async fn create_transfers(&self, planned: &[PlannedTransfer]) -> Result<usize, SyncError> {
752 let mut created = 0;
753 let mut transfer_ids: Vec<String> = Vec::with_capacity(planned.len());
755
756 for pt in planned.iter() {
757 trace!(
758 file_id = %pt.file_id,
759 src = %pt.src,
760 dest = %pt.dest,
761 kind = ?pt.kind,
762 depends_on = ?pt.depends_on_index,
763 "create_transfers: creating transfer"
764 );
765 let transfer = if let Some(dep_idx) = pt.depends_on_index {
766 let dep_id = &transfer_ids[dep_idx];
767 Transfer::with_dependency(
768 pt.file_id.clone(),
769 pt.src.clone(),
770 pt.dest.clone(),
771 pt.kind,
772 dep_id.clone(),
773 )?
774 } else {
775 Transfer::with_kind(pt.file_id.clone(), pt.src.clone(), pt.dest.clone(), pt.kind)?
776 };
777 self.transfers.insert_transfer(&transfer).await?;
778 transfer_ids.push(transfer.id().to_string());
779 created += 1;
780 }
781
782 trace!(created = created, "create_transfers: done");
783 Ok(created)
784 }
785
786 async fn collect_pending_dests(
788 &self,
789 ) -> Result<HashMap<String, HashSet<LocationId>>, SyncError> {
790 let pending = self.transfers.all_pending_transfers().await?;
791 let mut map: HashMap<String, HashSet<LocationId>> = HashMap::new();
792 for t in &pending {
793 map.entry(t.file_id().to_string())
794 .or_default()
795 .insert(t.dest().clone());
796 }
797 Ok(map)
798 }
799}
800
801#[derive(Debug, Clone, serde::Serialize)]
807pub struct TopologyFileView {
808 pub topology_file: TopologyFile,
809 pub location_files: Vec<LocationFile>,
810}
811
812fn to_ref_map(map: &HashMap<String, Vec<LocationFile>>) -> HashMap<String, Vec<&LocationFile>> {
818 map.iter()
819 .map(|(k, v)| (k.clone(), v.iter().collect()))
820 .collect()
821}
822
823#[cfg(test)]
828mod tests {
829 use super::*;
830 use async_trait::async_trait;
831 use chrono::{DateTime, Utc};
832 use tokio::sync::Mutex;
833
834 use crate::domain::location_file::LocationFileState;
835 use crate::domain::topology_delta::{ContentChangedFile, DiscoveredFile, VanishedFile};
836 use crate::domain::transfer::TransferState;
837 use crate::infra::error::InfraError;
838 use crate::infra::transfer_store::TransferStatRow;
839
840 struct MockTopologyFileStore {
845 files: Mutex<Vec<TopologyFile>>,
846 }
847
848 impl MockTopologyFileStore {
849 fn new() -> Self {
850 Self {
851 files: Mutex::new(Vec::new()),
852 }
853 }
854 }
855
856 #[async_trait]
857 impl TopologyFileStore for MockTopologyFileStore {
858 async fn upsert(&self, file: &TopologyFile) -> Result<(), InfraError> {
859 let mut files = self.files.lock().await;
860 if let Some(pos) = files.iter().position(|f| f.id() == file.id()) {
861 files[pos] = file.clone();
862 } else {
863 files.push(file.clone());
864 }
865 Ok(())
866 }
867
868 async fn get_by_id(&self, id: &str) -> Result<Option<TopologyFile>, InfraError> {
869 Ok(self
870 .files
871 .lock()
872 .await
873 .iter()
874 .find(|f| f.id() == id)
875 .cloned())
876 }
877
878 async fn get_by_path(&self, path: &str) -> Result<Option<TopologyFile>, InfraError> {
879 Ok(self
880 .files
881 .lock()
882 .await
883 .iter()
884 .find(|f| f.relative_path() == path && f.deleted_at().is_none())
885 .cloned())
886 }
887
888 async fn find_by_canonical_hash(
889 &self,
890 hash: &str,
891 ) -> Result<Option<TopologyFile>, InfraError> {
892 Ok(self
893 .files
894 .lock()
895 .await
896 .iter()
897 .find(|f| f.canonical_hash() == Some(hash) && f.deleted_at().is_none())
898 .cloned())
899 }
900
901 async fn list_active(
902 &self,
903 file_type: Option<FileType>,
904 limit: Option<usize>,
905 ) -> Result<Vec<TopologyFile>, InfraError> {
906 let files = self.files.lock().await;
907 let mut result: Vec<_> = files
908 .iter()
909 .filter(|f| f.deleted_at().is_none())
910 .filter(|f| file_type.map_or(true, |ft| f.file_type() == ft))
911 .cloned()
912 .collect();
913 if let Some(n) = limit {
914 result.truncate(n);
915 }
916 Ok(result)
917 }
918
919 async fn list_deleted(&self) -> Result<Vec<TopologyFile>, InfraError> {
920 Ok(self
921 .files
922 .lock()
923 .await
924 .iter()
925 .filter(|f| f.deleted_at().is_some())
926 .cloned()
927 .collect())
928 }
929
930 async fn count_active(&self) -> Result<usize, InfraError> {
931 Ok(self
932 .files
933 .lock()
934 .await
935 .iter()
936 .filter(|f| f.deleted_at().is_none())
937 .count())
938 }
939
940 async fn list_active_paths(&self) -> Result<Vec<String>, InfraError> {
941 Ok(self
942 .files
943 .lock()
944 .await
945 .iter()
946 .filter(|f| f.deleted_at().is_none())
947 .map(|f| f.relative_path().to_string())
948 .collect())
949 }
950 }
951
952 struct MockLocationFileStore {
953 files: Mutex<Vec<LocationFile>>,
954 }
955
956 impl MockLocationFileStore {
957 fn new() -> Self {
958 Self {
959 files: Mutex::new(Vec::new()),
960 }
961 }
962 }
963
964 #[async_trait]
965 impl LocationFileStore for MockLocationFileStore {
966 async fn upsert(&self, file: &LocationFile) -> Result<(), InfraError> {
967 let mut files = self.files.lock().await;
968 if let Some(pos) = files.iter().position(|f| {
969 f.file_id() == file.file_id() && f.location_id() == file.location_id()
970 }) {
971 files[pos] = file.clone();
972 } else {
973 files.push(file.clone());
974 }
975 Ok(())
976 }
977
978 async fn get(
979 &self,
980 file_id: &str,
981 location_id: &LocationId,
982 ) -> Result<Option<LocationFile>, InfraError> {
983 Ok(self
984 .files
985 .lock()
986 .await
987 .iter()
988 .find(|f| f.file_id() == file_id && f.location_id() == location_id)
989 .cloned())
990 }
991
992 async fn list_by_file(&self, file_id: &str) -> Result<Vec<LocationFile>, InfraError> {
993 Ok(self
994 .files
995 .lock()
996 .await
997 .iter()
998 .filter(|f| f.file_id() == file_id)
999 .cloned()
1000 .collect())
1001 }
1002
1003 async fn list_by_location(
1004 &self,
1005 location_id: &LocationId,
1006 ) -> Result<Vec<LocationFile>, InfraError> {
1007 Ok(self
1008 .files
1009 .lock()
1010 .await
1011 .iter()
1012 .filter(|f| f.location_id() == location_id)
1013 .cloned()
1014 .collect())
1015 }
1016
1017 async fn list_by_files(
1018 &self,
1019 file_ids: &[&str],
1020 ) -> Result<HashMap<String, Vec<LocationFile>>, InfraError> {
1021 let files = self.files.lock().await;
1022 let mut map: HashMap<String, Vec<LocationFile>> = HashMap::new();
1023 for f in files.iter() {
1024 if file_ids.contains(&f.file_id()) {
1025 map.entry(f.file_id().to_string())
1026 .or_default()
1027 .push(f.clone());
1028 }
1029 }
1030 Ok(map)
1031 }
1032
1033 async fn delete(
1034 &self,
1035 file_id: &str,
1036 location_id: &LocationId,
1037 ) -> Result<bool, InfraError> {
1038 let mut files = self.files.lock().await;
1039 let before = files.len();
1040 files.retain(|f| !(f.file_id() == file_id && f.location_id() == location_id));
1041 Ok(files.len() < before)
1042 }
1043
1044 async fn count_by_location(&self, location_id: &LocationId) -> Result<usize, InfraError> {
1045 Ok(self
1046 .files
1047 .lock()
1048 .await
1049 .iter()
1050 .filter(|f| f.location_id() == location_id)
1051 .count())
1052 }
1053 }
1054
1055 struct MockTransferStore {
1056 transfers: Mutex<Vec<Transfer>>,
1057 }
1058
1059 impl MockTransferStore {
1060 fn new() -> Self {
1061 Self {
1062 transfers: Mutex::new(Vec::new()),
1063 }
1064 }
1065 }
1066
1067 #[async_trait]
1068 impl TransferStore for MockTransferStore {
1069 async fn insert_transfer(&self, transfer: &Transfer) -> Result<(), InfraError> {
1070 self.transfers.lock().await.push(transfer.clone());
1071 Ok(())
1072 }
1073
1074 async fn update_transfer(&self, transfer: &Transfer) -> Result<(), InfraError> {
1075 let mut transfers = self.transfers.lock().await;
1076 if let Some(pos) = transfers.iter().position(|t| t.id() == transfer.id()) {
1077 transfers[pos] = transfer.clone();
1078 }
1079 Ok(())
1080 }
1081
1082 async fn queued_transfers(&self, dest: &LocationId) -> Result<Vec<Transfer>, InfraError> {
1083 Ok(self
1084 .transfers
1085 .lock()
1086 .await
1087 .iter()
1088 .filter(|t| t.dest() == dest && t.state() == TransferState::Queued)
1089 .cloned()
1090 .collect())
1091 }
1092
1093 async fn latest_transfers_by_file(
1094 &self,
1095 file_id: &str,
1096 ) -> Result<Vec<Transfer>, InfraError> {
1097 Ok(self
1098 .transfers
1099 .lock()
1100 .await
1101 .iter()
1102 .filter(|t| t.file_id() == file_id)
1103 .cloned()
1104 .collect())
1105 }
1106
1107 async fn failed_transfers(&self) -> Result<Vec<Transfer>, InfraError> {
1108 Ok(Vec::new())
1109 }
1110
1111 async fn prune_completed(&self, _before: DateTime<Utc>) -> Result<usize, InfraError> {
1112 Ok(0)
1113 }
1114
1115 async fn count_queued(&self) -> Result<usize, InfraError> {
1116 Ok(self
1117 .transfers
1118 .lock()
1119 .await
1120 .iter()
1121 .filter(|t| t.state() == TransferState::Queued)
1122 .count())
1123 }
1124
1125 async fn cancel_orphaned_inflight(&self) -> Result<usize, InfraError> {
1126 Ok(0)
1127 }
1128
1129 async fn unblock_dependents(
1130 &self,
1131 _completed_transfer_id: &str,
1132 ) -> Result<usize, InfraError> {
1133 Ok(0)
1134 }
1135
1136 async fn all_pending_transfers(&self) -> Result<Vec<Transfer>, InfraError> {
1137 Ok(self
1138 .transfers
1139 .lock()
1140 .await
1141 .iter()
1142 .filter(|t| {
1143 t.state() == TransferState::Queued || t.state() == TransferState::Blocked
1144 })
1145 .cloned()
1146 .collect())
1147 }
1148
1149 async fn transfer_stats(&self) -> Result<Vec<TransferStatRow>, InfraError> {
1150 Ok(Vec::new())
1151 }
1152
1153 async fn present_counts_by_location(
1154 &self,
1155 ) -> Result<HashMap<LocationId, usize>, InfraError> {
1156 Ok(HashMap::new())
1157 }
1158 }
1159
1160 fn loc(name: &str) -> LocationId {
1165 LocationId::new(name).expect("valid location name")
1166 }
1167
1168 fn fp(hash: &str, size: u64) -> FileFingerprint {
1169 use crate::domain::digest::{ByteDigest, ContentDigest};
1170 FileFingerprint {
1171 byte_digest: Some(ByteDigest::Djb2(hash.to_string())),
1172 content_digest: Some(ContentDigest(hash.to_string())),
1173 meta_digest: None,
1174 size,
1175 modified_at: None,
1176 }
1177 }
1178
1179 fn three_loc_setup() -> (RouteGraph, Vec<LocationId>) {
1181 let local = loc("local");
1182 let pod = loc("pod");
1183 let cloud = loc("cloud");
1184 let mut g = RouteGraph::new();
1185 g.add(local.clone(), pod.clone());
1186 g.add(pod.clone(), cloud.clone());
1187 g.add(pod.clone(), local.clone());
1188 g.add(cloud.clone(), pod.clone());
1189 (g, vec![local, pod, cloud])
1190 }
1191
1192 fn make_store(
1193 tf: Arc<MockTopologyFileStore>,
1194 lf: Arc<MockLocationFileStore>,
1195 tr: Arc<MockTransferStore>,
1196 ) -> TopologyStore {
1197 let (graph, locations) = three_loc_setup();
1198 TopologyStore::new(tf, lf, tr, graph, locations)
1199 }
1200
1201 fn discovered(path: &str, hash: &str, origin: &str) -> TopologyDelta {
1202 TopologyDelta::Discovered(DiscoveredFile {
1203 relative_path: path.to_string(),
1204 file_type: FileType::Image,
1205 fingerprint: fp(hash, 1024),
1206 origin: loc(origin),
1207 embedded_id: None,
1208 })
1209 }
1210
1211 #[tokio::test]
1219 async fn pipeline_discovered_creates_topology_and_routes_transfers() {
1220 let tf_s = Arc::new(MockTopologyFileStore::new());
1221 let lf_s = Arc::new(MockLocationFileStore::new());
1222 let tr_s = Arc::new(MockTransferStore::new());
1223 let store = make_store(tf_s.clone(), lf_s.clone(), tr_s.clone());
1224
1225 let result = store
1226 .sync(&[discovered("output/gen-001.png", "abc123", "local")])
1227 .await
1228 .unwrap();
1229
1230 let tfs = tf_s.files.lock().await;
1232 assert_eq!(tfs.len(), 1);
1233 assert_eq!(tfs[0].relative_path(), "output/gen-001.png");
1234 assert!(
1235 tfs[0].canonical_hash().is_some(),
1236 "canonical_hash should be promoted from fingerprint"
1237 );
1238
1239 let lfs = lf_s.files.lock().await;
1240 assert_eq!(lfs.len(), 1);
1241 assert_eq!(lfs[0].location_id(), &loc("local"));
1242 assert_eq!(lfs[0].state(), LocationFileState::Active);
1243
1244 assert_eq!(result.ingested, 1);
1246 assert!(result.distributed >= 2, "pod + cloud へのDistributeAction");
1247
1248 let transfers = tr_s.transfers.lock().await;
1250 assert!(
1251 !transfers.is_empty(),
1252 "Transfers should be created for reachable locations"
1253 );
1254 let tf_id = tfs[0].id();
1255 for t in transfers.iter() {
1256 assert_eq!(t.file_id(), tf_id, "All transfers for same file");
1257 assert_ne!(t.src(), t.dest(), "No self-transfer");
1258 }
1259 }
1260
1261 #[tokio::test]
1269 async fn pipeline_content_changed_stales_others_and_creates_transfers() {
1270 let tf_s = Arc::new(MockTopologyFileStore::new());
1271 let lf_s = Arc::new(MockLocationFileStore::new());
1272 let tr_s = Arc::new(MockTransferStore::new());
1273 let store = make_store(tf_s.clone(), lf_s.clone(), tr_s.clone());
1274
1275 store
1277 .sync(&[discovered("output/img.png", "v1_hash", "local")])
1278 .await
1279 .unwrap();
1280
1281 let tf_id = tf_s.files.lock().await[0].id().to_string();
1282
1283 let pod_lf = LocationFile::new(
1285 tf_id.clone(),
1286 loc("pod"),
1287 "output/img.png".to_string(),
1288 fp("v1_hash", 1024),
1289 None,
1290 )
1291 .unwrap();
1292 lf_s.upsert(&pod_lf).await.unwrap();
1293
1294 tr_s.transfers.lock().await.clear();
1296
1297 let delta = TopologyDelta::ContentChanged(ContentChangedFile {
1299 topology_file_id: tf_id.clone(),
1300 relative_path: "output/img.png".to_string(),
1301 file_type: FileType::Image,
1302 old_fingerprint: fp("v1_hash", 1024),
1303 new_fingerprint: fp("v2_hash", 2048),
1304 origin: loc("local"),
1305 embedded_id: None,
1306 });
1307
1308 let result = store.sync(&[delta]).await.unwrap();
1309
1310 let pod_lf = lf_s.get(&tf_id, &loc("pod")).await.unwrap().unwrap();
1312 assert_eq!(pod_lf.state(), LocationFileState::Stale);
1313
1314 let local_lf = lf_s.get(&tf_id, &loc("local")).await.unwrap().unwrap();
1316 assert_eq!(local_lf.state(), LocationFileState::Active);
1317
1318 assert!(result.distributed > 0);
1320 assert!(result.transfers_created > 0);
1321 }
1322
1323 #[tokio::test]
1331 async fn pipeline_vanished_marks_location_file_missing() {
1332 let tf_s = Arc::new(MockTopologyFileStore::new());
1333 let lf_s = Arc::new(MockLocationFileStore::new());
1334 let tr_s = Arc::new(MockTransferStore::new());
1335 let store = make_store(tf_s.clone(), lf_s.clone(), tr_s.clone());
1336
1337 store
1338 .sync(&[discovered("output/gone.png", "gone_hash", "local")])
1339 .await
1340 .unwrap();
1341
1342 let tf_id = tf_s.files.lock().await[0].id().to_string();
1343
1344 let delta = TopologyDelta::Vanished(VanishedFile {
1345 topology_file_id: tf_id.clone(),
1346 relative_path: "output/gone.png".to_string(),
1347 origin: loc("local"),
1348 });
1349
1350 store.sync(&[delta]).await.unwrap();
1351
1352 let lf = lf_s.get(&tf_id, &loc("local")).await.unwrap().unwrap();
1354 assert_eq!(lf.state(), LocationFileState::Missing);
1355
1356 let tf = tf_s.files.lock().await;
1358 assert!(tf[0].deleted_at().is_none());
1359 }
1360
1361 #[tokio::test]
1368 async fn pipeline_batch_discovered_multi_origin() {
1369 let tf_s = Arc::new(MockTopologyFileStore::new());
1370 let lf_s = Arc::new(MockLocationFileStore::new());
1371 let tr_s = Arc::new(MockTransferStore::new());
1372 let store = make_store(tf_s.clone(), lf_s.clone(), tr_s.clone());
1373
1374 let deltas = vec![
1375 discovered("a.png", "ha", "local"),
1376 discovered("b.png", "hb", "local"),
1377 discovered("c.png", "hc", "pod"), ];
1379
1380 let result = store.sync(&deltas).await.unwrap();
1381
1382 assert_eq!(result.ingested, 3);
1383 assert_eq!(tf_s.files.lock().await.len(), 3);
1384 assert_eq!(lf_s.files.lock().await.len(), 3);
1385
1386 assert!(result.distributed >= 3);
1388 assert!(result.transfers_created >= 3);
1389 }
1390
1391 #[tokio::test]
1398 async fn pipeline_put_then_empty_sync_is_consistent() {
1399 let tf_s = Arc::new(MockTopologyFileStore::new());
1400 let lf_s = Arc::new(MockLocationFileStore::new());
1401 let tr_s = Arc::new(MockTransferStore::new());
1402 let store = make_store(tf_s.clone(), lf_s.clone(), tr_s.clone());
1403
1404 let put_result = store
1406 .put("x.png", FileType::Image, fp("xh", 100), &loc("local"), None)
1407 .await
1408 .unwrap();
1409
1410 assert!(put_result.is_new);
1411 let initial_transfers = tr_s.transfers.lock().await.len();
1412 assert!(initial_transfers > 0);
1413
1414 let sync_result = store.sync(&[]).await.unwrap();
1416
1417 assert_eq!(sync_result.ingested, 0);
1418 }
1421
1422 #[tokio::test]
1429 async fn pipeline_delete_creates_delete_transfers() {
1430 let tf_s = Arc::new(MockTopologyFileStore::new());
1431 let lf_s = Arc::new(MockLocationFileStore::new());
1432 let tr_s = Arc::new(MockTransferStore::new());
1433 let store = make_store(tf_s.clone(), lf_s.clone(), tr_s.clone());
1434
1435 store
1437 .put(
1438 "del.png",
1439 FileType::Image,
1440 fp("dh", 100),
1441 &loc("local"),
1442 None,
1443 )
1444 .await
1445 .unwrap();
1446
1447 let tf_id = tf_s.files.lock().await[0].id().to_string();
1449 let pod_lf = LocationFile::new(
1450 tf_id.clone(),
1451 loc("pod"),
1452 "del.png".to_string(),
1453 fp("dh", 100),
1454 None,
1455 )
1456 .unwrap();
1457 lf_s.upsert(&pod_lf).await.unwrap();
1458
1459 tr_s.transfers.lock().await.clear();
1461
1462 let delete_count = store.delete("del.png").await.unwrap();
1464
1465 let tf = tf_s.files.lock().await;
1467 assert!(tf[0].deleted_at().is_some());
1468
1469 assert_eq!(delete_count, 2, "Delete Transfer for local + pod");
1472 let transfers = tr_s.transfers.lock().await;
1473 assert_eq!(transfers.len(), 2);
1474 for t in transfers.iter() {
1475 assert!(t.is_delete(), "All should be Delete kind");
1476 assert_ne!(t.src(), t.dest(), "No self-transfer");
1477 }
1478 }
1479}