Skip to main content

floe_core/io/storage/
target.rs

1use crate::{config, io, ConfigError, FloeResult};
2
3use super::paths;
4
5#[derive(Debug, Clone)]
6pub enum Target {
7    Local {
8        storage: String,
9        uri: String,
10        base_path: String,
11    },
12    S3 {
13        storage: String,
14        uri: String,
15        bucket: String,
16        base_key: String,
17    },
18    Adls {
19        storage: String,
20        uri: String,
21        account: String,
22        container: String,
23        base_path: String,
24    },
25    Gcs {
26        storage: String,
27        uri: String,
28        bucket: String,
29        base_key: String,
30    },
31}
32
33impl Target {
34    pub fn from_resolved(resolved: &config::ResolvedPath) -> FloeResult<Self> {
35        if let Some(path) = &resolved.local_path {
36            return Ok(Target::Local {
37                storage: resolved.storage.clone(),
38                uri: resolved.uri.clone(),
39                base_path: path.display().to_string(),
40            });
41        }
42        if resolved.uri.starts_with("s3://") {
43            let location = io::storage::s3::parse_s3_uri(&resolved.uri)?;
44            return Ok(Target::S3 {
45                storage: resolved.storage.clone(),
46                uri: resolved.uri.clone(),
47                bucket: location.bucket,
48                base_key: location.key,
49            });
50        }
51        if resolved.uri.starts_with("abfs://") {
52            let location = io::storage::adls::parse_adls_uri(&resolved.uri)?;
53            return Ok(Target::Adls {
54                storage: resolved.storage.clone(),
55                uri: resolved.uri.clone(),
56                account: location.account,
57                container: location.container,
58                base_path: location.path,
59            });
60        }
61        if resolved.uri.starts_with("gs://") {
62            let location = io::storage::gcs::parse_gcs_uri(&resolved.uri)?;
63            return Ok(Target::Gcs {
64                storage: resolved.storage.clone(),
65                uri: resolved.uri.clone(),
66                bucket: location.bucket,
67                base_key: location.key,
68            });
69        }
70        Err(Box::new(ConfigError(format!(
71            "unsupported storage uri: {}",
72            resolved.uri
73        ))))
74    }
75
76    pub fn storage(&self) -> &str {
77        match self {
78            Target::Local { storage, .. }
79            | Target::S3 { storage, .. }
80            | Target::Adls { storage, .. }
81            | Target::Gcs { storage, .. } => storage.as_str(),
82        }
83    }
84
85    pub fn target_uri(&self) -> &str {
86        match self {
87            Target::Local { uri, .. }
88            | Target::S3 { uri, .. }
89            | Target::Adls { uri, .. }
90            | Target::Gcs { uri, .. } => uri.as_str(),
91        }
92    }
93
94    pub fn join_relative(&self, relative: &str) -> String {
95        match self {
96            Target::Local { base_path, .. } => paths::resolve_output_dir_path(base_path, relative)
97                .display()
98                .to_string(),
99            Target::S3 {
100                bucket, base_key, ..
101            } => {
102                let key = paths::resolve_output_dir_key(base_key, relative);
103                io::storage::s3::format_s3_uri(bucket, &key)
104            }
105            Target::Adls {
106                account,
107                container,
108                base_path,
109                ..
110            } => {
111                let key = paths::resolve_output_dir_key(base_path, relative);
112                io::storage::adls::format_abfs_uri(container, account, &key)
113            }
114            Target::Gcs {
115                bucket, base_key, ..
116            } => {
117                let key = paths::resolve_output_dir_key(base_key, relative);
118                io::storage::gcs::format_gcs_uri(bucket, &key)
119            }
120        }
121    }
122
123    pub fn s3_parts(&self) -> Option<(&str, &str)> {
124        match self {
125            Target::S3 {
126                bucket, base_key, ..
127            } => Some((bucket.as_str(), base_key.as_str())),
128            Target::Local { .. } => None,
129            Target::Adls { .. } | Target::Gcs { .. } => None,
130        }
131    }
132
133    pub fn gcs_parts(&self) -> Option<(&str, &str)> {
134        match self {
135            Target::Gcs {
136                bucket, base_key, ..
137            } => Some((bucket.as_str(), base_key.as_str())),
138            Target::Local { .. } | Target::S3 { .. } | Target::Adls { .. } => None,
139        }
140    }
141
142    pub fn adls_parts(&self) -> Option<(&str, &str, &str)> {
143        match self {
144            Target::Adls {
145                container,
146                account,
147                base_path,
148                ..
149            } => Some((container.as_str(), account.as_str(), base_path.as_str())),
150            Target::Local { .. } | Target::S3 { .. } | Target::Gcs { .. } => None,
151        }
152    }
153}