posemesh_compute_node/storage/
client.rs1use crate::errors::StorageError;
2use crate::storage::token::TokenRef;
3use anyhow::{Context, Result};
4use futures::StreamExt;
5use regex::Regex;
6use reqwest::header::{HeaderMap, HeaderValue, ACCEPT, AUTHORIZATION};
7use reqwest::{Client, Method, StatusCode};
8use serde::Deserialize;
9use std::path::PathBuf;
10use std::time::Duration;
11use tokio::fs;
12use url::Url;
13use uuid::Uuid;
14#[derive(Debug, Clone)]
18pub struct DownloadedPart {
19 pub id: Option<String>,
20 pub name: Option<String>,
21 pub data_type: Option<String>,
22 pub domain_id: Option<String>,
23 pub path: PathBuf,
24 pub root: PathBuf,
25 pub relative_path: PathBuf,
26 pub extracted_paths: Vec<PathBuf>,
27}
28
29#[derive(Debug)]
30pub struct UploadRequest<'a> {
31 pub domain_id: &'a str,
32 pub name: &'a str,
33 pub data_type: &'a str,
34 pub logical_path: &'a str,
35 pub bytes: &'a [u8],
36 pub existing_id: Option<&'a str>,
37}
38
39#[derive(Clone)]
41pub struct DomainClient {
42 pub base: Url,
43 pub token: TokenRef,
44 http: Client,
45}
46impl DomainClient {
47 pub fn new(base: Url, token: TokenRef) -> Result<Self> {
48 let http = Client::builder()
49 .use_rustls_tls()
50 .timeout(Duration::from_secs(30))
51 .build()
52 .context("build reqwest client")?;
53 Ok(Self { base, token, http })
54 }
55
56 pub fn with_timeout(base: Url, token: TokenRef, timeout: Duration) -> Result<Self> {
57 let http = Client::builder()
58 .use_rustls_tls()
59 .timeout(timeout)
60 .build()
61 .context("build reqwest client")?;
62 Ok(Self { base, token, http })
63 }
64
65 fn auth_headers(&self) -> HeaderMap {
66 let mut h = HeaderMap::new();
67 let token = format!("Bearer {}", self.token.get());
68 let mut v = HeaderValue::from_str(&token)
69 .unwrap_or_else(|_| HeaderValue::from_static("Bearer INVALID"));
70 v.set_sensitive(true);
71 h.insert(AUTHORIZATION, v);
72 h
73 }
74
75 pub async fn download_uri(
78 &self,
79 uri: &str,
80 ) -> std::result::Result<Vec<DownloadedPart>, StorageError> {
81 let url =
82 Url::parse(uri).map_err(|e| StorageError::Other(format!("parse domain uri: {}", e)))?;
83 let url_for_log = url.clone();
84 tracing::debug!(
85 target: "posemesh_compute_node::storage::client",
86 method = "GET",
87 %url_for_log,
88 "Sending domain request"
89 );
90 let client_id = std::env::var("CLIENT_ID")
91 .unwrap_or_else(|_| format!("posemesh-compute-node/{}", uuid::Uuid::new_v4()));
92 let res = posemesh_domain_http::domain_data::request_download_absolute(
93 url.as_str(),
94 &client_id,
95 &self.token.get(),
96 )
97 .await
98 .map_err(|e| StorageError::Network(e.to_string()))?;
99 let status = res.status();
100 tracing::debug!(
101 target: "posemesh_compute_node::storage::client",
102 method = "GET",
103 %url_for_log,
104 status = %status,
105 "Domain response received"
106 );
107 if !status.is_success() {
108 return Err(map_status(status));
109 }
110
111 let root = std::env::temp_dir().join(format!("domain-input-{}", Uuid::new_v4()));
112 fs::create_dir_all(&root)
113 .await
114 .map_err(|e| StorageError::Other(format!("create download root: {}", e)))?;
115 let datasets_root = root.join("datasets");
116 fs::create_dir_all(&datasets_root)
117 .await
118 .map_err(|e| StorageError::Other(format!("create datasets root: {}", e)))?;
119
120 let mut parts = Vec::new();
121 let mut rx = posemesh_domain_http::domain_data::stream_from_response(res)
122 .await
123 .map_err(|e| StorageError::Other(e.to_string()))?;
124
125 while let Some(item) = rx.next().await {
126 let domain_item = item.map_err(|e| StorageError::Other(e.to_string()))?;
127 let name = domain_item.metadata.name.clone();
128 let data_type = domain_item.metadata.data_type.clone();
129
130 let scan_folder = extract_timestamp(&name)
131 .map(|ts| sanitize_component(&ts))
132 .unwrap_or_else(|| sanitize_component(&name));
133 let scan_dir = datasets_root.join(&scan_folder);
134 fs::create_dir_all(&scan_dir)
135 .await
136 .map_err(|e| StorageError::Other(format!("create scan dir: {}", e)))?;
137
138 let file_name = map_filename(&data_type, &name);
139 let file_path = scan_dir.join(&file_name);
140 if let Some(parent) = file_path.parent() {
141 fs::create_dir_all(parent)
142 .await
143 .map_err(|e| StorageError::Other(format!("create parent dir: {}", e)))?;
144 }
145 fs::write(&file_path, &domain_item.data)
146 .await
147 .map_err(|e| StorageError::Other(format!("write temp file: {}", e)))?;
148
149 let extracted_paths = Vec::new();
150
151 let relative_path = file_path
152 .strip_prefix(&root)
153 .unwrap_or(&file_path)
154 .to_path_buf();
155
156 parts.push(DownloadedPart {
157 id: Some(domain_item.metadata.id),
158 name: Some(name),
159 data_type: Some(data_type),
160 domain_id: Some(domain_item.metadata.domain_id),
161 path: file_path,
162 root: root.clone(),
163 relative_path,
164 extracted_paths,
165 });
166 }
167
168 if parts.is_empty() {
169 return Err(StorageError::Other(
170 "domain response did not contain any data parts".into(),
171 ));
172 }
173
174 Ok(parts)
175 }
176
177 pub async fn upload_artifact(
178 &self,
179 request: UploadRequest<'_>,
180 ) -> std::result::Result<Option<String>, StorageError> {
181 let domain_id = request.domain_id.trim();
182 if domain_id.is_empty() {
183 return Err(StorageError::Other(
184 "missing domain_id for artifact upload".into(),
185 ));
186 }
187 let action = if let Some(id) = request.existing_id {
188 posemesh_domain_http::domain_data::DomainAction::Update { id: id.to_string() }
189 } else {
190 posemesh_domain_http::domain_data::DomainAction::Create {
191 name: request.name.to_string(),
192 data_type: request.data_type.to_string(),
193 }
194 };
195
196 let upload = posemesh_domain_http::domain_data::UploadDomainData {
197 action,
198 data: request.bytes.to_vec(),
199 };
200
201 let base = self.base.as_str().trim_end_matches('/');
202 let method = if matches!(
203 &upload.action,
204 posemesh_domain_http::domain_data::DomainAction::Update { .. }
205 ) {
206 Method::PUT
207 } else {
208 Method::POST
209 };
210 tracing::debug!(
211 target: "posemesh_compute_node::storage::client",
212 method = %method,
213 url = %format!("{}/api/v1/domains/{}/data", base, domain_id),
214 logical_path = request.logical_path,
215 name = request.name,
216 data_type = request.data_type,
217 has_existing_id = request.existing_id.is_some(),
218 "Sending domain upload request"
219 );
220
221 match posemesh_domain_http::domain_data::upload_one(
222 base,
223 &self.token.get(),
224 domain_id,
225 upload,
226 )
227 .await
228 {
229 Ok(mut items) => {
230 let id = items.drain(..).next().map(|d| d.metadata.id);
231 Ok(id)
232 }
233 Err((status, _body)) => Err(map_status(status)),
234 }
235 }
236
237 pub async fn find_artifact_id(
238 &self,
239 domain_id: &str,
240 name: &str,
241 data_type: &str,
242 ) -> std::result::Result<Option<String>, StorageError> {
243 let domain_id = domain_id.trim();
244 if domain_id.is_empty() {
245 return Err(StorageError::Other(
246 "missing domain_id for artifact lookup".into(),
247 ));
248 }
249 let path = format!("api/v1/domains/{}/data", domain_id);
250 let url = self
251 .base
252 .join(&path)
253 .map_err(|e| StorageError::Other(format!("join lookup path: {}", e)))?;
254 let mut headers = self.auth_headers();
255 headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
256 tracing::debug!(
257 target: "posemesh_compute_node::storage::client",
258 method = "GET",
259 %url,
260 artifact_name = name,
261 artifact_type = data_type,
262 "Looking up existing domain artifact"
263 );
264 let res = self
265 .http
266 .get(url.clone())
267 .headers(headers)
268 .query(&[("name", name), ("data_type", data_type)])
269 .send()
270 .await
271 .map_err(|e| StorageError::Network(e.to_string()))?;
272 let status = res.status();
273 if status == StatusCode::NOT_FOUND {
274 tracing::debug!(
275 target: "posemesh_compute_node::storage::client",
276 method = "GET",
277 %url,
278 artifact_name = name,
279 artifact_type = data_type,
280 "Artifact lookup returned 404"
281 );
282 return Ok(None);
283 }
284 if !status.is_success() {
285 return Err(map_status(status));
286 }
287 let payload = res
288 .json::<ListDomainDataResponse>()
289 .await
290 .map_err(|e| StorageError::Network(e.to_string()))?;
291 let found = payload
292 .data
293 .into_iter()
294 .find(|item| item.name == name && item.data_type == data_type);
295 Ok(found.map(|item| item.id))
296 }
297}
298
299#[derive(Debug, Deserialize)]
300struct ListDomainDataResponse {
301 #[serde(default)]
302 data: Vec<DomainDataSummary>,
303}
304
305#[derive(Debug, Deserialize)]
306struct DomainDataSummary {
307 #[serde(default)]
308 id: String,
309 #[serde(default)]
310 name: String,
311 #[serde(default)]
312 data_type: String,
313}
314
315fn map_status(status: reqwest::StatusCode) -> StorageError {
316 match status.as_u16() {
317 400 => StorageError::BadRequest,
318 401 => StorageError::Unauthorized,
319 404 => StorageError::NotFound,
320 409 => StorageError::Conflict,
321 n if (500..=599).contains(&n) => StorageError::Server(n),
322 other => StorageError::Other(format!("unexpected status: {}", other)),
323 }
324}
325
326fn sanitize_component(value: &str) -> String {
329 let sanitized: String = value
330 .chars()
331 .map(|c| {
332 if c.is_ascii_alphanumeric() || matches!(c, '-' | '_') {
333 c
334 } else {
335 '_'
336 }
337 })
338 .collect();
339 if sanitized.is_empty() {
340 "part".into()
341 } else {
342 sanitized
343 }
344}
345
346fn extract_timestamp(name: &str) -> Option<String> {
347 Regex::new(r"\d{4}-\d{2}-\d{2}[_-]\d{2}-\d{2}-\d{2}")
348 .ok()
349 .and_then(|re| re.find(name).map(|m| m.as_str().to_string()))
350}
351
352fn map_filename(data_type: &str, name: &str) -> String {
353 format!(
354 "{}.{}",
355 sanitize_component(name),
356 sanitize_component(data_type)
357 )
358}