1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use crate::config::{RootConfig, StorageDefinition};
5use crate::{ConfigError, FloeResult};
6
7#[derive(Debug, Clone)]
8pub struct ConfigBase {
9 local_dir: PathBuf,
10 remote_base: Option<RemoteConfigBase>,
11}
12
13impl ConfigBase {
14 pub fn local_from_path(path: &Path) -> Self {
15 let local_dir = path
16 .parent()
17 .unwrap_or_else(|| Path::new("."))
18 .to_path_buf();
19 Self {
20 local_dir,
21 remote_base: None,
22 }
23 }
24
25 pub fn remote_from_uri(local_dir: PathBuf, uri: &str) -> FloeResult<Self> {
26 Ok(Self {
27 local_dir,
28 remote_base: Some(RemoteConfigBase::from_uri(uri)?),
29 })
30 }
31
32 pub fn local_dir(&self) -> &Path {
33 &self.local_dir
34 }
35
36 pub fn remote_base(&self) -> Option<&RemoteConfigBase> {
37 self.remote_base.as_ref()
38 }
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42enum RemoteScheme {
43 S3,
44 Gcs,
45 Adls,
46}
47
48#[derive(Debug, Clone)]
49pub struct RemoteConfigBase {
50 scheme: RemoteScheme,
51 bucket: String,
52 account: Option<String>,
53 container: Option<String>,
54 prefix: String,
55}
56
57impl RemoteConfigBase {
58 fn from_uri(uri: &str) -> FloeResult<Self> {
59 if let Some((bucket, key)) = parse_s3_uri(uri) {
60 let prefix = parent_prefix(&key);
61 return Ok(Self {
62 scheme: RemoteScheme::S3,
63 bucket,
64 account: None,
65 container: None,
66 prefix,
67 });
68 }
69 if let Some((bucket, key)) = parse_gcs_uri(uri) {
70 let prefix = parent_prefix(&key);
71 return Ok(Self {
72 scheme: RemoteScheme::Gcs,
73 bucket,
74 account: None,
75 container: None,
76 prefix,
77 });
78 }
79 if let Some((container, account, path)) = parse_adls_uri(uri) {
80 let prefix = parent_prefix(&path);
81 return Ok(Self {
82 scheme: RemoteScheme::Adls,
83 bucket: String::new(),
84 account: Some(account),
85 container: Some(container),
86 prefix,
87 });
88 }
89 Err(Box::new(ConfigError(format!(
90 "unsupported config uri: {}",
91 uri
92 ))))
93 }
94
95 fn matches_storage(&self, storage_type: &str) -> bool {
96 matches!(
97 (self.scheme, storage_type),
98 (RemoteScheme::S3, "s3") | (RemoteScheme::Gcs, "gcs") | (RemoteScheme::Adls, "adls")
99 )
100 }
101
102 fn join(&self, relative: &str) -> String {
103 match self.scheme {
104 RemoteScheme::S3 => {
105 let key = join_s3_key(&self.prefix, relative);
106 format_s3_uri(&self.bucket, &key)
107 }
108 RemoteScheme::Gcs => {
109 let key = join_s3_key(&self.prefix, relative);
110 format_gcs_uri(&self.bucket, &key)
111 }
112 RemoteScheme::Adls => {
113 let combined = join_adls_path(&self.prefix, relative);
114 let container = self.container.as_ref().expect("container");
115 let account = self.account.as_ref().expect("account");
116 format_adls_uri(container, account, &combined)
117 }
118 }
119 }
120}
121
122#[derive(Debug, Clone)]
123pub struct ResolvedPath {
124 pub storage: String,
125 pub uri: String,
126 pub local_path: Option<PathBuf>,
127}
128
129pub struct StorageResolver {
130 config_base: ConfigBase,
131 default_name: String,
132 definitions: HashMap<String, StorageDefinition>,
133 has_config: bool,
134}
135
136impl StorageResolver {
137 pub fn config_local_dir(&self) -> &Path {
138 self.config_base.local_dir()
139 }
140
141 pub fn config_is_remote(&self) -> bool {
142 self.config_base.remote_base().is_some()
143 }
144
145 pub fn resolve_local_path(&self, raw_path: &str) -> FloeResult<ResolvedPath> {
146 if is_remote_uri(raw_path) {
147 return Err(Box::new(ConfigError(format!(
148 "entity.state.path must be a local path (got {})",
149 raw_path
150 ))));
151 }
152 if self.config_base.remote_base().is_some() && Path::new(raw_path).is_relative() {
153 return Err(Box::new(ConfigError(
154 "entity.state.path must be absolute when config is remote".to_string(),
155 )));
156 }
157 let resolved = resolve_local_path(self.config_base.local_dir(), raw_path);
158 Ok(ResolvedPath {
159 storage: "local".to_string(),
160 uri: local_uri(&resolved),
161 local_path: Some(resolved),
162 })
163 }
164
165 pub fn new(config: &RootConfig, config_base: ConfigBase) -> FloeResult<Self> {
166 if let Some(storages) = &config.storages {
167 let mut definitions = HashMap::new();
168 for definition in &storages.definitions {
169 if definitions
170 .insert(definition.name.clone(), definition.clone())
171 .is_some()
172 {
173 return Err(Box::new(ConfigError(format!(
174 "storages.definitions name={} is duplicated",
175 definition.name
176 ))));
177 }
178 }
179 let default_name = storages
180 .default
181 .clone()
182 .ok_or_else(|| Box::new(ConfigError("storages.default is required".to_string())))?;
183 if !definitions.contains_key(&default_name) {
184 return Err(Box::new(ConfigError(format!(
185 "storages.default={} does not match any definition",
186 default_name
187 ))));
188 }
189 Ok(Self {
190 config_base,
191 default_name,
192 definitions,
193 has_config: true,
194 })
195 } else {
196 Ok(Self {
197 config_base,
198 default_name: "local".to_string(),
199 definitions: HashMap::new(),
200 has_config: false,
201 })
202 }
203 }
204
205 pub fn from_path(config: &RootConfig, config_path: &Path) -> FloeResult<Self> {
206 Self::new(config, ConfigBase::local_from_path(config_path))
207 }
208
209 pub fn resolve_path(
210 &self,
211 entity_name: &str,
212 field: &str,
213 storage_name: Option<&str>,
214 raw_path: &str,
215 ) -> FloeResult<ResolvedPath> {
216 let name = storage_name.unwrap_or(self.default_name.as_str());
217 if !self.has_config && name != "local" {
218 return Err(Box::new(ConfigError(format!(
219 "entity.name={} {field} references unknown storage {} (no storages block)",
220 entity_name, name
221 ))));
222 }
223
224 let definition = if self.has_config {
225 self.definitions.get(name).cloned().ok_or_else(|| {
226 Box::new(ConfigError(format!(
227 "entity.name={} {field} references unknown storage {}",
228 entity_name, name
229 )))
230 })?
231 } else {
232 StorageDefinition {
233 name: "local".to_string(),
234 fs_type: "local".to_string(),
235 bucket: None,
236 region: None,
237 account: None,
238 container: None,
239 prefix: None,
240 }
241 };
242
243 let resolved_remote = self
244 .resolve_remote_relative(&definition, raw_path)
245 .unwrap_or_else(|| raw_path.to_string());
246 let raw_path = resolved_remote.as_str();
247
248 match definition.fs_type.as_str() {
249 "local" => {
250 if is_remote_uri(raw_path) {
251 return Err(Box::new(ConfigError(format!(
252 "entity.name={} {field} must be a local path (got {})",
253 entity_name, raw_path
254 ))));
255 }
256 if self.config_base.remote_base().is_some() && Path::new(raw_path).is_relative() {
257 return Err(Box::new(ConfigError(format!(
258 "entity.name={} {field} must be absolute when config is remote",
259 entity_name
260 ))));
261 }
262 let resolved = resolve_local_path(self.config_base.local_dir(), raw_path);
263 Ok(ResolvedPath {
264 storage: name.to_string(),
265 uri: local_uri(&resolved),
266 local_path: Some(resolved),
267 })
268 }
269 "s3" => {
270 let uri = resolve_s3_uri(&definition, raw_path)?;
271 Ok(ResolvedPath {
272 storage: name.to_string(),
273 uri,
274 local_path: None,
275 })
276 }
277 "adls" => {
278 let uri = resolve_adls_uri(&definition, raw_path)?;
279 Ok(ResolvedPath {
280 storage: name.to_string(),
281 uri,
282 local_path: None,
283 })
284 }
285 "gcs" => {
286 let uri = resolve_gcs_uri(&definition, raw_path)?;
287 Ok(ResolvedPath {
288 storage: name.to_string(),
289 uri,
290 local_path: None,
291 })
292 }
293 _ => Err(Box::new(ConfigError(format!(
294 "storage type {} is unsupported",
295 definition.fs_type
296 )))),
297 }
298 }
299
300 pub fn resolve_report_path(
301 &self,
302 storage_name: Option<&str>,
303 raw_path: &str,
304 ) -> FloeResult<ResolvedPath> {
305 let name = storage_name.unwrap_or(self.default_name.as_str());
306 if !self.has_config && name != "local" {
307 return Err(Box::new(ConfigError(format!(
308 "report.storage references unknown storage {} (no storages block)",
309 name
310 ))));
311 }
312
313 let definition = if self.has_config {
314 self.definitions.get(name).cloned().ok_or_else(|| {
315 Box::new(ConfigError(format!(
316 "report.storage references unknown storage {}",
317 name
318 )))
319 })?
320 } else {
321 StorageDefinition {
322 name: "local".to_string(),
323 fs_type: "local".to_string(),
324 bucket: None,
325 region: None,
326 account: None,
327 container: None,
328 prefix: None,
329 }
330 };
331
332 let resolved_remote = self
333 .resolve_remote_relative(&definition, raw_path)
334 .unwrap_or_else(|| raw_path.to_string());
335 let raw_path = resolved_remote.as_str();
336
337 match definition.fs_type.as_str() {
338 "local" => {
339 if is_remote_uri(raw_path) {
340 return Err(Box::new(ConfigError(format!(
341 "report.path must be a local path (got {})",
342 raw_path
343 ))));
344 }
345 if self.config_base.remote_base().is_some() && Path::new(raw_path).is_relative() {
346 return Err(Box::new(ConfigError(
347 "report.path must be absolute when config is remote".to_string(),
348 )));
349 }
350 let resolved = resolve_local_path(self.config_base.local_dir(), raw_path);
351 Ok(ResolvedPath {
352 storage: name.to_string(),
353 uri: local_uri(&resolved),
354 local_path: Some(resolved),
355 })
356 }
357 "s3" => {
358 let uri = resolve_s3_uri(&definition, raw_path)?;
359 Ok(ResolvedPath {
360 storage: name.to_string(),
361 uri,
362 local_path: None,
363 })
364 }
365 "adls" => {
366 let uri = resolve_adls_uri(&definition, raw_path)?;
367 Ok(ResolvedPath {
368 storage: name.to_string(),
369 uri,
370 local_path: None,
371 })
372 }
373 "gcs" => {
374 let uri = resolve_gcs_uri(&definition, raw_path)?;
375 Ok(ResolvedPath {
376 storage: name.to_string(),
377 uri,
378 local_path: None,
379 })
380 }
381 _ => Err(Box::new(ConfigError(format!(
382 "storage type {} is unsupported",
383 definition.fs_type
384 )))),
385 }
386 }
387
388 pub fn definition(&self, name: &str) -> Option<StorageDefinition> {
389 if self.has_config {
390 self.definitions.get(name).cloned()
391 } else if name == "local" {
392 Some(StorageDefinition {
393 name: "local".to_string(),
394 fs_type: "local".to_string(),
395 bucket: None,
396 region: None,
397 account: None,
398 container: None,
399 prefix: None,
400 })
401 } else {
402 None
403 }
404 }
405
406 pub fn default_storage_name(&self) -> &str {
407 self.default_name.as_str()
408 }
409
410 pub fn config_dir(&self) -> &Path {
411 self.config_base.local_dir()
412 }
413
414 fn resolve_remote_relative(
415 &self,
416 definition: &StorageDefinition,
417 raw_path: &str,
418 ) -> Option<String> {
419 if !is_relative_path(raw_path) {
420 return None;
421 }
422 if definition.prefix.is_some() {
423 return None;
424 }
425 let remote = self.config_base.remote_base()?;
426 if !remote.matches_storage(definition.fs_type.as_str()) {
427 return None;
428 }
429 Some(remote.join(raw_path))
430 }
431}
432
433pub fn resolve_local_path(config_dir: &Path, raw_path: &str) -> PathBuf {
434 let path = Path::new(raw_path);
435 let resolved = if path.is_absolute() {
436 path.to_path_buf()
437 } else {
438 config_dir.join(path)
439 };
440 crate::io::storage::paths::normalize_local_path(&resolved)
441}
442
443fn local_uri(path: &Path) -> String {
444 let normalized = crate::io::storage::paths::normalize_local_path(path);
445 format!("local://{}", normalized.display())
446}
447
448fn resolve_s3_uri(definition: &StorageDefinition, raw_path: &str) -> FloeResult<String> {
449 let bucket = definition.bucket.as_ref().ok_or_else(|| {
450 Box::new(ConfigError(format!(
451 "storage {} requires bucket for type s3",
452 definition.name
453 )))
454 })?;
455 if let Some((bucket_in_path, key)) = parse_s3_uri(raw_path) {
456 if bucket_in_path != *bucket {
457 return Err(Box::new(ConfigError(format!(
458 "storage {} bucket mismatch: {}",
459 definition.name, bucket_in_path
460 ))));
461 }
462 return Ok(format_s3_uri(bucket, &key));
463 }
464
465 let key = join_s3_key(definition.prefix.as_deref().unwrap_or(""), raw_path);
466 Ok(format_s3_uri(bucket, &key))
467}
468
469fn resolve_adls_uri(definition: &StorageDefinition, raw_path: &str) -> FloeResult<String> {
470 let account = definition.account.as_ref().ok_or_else(|| {
471 Box::new(ConfigError(format!(
472 "storage {} requires account for type adls",
473 definition.name
474 )))
475 })?;
476 let container = definition.container.as_ref().ok_or_else(|| {
477 Box::new(ConfigError(format!(
478 "storage {} requires container for type adls",
479 definition.name
480 )))
481 })?;
482 if let Some((container_in_path, account_in_path, path)) = parse_adls_uri(raw_path) {
483 if container_in_path != *container || account_in_path != *account {
484 return Err(Box::new(ConfigError(format!(
485 "storage {} adls account/container mismatch",
486 definition.name
487 ))));
488 }
489 return Ok(format_adls_uri(container, account, &path));
490 }
491 let prefix = definition.prefix.as_deref().unwrap_or("");
492 let combined = join_adls_path(prefix, raw_path);
493 Ok(format_adls_uri(container, account, &combined))
494}
495
496fn join_adls_path(prefix: &str, raw_path: &str) -> String {
497 let prefix = prefix.trim_matches('/');
498 let trimmed = raw_path.trim_start_matches('/');
499 match (prefix.is_empty(), trimmed.is_empty()) {
500 (true, true) => String::new(),
501 (true, false) => trimmed.to_string(),
502 (false, true) => prefix.to_string(),
503 (false, false) => format!("{}/{}", prefix, trimmed),
504 }
505}
506
507fn format_adls_uri(container: &str, account: &str, path: &str) -> String {
508 if path.is_empty() {
509 format!("abfs://{}@{}.dfs.core.windows.net", container, account)
510 } else {
511 format!(
512 "abfs://{}@{}.dfs.core.windows.net/{}",
513 container, account, path
514 )
515 }
516}
517
518fn parse_s3_uri(value: &str) -> Option<(String, String)> {
519 let stripped = value.strip_prefix("s3://")?;
520 let mut parts = stripped.splitn(2, '/');
521 let bucket = parts.next()?.to_string();
522 if bucket.is_empty() {
523 return None;
524 }
525 let key = parts.next().unwrap_or("").to_string();
526 Some((bucket, key))
527}
528
529fn join_s3_key(prefix: &str, raw_path: &str) -> String {
530 let prefix = prefix.trim_matches('/');
531 let trimmed = raw_path.trim_start_matches('/');
532 match (prefix.is_empty(), trimmed.is_empty()) {
533 (true, true) => String::new(),
534 (true, false) => trimmed.to_string(),
535 (false, true) => prefix.to_string(),
536 (false, false) => format!("{}/{}", prefix, trimmed),
537 }
538}
539
540fn format_s3_uri(bucket: &str, key: &str) -> String {
541 if key.is_empty() {
542 format!("s3://{}", bucket)
543 } else {
544 format!("s3://{}/{}", bucket, key)
545 }
546}
547
548fn resolve_gcs_uri(definition: &StorageDefinition, raw_path: &str) -> FloeResult<String> {
549 let bucket = definition.bucket.as_ref().ok_or_else(|| {
550 Box::new(ConfigError(format!(
551 "storage {} requires bucket for type gcs",
552 definition.name
553 )))
554 })?;
555 if let Some((bucket_in_path, key)) = parse_gcs_uri(raw_path) {
556 if bucket_in_path != *bucket {
557 return Err(Box::new(ConfigError(format!(
558 "storage {} bucket mismatch: {}",
559 definition.name, bucket_in_path
560 ))));
561 }
562 return Ok(format_gcs_uri(bucket, &key));
563 }
564
565 let key = join_gcs_key(definition.prefix.as_deref().unwrap_or(""), raw_path);
566 Ok(format_gcs_uri(bucket, &key))
567}
568
569fn parse_gcs_uri(value: &str) -> Option<(String, String)> {
570 let stripped = value.strip_prefix("gs://")?;
571 let mut parts = stripped.splitn(2, '/');
572 let bucket = parts.next()?.to_string();
573 if bucket.is_empty() {
574 return None;
575 }
576 let key = parts.next().unwrap_or("").to_string();
577 Some((bucket, key))
578}
579
580fn join_gcs_key(prefix: &str, raw_path: &str) -> String {
581 let prefix = prefix.trim_matches('/');
582 let trimmed = raw_path.trim_start_matches('/');
583 match (prefix.is_empty(), trimmed.is_empty()) {
584 (true, true) => String::new(),
585 (true, false) => trimmed.to_string(),
586 (false, true) => prefix.to_string(),
587 (false, false) => format!("{}/{}", prefix, trimmed),
588 }
589}
590
591fn format_gcs_uri(bucket: &str, key: &str) -> String {
592 if key.is_empty() {
593 format!("gs://{}", bucket)
594 } else {
595 format!("gs://{}/{}", bucket, key)
596 }
597}
598
599fn parse_adls_uri(value: &str) -> Option<(String, String, String)> {
600 let stripped = value.strip_prefix("abfs://")?;
601 let (container, rest) = stripped.split_once('@')?;
602 let (account, path) = rest.split_once(".dfs.core.windows.net")?;
603 let path = path.trim_start_matches('/').to_string();
604 if container.is_empty() || account.is_empty() {
605 return None;
606 }
607 Some((container.to_string(), account.to_string(), path))
608}
609
610fn parent_prefix(key: &str) -> String {
611 match key.rsplit_once('/') {
612 Some((parent, _)) => parent.to_string(),
613 None => String::new(),
614 }
615}
616
617fn is_remote_uri(value: &str) -> bool {
618 value.starts_with("s3://") || value.starts_with("gs://") || value.starts_with("abfs://")
619}
620
621fn is_relative_path(value: &str) -> bool {
622 !is_remote_uri(value) && Path::new(value).is_relative()
623}