posemesh_compute_node/storage/
client.rs1use crate::errors::StorageError;
2use crate::storage::token::TokenRef;
3use anyhow::{Context, Result};
4use futures::StreamExt;
5use regex::Regex;
6use reqwest::header::{HeaderMap, HeaderValue, ACCEPT, AUTHORIZATION};
7use reqwest::{Client, Method, StatusCode};
8use serde::Deserialize;
9use std::path::PathBuf;
10use std::time::Duration;
11use tokio::fs;
12use url::Url;
13use uuid::Uuid;
14#[derive(Debug, Clone)]
18pub struct DownloadedPart {
19 pub id: Option<String>,
20 pub name: Option<String>,
21 pub data_type: Option<String>,
22 pub domain_id: Option<String>,
23 pub path: PathBuf,
24 pub root: PathBuf,
25 pub relative_path: PathBuf,
26 pub extracted_paths: Vec<PathBuf>,
27}
28
29#[derive(Debug)]
30pub struct UploadRequest<'a> {
31 pub domain_id: &'a str,
32 pub name: &'a str,
33 pub data_type: &'a str,
34 pub logical_path: &'a str,
35 pub bytes: &'a [u8],
36 pub existing_id: Option<&'a str>,
37}
38
39#[derive(Clone)]
41pub struct DomainClient {
42 pub base: Url,
43 pub token: TokenRef,
44 http: Client,
45}
46impl DomainClient {
47 pub fn new(base: Url, token: TokenRef) -> Result<Self> {
48 let http = Client::builder()
49 .use_rustls_tls()
50 .timeout(Duration::from_secs(30))
51 .build()
52 .context("build reqwest client")?;
53 Ok(Self { base, token, http })
54 }
55
56 pub fn with_timeout(base: Url, token: TokenRef, timeout: Duration) -> Result<Self> {
57 let http = Client::builder()
58 .use_rustls_tls()
59 .timeout(timeout)
60 .build()
61 .context("build reqwest client")?;
62 Ok(Self { base, token, http })
63 }
64
65 fn auth_headers(&self) -> HeaderMap {
66 let mut h = HeaderMap::new();
67 let token = format!("Bearer {}", self.token.get());
68 let mut v = HeaderValue::from_str(&token)
69 .unwrap_or_else(|_| HeaderValue::from_static("Bearer INVALID"));
70 v.set_sensitive(true);
71 h.insert(AUTHORIZATION, v);
72 h
73 }
74
75 pub async fn download_uri(
78 &self,
79 uri: &str,
80 ) -> std::result::Result<Vec<DownloadedPart>, StorageError> {
81 let url =
82 Url::parse(uri).map_err(|e| StorageError::Other(format!("parse domain uri: {}", e)))?;
83 let url_for_log = url.clone();
84 tracing::debug!(
85 target: "posemesh_compute_node::storage::client",
86 method = "GET",
87 %url_for_log,
88 "Sending domain request"
89 );
90 let client_id = std::env::var("CLIENT_ID")
91 .unwrap_or_else(|_| format!("posemesh-compute-node/{}", uuid::Uuid::new_v4()));
92 let res = posemesh_domain_http::domain_data::request_download_absolute(
93 url.as_str(),
94 &client_id,
95 &self.token.get(),
96 )
97 .await
98 .map_err(|e| StorageError::Network(e.to_string()))?;
99 let status = res.status();
100 tracing::debug!(
101 target: "posemesh_compute_node::storage::client",
102 method = "GET",
103 %url_for_log,
104 status = %status,
105 "Domain response received"
106 );
107 if !status.is_success() {
108 return Err(map_status(status));
109 }
110
111 let root = std::env::temp_dir().join(format!("domain-input-{}", Uuid::new_v4()));
112 fs::create_dir_all(&root)
113 .await
114 .map_err(|e| StorageError::Other(format!("create download root: {}", e)))?;
115 let datasets_root = root.join("datasets");
116 fs::create_dir_all(&datasets_root)
117 .await
118 .map_err(|e| StorageError::Other(format!("create datasets root: {}", e)))?;
119
120 let mut parts = Vec::new();
121 let mut rx = posemesh_domain_http::domain_data::stream_from_response(res)
122 .await
123 .map_err(|e| StorageError::Other(e.to_string()))?;
124
125 while let Some(item) = rx.next().await {
126 let domain_item = item.map_err(|e| StorageError::Other(e.to_string()))?;
127 let name = domain_item.metadata.name.clone();
128 let data_type = domain_item.metadata.data_type.clone();
129
130 let scan_folder = extract_timestamp(&name)
131 .map(|ts| sanitize_component(&ts))
132 .unwrap_or_else(|| sanitize_component(&name));
133 let scan_dir = datasets_root.join(&scan_folder);
134 fs::create_dir_all(&scan_dir)
135 .await
136 .map_err(|e| StorageError::Other(format!("create scan dir: {}", e)))?;
137
138 let file_name = map_filename(&data_type, &name);
139 let file_path = scan_dir.join(&file_name);
140 if let Some(parent) = file_path.parent() {
141 fs::create_dir_all(parent)
142 .await
143 .map_err(|e| StorageError::Other(format!("create parent dir: {}", e)))?;
144 }
145 fs::write(&file_path, &domain_item.data)
146 .await
147 .map_err(|e| StorageError::Other(format!("write temp file: {}", e)))?;
148
149 let extracted_paths = Vec::new();
150
151 let relative_path = file_path
152 .strip_prefix(&root)
153 .unwrap_or(&file_path)
154 .to_path_buf();
155
156 parts.push(DownloadedPart {
157 id: Some(domain_item.metadata.id),
158 name: Some(name),
159 data_type: Some(data_type),
160 domain_id: Some(domain_item.metadata.domain_id),
161 path: file_path,
162 root: root.clone(),
163 relative_path,
164 extracted_paths,
165 });
166 }
167
168 if parts.is_empty() {
169 return Err(StorageError::Other(
170 "domain response did not contain any data parts".into(),
171 ));
172 }
173
174 Ok(parts)
175 }
176
177 pub async fn upload_artifact(
178 &self,
179 request: UploadRequest<'_>,
180 ) -> std::result::Result<Option<String>, StorageError> {
181 let domain_id = request.domain_id.trim();
182 if domain_id.is_empty() {
183 return Err(StorageError::Other(
184 "missing domain_id for artifact upload".into(),
185 ));
186 }
187 let action = if let Some(id) = request.existing_id {
188 posemesh_domain_http::domain_data::DomainAction::Update(
189 posemesh_domain_http::domain_data::UpdateDomainData { id: id.to_string() },
190 )
191 } else {
192 posemesh_domain_http::domain_data::DomainAction::Create(
193 posemesh_domain_http::domain_data::CreateDomainData {
194 name: request.name.to_string(),
195 data_type: request.data_type.to_string(),
196 },
197 )
198 };
199
200 let upload = posemesh_domain_http::domain_data::UploadDomainData {
201 action,
202 data: request.bytes.to_vec(),
203 };
204
205 let base = self.base.as_str().trim_end_matches('/');
206 let method = if matches!(
207 upload.action,
208 posemesh_domain_http::domain_data::DomainAction::Update(_)
209 ) {
210 Method::PUT
211 } else {
212 Method::POST
213 };
214 tracing::debug!(
215 target: "posemesh_compute_node::storage::client",
216 method = %method,
217 url = %format!("{}/api/v1/domains/{}/data", base, domain_id),
218 logical_path = request.logical_path,
219 name = request.name,
220 data_type = request.data_type,
221 has_existing_id = request.existing_id.is_some(),
222 "Sending domain upload request"
223 );
224
225 match posemesh_domain_http::domain_data::upload_one(
226 base,
227 &self.token.get(),
228 domain_id,
229 upload,
230 )
231 .await
232 {
233 Ok(mut items) => {
234 let id = items.drain(..).next().map(|d| d.metadata.id);
235 Ok(id)
236 }
237 Err((status, _body)) => Err(map_status(status)),
238 }
239 }
240
241 pub async fn find_artifact_id(
242 &self,
243 domain_id: &str,
244 name: &str,
245 data_type: &str,
246 ) -> std::result::Result<Option<String>, StorageError> {
247 let domain_id = domain_id.trim();
248 if domain_id.is_empty() {
249 return Err(StorageError::Other(
250 "missing domain_id for artifact lookup".into(),
251 ));
252 }
253 let path = format!("api/v1/domains/{}/data", domain_id);
254 let url = self
255 .base
256 .join(&path)
257 .map_err(|e| StorageError::Other(format!("join lookup path: {}", e)))?;
258 let mut headers = self.auth_headers();
259 headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
260 tracing::debug!(
261 target: "posemesh_compute_node::storage::client",
262 method = "GET",
263 %url,
264 artifact_name = name,
265 artifact_type = data_type,
266 "Looking up existing domain artifact"
267 );
268 let res = self
269 .http
270 .get(url.clone())
271 .headers(headers)
272 .query(&[("name", name), ("data_type", data_type)])
273 .send()
274 .await
275 .map_err(|e| StorageError::Network(e.to_string()))?;
276 let status = res.status();
277 if status == StatusCode::NOT_FOUND {
278 tracing::debug!(
279 target: "posemesh_compute_node::storage::client",
280 method = "GET",
281 %url,
282 artifact_name = name,
283 artifact_type = data_type,
284 "Artifact lookup returned 404"
285 );
286 return Ok(None);
287 }
288 if !status.is_success() {
289 return Err(map_status(status));
290 }
291 let payload = res
292 .json::<ListDomainDataResponse>()
293 .await
294 .map_err(|e| StorageError::Network(e.to_string()))?;
295 let found = payload
296 .data
297 .into_iter()
298 .find(|item| item.name == name && item.data_type == data_type);
299 Ok(found.map(|item| item.id))
300 }
301}
302
303#[derive(Debug, Deserialize)]
304struct ListDomainDataResponse {
305 #[serde(default)]
306 data: Vec<DomainDataSummary>,
307}
308
309#[derive(Debug, Deserialize)]
310struct DomainDataSummary {
311 #[serde(default)]
312 id: String,
313 #[serde(default)]
314 name: String,
315 #[serde(default)]
316 data_type: String,
317}
318
319fn map_status(status: reqwest::StatusCode) -> StorageError {
320 match status.as_u16() {
321 400 => StorageError::BadRequest,
322 401 => StorageError::Unauthorized,
323 404 => StorageError::NotFound,
324 409 => StorageError::Conflict,
325 n if (500..=599).contains(&n) => StorageError::Server(n),
326 other => StorageError::Other(format!("unexpected status: {}", other)),
327 }
328}
329
330fn sanitize_component(value: &str) -> String {
333 let sanitized: String = value
334 .chars()
335 .map(|c| {
336 if c.is_ascii_alphanumeric() || matches!(c, '-' | '_') {
337 c
338 } else {
339 '_'
340 }
341 })
342 .collect();
343 if sanitized.is_empty() {
344 "part".into()
345 } else {
346 sanitized
347 }
348}
349
350fn extract_timestamp(name: &str) -> Option<String> {
351 Regex::new(r"\d{4}-\d{2}-\d{2}[_-]\d{2}-\d{2}-\d{2}")
352 .ok()
353 .and_then(|re| re.find(name).map(|m| m.as_str().to_string()))
354}
355
356fn map_filename(data_type: &str, name: &str) -> String {
357 format!(
358 "{}.{}",
359 sanitize_component(name),
360 sanitize_component(data_type)
361 )
362}