1use crate::data_contract::{
2 DataChunkUploadRequest, DataChunkUploadTarget, DataManifestDescriptor, DataManifestRequest,
3};
4use crate::{DirEntry, FileHandle, FsFileType, FsMetadata, FsProvider, OpenFlags};
5use async_trait::async_trait;
6use chrono::DateTime;
7use crossbeam_utils::thread;
8use once_cell::sync::Lazy;
9use reqwest::blocking::{Client, Response};
10use reqwest::StatusCode;
11use serde::{Deserialize, Serialize};
12use sha2::{Digest, Sha256};
13use std::collections::HashMap;
14use std::collections::VecDeque;
15use std::fmt;
16use std::io::{self, ErrorKind, Read, Seek, SeekFrom, Write};
17use std::path::{Path, PathBuf};
18use std::sync::{Arc, Mutex};
19use std::time::Duration;
20use url::Url;
21use uuid::Uuid;
22
23const MANIFEST_HASH: &str = "manifest:v1";
24const SHARD_PREFIX: &str = "/.runmat/shards";
25
26static USER_AGENT: Lazy<String> = Lazy::new(|| {
27 format!(
28 "runmat-remote-fs/{} ({})",
29 env!("CARGO_PKG_VERSION"),
30 std::env::consts::OS
31 )
32});
33
34#[derive(Clone, Debug)]
35pub struct RemoteFsConfig {
36 pub base_url: String,
37 pub auth_token: Option<String>,
38 pub chunk_bytes: usize,
39 pub parallel_requests: usize,
40 pub direct_read_threshold_bytes: u64,
41 pub direct_write_threshold_bytes: u64,
42 pub shard_threshold_bytes: u64,
43 pub shard_size_bytes: u64,
44 pub timeout: Duration,
45 pub retry_max_attempts: usize,
46 pub retry_base_delay: Duration,
47 pub retry_max_delay: Duration,
48}
49
50impl Default for RemoteFsConfig {
51 fn default() -> Self {
52 Self {
53 base_url: String::new(),
54 auth_token: None,
55 chunk_bytes: 16 * 1024 * 1024,
56 parallel_requests: 4,
57 direct_read_threshold_bytes: 64 * 1024 * 1024,
58 direct_write_threshold_bytes: 64 * 1024 * 1024,
59 shard_threshold_bytes: 4 * 1024 * 1024 * 1024,
60 shard_size_bytes: 512 * 1024 * 1024,
61 timeout: Duration::from_secs(120),
62 retry_max_attempts: 5,
63 retry_base_delay: Duration::from_millis(100),
64 retry_max_delay: Duration::from_secs(2),
65 }
66 }
67}
68
69struct RemoteInner {
70 client: Client,
71 base: Url,
72 auth_header: Option<String>,
73 chunk_bytes: usize,
74 parallel_requests: usize,
75 direct_read_threshold_bytes: u64,
76 direct_write_threshold_bytes: u64,
77 shard_threshold_bytes: u64,
78 shard_size_bytes: u64,
79 retry_max_attempts: usize,
80 retry_base_delay: Duration,
81 retry_max_delay: Duration,
82}
83
84impl RemoteInner {
85 fn new(config: RemoteFsConfig) -> io::Result<Self> {
86 if config.base_url.is_empty() {
87 return Err(io::Error::new(
88 ErrorKind::InvalidInput,
89 "RemoteFsConfig.base_url must be provided",
90 ));
91 }
92 let base = Url::parse(&config.base_url).map_err(map_url_err)?;
93 let client = Client::builder()
94 .timeout(config.timeout)
95 .user_agent(USER_AGENT.clone())
96 .build()
97 .map_err(map_http_err)?;
98 Ok(Self {
99 client,
100 base,
101 auth_header: config.auth_token.map(|token| format!("Bearer {token}")),
102 chunk_bytes: config.chunk_bytes.max(64 * 1024),
103 parallel_requests: config.parallel_requests.max(1),
104 direct_read_threshold_bytes: config.direct_read_threshold_bytes,
105 direct_write_threshold_bytes: config.direct_write_threshold_bytes,
106 shard_threshold_bytes: config.shard_threshold_bytes,
107 shard_size_bytes: config.shard_size_bytes.max(8 * 1024 * 1024),
108 retry_max_attempts: config.retry_max_attempts.max(1),
109 retry_base_delay: config.retry_base_delay,
110 retry_max_delay: config.retry_max_delay,
111 })
112 }
113
114 fn should_retry(&self, status: StatusCode) -> bool {
115 matches!(
116 status,
117 StatusCode::TOO_MANY_REQUESTS
118 | StatusCode::INTERNAL_SERVER_ERROR
119 | StatusCode::BAD_GATEWAY
120 | StatusCode::SERVICE_UNAVAILABLE
121 | StatusCode::GATEWAY_TIMEOUT
122 )
123 }
124
125 fn retry_delay(&self, attempt: usize) -> Duration {
126 let factor = 1u64.checked_shl(attempt as u32).unwrap_or(u64::MAX);
127 let delay = self.retry_base_delay.as_millis() as u64 * factor;
128 let capped = delay.min(self.retry_max_delay.as_millis() as u64);
129 Duration::from_millis(capped)
130 }
131
132 fn send_with_retry(
133 &self,
134 method: reqwest::Method,
135 route: &str,
136 query: &[(&str, String)],
137 ) -> io::Result<Response> {
138 for attempt in 0..self.retry_max_attempts {
139 let resp = self
140 .request(method.clone(), route, query)
141 .send()
142 .map_err(map_http_err)?;
143 if !self.should_retry(resp.status()) || attempt + 1 == self.retry_max_attempts {
144 return Ok(resp);
145 }
146 std::thread::sleep(self.retry_delay(attempt));
147 }
148 Err(io::Error::other("request retries exhausted"))
149 }
150
151 fn get_url_with_retry(&self, url: &str, range: Option<String>) -> io::Result<Response> {
152 for attempt in 0..self.retry_max_attempts {
153 let mut request = self.client.get(url);
154 if let Some(range) = &range {
155 request = request.header(reqwest::header::RANGE, range);
156 }
157 let resp = request.send().map_err(map_http_err)?;
158 if !self.should_retry(resp.status()) || attempt + 1 == self.retry_max_attempts {
159 return Ok(resp);
160 }
161 std::thread::sleep(self.retry_delay(attempt));
162 }
163 Err(io::Error::other("request retries exhausted"))
164 }
165
166 fn endpoint(&self, route: &str) -> Url {
167 self.base
168 .join(route.trim_start_matches('/'))
169 .expect("failed to join remote route")
170 }
171
172 fn request(
173 &self,
174 method: reqwest::Method,
175 route: &str,
176 query: &[(&str, String)],
177 ) -> reqwest::blocking::RequestBuilder {
178 let mut url = self.endpoint(route);
179 {
180 let mut pairs = url.query_pairs_mut();
181 for (k, v) in query {
182 pairs.append_pair(k, v);
183 }
184 }
185 let mut builder = self.client.request(method, url);
186 if let Some(auth) = &self.auth_header {
187 builder = builder.header("Authorization", auth);
188 }
189 builder
190 }
191
192 fn get_json<T: for<'a> Deserialize<'a>>(
193 &self,
194 route: &str,
195 query: &[(&str, String)],
196 ) -> io::Result<T> {
197 let resp = self.send_with_retry(reqwest::Method::GET, route, query)?;
198 handle_error(resp)?.json().map_err(map_http_err)
199 }
200
201 fn post_empty<B: Serialize>(&self, route: &str, body: &B) -> io::Result<()> {
202 let resp = self
203 .request(reqwest::Method::POST, route, &[])
204 .json(body)
205 .send()
206 .map_err(map_http_err)?;
207 handle_error(resp)?;
208 Ok(())
209 }
210
211 fn post_json<T: for<'a> Deserialize<'a>, B: Serialize>(
212 &self,
213 route: &str,
214 body: &B,
215 ) -> io::Result<T> {
216 let resp = self
217 .request(reqwest::Method::POST, route, &[])
218 .json(body)
219 .send()
220 .map_err(map_http_err)?;
221 handle_error(resp)?.json().map_err(map_http_err)
222 }
223
224 fn delete_empty(&self, route: &str, query: &[(&str, String)]) -> io::Result<()> {
225 let resp = self.send_with_retry(reqwest::Method::DELETE, route, query)?;
226 handle_error(resp)?;
227 Ok(())
228 }
229
230 fn download_chunk(&self, path: &str, offset: u64, length: usize) -> io::Result<Vec<u8>> {
231 let resp = self.send_with_retry(
232 reqwest::Method::GET,
233 "/fs/read",
234 &[
235 ("path", path.to_string()),
236 ("offset", offset.to_string()),
237 ("length", length.to_string()),
238 ],
239 )?;
240 let mut body = handle_error(resp)?;
241 if let Some(content_type) = body.headers().get(reqwest::header::CONTENT_TYPE) {
242 if content_type
243 .to_str()
244 .map(|value| value.contains("application/json"))
245 .unwrap_or(false)
246 {
247 let json: DownloadUrlResponse = body.json().map_err(map_http_err)?;
248 return self.download_range_from_url(&json.download_url, offset, length as u64);
249 }
250 }
251 let mut buf = Vec::with_capacity(length);
252 body.copy_to(&mut buf).map_err(map_http_err)?;
253 Ok(buf)
254 }
255
256 fn download_range_from_url(&self, url: &str, offset: u64, length: u64) -> io::Result<Vec<u8>> {
257 if length == 0 {
258 return Ok(Vec::new());
259 }
260 let end = offset + length - 1;
261 let resp = self.get_url_with_retry(url, Some(format!("bytes={offset}-{end}")))?;
262 let mut body = handle_error(resp)?;
263 let mut buf = Vec::with_capacity(length as usize);
264 body.copy_to(&mut buf).map_err(map_http_err)?;
265 Ok(buf)
266 }
267
268 fn fetch_download_url(&self, path: &str) -> io::Result<DownloadUrlResponse> {
269 self.get_json("/fs/download-url", &[("path", path.to_string())])
270 }
271
272 fn upload_chunk(
273 &self,
274 path: &str,
275 offset: u64,
276 truncate: bool,
277 final_chunk: bool,
278 data: &[u8],
279 hash: Option<&str>,
280 ) -> io::Result<Option<FsWriteSessionResponse>> {
281 let mut query = vec![("path", path.to_string()), ("offset", offset.to_string())];
282 if truncate {
283 query.push(("truncate", "true".to_string()));
284 }
285 if final_chunk {
286 query.push(("final", "true".to_string()));
287 }
288 if let Some(hash) = hash {
289 query.push(("hash", hash.to_string()));
290 }
291 let resp = self
292 .request(reqwest::Method::PUT, "/fs/write", &query)
293 .body(data.to_vec())
294 .send()
295 .map_err(map_http_err)?;
296 if resp.status() == StatusCode::ACCEPTED {
297 let session = handle_error(resp)?.json().map_err(map_http_err)?;
298 return Ok(Some(session));
299 }
300 handle_error(resp)?;
301 Ok(None)
302 }
303
304 fn multipart_create(&self, path: &str, size_bytes: u64) -> io::Result<MultipartUploadResponse> {
305 self.post_json(
306 "/fs/multipart-upload",
307 &MultipartUploadRequest {
308 path: path.to_string(),
309 size_bytes: size_bytes as i64,
310 content_type: None,
311 },
312 )
313 }
314
315 fn upload_session_start(
316 &self,
317 path: &str,
318 size_bytes: u64,
319 content_sha256: &str,
320 ) -> io::Result<UploadSessionStartResponse> {
321 self.post_json(
322 "/fs/upload-session/start",
323 &UploadSessionStartRequest {
324 path: path.to_string(),
325 size_bytes: size_bytes as i64,
326 content_type: None,
327 content_sha256: content_sha256.to_string(),
328 },
329 )
330 }
331
332 fn upload_session_chunks(
333 &self,
334 session_id: &str,
335 blob_key: &str,
336 chunks: &[UploadSessionChunkDescriptor],
337 ) -> io::Result<UploadSessionChunksResponse> {
338 self.post_json(
339 "/fs/upload-session/chunks",
340 &UploadSessionChunksRequest {
341 session_id: session_id.to_string(),
342 blob_key: blob_key.to_string(),
343 chunks: chunks.to_vec(),
344 },
345 )
346 }
347
348 fn upload_session_complete(
349 &self,
350 path: &str,
351 session_id: &str,
352 blob_key: &str,
353 size_bytes: u64,
354 content_sha256: &str,
355 chunk_count: usize,
356 ) -> io::Result<()> {
357 self.post_empty(
358 "/fs/upload-session/complete",
359 &UploadSessionCompleteRequest {
360 path: path.to_string(),
361 session_id: session_id.to_string(),
362 blob_key: blob_key.to_string(),
363 size_bytes: size_bytes as i64,
364 content_sha256: content_sha256.to_string(),
365 chunk_count,
366 hash: None,
367 },
368 )
369 }
370
371 fn multipart_presign_part(
372 &self,
373 session_id: &str,
374 blob_key: &str,
375 upload_id: &str,
376 part_number: i32,
377 size_bytes: u64,
378 ) -> io::Result<String> {
379 let response: MultipartUploadPartResponse = self.post_json(
380 "/fs/multipart-upload/part",
381 &MultipartUploadPartRequest {
382 session_id: session_id.to_string(),
383 blob_key: blob_key.to_string(),
384 upload_id: upload_id.to_string(),
385 part_number,
386 size_bytes: size_bytes as i64,
387 },
388 )?;
389 Ok(response.upload_url)
390 }
391
392 fn multipart_complete(
393 &self,
394 path: &str,
395 session_id: &str,
396 blob_key: &str,
397 upload_id: &str,
398 size_bytes: u64,
399 parts: Vec<MultipartPart>,
400 ) -> io::Result<()> {
401 let resp = self
402 .request(reqwest::Method::POST, "/fs/multipart-upload/complete", &[])
403 .json(&MultipartUploadCompleteRequest {
404 path: path.to_string(),
405 session_id: session_id.to_string(),
406 blob_key: blob_key.to_string(),
407 upload_id: upload_id.to_string(),
408 size_bytes: size_bytes as i64,
409 hash: None,
410 parts,
411 })
412 .send()
413 .map_err(map_http_err)?;
414 handle_error(resp)?;
415 Ok(())
416 }
417
418 fn put_upload_url_with_etag(&self, url: &str, data: &[u8]) -> io::Result<String> {
419 let resp = self
420 .client
421 .put(url)
422 .body(data.to_vec())
423 .send()
424 .map_err(map_http_err)?;
425 let resp = handle_error(resp)?;
426 let etag = resp
427 .headers()
428 .get(reqwest::header::ETAG)
429 .and_then(|value| value.to_str().ok())
430 .unwrap_or("")
431 .to_string();
432 Ok(etag)
433 }
434
435 fn put_upload_target(
436 &self,
437 method: &str,
438 url: &str,
439 headers: &HashMap<String, String>,
440 data: &[u8],
441 ) -> io::Result<()> {
442 let method = reqwest::Method::from_bytes(method.as_bytes()).map_err(|err| {
443 io::Error::new(
444 ErrorKind::InvalidInput,
445 format!("invalid upload method `{method}`: {err}"),
446 )
447 })?;
448 let mut request = self.client.request(method, url).body(data.to_vec());
449 for (name, value) in headers {
450 request = request.header(name, value);
451 }
452 let resp = request.send().map_err(map_http_err)?;
453 handle_error(resp)?;
454 Ok(())
455 }
456
457 fn fetch_metadata(&self, path: &str) -> io::Result<MetadataResponse> {
458 self.get_json("/fs/metadata", &[("path", path.to_string())])
459 }
460
461 fn fetch_dir(&self, path: &str) -> io::Result<Vec<DirEntryResponse>> {
462 self.get_json("/fs/dir", &[("path", path.to_string())])
463 }
464
465 fn canonicalize_path(&self, path: &str) -> io::Result<String> {
466 let resp: CanonicalizeResponse =
467 self.get_json("/fs/canonicalize", &[("path", path.to_string())])?;
468 Ok(resp.path)
469 }
470}
471
472pub struct RemoteFsProvider {
473 inner: Arc<RemoteInner>,
474}
475
476impl RemoteFsProvider {
477 pub fn new(config: RemoteFsConfig) -> io::Result<Self> {
478 Ok(Self {
479 inner: Arc::new(RemoteInner::new(config)?),
480 })
481 }
482
483 pub fn data_manifest_descriptor(
485 &self,
486 request: &DataManifestRequest,
487 ) -> io::Result<DataManifestDescriptor> {
488 let mut query = vec![("path", request.path.clone())];
489 if let Some(version) = &request.version {
490 query.push(("version", version.clone()));
491 }
492 self.inner.get_json("/data/manifest", &query)
493 }
494
495 pub fn data_chunk_upload_targets(
497 &self,
498 request: &DataChunkUploadRequest,
499 ) -> io::Result<Vec<DataChunkUploadTarget>> {
500 #[derive(Deserialize)]
501 struct DataChunkUploadTargetsResponse {
502 targets: Vec<DataChunkUploadTarget>,
503 }
504 let response: DataChunkUploadTargetsResponse = self
505 .inner
506 .post_json("/data/chunks/upload-targets", request)?;
507 Ok(response.targets)
508 }
509
510 fn normalize(&self, path: &Path) -> String {
511 let mut normalized = PathBuf::new();
512 normalized.push("/");
513 normalized.push(path);
514 normalized.to_string_lossy().replace('\\', "/").to_string()
515 }
516
517 fn ensure_parent_exists(&self, path: &Path) -> io::Result<()> {
518 if let Some(parent) = path.parent() {
519 self.inner.post_empty(
520 "/fs/mkdir",
521 &CreateDirRequest {
522 path: self.normalize(parent),
523 recursive: true,
524 },
525 )?;
526 }
527 Ok(())
528 }
529
530 fn download_raw_file(&self, path: &str, len: u64) -> io::Result<Vec<u8>> {
531 if len == 0 {
532 return Ok(Vec::new());
533 }
534 if len >= self.inner.direct_read_threshold_bytes {
535 let url = self.inner.fetch_download_url(path)?.download_url;
536 return self.download_entire_file_from_url(&url, len);
537 }
538 let chunk = self.inner.chunk_bytes as u64;
539 let mut tasks = Vec::new();
540 let mut offset = 0;
541 let mut index = 0;
542 while offset < len {
543 let remaining = len - offset;
544 let length = std::cmp::min(chunk, remaining);
545 tasks.push(ChunkTask {
546 offset,
547 length: length as usize,
548 index,
549 });
550 offset += length;
551 index += 1;
552 }
553 let mut buffer = vec![0u8; len as usize];
554 let path = path.to_string();
555 let inner = self.inner.clone();
556 let queue = Arc::new(Mutex::new(VecDeque::from(tasks.clone())));
557 let results = Arc::new(Mutex::new(vec![None::<Vec<u8>>; tasks.len()]));
558 let error: Arc<Mutex<Option<io::Error>>> = Arc::new(Mutex::new(None));
559 let threads = inner
560 .parallel_requests
561 .min(queue.lock().unwrap().len())
562 .max(1);
563 thread::scope(|scope| {
564 for _ in 0..threads {
565 let queue = queue.clone();
566 let error = error.clone();
567 let results = results.clone();
568 let inner = inner.clone();
569 let path = path.clone();
570 scope.spawn(move |_| loop {
571 let task_opt = {
572 let mut guard = queue.lock().unwrap();
573 guard.pop_front()
574 };
575 let task = match task_opt {
576 Some(task) => task,
577 None => break,
578 };
579 match inner.download_chunk(&path, task.offset, task.length) {
580 Ok(bytes) => {
581 let mut guard = results.lock().unwrap();
582 guard[task.index] = Some(bytes);
583 }
584 Err(err) => {
585 let mut guard = error.lock().unwrap();
586 if guard.is_none() {
587 *guard = Some(err);
588 }
589 break;
590 }
591 }
592 });
593 }
594 })
595 .expect("remote download scope");
596 if let Some(err) = error.lock().unwrap().take() {
597 return Err(err);
598 }
599 let chunks = Arc::try_unwrap(results)
600 .expect("results still in use")
601 .into_inner()
602 .expect("results poisoned");
603 for (task, maybe) in tasks.iter().zip(chunks.into_iter()) {
604 let bytes = maybe.expect("missing downloaded chunk for remote file");
605 let start = task.offset as usize;
606 buffer[start..start + bytes.len()].copy_from_slice(&bytes);
607 }
608 Ok(buffer)
609 }
610
611 fn download_sharded_file(&self, path: &str, len: u64) -> io::Result<Vec<u8>> {
612 let manifest_bytes = self.download_raw_file(path, len)?;
613 let manifest: ShardManifest = serde_json::from_slice(&manifest_bytes)
614 .map_err(|_| io::Error::new(ErrorKind::InvalidData, "invalid shard manifest"))?;
615 if manifest.version != 1 {
616 return Err(io::Error::new(
617 ErrorKind::InvalidData,
618 "unsupported manifest",
619 ));
620 }
621 let mut buffer = Vec::with_capacity(manifest.total_size as usize);
622 for shard in manifest.shards {
623 let meta = self.inner.fetch_metadata(&shard.path)?;
624 let bytes = self.download_raw_file(&shard.path, meta.len)?;
625 buffer.extend_from_slice(&bytes);
626 }
627 Ok(buffer)
628 }
629
630 fn download_entire_file_from_url(&self, url: &str, len: u64) -> io::Result<Vec<u8>> {
631 let chunk = self.inner.chunk_bytes as u64;
632 let mut tasks = Vec::new();
633 let mut offset = 0;
634 let mut index = 0;
635 while offset < len {
636 let remaining = len - offset;
637 let length = std::cmp::min(chunk, remaining);
638 tasks.push(ChunkTask {
639 offset,
640 length: length as usize,
641 index,
642 });
643 offset += length;
644 index += 1;
645 }
646 let mut buffer = vec![0u8; len as usize];
647 let url = url.to_string();
648 let inner = self.inner.clone();
649 let queue = Arc::new(Mutex::new(VecDeque::from(tasks.clone())));
650 let results = Arc::new(Mutex::new(vec![None::<Vec<u8>>; tasks.len()]));
651 let error: Arc<Mutex<Option<io::Error>>> = Arc::new(Mutex::new(None));
652 let threads = inner
653 .parallel_requests
654 .min(queue.lock().unwrap().len())
655 .max(1);
656 thread::scope(|scope| {
657 for _ in 0..threads {
658 let queue = queue.clone();
659 let error = error.clone();
660 let results = results.clone();
661 let inner = inner.clone();
662 let url = url.clone();
663 scope.spawn(move |_| loop {
664 let task_opt = {
665 let mut guard = queue.lock().unwrap();
666 guard.pop_front()
667 };
668 let task = match task_opt {
669 Some(task) => task,
670 None => break,
671 };
672 match inner.download_range_from_url(&url, task.offset, task.length as u64) {
673 Ok(bytes) => {
674 let mut guard = results.lock().unwrap();
675 guard[task.index] = Some(bytes);
676 }
677 Err(err) => {
678 let mut guard = error.lock().unwrap();
679 if guard.is_none() {
680 *guard = Some(err);
681 }
682 break;
683 }
684 }
685 });
686 }
687 })
688 .expect("remote download scope");
689 if let Some(err) = error.lock().unwrap().take() {
690 return Err(err);
691 }
692 let chunks = Arc::try_unwrap(results)
693 .expect("results still in use")
694 .into_inner()
695 .expect("results poisoned");
696 for (task, maybe) in tasks.iter().zip(chunks.into_iter()) {
697 let bytes = maybe.expect("missing downloaded chunk for remote file");
698 let start = task.offset as usize;
699 buffer[start..start + bytes.len()].copy_from_slice(&bytes);
700 }
701 Ok(buffer)
702 }
703
704 fn upload_entire_file(&self, path: &str, data: &[u8]) -> io::Result<()> {
705 if data.len() as u64 >= self.inner.shard_threshold_bytes {
706 return self.upload_sharded_file(path, data);
707 }
708 self.upload_unsharded_file(path, data, None)
709 }
710
711 fn upload_unsharded_file(&self, path: &str, data: &[u8], hash: Option<&str>) -> io::Result<()> {
712 if data.len() as u64 >= self.inner.direct_write_threshold_bytes {
713 return self.upload_multipart_file(path, data);
714 }
715 if data.is_empty() {
716 self.inner.upload_chunk(path, 0, true, true, data, hash)?;
717 return Ok(());
718 }
719 let chunk = self.inner.chunk_bytes;
720 let mut offset = 0usize;
721 while offset < data.len() {
722 let end = std::cmp::min(offset + chunk, data.len());
723 let slice = &data[offset..end];
724 let truncate = offset == 0;
725 let final_chunk = end == data.len();
726 let result =
727 self.inner
728 .upload_chunk(path, offset as u64, truncate, final_chunk, slice, hash)?;
729 if let Some(session) = result {
730 let expected = offset as u64 + slice.len() as u64;
731 if session.next_offset as u64 != expected {
732 return Err(io::Error::other("unexpected next offset"));
733 }
734 }
735 offset = end;
736 }
737 Ok(())
738 }
739
740 fn upload_sharded_file(&self, path: &str, data: &[u8]) -> io::Result<()> {
741 let shard_size = self.inner.shard_size_bytes as usize;
742 let mut shards = Vec::new();
743 let mut offset = 0usize;
744 while offset < data.len() {
745 let end = std::cmp::min(offset + shard_size, data.len());
746 let slice = &data[offset..end];
747 let shard_path = format!("{}/{}", SHARD_PREFIX, Uuid::new_v4());
748 self.upload_unsharded_file(&shard_path, slice, None)?;
749 shards.push(ShardEntry {
750 path: shard_path,
751 size: slice.len() as u64,
752 });
753 offset = end;
754 }
755 let manifest = ShardManifest {
756 version: 1,
757 total_size: data.len() as u64,
758 shard_size: self.inner.shard_size_bytes,
759 shards,
760 };
761 let bytes = serde_json::to_vec(&manifest)
762 .map_err(|_| io::Error::new(ErrorKind::InvalidData, "invalid manifest"))?;
763 self.upload_unsharded_file(path, &bytes, Some(MANIFEST_HASH))
764 }
765
766 fn upload_multipart_file(&self, path: &str, data: &[u8]) -> io::Result<()> {
767 if data.is_empty() {
768 self.inner.upload_chunk(path, 0, true, true, data, None)?;
769 return Ok(());
770 }
771 let content_sha256 = sha256_hex(data);
772 let session =
773 match self
774 .inner
775 .upload_session_start(path, data.len() as u64, &content_sha256)
776 {
777 Ok(session) => session,
778 Err(err) if err.kind() == ErrorKind::NotFound => {
779 return self.upload_multipart_file_legacy(path, data);
780 }
781 Err(err) => return Err(err),
782 };
783 let chunk_size = (session.chunk_size_bytes as usize).max(1);
784 let mut tasks = Vec::new();
785 let mut offset = 0usize;
786 let mut index = 0usize;
787 while offset < data.len() {
788 let end = std::cmp::min(offset + chunk_size, data.len());
789 let slice = &data[offset..end];
790 tasks.push(UploadTask {
791 index,
792 offset,
793 length: end - offset,
794 chunk_sha256: sha256_hex(slice),
795 });
796 offset = end;
797 index += 1;
798 }
799 let descriptors: Vec<UploadSessionChunkDescriptor> = tasks
800 .iter()
801 .map(|task| UploadSessionChunkDescriptor {
802 chunk_index: task.index,
803 offset_bytes: task.offset as i64,
804 size_bytes: task.length as i64,
805 chunk_sha256: task.chunk_sha256.clone(),
806 })
807 .collect();
808 let chunk_response = self.inner.upload_session_chunks(
809 &session.session_id,
810 &session.blob_key,
811 &descriptors,
812 )?;
813 let targets = Arc::new(chunk_response.targets);
814 let tasks = Arc::new(Mutex::new(VecDeque::from(tasks)));
815 let error = Arc::new(Mutex::new(None));
816 let data = Arc::new(data.to_vec());
817 thread::scope(|scope| {
818 for _ in 0..self.inner.parallel_requests {
819 let tasks = Arc::clone(&tasks);
820 let targets = Arc::clone(&targets);
821 let error = Arc::clone(&error);
822 let inner = Arc::clone(&self.inner);
823 let data = Arc::clone(&data);
824 scope.spawn(move |_| loop {
825 let task = {
826 let mut guard = tasks.lock().unwrap();
827 guard.pop_front()
828 };
829 let Some(task) = task else { break };
830 let target = targets
831 .iter()
832 .find(|target| target.chunk_index == task.index)
833 .cloned();
834 let result = (|| {
835 let target = target.ok_or_else(|| {
836 io::Error::other(format!(
837 "missing upload target for chunk {}",
838 task.index
839 ))
840 })?;
841 let slice = &data[task.offset..task.offset + task.length];
842 inner.put_upload_target(
843 &target.method,
844 &target.upload_url,
845 &target.headers,
846 slice,
847 )
848 })();
849 if let Err(err) = result {
850 let mut err_guard = error.lock().unwrap();
851 if err_guard.is_none() {
852 *err_guard = Some(io::Error::new(err.kind(), err.to_string()));
853 }
854 break;
855 }
856 });
857 }
858 })
859 .expect("upload session scope");
860 if let Some(err) = error.lock().unwrap().take() {
861 return Err(err);
862 }
863 self.inner.upload_session_complete(
864 path,
865 &session.session_id,
866 &session.blob_key,
867 data.len() as u64,
868 &content_sha256,
869 descriptors.len(),
870 )?;
871 Ok(())
872 }
873
874 fn upload_multipart_file_legacy(&self, path: &str, data: &[u8]) -> io::Result<()> {
875 let session = self.inner.multipart_create(path, data.len() as u64)?;
876 let part_size = session.part_size_bytes as usize;
877 let mut tasks = std::collections::VecDeque::new();
878 let mut offset = 0usize;
879 let mut part_number = 1;
880 let mut index = 0usize;
881 while offset < data.len() {
882 let end = std::cmp::min(offset + part_size, data.len());
883 let length = end - offset;
884 tasks.push_back(MultipartTask {
885 index,
886 part_number,
887 offset,
888 length,
889 });
890 offset = end;
891 part_number += 1;
892 index += 1;
893 }
894 let tasks = Arc::new(Mutex::new(tasks));
895 let task_len = tasks.lock().unwrap().len();
896 let mut result_vec: Vec<MultipartResult> = Vec::with_capacity(task_len);
897 for _ in 0..task_len {
898 result_vec.push(None);
899 }
900 let results = Arc::new(Mutex::new(result_vec));
901 let error = Arc::new(Mutex::new(None));
902 let data = Arc::new(data.to_vec());
903 thread::scope(|scope| {
904 for _ in 0..self.inner.parallel_requests {
905 let tasks = Arc::clone(&tasks);
906 let results = Arc::clone(&results);
907 let error = Arc::clone(&error);
908 let inner = Arc::clone(&self.inner);
909 let blob_key = session.blob_key.clone();
910 let upload_id = session.upload_id.clone();
911 let session_id = session.session_id.clone();
912 let data = Arc::clone(&data);
913 scope.spawn(move |_| loop {
914 let task = {
915 let mut guard = tasks.lock().unwrap();
916 guard.pop_front()
917 };
918 let Some(task) = task else { break };
919 let slice = &data[task.offset..task.offset + task.length];
920 let result = (|| {
921 let url = inner.multipart_presign_part(
922 &session_id,
923 &blob_key,
924 &upload_id,
925 task.part_number,
926 task.length as u64,
927 )?;
928 let etag = inner.put_upload_url_with_etag(&url, slice)?;
929 Ok(MultipartPart {
930 part_number: task.part_number,
931 etag,
932 })
933 })();
934 let mut guard = results.lock().unwrap();
935 guard[task.index] = Some(result);
936 if let Some(Err(err)) = guard[task.index].as_ref() {
937 let mut err_guard = error.lock().unwrap();
938 if err_guard.is_none() {
939 *err_guard = Some(io::Error::new(err.kind(), err.to_string()));
940 }
941 break;
942 }
943 });
944 }
945 })
946 .expect("multipart upload scope");
947 if let Some(err) = error.lock().unwrap().take() {
948 return Err(err);
949 }
950 let mut parts = Vec::with_capacity(results.lock().unwrap().len());
951 for maybe in Arc::try_unwrap(results)
952 .expect("results still in use")
953 .into_inner()
954 .expect("results poisoned")
955 {
956 let part = maybe.expect("missing part result")?;
957 if part.etag.is_empty() {
958 return Err(io::Error::other("missing etag"));
959 }
960 parts.push(part);
961 }
962 parts.sort_by_key(|part| part.part_number);
963 self.inner.multipart_complete(
964 path,
965 &session.session_id,
966 &session.blob_key,
967 &session.upload_id,
968 data.len() as u64,
969 parts,
970 )?;
971 Ok(())
972 }
973}
974
975#[derive(Clone, Copy)]
976struct ChunkTask {
977 offset: u64,
978 length: usize,
979 index: usize,
980}
981
982type MultipartResult = Option<Result<MultipartPart, io::Error>>;
983
984#[derive(Clone, Copy)]
985struct MultipartTask {
986 index: usize,
987 part_number: i32,
988 offset: usize,
989 length: usize,
990}
991
992#[derive(Clone)]
993struct UploadTask {
994 index: usize,
995 offset: usize,
996 length: usize,
997 chunk_sha256: String,
998}
999
1000#[async_trait(?Send)]
1001impl FsProvider for RemoteFsProvider {
1002 fn open(&self, path: &Path, flags: &OpenFlags) -> io::Result<Box<dyn FileHandle>> {
1003 let normalized = self.normalize(path);
1004 let mut data = Vec::new();
1005 if flags.read {
1006 let meta = self.inner.fetch_metadata(&normalized)?;
1007 if meta.file_type != "file" {
1008 return Err(io::Error::other("remote path is not a file"));
1009 }
1010 data = if meta.hash.as_deref() == Some(MANIFEST_HASH) {
1011 self.download_sharded_file(&normalized, meta.len)?
1012 } else {
1013 self.download_raw_file(&normalized, meta.len)?
1014 };
1015 }
1016 if flags.truncate {
1017 data.clear();
1018 }
1019 if flags.create {
1020 self.ensure_parent_exists(path)?;
1021 }
1022 let handle = RemoteFileHandle {
1023 provider: self.clone(),
1024 path: normalized,
1025 data,
1026 position: 0,
1027 flags: flags.clone(),
1028 dirty: false,
1029 };
1030 Ok(Box::new(handle))
1031 }
1032
1033 async fn read(&self, path: &Path) -> io::Result<Vec<u8>> {
1034 let normalized = self.normalize(path);
1035 let meta = self.inner.fetch_metadata(&normalized)?;
1036 if meta.file_type != "file" {
1037 return Err(io::Error::other("remote path is not a file"));
1038 }
1039 if meta.hash.as_deref() == Some(MANIFEST_HASH) {
1040 self.download_sharded_file(&normalized, meta.len)
1041 } else {
1042 self.download_raw_file(&normalized, meta.len)
1043 }
1044 }
1045
1046 async fn write(&self, path: &Path, data: &[u8]) -> io::Result<()> {
1047 let normalized = self.normalize(path);
1048 self.ensure_parent_exists(path)?;
1049 self.upload_entire_file(&normalized, data)
1050 }
1051
1052 async fn remove_file(&self, path: &Path) -> io::Result<()> {
1053 let normalized = self.normalize(path);
1054 self.inner
1055 .delete_empty("/fs/file", &[("path", normalized)])?;
1056 Ok(())
1057 }
1058
1059 async fn metadata(&self, path: &Path) -> io::Result<FsMetadata> {
1060 let normalized = self.normalize(path);
1061 let resp = self.inner.fetch_metadata(&normalized)?;
1062 Ok(resp.into())
1063 }
1064
1065 async fn symlink_metadata(&self, path: &Path) -> io::Result<FsMetadata> {
1066 self.metadata(path).await
1067 }
1068
1069 async fn read_dir(&self, path: &Path) -> io::Result<Vec<DirEntry>> {
1070 let normalized = self.normalize(path);
1071 let resp = self.inner.fetch_dir(&normalized)?;
1072 Ok(resp
1073 .into_iter()
1074 .map(|entry| DirEntry {
1075 path: PathBuf::from(entry.path),
1076 file_name: entry.file_name.into(),
1077 file_type: entry.file_type.into(),
1078 })
1079 .collect())
1080 }
1081
1082 async fn canonicalize(&self, path: &Path) -> io::Result<PathBuf> {
1083 let normalized = self.normalize(path);
1084 let canonical = self.inner.canonicalize_path(&normalized)?;
1085 Ok(PathBuf::from(canonical))
1086 }
1087
1088 async fn create_dir(&self, path: &Path) -> io::Result<()> {
1089 let normalized = self.normalize(path);
1090 self.inner.post_empty(
1091 "/fs/mkdir",
1092 &CreateDirRequest {
1093 path: normalized,
1094 recursive: false,
1095 },
1096 )
1097 }
1098
1099 async fn create_dir_all(&self, path: &Path) -> io::Result<()> {
1100 let normalized = self.normalize(path);
1101 self.inner.post_empty(
1102 "/fs/mkdir",
1103 &CreateDirRequest {
1104 path: normalized,
1105 recursive: true,
1106 },
1107 )
1108 }
1109
1110 async fn remove_dir(&self, path: &Path) -> io::Result<()> {
1111 let normalized = self.normalize(path);
1112 self.inner.delete_empty(
1113 "/fs/dir",
1114 &[("path", normalized), ("recursive", "false".into())],
1115 )
1116 }
1117
1118 async fn remove_dir_all(&self, path: &Path) -> io::Result<()> {
1119 let normalized = self.normalize(path);
1120 self.inner.delete_empty(
1121 "/fs/dir",
1122 &[("path", normalized), ("recursive", "true".into())],
1123 )
1124 }
1125
1126 async fn rename(&self, from: &Path, to: &Path) -> io::Result<()> {
1127 self.inner.post_empty(
1128 "/fs/rename",
1129 &RenameRequest {
1130 from: self.normalize(from),
1131 to: self.normalize(to),
1132 },
1133 )
1134 }
1135
1136 async fn set_readonly(&self, path: &Path, readonly: bool) -> io::Result<()> {
1137 self.inner.post_empty(
1138 "/fs/set-readonly",
1139 &SetReadonlyRequest {
1140 path: self.normalize(path),
1141 readonly,
1142 },
1143 )
1144 }
1145
1146 async fn data_manifest_descriptor(
1147 &self,
1148 request: &DataManifestRequest,
1149 ) -> io::Result<DataManifestDescriptor> {
1150 let mut query = vec![("path", request.path.clone())];
1151 if let Some(version) = &request.version {
1152 query.push(("version", version.clone()));
1153 }
1154 self.inner.get_json("/data/manifest", &query)
1155 }
1156
1157 async fn data_chunk_upload_targets(
1158 &self,
1159 request: &DataChunkUploadRequest,
1160 ) -> io::Result<Vec<DataChunkUploadTarget>> {
1161 #[derive(Deserialize)]
1162 struct DataChunkUploadTargetsResponse {
1163 targets: Vec<DataChunkUploadTarget>,
1164 }
1165 let response: DataChunkUploadTargetsResponse = self
1166 .inner
1167 .post_json("/data/chunks/upload-targets", request)?;
1168 Ok(response.targets)
1169 }
1170
1171 async fn data_upload_chunk(
1172 &self,
1173 target: &DataChunkUploadTarget,
1174 data: &[u8],
1175 ) -> io::Result<()> {
1176 self.inner
1177 .put_upload_target(&target.method, &target.upload_url, &target.headers, data)
1178 }
1179}
1180
1181impl Clone for RemoteFsProvider {
1182 fn clone(&self) -> Self {
1183 Self {
1184 inner: Arc::clone(&self.inner),
1185 }
1186 }
1187}
1188
1189struct RemoteFileHandle {
1190 provider: RemoteFsProvider,
1191 path: String,
1192 data: Vec<u8>,
1193 position: usize,
1194 flags: OpenFlags,
1195 dirty: bool,
1196}
1197
1198impl RemoteFileHandle {
1199 fn flush_remote(&mut self) -> io::Result<()> {
1200 if !self.dirty {
1201 return Ok(());
1202 }
1203 self.provider.upload_entire_file(&self.path, &self.data)?;
1204 self.dirty = false;
1205 Ok(())
1206 }
1207}
1208
1209impl Read for RemoteFileHandle {
1210 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
1211 let remaining = &self.data[self.position..];
1212 let amt = remaining.len().min(buf.len());
1213 buf[..amt].copy_from_slice(&remaining[..amt]);
1214 self.position += amt;
1215 Ok(amt)
1216 }
1217}
1218
1219impl Write for RemoteFileHandle {
1220 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1221 if !self.flags.write && !self.flags.append {
1222 return Err(io::Error::new(
1223 ErrorKind::PermissionDenied,
1224 "remote file not opened for writing",
1225 ));
1226 }
1227 if self.flags.append {
1228 self.position = self.data.len();
1229 }
1230 let required = self.position + buf.len();
1231 if required > self.data.len() {
1232 self.data.resize(required, 0);
1233 }
1234 self.data[self.position..self.position + buf.len()].copy_from_slice(buf);
1235 self.position += buf.len();
1236 self.dirty = true;
1237 Ok(buf.len())
1238 }
1239
1240 fn flush(&mut self) -> io::Result<()> {
1241 self.flush_remote()
1242 }
1243}
1244
1245impl Seek for RemoteFileHandle {
1246 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
1247 let new_pos = match pos {
1248 SeekFrom::Start(offset) => offset as i64,
1249 SeekFrom::End(offset) => self.data.len() as i64 + offset,
1250 SeekFrom::Current(offset) => self.position as i64 + offset,
1251 };
1252 if new_pos < 0 {
1253 return Err(io::Error::new(ErrorKind::InvalidInput, "seek before start"));
1254 }
1255 self.position = new_pos as usize;
1256 Ok(self.position as u64)
1257 }
1258}
1259
1260#[async_trait(?Send)]
1261impl FileHandle for RemoteFileHandle {}
1262
1263impl Drop for RemoteFileHandle {
1264 fn drop(&mut self) {
1265 if self.dirty {
1266 if let Err(err) = self.provider.upload_entire_file(&self.path, &self.data) {
1267 eprintln!("remote fs flush failed: {err}");
1268 }
1269 }
1270 }
1271}
1272
1273#[derive(Debug, Deserialize)]
1274struct MetadataResponse {
1275 #[serde(rename = "fileType")]
1276 file_type: String,
1277 len: u64,
1278 #[serde(rename = "modifiedAt")]
1279 modified_at: Option<String>,
1280 readonly: bool,
1281 hash: Option<String>,
1282}
1283
1284impl From<MetadataResponse> for FsMetadata {
1285 fn from(value: MetadataResponse) -> Self {
1286 FsMetadata::new_with_hash(
1287 value.file_type.into(),
1288 value.len,
1289 parse_modified_at(value.modified_at.as_deref()),
1290 value.readonly,
1291 value.hash,
1292 )
1293 }
1294}
1295
1296#[derive(Debug, Deserialize)]
1297struct DirEntryResponse {
1298 path: String,
1299 #[serde(rename = "fileName")]
1300 file_name: String,
1301 #[serde(rename = "fileType")]
1302 file_type: String,
1303}
1304
1305impl From<String> for FsFileType {
1306 fn from(value: String) -> Self {
1307 match value.as_str() {
1308 "dir" => FsFileType::Directory,
1309 "file" => FsFileType::File,
1310 "symlink" => FsFileType::Symlink,
1311 "other" => FsFileType::Other,
1312 _ => FsFileType::Unknown,
1313 }
1314 }
1315}
1316
1317#[derive(Debug, Serialize, Deserialize)]
1318struct CreateDirRequest {
1319 path: String,
1320 recursive: bool,
1321}
1322
1323#[derive(Debug, Serialize, Deserialize)]
1324struct RenameRequest {
1325 from: String,
1326 to: String,
1327}
1328
1329#[derive(Debug, Serialize, Deserialize)]
1330struct SetReadonlyRequest {
1331 path: String,
1332 readonly: bool,
1333}
1334
1335#[derive(Debug, Deserialize)]
1336struct CanonicalizeResponse {
1337 path: String,
1338}
1339
1340#[derive(Debug, Deserialize)]
1341struct DownloadUrlResponse {
1342 #[serde(rename = "downloadUrl")]
1343 download_url: String,
1344}
1345
1346#[derive(Debug, Serialize, Deserialize)]
1347#[serde(rename_all = "camelCase")]
1348struct UploadSessionStartRequest {
1349 path: String,
1350 size_bytes: i64,
1351 content_type: Option<String>,
1352 content_sha256: String,
1353}
1354
1355#[derive(Debug, Serialize, Deserialize)]
1356#[serde(rename_all = "camelCase")]
1357struct UploadSessionStartResponse {
1358 session_id: String,
1359 blob_key: String,
1360 chunk_size_bytes: i64,
1361}
1362
1363#[derive(Debug, Clone, Serialize, Deserialize)]
1364#[serde(rename_all = "camelCase")]
1365struct UploadSessionChunkDescriptor {
1366 chunk_index: usize,
1367 offset_bytes: i64,
1368 size_bytes: i64,
1369 chunk_sha256: String,
1370}
1371
1372#[derive(Debug, Serialize, Deserialize)]
1373#[serde(rename_all = "camelCase")]
1374struct UploadSessionChunksRequest {
1375 session_id: String,
1376 blob_key: String,
1377 chunks: Vec<UploadSessionChunkDescriptor>,
1378}
1379
1380#[derive(Debug, Serialize, Deserialize)]
1381#[serde(rename_all = "camelCase")]
1382struct UploadSessionChunksResponse {
1383 targets: Vec<UploadChunkTarget>,
1384}
1385
1386#[derive(Debug, Clone, Serialize, Deserialize)]
1387#[serde(rename_all = "camelCase")]
1388struct UploadChunkTarget {
1389 chunk_index: usize,
1390 method: String,
1391 upload_url: String,
1392 headers: HashMap<String, String>,
1393}
1394
1395#[derive(Debug, Serialize, Deserialize)]
1396#[serde(rename_all = "camelCase")]
1397struct UploadSessionCompleteRequest {
1398 path: String,
1399 session_id: String,
1400 blob_key: String,
1401 size_bytes: i64,
1402 content_sha256: String,
1403 chunk_count: usize,
1404 hash: Option<String>,
1405}
1406
1407#[derive(Debug, Serialize, Deserialize)]
1408struct MultipartUploadRequest {
1409 path: String,
1410 #[serde(rename = "sizeBytes")]
1411 size_bytes: i64,
1412 #[serde(rename = "contentType")]
1413 content_type: Option<String>,
1414}
1415
1416#[derive(Debug, Deserialize)]
1417struct MultipartUploadResponse {
1418 #[serde(rename = "sessionId")]
1419 session_id: String,
1420 #[serde(rename = "blobKey")]
1421 blob_key: String,
1422 #[serde(rename = "uploadId")]
1423 upload_id: String,
1424 #[serde(rename = "partSizeBytes")]
1425 part_size_bytes: i64,
1426}
1427
1428#[derive(Debug, Serialize, Deserialize)]
1429struct MultipartUploadPartRequest {
1430 #[serde(rename = "sessionId")]
1431 session_id: String,
1432 #[serde(rename = "blobKey")]
1433 blob_key: String,
1434 #[serde(rename = "uploadId")]
1435 upload_id: String,
1436 #[serde(rename = "partNumber")]
1437 part_number: i32,
1438 #[serde(rename = "sizeBytes")]
1439 size_bytes: i64,
1440}
1441
1442#[derive(Debug, Deserialize)]
1443struct MultipartUploadPartResponse {
1444 #[serde(rename = "upload_url")]
1445 upload_url: String,
1446}
1447
1448#[derive(Debug, Serialize, Deserialize)]
1449struct MultipartUploadCompleteRequest {
1450 path: String,
1451 #[serde(rename = "sessionId")]
1452 session_id: String,
1453 #[serde(rename = "blobKey")]
1454 blob_key: String,
1455 #[serde(rename = "uploadId")]
1456 upload_id: String,
1457 #[serde(rename = "sizeBytes")]
1458 size_bytes: i64,
1459 hash: Option<String>,
1460 parts: Vec<MultipartPart>,
1461}
1462
1463#[derive(Debug, Serialize, Deserialize)]
1464struct MultipartPart {
1465 #[serde(rename = "partNumber")]
1466 part_number: i32,
1467 etag: String,
1468}
1469
1470#[derive(Debug, Serialize, Deserialize)]
1471struct ShardManifest {
1472 version: u32,
1473 total_size: u64,
1474 shard_size: u64,
1475 shards: Vec<ShardEntry>,
1476}
1477
1478#[derive(Debug, Serialize, Deserialize)]
1479struct ShardEntry {
1480 path: String,
1481 size: u64,
1482}
1483
1484#[derive(Debug, Deserialize)]
1485struct FsWriteSessionResponse {
1486 #[serde(rename = "sessionId")]
1487 _session_id: String,
1488 #[serde(rename = "nextOffset")]
1489 next_offset: i64,
1490}
1491
1492fn sha256_hex(data: &[u8]) -> String {
1493 let digest = Sha256::digest(data);
1494 let mut out = String::with_capacity(digest.len() * 2);
1495 for byte in digest {
1496 use std::fmt::Write as _;
1497 let _ = write!(out, "{byte:02x}");
1498 }
1499 out
1500}
1501
1502fn map_http_err(err: reqwest::Error) -> io::Error {
1503 io::Error::other(err)
1504}
1505
1506fn parse_modified_at(value: Option<&str>) -> Option<std::time::SystemTime> {
1507 let value = value?;
1508 let parsed = DateTime::parse_from_rfc3339(value).ok()?;
1509 let millis = parsed.timestamp_millis();
1510 if millis < 0 {
1511 return None;
1512 }
1513 Some(std::time::UNIX_EPOCH + Duration::from_millis(millis as u64))
1514}
1515
1516fn map_url_err(err: url::ParseError) -> io::Error {
1517 io::Error::new(ErrorKind::InvalidInput, err)
1518}
1519
1520fn handle_error(resp: Response) -> io::Result<Response> {
1521 let status = resp.status();
1522 if status.is_success() {
1523 return Ok(resp);
1524 }
1525 let text = resp.text().unwrap_or_else(|_| status.to_string());
1526 let kind = match status {
1527 StatusCode::NOT_FOUND => ErrorKind::NotFound,
1528 StatusCode::FORBIDDEN | StatusCode::UNAUTHORIZED => ErrorKind::PermissionDenied,
1529 StatusCode::CONFLICT => ErrorKind::AlreadyExists,
1530 StatusCode::BAD_REQUEST => ErrorKind::InvalidInput,
1531 _ => ErrorKind::Other,
1532 };
1533 Err(io::Error::new(kind, text))
1534}
1535
1536impl fmt::Debug for RemoteFsProvider {
1537 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1538 f.debug_struct("RemoteFsProvider").finish_non_exhaustive()
1539 }
1540}
1541
1542#[cfg(test)]
1543mod tests {
1544 use super::*;
1545 use crate::data_contract::DataChunkDescriptor;
1546 use axum::extract::{Query, State};
1547 use axum::http::{HeaderMap, StatusCode};
1548 use axum::routing::{delete, get, post, put};
1549 use axum::{Json, Router};
1550 use futures::executor;
1551 use serde::Deserialize;
1552 use std::collections::HashMap;
1553 use std::net::TcpListener as StdTcpListener;
1554 use std::sync::Arc;
1555 use tempfile::tempdir;
1556 use tokio::net::TcpListener as TokioTcpListener;
1557 use tokio::runtime::Runtime;
1558
1559 #[derive(Clone)]
1560 struct Harness {
1561 root: Arc<PathBuf>,
1562 _keeper: Arc<tempfile::TempDir>,
1563 base_url: Arc<Mutex<Option<String>>>,
1564 upload_sessions: Arc<Mutex<HashMap<String, UploadSessionTestState>>>,
1565 omit_last_chunk_target: bool,
1566 }
1567
1568 impl Harness {
1569 fn new() -> Self {
1570 Self::with_omit_last_chunk_target(false)
1571 }
1572
1573 fn with_omit_last_chunk_target(omit_last_chunk_target: bool) -> Self {
1574 let dir = tempdir().expect("tempdir");
1575 let path = dir.path().to_path_buf();
1576 Self {
1577 root: Arc::new(path),
1578 _keeper: Arc::new(dir),
1579 base_url: Arc::new(Mutex::new(None)),
1580 upload_sessions: Arc::new(Mutex::new(HashMap::new())),
1581 omit_last_chunk_target,
1582 }
1583 }
1584
1585 fn resolve(&self, remote_path: &str) -> PathBuf {
1586 let trimmed = remote_path.trim_start_matches('/');
1587 self.root.join(trimmed)
1588 }
1589 }
1590
1591 #[derive(Clone)]
1592 struct UploadSessionTestState {
1593 path: String,
1594 chunk_count: usize,
1595 }
1596
1597 #[derive(Deserialize)]
1598 struct PathParams {
1599 path: String,
1600 offset: Option<u64>,
1601 length: Option<usize>,
1602 truncate: Option<String>,
1603 recursive: Option<String>,
1604 }
1605
1606 #[derive(Deserialize)]
1607 struct UploadChunkQuery {
1608 #[serde(rename = "sessionId")]
1609 session_id: String,
1610 #[serde(rename = "chunkIndex")]
1611 chunk_index: usize,
1612 }
1613
1614 #[derive(Deserialize)]
1615 struct DataManifestQuery {
1616 path: String,
1617 version: Option<String>,
1618 }
1619
1620 async fn metadata_handler(
1621 State(harness): State<Harness>,
1622 Query(params): Query<PathParams>,
1623 ) -> Result<Json<serde_json::Value>, StatusCode> {
1624 let meta =
1625 std::fs::metadata(harness.resolve(¶ms.path)).map_err(|_| StatusCode::NOT_FOUND)?;
1626 let file_type = if meta.is_dir() {
1627 "dir"
1628 } else if meta.is_file() {
1629 "file"
1630 } else {
1631 "other"
1632 };
1633 Ok(Json(serde_json::json!({
1634 "fileType": file_type,
1635 "len": meta.len(),
1636 "modified": meta.modified().ok().and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok()).map(|d| d.as_secs()),
1637 "readonly": meta.permissions().readonly()
1638 })))
1639 }
1640
1641 async fn dir_handler(
1642 State(harness): State<Harness>,
1643 Query(params): Query<PathParams>,
1644 ) -> Result<Json<Vec<serde_json::Value>>, StatusCode> {
1645 let mut entries = Vec::new();
1646 for entry in
1647 std::fs::read_dir(harness.resolve(¶ms.path)).map_err(|_| StatusCode::NOT_FOUND)?
1648 {
1649 let entry = entry.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1650 let file_type = entry
1651 .file_type()
1652 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1653 let kind = if file_type.is_dir() {
1654 "dir"
1655 } else if file_type.is_file() {
1656 "file"
1657 } else {
1658 "other"
1659 };
1660 entries.push(serde_json::json!({
1661 "path": format!("/{}", entry.path().strip_prefix(&*harness.root).unwrap().display()),
1662 "fileName": entry.file_name().to_string_lossy(),
1663 "fileType": kind
1664 }));
1665 }
1666 Ok(Json(entries))
1667 }
1668
1669 async fn canonicalize_handler(
1670 State(harness): State<Harness>,
1671 Query(params): Query<PathParams>,
1672 ) -> Result<Json<serde_json::Value>, StatusCode> {
1673 let path = harness.resolve(¶ms.path);
1674 let canonical = std::fs::canonicalize(path).map_err(|_| StatusCode::NOT_FOUND)?;
1675 let rel = canonical.strip_prefix(&*harness.root).unwrap_or(&canonical);
1676 Ok(Json(serde_json::json!({
1677 "path": format!("/{}", rel.display())
1678 })))
1679 }
1680
1681 async fn read_handler(
1682 State(harness): State<Harness>,
1683 Query(params): Query<PathParams>,
1684 ) -> Result<Vec<u8>, StatusCode> {
1685 let mut data =
1686 std::fs::read(harness.resolve(¶ms.path)).map_err(|_| StatusCode::NOT_FOUND)?;
1687 let offset = params.offset.unwrap_or(0) as usize;
1688 let length = params.length.unwrap_or(data.len().saturating_sub(offset));
1689 let end = std::cmp::min(offset + length, data.len());
1690 if offset < data.len() {
1691 data = data[offset..end].to_vec();
1692 } else {
1693 data.clear();
1694 }
1695 Ok(data)
1696 }
1697
1698 async fn write_handler(
1699 State(harness): State<Harness>,
1700 Query(params): Query<PathParams>,
1701 body: axum::body::Bytes,
1702 ) -> Result<(), StatusCode> {
1703 let path = harness.resolve(¶ms.path);
1704 if let Some(parent) = path.parent() {
1705 std::fs::create_dir_all(parent).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1706 }
1707 let mut data = if params.truncate.as_deref() == Some("true") || !path.exists() {
1708 Vec::new()
1709 } else {
1710 std::fs::read(&path).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
1711 };
1712 let offset = params.offset.unwrap_or(0) as usize;
1713 let required = offset + body.len();
1714 if required > data.len() {
1715 data.resize(required, 0);
1716 }
1717 data[offset..offset + body.len()].copy_from_slice(&body);
1718 std::fs::write(path, data).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1719 Ok(())
1720 }
1721
1722 async fn upload_session_start_handler(
1723 State(harness): State<Harness>,
1724 Json(req): Json<UploadSessionStartRequest>,
1725 ) -> Result<Json<UploadSessionStartResponse>, StatusCode> {
1726 let session_id = Uuid::new_v4().to_string();
1727 let chunk_size_bytes = 1024i64;
1728 harness
1729 .upload_sessions
1730 .lock()
1731 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
1732 .insert(
1733 session_id.clone(),
1734 UploadSessionTestState {
1735 path: req.path,
1736 chunk_count: 0,
1737 },
1738 );
1739 Ok(Json(UploadSessionStartResponse {
1740 session_id: session_id.clone(),
1741 blob_key: format!("test/blob/{session_id}"),
1742 chunk_size_bytes,
1743 }))
1744 }
1745
1746 async fn upload_session_chunks_handler(
1747 State(harness): State<Harness>,
1748 Json(req): Json<UploadSessionChunksRequest>,
1749 ) -> Result<Json<UploadSessionChunksResponse>, StatusCode> {
1750 let base = harness
1751 .base_url
1752 .lock()
1753 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
1754 .clone()
1755 .ok_or(StatusCode::INTERNAL_SERVER_ERROR)?;
1756 let mut sessions = harness
1757 .upload_sessions
1758 .lock()
1759 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1760 let state = sessions
1761 .get_mut(&req.session_id)
1762 .ok_or(StatusCode::NOT_FOUND)?;
1763 state.chunk_count = req.chunks.len();
1764 let mut targets = req
1765 .chunks
1766 .iter()
1767 .map(|chunk| UploadChunkTarget {
1768 chunk_index: chunk.chunk_index,
1769 method: "PUT".to_string(),
1770 upload_url: format!(
1771 "{base}/upload-chunk?sessionId={}&chunkIndex={}",
1772 req.session_id, chunk.chunk_index
1773 ),
1774 headers: HashMap::new(),
1775 })
1776 .collect::<Vec<_>>();
1777 if harness.omit_last_chunk_target && !targets.is_empty() {
1778 targets.pop();
1779 }
1780 Ok(Json(UploadSessionChunksResponse { targets }))
1781 }
1782
1783 async fn upload_chunk_handler(
1784 State(harness): State<Harness>,
1785 Query(query): Query<UploadChunkQuery>,
1786 body: axum::body::Bytes,
1787 ) -> Result<(), StatusCode> {
1788 let chunk_path = harness.resolve(&format!(
1789 "/.upload-sessions/{}/{}",
1790 query.session_id, query.chunk_index
1791 ));
1792 if let Some(parent) = chunk_path.parent() {
1793 std::fs::create_dir_all(parent).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1794 }
1795 std::fs::write(chunk_path, body).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1796 Ok(())
1797 }
1798
1799 async fn data_manifest_handler(
1800 Query(query): Query<DataManifestQuery>,
1801 ) -> Result<Json<DataManifestDescriptor>, StatusCode> {
1802 let updated_at = if query.version.as_deref() == Some("v1") {
1803 "2026-01-01T00:00:00Z"
1804 } else {
1805 "2026-02-02T00:00:00Z"
1806 };
1807 Ok(Json(DataManifestDescriptor {
1808 schema_version: 1,
1809 format: "runmat-data".to_string(),
1810 dataset_id: format!("dataset:{}", query.path),
1811 updated_at: updated_at.to_string(),
1812 txn_sequence: 9,
1813 }))
1814 }
1815
1816 async fn data_chunk_upload_targets_handler(
1817 Json(req): Json<DataChunkUploadRequest>,
1818 ) -> Result<Json<serde_json::Value>, StatusCode> {
1819 let targets = req
1820 .chunks
1821 .iter()
1822 .map(|chunk| {
1823 serde_json::json!({
1824 "key": chunk.key,
1825 "method": "PUT",
1826 "upload_url": format!("https://uploads.example.test/{}/{}", req.array, chunk.object_id),
1827 "headers": {
1828 "x-runmat-hash": chunk.hash,
1829 "x-runmat-object": chunk.object_id,
1830 }
1831 })
1832 })
1833 .collect::<Vec<_>>();
1834 Ok(Json(serde_json::json!({ "targets": targets })))
1835 }
1836
1837 async fn upload_session_complete_handler(
1838 State(harness): State<Harness>,
1839 Json(req): Json<UploadSessionCompleteRequest>,
1840 ) -> Result<(), StatusCode> {
1841 let state = harness
1842 .upload_sessions
1843 .lock()
1844 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
1845 .remove(&req.session_id)
1846 .ok_or(StatusCode::NOT_FOUND)?;
1847 let mut data = Vec::new();
1848 for chunk_index in 0..state.chunk_count {
1849 let chunk_path = harness.resolve(&format!(
1850 "/.upload-sessions/{}/{}",
1851 req.session_id, chunk_index
1852 ));
1853 let chunk = std::fs::read(chunk_path).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1854 data.extend_from_slice(&chunk);
1855 }
1856 if sha256_hex(&data) != req.content_sha256 {
1857 return Err(StatusCode::BAD_REQUEST);
1858 }
1859 let target_path = harness.resolve(&state.path);
1860 if let Some(parent) = target_path.parent() {
1861 std::fs::create_dir_all(parent).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1862 }
1863 std::fs::write(target_path, data).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1864 Ok(())
1865 }
1866
1867 async fn mkdir_handler(
1868 State(harness): State<Harness>,
1869 Json(req): Json<CreateDirRequest>,
1870 ) -> Result<(), StatusCode> {
1871 let path = harness.resolve(&req.path);
1872 if req.recursive {
1873 std::fs::create_dir_all(path).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1874 } else {
1875 std::fs::create_dir(path).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1876 }
1877 Ok(())
1878 }
1879
1880 async fn delete_file_handler(
1881 State(harness): State<Harness>,
1882 Query(params): Query<PathParams>,
1883 ) -> Result<(), StatusCode> {
1884 std::fs::remove_file(harness.resolve(¶ms.path)).map_err(|_| StatusCode::NOT_FOUND)?;
1885 Ok(())
1886 }
1887
1888 async fn delete_dir_handler(
1889 State(harness): State<Harness>,
1890 Query(params): Query<PathParams>,
1891 ) -> Result<(), StatusCode> {
1892 let recursive = params.recursive.as_deref() == Some("true");
1893 if recursive {
1894 std::fs::remove_dir_all(harness.resolve(¶ms.path))
1895 .map_err(|_| StatusCode::NOT_FOUND)?;
1896 } else {
1897 std::fs::remove_dir(harness.resolve(¶ms.path))
1898 .map_err(|_| StatusCode::NOT_FOUND)?;
1899 }
1900 Ok(())
1901 }
1902
1903 async fn rename_handler(
1904 State(harness): State<Harness>,
1905 Json(req): Json<RenameRequest>,
1906 ) -> Result<(), StatusCode> {
1907 let from = harness.resolve(&req.from);
1908 let to = harness.resolve(&req.to);
1909 if let Some(parent) = to.parent() {
1910 std::fs::create_dir_all(parent).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1911 }
1912 std::fs::rename(from, to).map_err(|_| StatusCode::NOT_FOUND)?;
1913 Ok(())
1914 }
1915
1916 async fn set_readonly_handler(
1917 State(harness): State<Harness>,
1918 Json(req): Json<SetReadonlyRequest>,
1919 ) -> Result<(), StatusCode> {
1920 let path = harness.resolve(&req.path);
1921 let mut perms = std::fs::metadata(&path)
1922 .map_err(|_| StatusCode::NOT_FOUND)?
1923 .permissions();
1924 perms.set_readonly(req.readonly);
1925 std::fs::set_permissions(path, perms).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1926 Ok(())
1927 }
1928
1929 async fn read_with_download_handler(
1930 State(harness): State<Harness>,
1931 Query(params): Query<PathParams>,
1932 ) -> Result<Json<serde_json::Value>, StatusCode> {
1933 let base = harness
1934 .base_url
1935 .lock()
1936 .unwrap()
1937 .clone()
1938 .ok_or(StatusCode::INTERNAL_SERVER_ERROR)?;
1939 let download_url = format!("{base}/download?path={}", params.path);
1940 Ok(Json(serde_json::json!({
1941 "downloadUrl": download_url,
1942 "expiresAt": 0
1943 })))
1944 }
1945
1946 async fn download_handler(
1947 State(harness): State<Harness>,
1948 Query(params): Query<PathParams>,
1949 headers: HeaderMap,
1950 ) -> Result<Vec<u8>, StatusCode> {
1951 let mut data =
1952 std::fs::read(harness.resolve(¶ms.path)).map_err(|_| StatusCode::NOT_FOUND)?;
1953 if let Some(range) = headers.get("range").and_then(|value| value.to_str().ok()) {
1954 if let Some((start, end)) = parse_range(range) {
1955 let start = start.min(data.len());
1956 let end = end.min(data.len().saturating_sub(1));
1957 if start < data.len() {
1958 data = data[start..=end].to_vec();
1959 } else {
1960 data.clear();
1961 }
1962 }
1963 }
1964 Ok(data)
1965 }
1966
1967 fn parse_range(value: &str) -> Option<(usize, usize)> {
1968 let value = value.strip_prefix("bytes=")?;
1969 let (start, end) = value.split_once('-')?;
1970 let start = start.parse::<usize>().ok()?;
1971 let end = end.parse::<usize>().ok()?;
1972 Some((start, end))
1973 }
1974
1975 fn spawn_server() -> (String, Harness, Runtime) {
1976 let harness = Harness::new();
1977 let router = Router::new()
1978 .route("/fs/metadata", get(metadata_handler))
1979 .route("/fs/dir", get(dir_handler).delete(delete_dir_handler))
1980 .route("/fs/file", delete(delete_file_handler))
1981 .route("/fs/canonicalize", get(canonicalize_handler))
1982 .route("/fs/read", get(read_handler))
1983 .route("/fs/write", put(write_handler))
1984 .route(
1985 "/fs/upload-session/start",
1986 post(upload_session_start_handler),
1987 )
1988 .route(
1989 "/fs/upload-session/chunks",
1990 post(upload_session_chunks_handler),
1991 )
1992 .route(
1993 "/fs/upload-session/complete",
1994 post(upload_session_complete_handler),
1995 )
1996 .route("/fs/mkdir", post(mkdir_handler))
1997 .route("/fs/rename", post(rename_handler))
1998 .route("/fs/set-readonly", post(set_readonly_handler))
1999 .route("/data/manifest", get(data_manifest_handler))
2000 .route(
2001 "/data/chunks/upload-targets",
2002 post(data_chunk_upload_targets_handler),
2003 )
2004 .route("/upload-chunk", put(upload_chunk_handler))
2005 .with_state(harness.clone());
2006 let std_listener = StdTcpListener::bind("127.0.0.1:0").unwrap();
2007 std_listener.set_nonblocking(true).expect("nonblocking");
2008 let addr = std_listener.local_addr().unwrap();
2009 let service = router.into_make_service();
2010 let rt = Runtime::new().unwrap();
2011 let base = format!("http://{addr}");
2012 *harness.base_url.lock().unwrap() = Some(base.clone());
2013 rt.spawn(async move {
2014 let listener = TokioTcpListener::from_std(std_listener).unwrap();
2015 axum::serve(listener, service).await.unwrap();
2016 });
2017 (base, harness, rt)
2018 }
2019
2020 fn spawn_server_with_download_url() -> (String, Harness, Runtime) {
2021 let harness = Harness::new();
2022 let router = Router::new()
2023 .route("/fs/metadata", get(metadata_handler))
2024 .route("/fs/dir", get(dir_handler).delete(delete_dir_handler))
2025 .route("/fs/file", delete(delete_file_handler))
2026 .route("/fs/canonicalize", get(canonicalize_handler))
2027 .route("/fs/read", get(read_with_download_handler))
2028 .route("/fs/write", put(write_handler))
2029 .route(
2030 "/fs/upload-session/start",
2031 post(upload_session_start_handler),
2032 )
2033 .route(
2034 "/fs/upload-session/chunks",
2035 post(upload_session_chunks_handler),
2036 )
2037 .route(
2038 "/fs/upload-session/complete",
2039 post(upload_session_complete_handler),
2040 )
2041 .route("/fs/mkdir", post(mkdir_handler))
2042 .route("/fs/rename", post(rename_handler))
2043 .route("/fs/set-readonly", post(set_readonly_handler))
2044 .route("/download", get(download_handler))
2045 .route("/data/manifest", get(data_manifest_handler))
2046 .route(
2047 "/data/chunks/upload-targets",
2048 post(data_chunk_upload_targets_handler),
2049 )
2050 .route("/upload-chunk", put(upload_chunk_handler))
2051 .with_state(harness.clone());
2052 let std_listener = StdTcpListener::bind("127.0.0.1:0").unwrap();
2053 std_listener.set_nonblocking(true).expect("nonblocking");
2054 let addr = std_listener.local_addr().unwrap();
2055 let service = router.into_make_service();
2056 let rt = Runtime::new().unwrap();
2057 let base = format!("http://{addr}");
2058 *harness.base_url.lock().unwrap() = Some(base.clone());
2059 rt.spawn(async move {
2060 let listener = TokioTcpListener::from_std(std_listener).unwrap();
2061 axum::serve(listener, service).await.unwrap();
2062 });
2063 (base, harness, rt)
2064 }
2065
2066 fn spawn_server_with_missing_chunk_target() -> (String, Harness, Runtime) {
2067 let harness = Harness::with_omit_last_chunk_target(true);
2068 let router = Router::new()
2069 .route("/fs/metadata", get(metadata_handler))
2070 .route("/fs/dir", get(dir_handler).delete(delete_dir_handler))
2071 .route("/fs/file", delete(delete_file_handler))
2072 .route("/fs/canonicalize", get(canonicalize_handler))
2073 .route("/fs/read", get(read_handler))
2074 .route("/fs/write", put(write_handler))
2075 .route(
2076 "/fs/upload-session/start",
2077 post(upload_session_start_handler),
2078 )
2079 .route(
2080 "/fs/upload-session/chunks",
2081 post(upload_session_chunks_handler),
2082 )
2083 .route(
2084 "/fs/upload-session/complete",
2085 post(upload_session_complete_handler),
2086 )
2087 .route("/fs/mkdir", post(mkdir_handler))
2088 .route("/fs/rename", post(rename_handler))
2089 .route("/fs/set-readonly", post(set_readonly_handler))
2090 .route("/data/manifest", get(data_manifest_handler))
2091 .route(
2092 "/data/chunks/upload-targets",
2093 post(data_chunk_upload_targets_handler),
2094 )
2095 .route("/upload-chunk", put(upload_chunk_handler))
2096 .with_state(harness.clone());
2097 let std_listener = StdTcpListener::bind("127.0.0.1:0").unwrap();
2098 std_listener.set_nonblocking(true).expect("nonblocking");
2099 let addr = std_listener.local_addr().unwrap();
2100 let service = router.into_make_service();
2101 let rt = Runtime::new().unwrap();
2102 let base = format!("http://{addr}");
2103 *harness.base_url.lock().unwrap() = Some(base.clone());
2104 rt.spawn(async move {
2105 let listener = TokioTcpListener::from_std(std_listener).unwrap();
2106 axum::serve(listener, service).await.unwrap();
2107 });
2108 (base, harness, rt)
2109 }
2110
2111 #[test]
2112 fn remote_provider_roundtrip() {
2113 let (base, harness, _rt) = spawn_server();
2114 let provider = RemoteFsProvider::new(RemoteFsConfig {
2115 base_url: base,
2116 auth_token: None,
2117 chunk_bytes: 1024,
2118 parallel_requests: 4,
2119 direct_read_threshold_bytes: u64::MAX,
2120 direct_write_threshold_bytes: 1024,
2121 timeout: Duration::from_secs(30),
2122 ..RemoteFsConfig::default()
2123 })
2124 .expect("provider");
2125
2126 let data = (0..16_384u32)
2127 .flat_map(|v| v.to_le_bytes())
2128 .collect::<Vec<_>>();
2129 executor::block_on(provider.write(Path::new("/reports/data.bin"), &data)).expect("write");
2130 let read_back =
2131 executor::block_on(provider.read(Path::new("/reports/data.bin"))).expect("read");
2132 assert_eq!(data, read_back);
2133 executor::block_on(provider.remove_file(Path::new("/reports/data.bin"))).expect("remove");
2134 assert!(!harness.resolve("/reports/data.bin").exists());
2135 }
2136
2137 #[test]
2138 fn remote_provider_download_url_read() {
2139 let (base, harness, _rt) = spawn_server_with_download_url();
2140 let provider = RemoteFsProvider::new(RemoteFsConfig {
2141 base_url: base,
2142 auth_token: None,
2143 chunk_bytes: 128,
2144 parallel_requests: 2,
2145 direct_read_threshold_bytes: u64::MAX,
2146 timeout: Duration::from_secs(30),
2147 ..RemoteFsConfig::default()
2148 })
2149 .expect("provider");
2150
2151 let data = (0..1024u32)
2152 .flat_map(|v| v.to_le_bytes())
2153 .collect::<Vec<_>>();
2154 std::fs::create_dir_all(harness.resolve("/reports")).expect("mkdir");
2155 std::fs::write(harness.resolve("/reports/data.bin"), &data).expect("write");
2156
2157 let read_back =
2158 executor::block_on(provider.read(Path::new("/reports/data.bin"))).expect("read");
2159 assert_eq!(data, read_back);
2160 }
2161
2162 #[test]
2163 fn remote_provider_errors_when_chunk_target_is_missing() {
2164 let (base, _harness, _rt) = spawn_server_with_missing_chunk_target();
2165 let provider = RemoteFsProvider::new(RemoteFsConfig {
2166 base_url: base,
2167 auth_token: None,
2168 chunk_bytes: 128,
2169 parallel_requests: 2,
2170 direct_read_threshold_bytes: u64::MAX,
2171 direct_write_threshold_bytes: 128,
2172 timeout: Duration::from_secs(30),
2173 ..RemoteFsConfig::default()
2174 })
2175 .expect("provider");
2176
2177 let data = vec![7u8; 1024];
2178 let err =
2179 executor::block_on(provider.write(Path::new("/reports/missing-target.bin"), &data))
2180 .expect_err("write should fail when chunk target is missing");
2181 assert_eq!(err.kind(), io::ErrorKind::Other);
2182 }
2183
2184 #[test]
2185 fn remote_provider_data_manifest_descriptor_fetches_descriptor() {
2186 let (base, _harness, _rt) = spawn_server();
2187 let provider = RemoteFsProvider::new(RemoteFsConfig {
2188 base_url: base,
2189 auth_token: None,
2190 timeout: Duration::from_secs(30),
2191 ..RemoteFsConfig::default()
2192 })
2193 .expect("provider");
2194
2195 let descriptor = provider
2196 .data_manifest_descriptor(&DataManifestRequest {
2197 path: "/datasets/alpha.data".to_string(),
2198 version: Some("v1".to_string()),
2199 })
2200 .expect("manifest descriptor");
2201 assert_eq!(descriptor.schema_version, 1);
2202 assert_eq!(descriptor.format, "runmat-data");
2203 assert_eq!(descriptor.dataset_id, "dataset:/datasets/alpha.data");
2204 assert_eq!(descriptor.updated_at, "2026-01-01T00:00:00Z");
2205 assert_eq!(descriptor.txn_sequence, 9);
2206 }
2207
2208 #[test]
2209 fn remote_provider_data_chunk_upload_targets_fetches_targets() {
2210 let (base, _harness, _rt) = spawn_server();
2211 let provider = RemoteFsProvider::new(RemoteFsConfig {
2212 base_url: base,
2213 auth_token: None,
2214 timeout: Duration::from_secs(30),
2215 ..RemoteFsConfig::default()
2216 })
2217 .expect("provider");
2218
2219 let targets = provider
2220 .data_chunk_upload_targets(&DataChunkUploadRequest {
2221 dataset_path: "/datasets/alpha.data".to_string(),
2222 array: "X".to_string(),
2223 chunks: vec![DataChunkDescriptor {
2224 key: "X/0-0".to_string(),
2225 object_id: "obj_0".to_string(),
2226 hash: "sha256:abc".to_string(),
2227 bytes_raw: 10,
2228 bytes_stored: 8,
2229 }],
2230 })
2231 .expect("chunk upload targets");
2232
2233 assert_eq!(targets.len(), 1);
2234 assert_eq!(targets[0].key, "X/0-0");
2235 assert_eq!(targets[0].method, "PUT");
2236 assert_eq!(
2237 targets[0].upload_url,
2238 "https://uploads.example.test/X/obj_0"
2239 );
2240 let headers = &targets[0].headers;
2241 assert_eq!(headers.len(), 2);
2242 assert_eq!(
2243 headers.get("x-runmat-hash").map(String::as_str),
2244 Some("sha256:abc")
2245 );
2246 assert_eq!(
2247 headers.get("x-runmat-object").map(String::as_str),
2248 Some("obj_0")
2249 );
2250 }
2251}