posemesh_compute_node/storage/
client.rs1use crate::errors::StorageError;
2use crate::storage::token::TokenRef;
3use anyhow::Result;
4use futures::StreamExt;
5use regex::Regex;
6use reqwest::Method;
7use std::path::PathBuf;
8use std::time::Duration;
9use tokio::fs;
10use url::Url;
11use uuid::Uuid;
12#[derive(Debug, Clone)]
16pub struct DownloadedPart {
17 pub id: Option<String>,
18 pub name: Option<String>,
19 pub data_type: Option<String>,
20 pub domain_id: Option<String>,
21 pub path: PathBuf,
22 pub root: PathBuf,
23 pub relative_path: PathBuf,
24 pub extracted_paths: Vec<PathBuf>,
25}
26
27#[derive(Debug)]
28pub struct UploadRequest<'a> {
29 pub domain_id: &'a str,
30 pub name: &'a str,
31 pub data_type: &'a str,
32 pub logical_path: &'a str,
33 pub bytes: &'a [u8],
34 pub existing_id: Option<&'a str>,
35}
36
37#[derive(Clone)]
39pub struct DomainClient {
40 pub base: Url,
41 pub token: TokenRef,
42 client_id: String,
43}
44impl DomainClient {
45 pub fn new(base: Url, token: TokenRef) -> Result<Self> {
46 let client_id = env_client_id();
47 Ok(Self {
48 base,
49 token,
50 client_id,
51 })
52 }
53
54 pub fn with_timeout(base: Url, token: TokenRef, _timeout: Duration) -> Result<Self> {
55 let client_id = env_client_id();
56 Ok(Self {
57 base,
58 token,
59 client_id,
60 })
61 }
62
63 pub async fn download_uri(
66 &self,
67 uri: &str,
68 ) -> std::result::Result<Vec<DownloadedPart>, StorageError> {
69 let resolved = resolve_domain_url(&self.base, uri)?;
70 let (domain_id, query) = parse_download_target(&resolved, None)?;
71 self.download_domain_data(&resolved, &domain_id, query)
72 .await
73 }
74
75 pub async fn download_cid(
79 &self,
80 domain_id: &str,
81 cid: &str,
82 ) -> std::result::Result<Vec<DownloadedPart>, StorageError> {
83 let cid = cid.trim();
84 if cid.is_empty() {
85 return Err(StorageError::Other("empty cid".into()));
86 }
87
88 if cid.contains("://") || cid.starts_with('/') {
89 let resolved = resolve_domain_url(&self.base, cid)?;
90 let (domain_id, query) = parse_download_target(&resolved, Some(domain_id))?;
91 return self
92 .download_domain_data(&resolved, &domain_id, query)
93 .await;
94 }
95
96 let query = posemesh_domain_http::domain_data::DownloadQuery {
97 ids: vec![cid.to_string()],
98 name: None,
99 data_type: None,
100 };
101 self.download_domain_data(&self.base, domain_id, query)
102 .await
103 }
104
105 async fn download_domain_data(
106 &self,
107 url_for_log: &Url,
108 domain_id: &str,
109 query: posemesh_domain_http::domain_data::DownloadQuery,
110 ) -> std::result::Result<Vec<DownloadedPart>, StorageError> {
111 let domain_id = domain_id.trim();
112 if domain_id.is_empty() {
113 return Err(StorageError::Other("missing domain_id for download".into()));
114 }
115
116 tracing::debug!(
117 target: "posemesh_compute_node::storage::client",
118 method = "GET",
119 %url_for_log,
120 domain_id = domain_id,
121 ids = ?query.ids,
122 name = ?query.name,
123 data_type = ?query.data_type,
124 "Downloading domain data"
125 );
126
127 let base = self.base.as_str().trim_end_matches('/');
128 let mut rx = posemesh_domain_http::domain_data::download_v1_stream(
129 base,
130 self.client_id.as_str(),
131 self.token.get().as_str(),
132 domain_id,
133 &query,
134 )
135 .await
136 .map_err(map_domain_error)?;
137
138 let root = std::env::temp_dir().join(format!("domain-input-{}", Uuid::new_v4()));
139 fs::create_dir_all(&root)
140 .await
141 .map_err(|e| StorageError::Other(format!("create download root: {}", e)))?;
142 let datasets_root = root.join("datasets");
143 fs::create_dir_all(&datasets_root)
144 .await
145 .map_err(|e| StorageError::Other(format!("create datasets root: {}", e)))?;
146
147 let mut parts = Vec::new();
148
149 while let Some(item) = rx.next().await {
150 let domain_item = item.map_err(map_domain_error)?;
151 let name = domain_item.metadata.name.clone();
152 let data_type = domain_item.metadata.data_type.clone();
153
154 let scan_folder = extract_timestamp(&name)
155 .map(|ts| sanitize_component(&ts))
156 .unwrap_or_else(|| sanitize_component(&name));
157 let scan_dir = datasets_root.join(&scan_folder);
158 fs::create_dir_all(&scan_dir)
159 .await
160 .map_err(|e| StorageError::Other(format!("create scan dir: {}", e)))?;
161
162 let file_name = map_filename(&data_type, &name);
163 let file_path = scan_dir.join(&file_name);
164 if let Some(parent) = file_path.parent() {
165 fs::create_dir_all(parent)
166 .await
167 .map_err(|e| StorageError::Other(format!("create parent dir: {}", e)))?;
168 }
169 fs::write(&file_path, &domain_item.data)
170 .await
171 .map_err(|e| StorageError::Other(format!("write temp file: {}", e)))?;
172
173 let extracted_paths = Vec::new();
174
175 let relative_path = file_path
176 .strip_prefix(&root)
177 .unwrap_or(&file_path)
178 .to_path_buf();
179
180 parts.push(DownloadedPart {
181 id: Some(domain_item.metadata.id),
182 name: Some(name),
183 data_type: Some(data_type),
184 domain_id: Some(domain_item.metadata.domain_id),
185 path: file_path,
186 root: root.clone(),
187 relative_path,
188 extracted_paths,
189 });
190 }
191
192 if parts.is_empty() {
193 return Err(StorageError::NotFound);
194 }
195
196 Ok(parts)
197 }
198
199 pub async fn upload_artifact(
200 &self,
201 request: UploadRequest<'_>,
202 ) -> std::result::Result<Option<String>, StorageError> {
203 let domain_id = request.domain_id.trim();
204 if domain_id.is_empty() {
205 return Err(StorageError::Other(
206 "missing domain_id for artifact upload".into(),
207 ));
208 }
209 let action = if let Some(id) = request.existing_id {
210 posemesh_domain_http::domain_data::DomainAction::Update { id: id.to_string() }
211 } else {
212 posemesh_domain_http::domain_data::DomainAction::Create {
213 name: request.name.to_string(),
214 data_type: request.data_type.to_string(),
215 }
216 };
217
218 let upload = posemesh_domain_http::domain_data::UploadDomainData {
219 action,
220 data: request.bytes.to_vec(),
221 };
222
223 let base = self.base.as_str().trim_end_matches('/');
224 let method = if request.existing_id.is_some() {
225 Method::PUT
226 } else {
227 Method::POST
228 };
229 tracing::debug!(
230 target: "posemesh_compute_node::storage::client",
231 method = %method,
232 url = %format!("{}/api/v1/domains/{}/data", base, domain_id),
233 logical_path = request.logical_path,
234 name = request.name,
235 data_type = request.data_type,
236 has_existing_id = request.existing_id.is_some(),
237 "Sending domain upload request"
238 );
239
240 let items = posemesh_domain_http::domain_data::upload_v1(
241 base,
242 self.token.get().as_str(),
243 domain_id,
244 vec![upload],
245 )
246 .await
247 .map_err(map_domain_error)?;
248
249 Ok(items.into_iter().next().map(|d| d.id))
250 }
251
252 pub async fn find_artifact_id(
253 &self,
254 domain_id: &str,
255 name: &str,
256 data_type: &str,
257 ) -> std::result::Result<Option<String>, StorageError> {
258 let domain_id = domain_id.trim();
259 if domain_id.is_empty() {
260 return Err(StorageError::Other(
261 "missing domain_id for artifact lookup".into(),
262 ));
263 }
264
265 let query = posemesh_domain_http::domain_data::DownloadQuery {
266 ids: Vec::new(),
267 name: Some(name.to_string()),
268 data_type: Some(data_type.to_string()),
269 };
270
271 let base = self.base.as_str().trim_end_matches('/');
272 let url = format!("{}/api/v1/domains/{}/data", base, domain_id);
273 tracing::debug!(
274 target: "posemesh_compute_node::storage::client",
275 method = "GET",
276 %url,
277 artifact_name = name,
278 artifact_type = data_type,
279 "Looking up existing domain artifact"
280 );
281
282 let results = posemesh_domain_http::domain_data::download_metadata_v1(
283 base,
284 self.client_id.as_str(),
285 self.token.get().as_str(),
286 domain_id,
287 &query,
288 )
289 .await;
290
291 let results = match results {
292 Ok(items) => items,
293 Err(err) => {
294 if let posemesh_domain_http::errors::DomainError::AukiErrorResponse(resp) = &err {
295 if resp.status == reqwest::StatusCode::NOT_FOUND {
296 return Ok(None);
297 }
298 }
299 return Err(map_domain_error(err));
300 }
301 };
302
303 Ok(results
304 .into_iter()
305 .find(|item| item.name == name && item.data_type == data_type)
306 .map(|item| item.id))
307 }
308}
309
310fn map_status(status: reqwest::StatusCode) -> StorageError {
311 match status.as_u16() {
312 400 => StorageError::BadRequest,
313 401 => StorageError::Unauthorized,
314 404 => StorageError::NotFound,
315 409 => StorageError::Conflict,
316 n if (500..=599).contains(&n) => StorageError::Server(n),
317 other => StorageError::Other(format!("unexpected status: {}", other)),
318 }
319}
320
321fn map_domain_error(err: posemesh_domain_http::errors::DomainError) -> StorageError {
322 use posemesh_domain_http::errors::{AuthError, DomainError};
323
324 match err {
325 DomainError::AukiErrorResponse(resp) => map_status(resp.status),
326 DomainError::ReqwestError(e) => StorageError::Network(e.to_string()),
327 DomainError::AuthError(AuthError::Unauthorized(_)) => StorageError::Unauthorized,
328 other => StorageError::Other(other.to_string()),
329 }
330}
331
332fn env_client_id() -> String {
333 std::env::var("CLIENT_ID")
334 .ok()
335 .map(|v| v.trim().to_string())
336 .filter(|v| !v.is_empty())
337 .unwrap_or_else(|| format!("posemesh-compute-node/{}", Uuid::new_v4()))
338}
339
340fn resolve_domain_url(base: &Url, value: &str) -> std::result::Result<Url, StorageError> {
341 if value.contains("://") {
342 Url::parse(value).map_err(|e| StorageError::Other(format!("parse domain url: {e}")))
343 } else {
344 base.join(value)
345 .map_err(|e| StorageError::Other(format!("join domain url: {e}")))
346 }
347}
348
349fn parse_download_target(
350 url: &Url,
351 fallback_domain_id: Option<&str>,
352) -> std::result::Result<(String, posemesh_domain_http::domain_data::DownloadQuery), StorageError> {
353 let segments: Vec<&str> = url
354 .path_segments()
355 .map(|segments| segments.filter(|seg| !seg.is_empty()).collect())
356 .unwrap_or_default();
357
358 let mut domain_id_from_path: Option<&str> = None;
359 let mut data_id_from_path: Option<&str> = None;
360
361 for idx in 0..segments.len() {
362 if segments[idx] == "domains" && idx + 2 < segments.len() && segments[idx + 2] == "data" {
363 domain_id_from_path = Some(segments[idx + 1]);
364 data_id_from_path = segments.get(idx + 3).copied();
365 break;
366 }
367 }
368
369 let domain_id = domain_id_from_path
370 .or(fallback_domain_id)
371 .ok_or_else(|| StorageError::Other(format!("cid url missing domain_id: {}", url)))?
372 .to_string();
373
374 let mut ids: Vec<String> = Vec::new();
375 let mut name: Option<String> = None;
376 let mut data_type: Option<String> = None;
377
378 for (key, value) in url.query_pairs() {
379 match key.as_ref() {
380 "ids" => ids.extend(
381 value
382 .split(',')
383 .map(|s| s.trim())
384 .filter(|s| !s.is_empty())
385 .map(|s| s.to_string()),
386 ),
387 "name" => {
388 if name.is_none() {
389 name = Some(value.to_string());
390 }
391 }
392 "data_type" => {
393 if data_type.is_none() {
394 data_type = Some(value.to_string());
395 }
396 }
397 _ => {}
398 }
399 }
400
401 if let Some(id) = data_id_from_path {
402 ids = vec![id.to_string()];
403 }
404
405 Ok((
406 domain_id,
407 posemesh_domain_http::domain_data::DownloadQuery {
408 ids,
409 name,
410 data_type,
411 },
412 ))
413}
414
415fn sanitize_component(value: &str) -> String {
418 let sanitized: String = value
419 .chars()
420 .map(|c| {
421 if c.is_ascii_alphanumeric() || matches!(c, '-' | '_') {
422 c
423 } else {
424 '_'
425 }
426 })
427 .collect();
428 if sanitized.is_empty() {
429 "part".into()
430 } else {
431 sanitized
432 }
433}
434
435fn extract_timestamp(name: &str) -> Option<String> {
436 Regex::new(r"\d{4}-\d{2}-\d{2}[_-]\d{2}-\d{2}-\d{2}")
437 .ok()
438 .and_then(|re| re.find(name).map(|m| m.as_str().to_string()))
439}
440
441fn map_filename(data_type: &str, name: &str) -> String {
442 format!(
443 "{}.{}",
444 sanitize_component(name),
445 sanitize_component(data_type)
446 )
447}