1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use crate::config::{RootConfig, StorageDefinition};
5use crate::{ConfigError, FloeResult};
6
7#[derive(Debug, Clone)]
8pub struct ConfigBase {
9 local_dir: PathBuf,
10 remote_base: Option<RemoteConfigBase>,
11}
12
13impl ConfigBase {
14 pub fn local_from_path(path: &Path) -> Self {
15 let local_dir = path
16 .parent()
17 .unwrap_or_else(|| Path::new("."))
18 .to_path_buf();
19 Self {
20 local_dir,
21 remote_base: None,
22 }
23 }
24
25 pub fn remote_from_uri(local_dir: PathBuf, uri: &str) -> FloeResult<Self> {
26 Ok(Self {
27 local_dir,
28 remote_base: Some(RemoteConfigBase::from_uri(uri)?),
29 })
30 }
31
32 pub fn local_dir(&self) -> &Path {
33 &self.local_dir
34 }
35
36 pub fn remote_base(&self) -> Option<&RemoteConfigBase> {
37 self.remote_base.as_ref()
38 }
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42enum RemoteScheme {
43 S3,
44 Gcs,
45 Adls,
46}
47
48#[derive(Debug, Clone)]
49pub struct RemoteConfigBase {
50 scheme: RemoteScheme,
51 bucket: String,
52 account: Option<String>,
53 container: Option<String>,
54 prefix: String,
55}
56
57impl RemoteConfigBase {
58 fn from_uri(uri: &str) -> FloeResult<Self> {
59 if let Some((bucket, key)) = parse_s3_uri(uri) {
60 let prefix = parent_prefix(&key);
61 return Ok(Self {
62 scheme: RemoteScheme::S3,
63 bucket,
64 account: None,
65 container: None,
66 prefix,
67 });
68 }
69 if let Some((bucket, key)) = parse_gcs_uri(uri) {
70 let prefix = parent_prefix(&key);
71 return Ok(Self {
72 scheme: RemoteScheme::Gcs,
73 bucket,
74 account: None,
75 container: None,
76 prefix,
77 });
78 }
79 if let Some((container, account, path)) = parse_adls_uri(uri) {
80 let prefix = parent_prefix(&path);
81 return Ok(Self {
82 scheme: RemoteScheme::Adls,
83 bucket: String::new(),
84 account: Some(account),
85 container: Some(container),
86 prefix,
87 });
88 }
89 Err(Box::new(ConfigError(format!(
90 "unsupported config uri: {}",
91 uri
92 ))))
93 }
94
95 fn matches_storage(&self, storage_type: &str) -> bool {
96 matches!(
97 (self.scheme, storage_type),
98 (RemoteScheme::S3, "s3") | (RemoteScheme::Gcs, "gcs") | (RemoteScheme::Adls, "adls")
99 )
100 }
101
102 fn join(&self, relative: &str) -> String {
103 match self.scheme {
104 RemoteScheme::S3 => {
105 let key = join_s3_key(&self.prefix, relative);
106 format_s3_uri(&self.bucket, &key)
107 }
108 RemoteScheme::Gcs => {
109 let key = join_s3_key(&self.prefix, relative);
110 format_gcs_uri(&self.bucket, &key)
111 }
112 RemoteScheme::Adls => {
113 let combined = join_adls_path(&self.prefix, relative);
114 let container = self.container.as_ref().expect("container");
115 let account = self.account.as_ref().expect("account");
116 format_adls_uri(container, account, &combined)
117 }
118 }
119 }
120}
121
122#[derive(Debug, Clone)]
123pub struct ResolvedPath {
124 pub storage: String,
125 pub uri: String,
126 pub local_path: Option<PathBuf>,
127}
128
129#[derive(Clone)]
130pub struct StorageResolver {
131 config_base: ConfigBase,
132 default_name: String,
133 definitions: HashMap<String, StorageDefinition>,
134 has_config: bool,
135}
136
137impl StorageResolver {
138 pub fn config_local_dir(&self) -> &Path {
139 self.config_base.local_dir()
140 }
141
142 pub fn config_is_remote(&self) -> bool {
143 self.config_base.remote_base().is_some()
144 }
145
146 pub fn resolve_local_path(&self, raw_path: &str) -> FloeResult<ResolvedPath> {
147 if is_remote_uri(raw_path) {
148 return Err(Box::new(ConfigError(format!(
149 "entity.state.path must be a local path (got {})",
150 raw_path
151 ))));
152 }
153 if self.config_base.remote_base().is_some() && Path::new(raw_path).is_relative() {
154 return Err(Box::new(ConfigError(
155 "entity.state.path must be absolute when config is remote".to_string(),
156 )));
157 }
158 let resolved = resolve_local_path(self.config_base.local_dir(), raw_path);
159 Ok(ResolvedPath {
160 storage: "local".to_string(),
161 uri: local_uri(&resolved),
162 local_path: Some(resolved),
163 })
164 }
165
166 pub fn new(config: &RootConfig, config_base: ConfigBase) -> FloeResult<Self> {
167 if let Some(storages) = &config.storages {
168 let mut definitions = HashMap::new();
169 for definition in &storages.definitions {
170 if definitions
171 .insert(definition.name.clone(), definition.clone())
172 .is_some()
173 {
174 return Err(Box::new(ConfigError(format!(
175 "storages.definitions name={} is duplicated",
176 definition.name
177 ))));
178 }
179 }
180 let default_name = storages
181 .default
182 .clone()
183 .ok_or_else(|| Box::new(ConfigError("storages.default is required".to_string())))?;
184 if !definitions.contains_key(&default_name) {
185 return Err(Box::new(ConfigError(format!(
186 "storages.default={} does not match any definition",
187 default_name
188 ))));
189 }
190 Ok(Self {
191 config_base,
192 default_name,
193 definitions,
194 has_config: true,
195 })
196 } else {
197 Ok(Self {
198 config_base,
199 default_name: "local".to_string(),
200 definitions: HashMap::new(),
201 has_config: false,
202 })
203 }
204 }
205
206 pub fn from_path(config: &RootConfig, config_path: &Path) -> FloeResult<Self> {
207 Self::new(config, ConfigBase::local_from_path(config_path))
208 }
209
210 pub fn resolve_path(
211 &self,
212 entity_name: &str,
213 field: &str,
214 storage_name: Option<&str>,
215 raw_path: &str,
216 ) -> FloeResult<ResolvedPath> {
217 let name = storage_name.unwrap_or(self.default_name.as_str());
218 if !self.has_config && name != "local" && !self.definitions.contains_key(name) {
219 return Err(Box::new(ConfigError(format!(
220 "entity.name={} {field} references unknown storage {} (no storages block)",
221 entity_name, name
222 ))));
223 }
224
225 let definition = if self.has_config {
226 self.definitions.get(name).cloned().ok_or_else(|| {
227 Box::new(ConfigError(format!(
228 "entity.name={} {field} references unknown storage {}",
229 entity_name, name
230 )))
231 })?
232 } else {
233 StorageDefinition {
234 name: "local".to_string(),
235 fs_type: "local".to_string(),
236 bucket: None,
237 region: None,
238 account: None,
239 container: None,
240 prefix: None,
241 endpoint: None,
242 path_style_access: None,
243 }
244 };
245
246 let resolved_remote = self
247 .resolve_remote_relative(&definition, raw_path)
248 .unwrap_or_else(|| raw_path.to_string());
249 let raw_path = resolved_remote.as_str();
250
251 match definition.fs_type.as_str() {
252 "local" => {
253 if is_remote_uri(raw_path) {
254 return Err(Box::new(ConfigError(format!(
255 "entity.name={} {field} must be a local path (got {})",
256 entity_name, raw_path
257 ))));
258 }
259 if self.config_base.remote_base().is_some() && Path::new(raw_path).is_relative() {
260 return Err(Box::new(ConfigError(format!(
261 "entity.name={} {field} must be absolute when config is remote",
262 entity_name
263 ))));
264 }
265 let resolved = resolve_local_path(self.config_base.local_dir(), raw_path);
266 Ok(ResolvedPath {
267 storage: name.to_string(),
268 uri: local_uri(&resolved),
269 local_path: Some(resolved),
270 })
271 }
272 "s3" => {
273 let uri = resolve_s3_uri(&definition, raw_path)?;
274 Ok(ResolvedPath {
275 storage: name.to_string(),
276 uri,
277 local_path: None,
278 })
279 }
280 "adls" => {
281 let uri = resolve_adls_uri(&definition, raw_path)?;
282 Ok(ResolvedPath {
283 storage: name.to_string(),
284 uri,
285 local_path: None,
286 })
287 }
288 "gcs" => {
289 let uri = resolve_gcs_uri(&definition, raw_path)?;
290 Ok(ResolvedPath {
291 storage: name.to_string(),
292 uri,
293 local_path: None,
294 })
295 }
296 _ => Err(Box::new(ConfigError(format!(
297 "storage type {} is unsupported",
298 definition.fs_type
299 )))),
300 }
301 }
302
303 pub fn resolve_report_path(
304 &self,
305 storage_name: Option<&str>,
306 raw_path: &str,
307 ) -> FloeResult<ResolvedPath> {
308 let name = storage_name.unwrap_or(self.default_name.as_str());
309 if !self.has_config && name != "local" && !self.definitions.contains_key(name) {
310 return Err(Box::new(ConfigError(format!(
311 "report.storage references unknown storage {} (no storages block)",
312 name
313 ))));
314 }
315
316 let definition = if self.has_config {
317 self.definitions.get(name).cloned().ok_or_else(|| {
318 Box::new(ConfigError(format!(
319 "report.storage references unknown storage {}",
320 name
321 )))
322 })?
323 } else {
324 StorageDefinition {
325 name: "local".to_string(),
326 fs_type: "local".to_string(),
327 bucket: None,
328 region: None,
329 account: None,
330 container: None,
331 prefix: None,
332 endpoint: None,
333 path_style_access: None,
334 }
335 };
336
337 let resolved_remote = self
338 .resolve_remote_relative(&definition, raw_path)
339 .unwrap_or_else(|| raw_path.to_string());
340 let raw_path = resolved_remote.as_str();
341
342 match definition.fs_type.as_str() {
343 "local" => {
344 if is_remote_uri(raw_path) {
345 return Err(Box::new(ConfigError(format!(
346 "report.path must be a local path (got {})",
347 raw_path
348 ))));
349 }
350 if self.config_base.remote_base().is_some() && Path::new(raw_path).is_relative() {
351 return Err(Box::new(ConfigError(
352 "report.path must be absolute when config is remote".to_string(),
353 )));
354 }
355 let resolved = resolve_local_path(self.config_base.local_dir(), raw_path);
356 Ok(ResolvedPath {
357 storage: name.to_string(),
358 uri: local_uri(&resolved),
359 local_path: Some(resolved),
360 })
361 }
362 "s3" => {
363 let uri = resolve_s3_uri(&definition, raw_path)?;
364 Ok(ResolvedPath {
365 storage: name.to_string(),
366 uri,
367 local_path: None,
368 })
369 }
370 "adls" => {
371 let uri = resolve_adls_uri(&definition, raw_path)?;
372 Ok(ResolvedPath {
373 storage: name.to_string(),
374 uri,
375 local_path: None,
376 })
377 }
378 "gcs" => {
379 let uri = resolve_gcs_uri(&definition, raw_path)?;
380 Ok(ResolvedPath {
381 storage: name.to_string(),
382 uri,
383 local_path: None,
384 })
385 }
386 _ => Err(Box::new(ConfigError(format!(
387 "storage type {} is unsupported",
388 definition.fs_type
389 )))),
390 }
391 }
392
393 pub fn find_definition_name_for_uri(&self, uri: &str) -> Option<String> {
396 for (name, def) in &self.definitions {
397 if uri.starts_with("s3://") && def.fs_type == "s3" {
398 if let Some(b) = &def.bucket {
399 if uri.starts_with(&format!("s3://{b}/")) || uri == format!("s3://{b}") {
400 return Some(name.clone());
401 }
402 }
403 }
404 if uri.starts_with("gs://") && def.fs_type == "gcs" {
405 if let Some(b) = &def.bucket {
406 if uri.starts_with(&format!("gs://{b}/")) || uri == format!("gs://{b}") {
407 return Some(name.clone());
408 }
409 }
410 }
411 if uri.starts_with("abfs://") && def.fs_type == "adls" {
412 if let (Some(c), Some(a)) = (&def.container, &def.account) {
413 if uri.starts_with(&format!("abfs://{c}@{a}.dfs.core.windows.net")) {
414 return Some(name.clone());
415 }
416 }
417 }
418 }
419 None
420 }
421
422 pub fn register_definition(&mut self, definition: StorageDefinition) {
426 self.definitions.insert(definition.name.clone(), definition);
427 }
428
429 pub fn definition(&self, name: &str) -> Option<StorageDefinition> {
430 if self.has_config {
431 self.definitions.get(name).cloned()
432 } else if let Some(def) = self.definitions.get(name) {
433 Some(def.clone())
435 } else if name == "local" {
436 Some(StorageDefinition {
437 name: "local".to_string(),
438 fs_type: "local".to_string(),
439 bucket: None,
440 region: None,
441 account: None,
442 container: None,
443 prefix: None,
444 endpoint: None,
445 path_style_access: None,
446 })
447 } else {
448 None
449 }
450 }
451
452 pub fn default_storage_name(&self) -> &str {
453 self.default_name.as_str()
454 }
455
456 pub fn config_dir(&self) -> &Path {
457 self.config_base.local_dir()
458 }
459
460 fn resolve_remote_relative(
461 &self,
462 definition: &StorageDefinition,
463 raw_path: &str,
464 ) -> Option<String> {
465 if !is_relative_path(raw_path) {
466 return None;
467 }
468 if definition.prefix.is_some() {
469 return None;
470 }
471 let remote = self.config_base.remote_base()?;
472 if !remote.matches_storage(definition.fs_type.as_str()) {
473 return None;
474 }
475 Some(remote.join(raw_path))
476 }
477}
478
479pub fn resolve_local_path(config_dir: &Path, raw_path: &str) -> PathBuf {
480 let path = Path::new(raw_path);
481 let resolved = if path.is_absolute() {
482 path.to_path_buf()
483 } else {
484 config_dir.join(path)
485 };
486 crate::io::storage::paths::normalize_local_path(&resolved)
487}
488
489fn local_uri(path: &Path) -> String {
490 let normalized = crate::io::storage::paths::normalize_local_path(path);
491 format!("local://{}", normalized.display())
492}
493
494fn resolve_s3_uri(definition: &StorageDefinition, raw_path: &str) -> FloeResult<String> {
495 let bucket = definition.bucket.as_ref().ok_or_else(|| {
496 Box::new(ConfigError(format!(
497 "storage {} requires bucket for type s3",
498 definition.name
499 )))
500 })?;
501 if let Some((bucket_in_path, key)) = parse_s3_uri(raw_path) {
502 if bucket_in_path != *bucket {
503 return Err(Box::new(ConfigError(format!(
504 "storage {} bucket mismatch: {}",
505 definition.name, bucket_in_path
506 ))));
507 }
508 return Ok(format_s3_uri(bucket, &key));
509 }
510
511 let key = join_s3_key(definition.prefix.as_deref().unwrap_or(""), raw_path);
512 Ok(format_s3_uri(bucket, &key))
513}
514
515fn resolve_adls_uri(definition: &StorageDefinition, raw_path: &str) -> FloeResult<String> {
516 let account = definition.account.as_ref().ok_or_else(|| {
517 Box::new(ConfigError(format!(
518 "storage {} requires account for type adls",
519 definition.name
520 )))
521 })?;
522 let container = definition.container.as_ref().ok_or_else(|| {
523 Box::new(ConfigError(format!(
524 "storage {} requires container for type adls",
525 definition.name
526 )))
527 })?;
528 if let Some((container_in_path, account_in_path, path)) = parse_adls_uri(raw_path) {
529 if container_in_path != *container || account_in_path != *account {
530 return Err(Box::new(ConfigError(format!(
531 "storage {} adls account/container mismatch",
532 definition.name
533 ))));
534 }
535 return Ok(format_adls_uri(container, account, &path));
536 }
537 let prefix = definition.prefix.as_deref().unwrap_or("");
538 let combined = join_adls_path(prefix, raw_path);
539 Ok(format_adls_uri(container, account, &combined))
540}
541
542fn join_adls_path(prefix: &str, raw_path: &str) -> String {
543 let prefix = prefix.trim_matches('/');
544 let trimmed = raw_path.trim_start_matches('/');
545 match (prefix.is_empty(), trimmed.is_empty()) {
546 (true, true) => String::new(),
547 (true, false) => trimmed.to_string(),
548 (false, true) => prefix.to_string(),
549 (false, false) => format!("{}/{}", prefix, trimmed),
550 }
551}
552
553fn format_adls_uri(container: &str, account: &str, path: &str) -> String {
554 if path.is_empty() {
555 format!("abfs://{}@{}.dfs.core.windows.net", container, account)
556 } else {
557 format!(
558 "abfs://{}@{}.dfs.core.windows.net/{}",
559 container, account, path
560 )
561 }
562}
563
564fn parse_s3_uri(value: &str) -> Option<(String, String)> {
565 let stripped = value.strip_prefix("s3://")?;
566 let mut parts = stripped.splitn(2, '/');
567 let bucket = parts.next()?.to_string();
568 if bucket.is_empty() {
569 return None;
570 }
571 let key = parts.next().unwrap_or("").to_string();
572 Some((bucket, key))
573}
574
575fn join_s3_key(prefix: &str, raw_path: &str) -> String {
576 let prefix = prefix.trim_matches('/');
577 let trimmed = raw_path.trim_start_matches('/');
578 match (prefix.is_empty(), trimmed.is_empty()) {
579 (true, true) => String::new(),
580 (true, false) => trimmed.to_string(),
581 (false, true) => prefix.to_string(),
582 (false, false) => format!("{}/{}", prefix, trimmed),
583 }
584}
585
586fn format_s3_uri(bucket: &str, key: &str) -> String {
587 if key.is_empty() {
588 format!("s3://{}", bucket)
589 } else {
590 format!("s3://{}/{}", bucket, key)
591 }
592}
593
594fn resolve_gcs_uri(definition: &StorageDefinition, raw_path: &str) -> FloeResult<String> {
595 let bucket = definition.bucket.as_ref().ok_or_else(|| {
596 Box::new(ConfigError(format!(
597 "storage {} requires bucket for type gcs",
598 definition.name
599 )))
600 })?;
601 if let Some((bucket_in_path, key)) = parse_gcs_uri(raw_path) {
602 if bucket_in_path != *bucket {
603 return Err(Box::new(ConfigError(format!(
604 "storage {} bucket mismatch: {}",
605 definition.name, bucket_in_path
606 ))));
607 }
608 return Ok(format_gcs_uri(bucket, &key));
609 }
610
611 let key = join_gcs_key(definition.prefix.as_deref().unwrap_or(""), raw_path);
612 Ok(format_gcs_uri(bucket, &key))
613}
614
615fn parse_gcs_uri(value: &str) -> Option<(String, String)> {
616 let stripped = value.strip_prefix("gs://")?;
617 let mut parts = stripped.splitn(2, '/');
618 let bucket = parts.next()?.to_string();
619 if bucket.is_empty() {
620 return None;
621 }
622 let key = parts.next().unwrap_or("").to_string();
623 Some((bucket, key))
624}
625
626fn join_gcs_key(prefix: &str, raw_path: &str) -> String {
627 let prefix = prefix.trim_matches('/');
628 let trimmed = raw_path.trim_start_matches('/');
629 match (prefix.is_empty(), trimmed.is_empty()) {
630 (true, true) => String::new(),
631 (true, false) => trimmed.to_string(),
632 (false, true) => prefix.to_string(),
633 (false, false) => format!("{}/{}", prefix, trimmed),
634 }
635}
636
637fn format_gcs_uri(bucket: &str, key: &str) -> String {
638 if key.is_empty() {
639 format!("gs://{}", bucket)
640 } else {
641 format!("gs://{}/{}", bucket, key)
642 }
643}
644
645fn parse_adls_uri(value: &str) -> Option<(String, String, String)> {
646 let stripped = value.strip_prefix("abfs://")?;
647 let (container, rest) = stripped.split_once('@')?;
648 let (account, path) = rest.split_once(".dfs.core.windows.net")?;
649 let path = path.trim_start_matches('/').to_string();
650 if container.is_empty() || account.is_empty() {
651 return None;
652 }
653 Some((container.to_string(), account.to_string(), path))
654}
655
656fn parent_prefix(key: &str) -> String {
657 match key.rsplit_once('/') {
658 Some((parent, _)) => parent.to_string(),
659 None => String::new(),
660 }
661}
662
663pub(crate) fn is_remote_uri(value: &str) -> bool {
664 value.starts_with("s3://") || value.starts_with("gs://") || value.starts_with("abfs://")
665}
666
667fn is_relative_path(value: &str) -> bool {
668 !is_remote_uri(value) && Path::new(value).is_relative()
669}