1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3
4use azure_identity::{DefaultAzureCredential, TokenCredentialOptions};
5use azure_storage::StorageCredentials;
6use azure_storage_blobs::prelude::{BlobServiceClient, ContainerClient};
7use futures::StreamExt;
8use tokio::runtime::Runtime;
9
10use crate::errors::{RunError, StorageError};
11use crate::{config, ConfigError, FloeResult};
12
13use super::{planner, ObjectRef, StorageClient};
14
15pub struct AdlsClient {
16 account: String,
17 container: String,
18 prefix: String,
19 runtime: Runtime,
20 container_client: ContainerClient,
21}
22
23impl AdlsClient {
24 pub fn new(definition: &config::StorageDefinition) -> FloeResult<Self> {
25 let account = definition.account.clone().ok_or_else(|| {
26 Box::new(StorageError(format!(
27 "storage {} requires account for type adls",
28 definition.name
29 )))
30 })?;
31 let container = definition.container.clone().ok_or_else(|| {
32 Box::new(StorageError(format!(
33 "storage {} requires container for type adls",
34 definition.name
35 )))
36 })?;
37 let prefix = definition.prefix.clone().unwrap_or_default();
38 let runtime = tokio::runtime::Builder::new_current_thread()
39 .enable_all()
40 .build()
41 .map_err(|err| Box::new(StorageError(format!("adls runtime init failed: {err}"))))?;
42 let credential = DefaultAzureCredential::create(TokenCredentialOptions::default())
43 .map_err(|err| Box::new(StorageError(format!("adls credential init failed: {err}"))))?;
44 let storage_credentials = StorageCredentials::token_credential(Arc::new(credential));
45 let service_client = BlobServiceClient::new(account.clone(), storage_credentials);
46 let container_client = service_client.container_client(container.clone());
47 Ok(Self {
48 account,
49 container,
50 prefix,
51 runtime,
52 container_client,
53 })
54 }
55
56 fn base_prefix(&self) -> String {
57 planner::normalize_separators(&self.prefix)
58 }
59
60 fn full_path(&self, path: &str) -> String {
61 let prefix = self.base_prefix();
62 let joined = planner::join_prefix(&prefix, &planner::normalize_separators(path));
63 joined.trim_start_matches('/').to_string()
64 }
65
66 fn format_abfs(&self, path: &str) -> String {
67 format_abfs_uri(&self.container, &self.account, path)
68 }
69}
70
71impl StorageClient for AdlsClient {
72 fn list(&self, prefix_or_path: &str) -> FloeResult<Vec<ObjectRef>> {
73 let prefix = self.full_path(prefix_or_path);
74 let container = self.container.clone();
75 let account = self.account.clone();
76 let client = self.container_client.clone();
77 self.runtime.block_on(async move {
78 let mut refs = Vec::new();
79 let mut stream = client.list_blobs().prefix(prefix.clone()).into_stream();
80 while let Some(resp) = stream.next().await {
81 let resp = resp.map_err(|err| {
82 Box::new(StorageError(format!("adls list failed: {err}")))
83 as Box<dyn std::error::Error + Send + Sync>
84 })?;
85 for blob in resp.blobs.blobs() {
86 let key = blob.name.clone();
87 let uri = if key.is_empty() {
88 format!("abfs://{}@{}.dfs.core.windows.net", container, account)
89 } else {
90 format!(
91 "abfs://{}@{}.dfs.core.windows.net/{}",
92 container, account, key
93 )
94 };
95 refs.push(ObjectRef {
96 uri,
97 key,
98 last_modified: Some(blob.properties.last_modified.to_string()),
99 size: Some(blob.properties.content_length),
100 });
101 }
102 }
103 Ok(planner::stable_sort_refs(refs))
104 })
105 }
106
107 fn download_to_temp(&self, uri: &str, temp_dir: &Path) -> FloeResult<PathBuf> {
108 let key = uri
109 .split_once(".dfs.core.windows.net/")
110 .map(|(_, tail)| tail)
111 .unwrap_or("")
112 .trim_start_matches('/')
113 .to_string();
114 let key = if key.is_empty() {
115 return Err(Box::new(StorageError(
116 "adls download requires a blob path".to_string(),
117 )));
118 } else {
119 key
120 };
121 let dest = temp_dir.join(
122 Path::new(&key)
123 .file_name()
124 .and_then(|name| name.to_str())
125 .unwrap_or("object"),
126 );
127 let dest_clone = dest.clone();
128 let client = self.container_client.clone();
129 let key_clone = key.clone();
130 self.runtime.block_on(async move {
131 if let Some(parent) = dest_clone.parent() {
132 tokio::fs::create_dir_all(parent).await?;
133 }
134 let blob = client.blob_client(key_clone);
135 let mut stream = blob.get().into_stream();
136 let mut file = tokio::fs::File::create(&dest_clone).await?;
137 while let Some(chunk) = stream.next().await {
138 let resp = chunk.map_err(|err| {
139 Box::new(StorageError(format!("adls download failed: {err}")))
140 as Box<dyn std::error::Error + Send + Sync>
141 })?;
142 let bytes = resp.data.collect().await.map_err(|err| {
143 Box::new(StorageError(format!("adls download read failed: {err}")))
144 as Box<dyn std::error::Error + Send + Sync>
145 })?;
146 tokio::io::AsyncWriteExt::write_all(&mut file, &bytes).await?;
147 }
148 Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
149 })?;
150 Ok(dest)
151 }
152
153 fn upload_from_path(&self, local_path: &Path, uri: &str) -> FloeResult<()> {
154 let key = uri
155 .split_once(".dfs.core.windows.net/")
156 .map(|(_, tail)| tail)
157 .unwrap_or("")
158 .trim_start_matches('/')
159 .to_string();
160 if key.is_empty() {
161 return Err(Box::new(StorageError(
162 "adls upload requires a blob path".to_string(),
163 )));
164 }
165 let client = self.container_client.clone();
166 let path = local_path.to_path_buf();
167 self.runtime.block_on(async move {
168 let data = tokio::fs::read(path).await?;
169 let blob = client.blob_client(key);
170 blob.put_block_blob(data)
171 .content_type("application/octet-stream")
172 .into_future()
173 .await
174 .map_err(|err| {
175 Box::new(StorageError(format!("adls upload failed: {err}")))
176 as Box<dyn std::error::Error + Send + Sync>
177 })?;
178 Ok(())
179 })
180 }
181
182 fn resolve_uri(&self, path: &str) -> FloeResult<String> {
183 Ok(self.format_abfs(&self.full_path(path)))
184 }
185
186 fn delete(&self, uri: &str) -> FloeResult<()> {
187 let key = uri
188 .split_once(".dfs.core.windows.net/")
189 .map(|(_, tail)| tail)
190 .unwrap_or("")
191 .trim_start_matches('/')
192 .to_string();
193 if key.is_empty() {
194 return Ok(());
195 }
196 let client = self.container_client.clone();
197 self.runtime.block_on(async move {
198 let blob = client.blob_client(key);
199 blob.delete().into_future().await.map_err(|err| {
200 Box::new(StorageError(format!("adls delete failed: {err}")))
201 as Box<dyn std::error::Error + Send + Sync>
202 })?;
203 Ok(())
204 })
205 }
206}
207
208#[derive(Debug, Clone, PartialEq, Eq)]
209pub struct AdlsLocation {
210 pub account: String,
211 pub container: String,
212 pub path: String,
213}
214
215pub fn parse_adls_uri(uri: &str) -> FloeResult<AdlsLocation> {
216 let stripped = uri.strip_prefix("abfs://").ok_or_else(|| {
217 Box::new(ConfigError(format!("expected abfs uri, got {}", uri)))
218 as Box<dyn std::error::Error + Send + Sync>
219 })?;
220 let (container, rest) = stripped.split_once('@').ok_or_else(|| {
221 Box::new(ConfigError(format!(
222 "missing container in abfs uri: {}",
223 uri
224 ))) as Box<dyn std::error::Error + Send + Sync>
225 })?;
226 let (account, path) = rest.split_once(".dfs.core.windows.net").ok_or_else(|| {
227 Box::new(ConfigError(format!("missing account in abfs uri: {}", uri)))
228 as Box<dyn std::error::Error + Send + Sync>
229 })?;
230 let path = path.trim_start_matches('/');
231 Ok(AdlsLocation {
232 account: account.to_string(),
233 container: container.to_string(),
234 path: path.to_string(),
235 })
236}
237
238pub fn format_abfs_uri(container: &str, account: &str, path: &str) -> String {
239 let trimmed = path.trim_start_matches('/');
240 if trimmed.is_empty() {
241 format!("abfs://{}@{}.dfs.core.windows.net", container, account)
242 } else {
243 format!(
244 "abfs://{}@{}.dfs.core.windows.net/{}",
245 container, account, trimmed
246 )
247 }
248}
249
250pub fn build_input_files(
251 client: &dyn StorageClient,
252 container: &str,
253 account: &str,
254 prefix: &str,
255 adapter: &dyn crate::io::format::InputAdapter,
256 temp_dir: &Path,
257 entity: &crate::config::EntityConfig,
258 storage: &str,
259) -> FloeResult<Vec<crate::io::format::InputFile>> {
260 let suffixes = adapter.suffixes()?;
261 let list_refs = client.list(prefix)?;
262 let filtered = planner::filter_by_suffixes(list_refs, &suffixes);
263 let filtered = planner::stable_sort_refs(filtered);
264 if filtered.is_empty() {
265 return Err(Box::new(RunError(format!(
266 "entity.name={} source.storage={} no input objects matched (container={}, account={}, prefix={}, suffixes={})",
267 entity.name,
268 storage,
269 container,
270 account,
271 prefix,
272 suffixes.join(",")
273 ))));
274 }
275 let mut inputs = Vec::with_capacity(filtered.len());
276 for object in filtered {
277 let local_path = client.download_to_temp(&object.uri, temp_dir)?;
278 let source_name = crate::io::storage::s3::file_name_from_key(&object.key)
279 .unwrap_or_else(|| entity.name.clone());
280 let source_stem = crate::io::storage::s3::file_stem_from_name(&source_name)
281 .unwrap_or_else(|| entity.name.clone());
282 let source_uri = object.uri;
283 inputs.push(crate::io::format::InputFile {
284 source_uri,
285 source_local_path: local_path,
286 source_name,
287 source_stem,
288 });
289 }
290 Ok(inputs)
291}