vdsl_sync/application/sdk_impl/
builder.rs1use std::collections::HashMap;
8use std::sync::{Arc, Mutex as StdMutex};
9
10use super::SdkImpl;
11use crate::application::error::SyncError;
12use crate::application::route::{TransferDirection, TransferRoute};
13use crate::application::topology_scanner::TopologyScanner;
14use crate::application::topology_store::TopologyStore;
15use crate::application::transfer_engine::TransferEngine;
16use crate::domain::config::SyncConfig;
17use crate::domain::graph::{EdgeCost, RouteGraph};
18use crate::domain::location::LocationId;
19use crate::infra::backend::StorageBackend;
20use crate::infra::location::{Location, LocationKind};
21use crate::infra::location_file_store::LocationFileStore;
22use crate::infra::location_scanner::LocationScanner;
23use crate::infra::shell::RemoteShell;
24use crate::infra::topology_file_store::TopologyFileStore;
25use crate::infra::transfer_store::TransferStore;
26
27struct PendingRoute {
32 src: LocationId,
33 dest: LocationId,
34 backend: Box<dyn StorageBackend>,
35 src_shell: Option<Box<dyn RemoteShell>>,
36 direction: TransferDirection,
37}
38
39pub struct SdkImplBuilder {
59 topology_files: Arc<dyn TopologyFileStore>,
60 location_files: Arc<dyn LocationFileStore>,
61 transfer_store: Arc<dyn TransferStore>,
62 locations: Vec<Arc<dyn Location>>,
63 pending_routes: Vec<PendingRoute>,
64 config: Option<SyncConfig>,
65 scan_excludes: Vec<glob::Pattern>,
66 archive_roots: HashMap<LocationId, std::path::PathBuf>,
69}
70
71impl SdkImplBuilder {
72 pub fn new(
74 topology_files: Arc<dyn TopologyFileStore>,
75 location_files: Arc<dyn LocationFileStore>,
76 transfer_store: Arc<dyn TransferStore>,
77 ) -> Self {
78 Self {
79 topology_files,
80 location_files,
81 transfer_store,
82 locations: Vec::new(),
83 pending_routes: Vec::new(),
84 config: None,
85 scan_excludes: Vec::new(),
86 archive_roots: HashMap::new(),
87 }
88 }
89
90 pub fn archive_route_to(mut self, dest: &LocationId, archive_root: std::path::PathBuf) -> Self {
97 self.archive_roots.insert(dest.clone(), archive_root);
98 self
99 }
100
101 pub fn location(mut self, loc: Arc<dyn Location>) -> Self {
106 if !self.locations.iter().any(|l| l.id() == loc.id()) {
107 self.locations.push(loc);
108 }
109 self
110 }
111
112 pub fn connect(
118 mut self,
119 src: &LocationId,
120 dest: &LocationId,
121 backend: Box<dyn StorageBackend>,
122 ) -> Self {
123 self.pending_routes.push(PendingRoute {
124 src: src.clone(),
125 dest: dest.clone(),
126 backend,
127 src_shell: None,
128 direction: TransferDirection::Push,
129 });
130 self
131 }
132
133 pub fn connect_with_shell(
138 mut self,
139 src: &LocationId,
140 dest: &LocationId,
141 backend: Box<dyn StorageBackend>,
142 src_shell: Box<dyn RemoteShell>,
143 ) -> Self {
144 self.pending_routes.push(PendingRoute {
145 src: src.clone(),
146 dest: dest.clone(),
147 backend,
148 src_shell: Some(src_shell),
149 direction: TransferDirection::Push,
150 });
151 self
152 }
153
154 pub fn connect_pull(
159 mut self,
160 src: &LocationId,
161 dest: &LocationId,
162 backend: Box<dyn StorageBackend>,
163 ) -> Self {
164 self.pending_routes.push(PendingRoute {
165 src: src.clone(),
166 dest: dest.clone(),
167 backend,
168 src_shell: None,
169 direction: TransferDirection::Pull,
170 });
171 self
172 }
173
174 pub fn config(mut self, config: SyncConfig) -> Self {
176 self.config = Some(config);
177 self
178 }
179
180 pub fn exclude(mut self, pattern: &str) -> Self {
182 match glob::Pattern::new(pattern) {
183 Ok(p) => self.scan_excludes.push(p),
184 Err(e) => {
185 tracing::warn!(pattern = pattern, error = %e, "invalid exclude glob pattern, skipped");
186 }
187 }
188 self
189 }
190
191 pub fn build(self) -> Result<SdkImpl, SyncError> {
198 let config = self.config.unwrap_or_default();
199
200 let loc_map: HashMap<LocationId, &Arc<dyn Location>> =
202 self.locations.iter().map(|l| (l.id().clone(), l)).collect();
203
204 let scanners: Vec<Arc<dyn LocationScanner>> =
206 self.locations.iter().map(|loc| loc.scanner()).collect();
207
208 let archive_roots = self.archive_roots;
209
210 let routes: Vec<TransferRoute> = self
212 .pending_routes
213 .into_iter()
214 .filter_map(|pr| {
215 let src_loc = loc_map.get(&pr.src)?;
216 let dest_loc = loc_map.get(&pr.dest)?;
217
218 let cost = match estimate_route_cost(src_loc.kind(), dest_loc.kind()) {
219 Ok(c) => c,
220 Err(e) => {
221 tracing::warn!(src = ?src_loc.kind(), dest = ?dest_loc.kind(), error = %e, "skipping route: invalid cost");
222 return None;
223 }
224 };
225
226 let archive_root_for_dest = archive_roots.get(&pr.dest).cloned();
227
228 let mut route = TransferRoute::new(
229 pr.src,
230 pr.dest.clone(),
231 src_loc.file_root().to_path_buf(),
232 dest_loc.file_root().to_path_buf(),
233 pr.backend,
234 )
235 .with_cost(cost.time_per_gb, cost.priority);
236
237 if let Some(archive_root) = archive_root_for_dest {
238 if pr.direction == TransferDirection::Push {
242 route = route.with_archive_root(archive_root);
243 }
244 }
245
246 if pr.direction == TransferDirection::Pull {
247 route = route.direction(TransferDirection::Pull);
248 }
249 if let Some(shell) = pr.src_shell {
250 route = route.with_src_shell(shell);
251 }
252
253 Some(route)
254 })
255 .collect();
256
257 let location_ids: Vec<LocationId> =
259 self.locations.iter().map(|loc| loc.id().clone()).collect();
260
261 let mut graph = RouteGraph::new();
263 for r in &routes {
264 graph.add_with_cost(
265 r.src().clone(),
266 r.dest().clone(),
267 EdgeCost::new(r.time_per_gb(), r.priority())?,
268 );
269 }
270
271 let topology = TopologyStore::new(
272 self.topology_files.clone(),
273 self.location_files.clone(),
274 self.transfer_store.clone(),
275 graph.clone(),
276 location_ids,
277 );
278
279 let engine = TransferEngine::new(graph, routes, config.concurrency);
280
281 let scanner = TopologyScanner::new(
282 self.topology_files.clone(),
283 self.location_files.clone(),
284 scanners,
285 );
286
287 Ok(SdkImpl {
288 scanner,
289 topology,
290 engine,
291 topology_files: self.topology_files,
292 location_files: self.location_files,
293 transfer_store: self.transfer_store,
294 locations: self.locations,
295 config,
296 scan_excludes: self.scan_excludes,
297 progress: StdMutex::new(None),
298 })
299 }
300}
301
302fn estimate_route_cost(
308 src: LocationKind,
309 dest: LocationKind,
310) -> Result<EdgeCost, crate::domain::error::DomainError> {
311 let (time_per_gb, priority) = match (src, dest) {
312 (LocationKind::Local, LocationKind::Remote) => (1.0, 10),
314 (LocationKind::Remote, LocationKind::Local) => (1.0, 10),
315
316 (LocationKind::Remote, LocationKind::Cloud) => (2.0, 50),
318 (LocationKind::Cloud, LocationKind::Remote) => (2.0, 50),
319
320 (LocationKind::Local, LocationKind::Cloud) => (5.0, 100),
322 (LocationKind::Cloud, LocationKind::Local) => (5.0, 100),
323
324 _ => (1.0, 100),
326 };
327 EdgeCost::new(time_per_gb, priority)
328}