Skip to main content

floe_core/io/storage/providers/
adls.rs

1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3
4use azure_identity::{DefaultAzureCredential, TokenCredentialOptions};
5use azure_storage::StorageCredentials;
6use azure_storage_blobs::prelude::{BlobServiceClient, ContainerClient};
7use futures::StreamExt;
8use tokio::runtime::Runtime;
9
10use crate::errors::StorageError;
11use crate::io::storage::{planner, uri, validation, ObjectRef, StorageClient};
12use crate::{config, FloeResult};
13
14pub struct AdlsClient {
15    account: String,
16    container: String,
17    prefix: String,
18    runtime: Runtime,
19    container_client: ContainerClient,
20}
21
22impl AdlsClient {
23    pub fn new(definition: &config::StorageDefinition) -> FloeResult<Self> {
24        let account =
25            validation::require_field(definition, definition.account.as_ref(), "account", "adls")?;
26        let container = validation::require_field(
27            definition,
28            definition.container.as_ref(),
29            "container",
30            "adls",
31        )?;
32        let prefix = definition.prefix.clone().unwrap_or_default();
33        let runtime = tokio::runtime::Builder::new_current_thread()
34            .enable_all()
35            .build()
36            .map_err(|err| Box::new(StorageError(format!("adls runtime init failed: {err}"))))?;
37        let credential = DefaultAzureCredential::create(TokenCredentialOptions::default())
38            .map_err(|err| Box::new(StorageError(format!("adls credential init failed: {err}"))))?;
39        let storage_credentials = StorageCredentials::token_credential(Arc::new(credential));
40        let service_client = BlobServiceClient::new(account.clone(), storage_credentials);
41        let container_client = service_client.container_client(container.clone());
42        Ok(Self {
43            account,
44            container,
45            prefix,
46            runtime,
47            container_client,
48        })
49    }
50
51    fn base_prefix(&self) -> String {
52        planner::normalize_separators(&self.prefix)
53    }
54
55    fn full_path(&self, path: &str) -> String {
56        let prefix = self.base_prefix();
57        let joined = planner::join_prefix(&prefix, &planner::normalize_separators(path));
58        joined.trim_start_matches('/').to_string()
59    }
60
61    fn format_abfs(&self, path: &str) -> String {
62        format_abfs_uri(&self.container, &self.account, path)
63    }
64}
65
66impl StorageClient for AdlsClient {
67    fn list(&self, prefix_or_path: &str) -> FloeResult<Vec<ObjectRef>> {
68        let prefix = self.full_path(prefix_or_path);
69        let container = self.container.clone();
70        let account = self.account.clone();
71        let client = self.container_client.clone();
72        self.runtime.block_on(async move {
73            let mut refs = Vec::new();
74            let mut stream = client.list_blobs().prefix(prefix.clone()).into_stream();
75            while let Some(resp) = stream.next().await {
76                let resp = resp.map_err(|err| {
77                    Box::new(StorageError(format!("adls list failed: {err}")))
78                        as Box<dyn std::error::Error + Send + Sync>
79                })?;
80                for blob in resp.blobs.blobs() {
81                    let key = blob.name.clone();
82                    let uri = if key.is_empty() {
83                        format!("abfs://{}@{}.dfs.core.windows.net", container, account)
84                    } else {
85                        format!(
86                            "abfs://{}@{}.dfs.core.windows.net/{}",
87                            container, account, key
88                        )
89                    };
90                    refs.push(planner::object_ref(
91                        uri,
92                        key,
93                        Some(blob.properties.last_modified.to_string()),
94                        Some(blob.properties.content_length),
95                    ));
96                }
97            }
98            Ok(planner::stable_sort_refs(refs))
99        })
100    }
101
102    fn download_to_temp(&self, uri: &str, temp_dir: &Path) -> FloeResult<PathBuf> {
103        let key = uri
104            .split_once(".dfs.core.windows.net/")
105            .map(|(_, tail)| tail)
106            .unwrap_or("")
107            .trim_start_matches('/')
108            .to_string();
109        let key = if key.is_empty() {
110            return Err(Box::new(StorageError(
111                "adls download requires a blob path".to_string(),
112            )));
113        } else {
114            key
115        };
116        let dest = planner::temp_path_for_key(temp_dir, &key);
117        let dest_clone = dest.clone();
118        let client = self.container_client.clone();
119        let key_clone = key.clone();
120        self.runtime.block_on(async move {
121            if let Some(parent) = dest_clone.parent() {
122                tokio::fs::create_dir_all(parent).await?;
123            }
124            let blob = client.blob_client(key_clone);
125            let mut stream = blob.get().into_stream();
126            let mut file = tokio::fs::File::create(&dest_clone).await?;
127            while let Some(chunk) = stream.next().await {
128                let resp = chunk.map_err(|err| {
129                    Box::new(StorageError(format!("adls download failed: {err}")))
130                        as Box<dyn std::error::Error + Send + Sync>
131                })?;
132                let bytes = resp.data.collect().await.map_err(|err| {
133                    Box::new(StorageError(format!("adls download read failed: {err}")))
134                        as Box<dyn std::error::Error + Send + Sync>
135                })?;
136                tokio::io::AsyncWriteExt::write_all(&mut file, &bytes).await?;
137            }
138            Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
139        })?;
140        Ok(dest)
141    }
142
143    fn upload_from_path(&self, local_path: &Path, uri: &str) -> FloeResult<()> {
144        let key = uri
145            .split_once(".dfs.core.windows.net/")
146            .map(|(_, tail)| tail)
147            .unwrap_or("")
148            .trim_start_matches('/')
149            .to_string();
150        if key.is_empty() {
151            return Err(Box::new(StorageError(
152                "adls upload requires a blob path".to_string(),
153            )));
154        }
155        let client = self.container_client.clone();
156        let path = local_path.to_path_buf();
157        self.runtime.block_on(async move {
158            let data = tokio::fs::read(path).await?;
159            let blob = client.blob_client(key);
160            blob.put_block_blob(data)
161                .content_type("application/octet-stream")
162                .into_future()
163                .await
164                .map_err(|err| {
165                    Box::new(StorageError(format!("adls upload failed: {err}")))
166                        as Box<dyn std::error::Error + Send + Sync>
167                })?;
168            Ok(())
169        })
170    }
171
172    fn resolve_uri(&self, path: &str) -> FloeResult<String> {
173        Ok(self.format_abfs(&self.full_path(path)))
174    }
175
176    fn copy_object(&self, src_uri: &str, dst_uri: &str) -> FloeResult<()> {
177        planner::copy_via_temp(self, src_uri, dst_uri)
178    }
179
180    fn delete_object(&self, uri: &str) -> FloeResult<()> {
181        let key = uri
182            .split_once(".dfs.core.windows.net/")
183            .map(|(_, tail)| tail)
184            .unwrap_or("")
185            .trim_start_matches('/')
186            .to_string();
187        if key.is_empty() {
188            return Ok(());
189        }
190        let client = self.container_client.clone();
191        self.runtime.block_on(async move {
192            let blob = client.blob_client(key);
193            blob.delete().into_future().await.map_err(|err| {
194                Box::new(StorageError(format!("adls delete failed: {err}")))
195                    as Box<dyn std::error::Error + Send + Sync>
196            })?;
197            Ok(())
198        })
199    }
200
201    fn exists(&self, uri: &str) -> FloeResult<bool> {
202        let key = uri
203            .split_once(".dfs.core.windows.net/")
204            .map(|(_, tail)| tail)
205            .unwrap_or("")
206            .trim_start_matches('/')
207            .to_string();
208        planner::exists_by_key(self, &key)
209    }
210}
211
212pub fn parse_adls_uri(uri: &str) -> FloeResult<AdlsLocation> {
213    uri::parse_abfs_uri(uri)
214}
215
216pub fn format_abfs_uri(container: &str, account: &str, path: &str) -> String {
217    uri::format_abfs_uri(container, account, path)
218}
219
220pub type AdlsLocation = uri::AdlsLocation;