Skip to main content

floe_core/io/storage/
adls.rs

1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3
4use azure_identity::{DefaultAzureCredential, TokenCredentialOptions};
5use azure_storage::StorageCredentials;
6use azure_storage_blobs::prelude::{BlobServiceClient, ContainerClient};
7use futures::StreamExt;
8use tokio::runtime::Runtime;
9
10use crate::errors::{RunError, StorageError};
11use crate::{config, ConfigError, FloeResult};
12
13use super::{planner, ObjectRef, StorageClient};
14
15pub struct AdlsClient {
16    account: String,
17    container: String,
18    prefix: String,
19    runtime: Runtime,
20    container_client: ContainerClient,
21}
22
23impl AdlsClient {
24    pub fn new(definition: &config::StorageDefinition) -> FloeResult<Self> {
25        let account = definition.account.clone().ok_or_else(|| {
26            Box::new(StorageError(format!(
27                "storage {} requires account for type adls",
28                definition.name
29            )))
30        })?;
31        let container = definition.container.clone().ok_or_else(|| {
32            Box::new(StorageError(format!(
33                "storage {} requires container for type adls",
34                definition.name
35            )))
36        })?;
37        let prefix = definition.prefix.clone().unwrap_or_default();
38        let runtime = tokio::runtime::Builder::new_current_thread()
39            .enable_all()
40            .build()
41            .map_err(|err| Box::new(StorageError(format!("adls runtime init failed: {err}"))))?;
42        let credential = DefaultAzureCredential::create(TokenCredentialOptions::default())
43            .map_err(|err| Box::new(StorageError(format!("adls credential init failed: {err}"))))?;
44        let storage_credentials = StorageCredentials::token_credential(Arc::new(credential));
45        let service_client = BlobServiceClient::new(account.clone(), storage_credentials);
46        let container_client = service_client.container_client(container.clone());
47        Ok(Self {
48            account,
49            container,
50            prefix,
51            runtime,
52            container_client,
53        })
54    }
55
56    fn base_prefix(&self) -> String {
57        planner::normalize_separators(&self.prefix)
58    }
59
60    fn full_path(&self, path: &str) -> String {
61        let prefix = self.base_prefix();
62        let joined = planner::join_prefix(&prefix, &planner::normalize_separators(path));
63        joined.trim_start_matches('/').to_string()
64    }
65
66    fn format_abfs(&self, path: &str) -> String {
67        format_abfs_uri(&self.container, &self.account, path)
68    }
69}
70
71impl StorageClient for AdlsClient {
72    fn list(&self, prefix_or_path: &str) -> FloeResult<Vec<ObjectRef>> {
73        let prefix = self.full_path(prefix_or_path);
74        let container = self.container.clone();
75        let account = self.account.clone();
76        let client = self.container_client.clone();
77        self.runtime.block_on(async move {
78            let mut refs = Vec::new();
79            let mut stream = client.list_blobs().prefix(prefix.clone()).into_stream();
80            while let Some(resp) = stream.next().await {
81                let resp = resp.map_err(|err| {
82                    Box::new(StorageError(format!("adls list failed: {err}")))
83                        as Box<dyn std::error::Error + Send + Sync>
84                })?;
85                for blob in resp.blobs.blobs() {
86                    let key = blob.name.clone();
87                    let uri = if key.is_empty() {
88                        format!("abfs://{}@{}.dfs.core.windows.net", container, account)
89                    } else {
90                        format!(
91                            "abfs://{}@{}.dfs.core.windows.net/{}",
92                            container, account, key
93                        )
94                    };
95                    refs.push(ObjectRef {
96                        uri,
97                        key,
98                        last_modified: Some(blob.properties.last_modified.to_string()),
99                        size: Some(blob.properties.content_length),
100                    });
101                }
102            }
103            Ok(planner::stable_sort_refs(refs))
104        })
105    }
106
107    fn download_to_temp(&self, uri: &str, temp_dir: &Path) -> FloeResult<PathBuf> {
108        let key = uri
109            .split_once(".dfs.core.windows.net/")
110            .map(|(_, tail)| tail)
111            .unwrap_or("")
112            .trim_start_matches('/')
113            .to_string();
114        let key = if key.is_empty() {
115            return Err(Box::new(StorageError(
116                "adls download requires a blob path".to_string(),
117            )));
118        } else {
119            key
120        };
121        let dest = temp_dir.join(
122            Path::new(&key)
123                .file_name()
124                .and_then(|name| name.to_str())
125                .unwrap_or("object"),
126        );
127        let dest_clone = dest.clone();
128        let client = self.container_client.clone();
129        let key_clone = key.clone();
130        self.runtime.block_on(async move {
131            if let Some(parent) = dest_clone.parent() {
132                tokio::fs::create_dir_all(parent).await?;
133            }
134            let blob = client.blob_client(key_clone);
135            let mut stream = blob.get().into_stream();
136            let mut file = tokio::fs::File::create(&dest_clone).await?;
137            while let Some(chunk) = stream.next().await {
138                let resp = chunk.map_err(|err| {
139                    Box::new(StorageError(format!("adls download failed: {err}")))
140                        as Box<dyn std::error::Error + Send + Sync>
141                })?;
142                let bytes = resp.data.collect().await.map_err(|err| {
143                    Box::new(StorageError(format!("adls download read failed: {err}")))
144                        as Box<dyn std::error::Error + Send + Sync>
145                })?;
146                tokio::io::AsyncWriteExt::write_all(&mut file, &bytes).await?;
147            }
148            Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
149        })?;
150        Ok(dest)
151    }
152
153    fn upload_from_path(&self, local_path: &Path, uri: &str) -> FloeResult<()> {
154        let key = uri
155            .split_once(".dfs.core.windows.net/")
156            .map(|(_, tail)| tail)
157            .unwrap_or("")
158            .trim_start_matches('/')
159            .to_string();
160        if key.is_empty() {
161            return Err(Box::new(StorageError(
162                "adls upload requires a blob path".to_string(),
163            )));
164        }
165        let client = self.container_client.clone();
166        let path = local_path.to_path_buf();
167        self.runtime.block_on(async move {
168            let data = tokio::fs::read(path).await?;
169            let blob = client.blob_client(key);
170            blob.put_block_blob(data)
171                .content_type("application/octet-stream")
172                .into_future()
173                .await
174                .map_err(|err| {
175                    Box::new(StorageError(format!("adls upload failed: {err}")))
176                        as Box<dyn std::error::Error + Send + Sync>
177                })?;
178            Ok(())
179        })
180    }
181
182    fn resolve_uri(&self, path: &str) -> FloeResult<String> {
183        Ok(self.format_abfs(&self.full_path(path)))
184    }
185
186    fn delete(&self, uri: &str) -> FloeResult<()> {
187        let key = uri
188            .split_once(".dfs.core.windows.net/")
189            .map(|(_, tail)| tail)
190            .unwrap_or("")
191            .trim_start_matches('/')
192            .to_string();
193        if key.is_empty() {
194            return Ok(());
195        }
196        let client = self.container_client.clone();
197        self.runtime.block_on(async move {
198            let blob = client.blob_client(key);
199            blob.delete().into_future().await.map_err(|err| {
200                Box::new(StorageError(format!("adls delete failed: {err}")))
201                    as Box<dyn std::error::Error + Send + Sync>
202            })?;
203            Ok(())
204        })
205    }
206}
207
208#[derive(Debug, Clone, PartialEq, Eq)]
209pub struct AdlsLocation {
210    pub account: String,
211    pub container: String,
212    pub path: String,
213}
214
215pub fn parse_adls_uri(uri: &str) -> FloeResult<AdlsLocation> {
216    let stripped = uri.strip_prefix("abfs://").ok_or_else(|| {
217        Box::new(ConfigError(format!("expected abfs uri, got {}", uri)))
218            as Box<dyn std::error::Error + Send + Sync>
219    })?;
220    let (container, rest) = stripped.split_once('@').ok_or_else(|| {
221        Box::new(ConfigError(format!(
222            "missing container in abfs uri: {}",
223            uri
224        ))) as Box<dyn std::error::Error + Send + Sync>
225    })?;
226    let (account, path) = rest.split_once(".dfs.core.windows.net").ok_or_else(|| {
227        Box::new(ConfigError(format!("missing account in abfs uri: {}", uri)))
228            as Box<dyn std::error::Error + Send + Sync>
229    })?;
230    let path = path.trim_start_matches('/');
231    Ok(AdlsLocation {
232        account: account.to_string(),
233        container: container.to_string(),
234        path: path.to_string(),
235    })
236}
237
238pub fn format_abfs_uri(container: &str, account: &str, path: &str) -> String {
239    let trimmed = path.trim_start_matches('/');
240    if trimmed.is_empty() {
241        format!("abfs://{}@{}.dfs.core.windows.net", container, account)
242    } else {
243        format!(
244            "abfs://{}@{}.dfs.core.windows.net/{}",
245            container, account, trimmed
246        )
247    }
248}
249
250pub fn build_input_files(
251    client: &dyn StorageClient,
252    container: &str,
253    account: &str,
254    prefix: &str,
255    adapter: &dyn crate::io::format::InputAdapter,
256    temp_dir: &Path,
257    entity: &crate::config::EntityConfig,
258    storage: &str,
259) -> FloeResult<Vec<crate::io::format::InputFile>> {
260    let suffixes = adapter.suffixes()?;
261    let list_refs = client.list(prefix)?;
262    let filtered = planner::filter_by_suffixes(list_refs, &suffixes);
263    let filtered = planner::stable_sort_refs(filtered);
264    if filtered.is_empty() {
265        return Err(Box::new(RunError(format!(
266            "entity.name={} source.storage={} no input objects matched (container={}, account={}, prefix={}, suffixes={})",
267            entity.name,
268            storage,
269            container,
270            account,
271            prefix,
272            suffixes.join(",")
273        ))));
274    }
275    let mut inputs = Vec::with_capacity(filtered.len());
276    for object in filtered {
277        let local_path = client.download_to_temp(&object.uri, temp_dir)?;
278        let source_name = crate::io::storage::s3::file_name_from_key(&object.key)
279            .unwrap_or_else(|| entity.name.clone());
280        let source_stem = crate::io::storage::s3::file_stem_from_name(&source_name)
281            .unwrap_or_else(|| entity.name.clone());
282        let source_uri = object.uri;
283        inputs.push(crate::io::format::InputFile {
284            source_uri,
285            source_local_path: local_path,
286            source_name,
287            source_stem,
288        });
289    }
290    Ok(inputs)
291}