floe_core/io/storage/providers/
adls.rs1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3
4use azure_identity::{DefaultAzureCredential, TokenCredentialOptions};
5use azure_storage::StorageCredentials;
6use azure_storage_blobs::prelude::{BlobServiceClient, ContainerClient};
7use futures::StreamExt;
8use tokio::runtime::Runtime;
9
10use crate::errors::StorageError;
11use crate::io::storage::{planner, uri, validation, ObjectRef, StorageClient};
12use crate::{config, FloeResult};
13
14pub struct AdlsClient {
15 account: String,
16 container: String,
17 prefix: String,
18 runtime: Runtime,
19 container_client: ContainerClient,
20}
21
22impl AdlsClient {
23 pub fn new(definition: &config::StorageDefinition) -> FloeResult<Self> {
24 let account =
25 validation::require_field(definition, definition.account.as_ref(), "account", "adls")?;
26 let container = validation::require_field(
27 definition,
28 definition.container.as_ref(),
29 "container",
30 "adls",
31 )?;
32 let prefix = definition.prefix.clone().unwrap_or_default();
33 let runtime = tokio::runtime::Builder::new_current_thread()
34 .enable_all()
35 .build()
36 .map_err(|err| Box::new(StorageError(format!("adls runtime init failed: {err}"))))?;
37 let credential = DefaultAzureCredential::create(TokenCredentialOptions::default())
38 .map_err(|err| Box::new(StorageError(format!("adls credential init failed: {err}"))))?;
39 let storage_credentials = StorageCredentials::token_credential(Arc::new(credential));
40 let service_client = BlobServiceClient::new(account.clone(), storage_credentials);
41 let container_client = service_client.container_client(container.clone());
42 Ok(Self {
43 account,
44 container,
45 prefix,
46 runtime,
47 container_client,
48 })
49 }
50
51 fn base_prefix(&self) -> String {
52 planner::normalize_separators(&self.prefix)
53 }
54
55 fn full_path(&self, path: &str) -> String {
56 let prefix = self.base_prefix();
57 let joined = planner::join_prefix(&prefix, &planner::normalize_separators(path));
58 joined.trim_start_matches('/').to_string()
59 }
60
61 fn format_abfs(&self, path: &str) -> String {
62 format_abfs_uri(&self.container, &self.account, path)
63 }
64}
65
66impl StorageClient for AdlsClient {
67 fn list(&self, prefix_or_path: &str) -> FloeResult<Vec<ObjectRef>> {
68 let prefix = self.full_path(prefix_or_path);
69 let container = self.container.clone();
70 let account = self.account.clone();
71 let client = self.container_client.clone();
72 self.runtime.block_on(async move {
73 let mut refs = Vec::new();
74 let mut stream = client.list_blobs().prefix(prefix.clone()).into_stream();
75 while let Some(resp) = stream.next().await {
76 let resp = resp.map_err(|err| {
77 Box::new(StorageError(format!("adls list failed: {err}")))
78 as Box<dyn std::error::Error + Send + Sync>
79 })?;
80 for blob in resp.blobs.blobs() {
81 let key = blob.name.clone();
82 let uri = if key.is_empty() {
83 format!("abfs://{}@{}.dfs.core.windows.net", container, account)
84 } else {
85 format!(
86 "abfs://{}@{}.dfs.core.windows.net/{}",
87 container, account, key
88 )
89 };
90 refs.push(planner::object_ref(
91 uri,
92 key,
93 Some(blob.properties.last_modified.to_string()),
94 Some(blob.properties.content_length),
95 ));
96 }
97 }
98 Ok(planner::stable_sort_refs(refs))
99 })
100 }
101
102 fn download_to_temp(&self, uri: &str, temp_dir: &Path) -> FloeResult<PathBuf> {
103 let key = uri
104 .split_once(".dfs.core.windows.net/")
105 .map(|(_, tail)| tail)
106 .unwrap_or("")
107 .trim_start_matches('/')
108 .to_string();
109 let key = if key.is_empty() {
110 return Err(Box::new(StorageError(
111 "adls download requires a blob path".to_string(),
112 )));
113 } else {
114 key
115 };
116 let dest = planner::temp_path_for_key(temp_dir, &key);
117 let dest_clone = dest.clone();
118 let client = self.container_client.clone();
119 let key_clone = key.clone();
120 self.runtime.block_on(async move {
121 if let Some(parent) = dest_clone.parent() {
122 tokio::fs::create_dir_all(parent).await?;
123 }
124 let blob = client.blob_client(key_clone);
125 let mut stream = blob.get().into_stream();
126 let mut file = tokio::fs::File::create(&dest_clone).await?;
127 while let Some(chunk) = stream.next().await {
128 let resp = chunk.map_err(|err| {
129 Box::new(StorageError(format!("adls download failed: {err}")))
130 as Box<dyn std::error::Error + Send + Sync>
131 })?;
132 let bytes = resp.data.collect().await.map_err(|err| {
133 Box::new(StorageError(format!("adls download read failed: {err}")))
134 as Box<dyn std::error::Error + Send + Sync>
135 })?;
136 tokio::io::AsyncWriteExt::write_all(&mut file, &bytes).await?;
137 }
138 Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
139 })?;
140 Ok(dest)
141 }
142
143 fn upload_from_path(&self, local_path: &Path, uri: &str) -> FloeResult<()> {
144 let key = uri
145 .split_once(".dfs.core.windows.net/")
146 .map(|(_, tail)| tail)
147 .unwrap_or("")
148 .trim_start_matches('/')
149 .to_string();
150 if key.is_empty() {
151 return Err(Box::new(StorageError(
152 "adls upload requires a blob path".to_string(),
153 )));
154 }
155 let client = self.container_client.clone();
156 let path = local_path.to_path_buf();
157 self.runtime.block_on(async move {
158 let data = tokio::fs::read(path).await?;
159 let blob = client.blob_client(key);
160 blob.put_block_blob(data)
161 .content_type("application/octet-stream")
162 .into_future()
163 .await
164 .map_err(|err| {
165 Box::new(StorageError(format!("adls upload failed: {err}")))
166 as Box<dyn std::error::Error + Send + Sync>
167 })?;
168 Ok(())
169 })
170 }
171
172 fn resolve_uri(&self, path: &str) -> FloeResult<String> {
173 Ok(self.format_abfs(&self.full_path(path)))
174 }
175
176 fn copy_object(&self, src_uri: &str, dst_uri: &str) -> FloeResult<()> {
177 planner::copy_via_temp(self, src_uri, dst_uri)
178 }
179
180 fn delete_object(&self, uri: &str) -> FloeResult<()> {
181 let key = uri
182 .split_once(".dfs.core.windows.net/")
183 .map(|(_, tail)| tail)
184 .unwrap_or("")
185 .trim_start_matches('/')
186 .to_string();
187 if key.is_empty() {
188 return Ok(());
189 }
190 let client = self.container_client.clone();
191 self.runtime.block_on(async move {
192 let blob = client.blob_client(key);
193 blob.delete().into_future().await.map_err(|err| {
194 Box::new(StorageError(format!("adls delete failed: {err}")))
195 as Box<dyn std::error::Error + Send + Sync>
196 })?;
197 Ok(())
198 })
199 }
200
201 fn exists(&self, uri: &str) -> FloeResult<bool> {
202 let key = uri
203 .split_once(".dfs.core.windows.net/")
204 .map(|(_, tail)| tail)
205 .unwrap_or("")
206 .trim_start_matches('/')
207 .to_string();
208 planner::exists_by_key(self, &key)
209 }
210}
211
212pub fn parse_adls_uri(uri: &str) -> FloeResult<AdlsLocation> {
213 uri::parse_abfs_uri(uri)
214}
215
216pub fn format_abfs_uri(container: &str, account: &str, path: &str) -> String {
217 uri::format_abfs_uri(container, account, path)
218}
219
220pub type AdlsLocation = uri::AdlsLocation;