1use crate::data_contract::{
2 DataChunkUploadRequest, DataChunkUploadTarget, DataManifestDescriptor, DataManifestRequest,
3};
4use crate::{DirEntry, FileHandle, FsFileType, FsMetadata, FsProvider, OpenFlags};
5use async_trait::async_trait;
6use chrono::DateTime;
7use crossbeam_utils::thread;
8use once_cell::sync::Lazy;
9use reqwest::blocking::{Client, Response};
10use reqwest::StatusCode;
11use serde::{Deserialize, Serialize};
12use sha2::{Digest, Sha256};
13use std::collections::HashMap;
14use std::collections::VecDeque;
15use std::fmt;
16use std::io::{self, ErrorKind, Read, Seek, SeekFrom, Write};
17use std::path::{Path, PathBuf};
18use std::sync::{Arc, Mutex};
19use std::time::Duration;
20use url::Url;
21use uuid::Uuid;
22
23const MANIFEST_HASH: &str = "manifest:v1";
24const SHARD_PREFIX: &str = "/.runmat/shards";
25
26static USER_AGENT: Lazy<String> = Lazy::new(|| {
27 format!(
28 "runmat-remote-fs/{} ({})",
29 env!("CARGO_PKG_VERSION"),
30 std::env::consts::OS
31 )
32});
33
34#[derive(Clone, Debug)]
35pub struct RemoteFsConfig {
36 pub base_url: String,
37 pub auth_token: Option<String>,
38 pub chunk_bytes: usize,
39 pub parallel_requests: usize,
40 pub direct_read_threshold_bytes: u64,
41 pub direct_write_threshold_bytes: u64,
42 pub shard_threshold_bytes: u64,
43 pub shard_size_bytes: u64,
44 pub timeout: Duration,
45 pub retry_max_attempts: usize,
46 pub retry_base_delay: Duration,
47 pub retry_max_delay: Duration,
48}
49
50impl Default for RemoteFsConfig {
51 fn default() -> Self {
52 Self {
53 base_url: String::new(),
54 auth_token: None,
55 chunk_bytes: 16 * 1024 * 1024,
56 parallel_requests: 4,
57 direct_read_threshold_bytes: 64 * 1024 * 1024,
58 direct_write_threshold_bytes: 64 * 1024 * 1024,
59 shard_threshold_bytes: 4 * 1024 * 1024 * 1024,
60 shard_size_bytes: 512 * 1024 * 1024,
61 timeout: Duration::from_secs(120),
62 retry_max_attempts: 5,
63 retry_base_delay: Duration::from_millis(100),
64 retry_max_delay: Duration::from_secs(2),
65 }
66 }
67}
68
69struct RemoteInner {
70 client: Client,
71 base: Url,
72 auth_header: Option<String>,
73 chunk_bytes: usize,
74 parallel_requests: usize,
75 direct_read_threshold_bytes: u64,
76 direct_write_threshold_bytes: u64,
77 shard_threshold_bytes: u64,
78 shard_size_bytes: u64,
79 retry_max_attempts: usize,
80 retry_base_delay: Duration,
81 retry_max_delay: Duration,
82}
83
84impl RemoteInner {
85 fn new(config: RemoteFsConfig) -> io::Result<Self> {
86 if config.base_url.is_empty() {
87 return Err(io::Error::new(
88 ErrorKind::InvalidInput,
89 "RemoteFsConfig.base_url must be provided",
90 ));
91 }
92 let base = Url::parse(&config.base_url).map_err(map_url_err)?;
93 let client = Client::builder()
94 .timeout(config.timeout)
95 .user_agent(USER_AGENT.clone())
96 .build()
97 .map_err(map_http_err)?;
98 Ok(Self {
99 client,
100 base,
101 auth_header: config.auth_token.map(|token| format!("Bearer {token}")),
102 chunk_bytes: config.chunk_bytes.max(64 * 1024),
103 parallel_requests: config.parallel_requests.max(1),
104 direct_read_threshold_bytes: config.direct_read_threshold_bytes,
105 direct_write_threshold_bytes: config.direct_write_threshold_bytes,
106 shard_threshold_bytes: config.shard_threshold_bytes,
107 shard_size_bytes: config.shard_size_bytes.max(8 * 1024 * 1024),
108 retry_max_attempts: config.retry_max_attempts.max(1),
109 retry_base_delay: config.retry_base_delay,
110 retry_max_delay: config.retry_max_delay,
111 })
112 }
113
114 fn should_retry(&self, status: StatusCode) -> bool {
115 matches!(
116 status,
117 StatusCode::TOO_MANY_REQUESTS
118 | StatusCode::INTERNAL_SERVER_ERROR
119 | StatusCode::BAD_GATEWAY
120 | StatusCode::SERVICE_UNAVAILABLE
121 | StatusCode::GATEWAY_TIMEOUT
122 )
123 }
124
125 fn retry_delay(&self, attempt: usize) -> Duration {
126 let factor = 1u64.checked_shl(attempt as u32).unwrap_or(u64::MAX);
127 let delay = self.retry_base_delay.as_millis() as u64 * factor;
128 let capped = delay.min(self.retry_max_delay.as_millis() as u64);
129 Duration::from_millis(capped)
130 }
131
132 fn send_with_retry(
133 &self,
134 method: reqwest::Method,
135 route: &str,
136 query: &[(&str, String)],
137 ) -> io::Result<Response> {
138 for attempt in 0..self.retry_max_attempts {
139 let resp = self
140 .request(method.clone(), route, query)
141 .send()
142 .map_err(map_http_err)?;
143 if !self.should_retry(resp.status()) || attempt + 1 == self.retry_max_attempts {
144 return Ok(resp);
145 }
146 std::thread::sleep(self.retry_delay(attempt));
147 }
148 Err(io::Error::other("request retries exhausted"))
149 }
150
151 fn get_url_with_retry(&self, url: &str, range: Option<String>) -> io::Result<Response> {
152 for attempt in 0..self.retry_max_attempts {
153 let mut request = self.client.get(url);
154 if let Some(range) = &range {
155 request = request.header(reqwest::header::RANGE, range);
156 }
157 let resp = request.send().map_err(map_http_err)?;
158 if !self.should_retry(resp.status()) || attempt + 1 == self.retry_max_attempts {
159 return Ok(resp);
160 }
161 std::thread::sleep(self.retry_delay(attempt));
162 }
163 Err(io::Error::other("request retries exhausted"))
164 }
165
166 fn endpoint(&self, route: &str) -> Url {
167 self.base
168 .join(route.trim_start_matches('/'))
169 .expect("failed to join remote route")
170 }
171
172 fn request(
173 &self,
174 method: reqwest::Method,
175 route: &str,
176 query: &[(&str, String)],
177 ) -> reqwest::blocking::RequestBuilder {
178 let mut url = self.endpoint(route);
179 {
180 let mut pairs = url.query_pairs_mut();
181 for (k, v) in query {
182 pairs.append_pair(k, v);
183 }
184 }
185 let mut builder = self.client.request(method, url);
186 if let Some(auth) = &self.auth_header {
187 builder = builder.header("Authorization", auth);
188 }
189 builder
190 }
191
192 fn get_json<T: for<'a> Deserialize<'a>>(
193 &self,
194 route: &str,
195 query: &[(&str, String)],
196 ) -> io::Result<T> {
197 let resp = self.send_with_retry(reqwest::Method::GET, route, query)?;
198 handle_error(resp)?.json().map_err(map_http_err)
199 }
200
201 fn post_empty<B: Serialize>(&self, route: &str, body: &B) -> io::Result<()> {
202 let resp = self
203 .request(reqwest::Method::POST, route, &[])
204 .json(body)
205 .send()
206 .map_err(map_http_err)?;
207 handle_error(resp)?;
208 Ok(())
209 }
210
211 fn post_json<T: for<'a> Deserialize<'a>, B: Serialize>(
212 &self,
213 route: &str,
214 body: &B,
215 ) -> io::Result<T> {
216 let resp = self
217 .request(reqwest::Method::POST, route, &[])
218 .json(body)
219 .send()
220 .map_err(map_http_err)?;
221 handle_error(resp)?.json().map_err(map_http_err)
222 }
223
224 fn delete_empty(&self, route: &str, query: &[(&str, String)]) -> io::Result<()> {
225 let resp = self.send_with_retry(reqwest::Method::DELETE, route, query)?;
226 handle_error(resp)?;
227 Ok(())
228 }
229
230 fn download_chunk(&self, path: &str, offset: u64, length: usize) -> io::Result<Vec<u8>> {
231 let resp = self.send_with_retry(
232 reqwest::Method::GET,
233 "/fs/read",
234 &[
235 ("path", path.to_string()),
236 ("offset", offset.to_string()),
237 ("length", length.to_string()),
238 ],
239 )?;
240 let mut body = handle_error(resp)?;
241 if let Some(content_type) = body.headers().get(reqwest::header::CONTENT_TYPE) {
242 if content_type
243 .to_str()
244 .map(|value| value.contains("application/json"))
245 .unwrap_or(false)
246 {
247 let json: DownloadUrlResponse = body.json().map_err(map_http_err)?;
248 return self.download_range_from_url(&json.download_url, offset, length as u64);
249 }
250 }
251 let mut buf = Vec::with_capacity(length);
252 body.copy_to(&mut buf).map_err(map_http_err)?;
253 Ok(buf)
254 }
255
256 fn download_range_from_url(&self, url: &str, offset: u64, length: u64) -> io::Result<Vec<u8>> {
257 if length == 0 {
258 return Ok(Vec::new());
259 }
260 let end = offset + length - 1;
261 let resp = self.get_url_with_retry(url, Some(format!("bytes={offset}-{end}")))?;
262 let mut body = handle_error(resp)?;
263 let mut buf = Vec::with_capacity(length as usize);
264 body.copy_to(&mut buf).map_err(map_http_err)?;
265 Ok(buf)
266 }
267
268 fn fetch_download_url(&self, path: &str) -> io::Result<DownloadUrlResponse> {
269 self.get_json("/fs/download-url", &[("path", path.to_string())])
270 }
271
272 fn upload_chunk(
273 &self,
274 path: &str,
275 offset: u64,
276 truncate: bool,
277 final_chunk: bool,
278 data: &[u8],
279 hash: Option<&str>,
280 ) -> io::Result<Option<FsWriteSessionResponse>> {
281 let mut query = vec![("path", path.to_string()), ("offset", offset.to_string())];
282 if truncate {
283 query.push(("truncate", "true".to_string()));
284 }
285 if final_chunk {
286 query.push(("final", "true".to_string()));
287 }
288 if let Some(hash) = hash {
289 query.push(("hash", hash.to_string()));
290 }
291 let resp = self
292 .request(reqwest::Method::PUT, "/fs/write", &query)
293 .body(data.to_vec())
294 .send()
295 .map_err(map_http_err)?;
296 if resp.status() == StatusCode::ACCEPTED {
297 let session = handle_error(resp)?.json().map_err(map_http_err)?;
298 return Ok(Some(session));
299 }
300 handle_error(resp)?;
301 Ok(None)
302 }
303
304 fn multipart_create(&self, path: &str, size_bytes: u64) -> io::Result<MultipartUploadResponse> {
305 self.post_json(
306 "/fs/multipart-upload",
307 &MultipartUploadRequest {
308 path: path.to_string(),
309 size_bytes: size_bytes as i64,
310 content_type: None,
311 },
312 )
313 }
314
315 fn upload_session_start(
316 &self,
317 path: &str,
318 size_bytes: u64,
319 content_sha256: &str,
320 ) -> io::Result<UploadSessionStartResponse> {
321 self.post_json(
322 "/fs/upload-session/start",
323 &UploadSessionStartRequest {
324 path: path.to_string(),
325 size_bytes: size_bytes as i64,
326 content_type: None,
327 content_sha256: content_sha256.to_string(),
328 },
329 )
330 }
331
332 fn upload_session_chunks(
333 &self,
334 session_id: &str,
335 blob_key: &str,
336 chunks: &[UploadSessionChunkDescriptor],
337 ) -> io::Result<UploadSessionChunksResponse> {
338 self.post_json(
339 "/fs/upload-session/chunks",
340 &UploadSessionChunksRequest {
341 session_id: session_id.to_string(),
342 blob_key: blob_key.to_string(),
343 chunks: chunks.to_vec(),
344 },
345 )
346 }
347
348 fn upload_session_complete(
349 &self,
350 path: &str,
351 session_id: &str,
352 blob_key: &str,
353 size_bytes: u64,
354 content_sha256: &str,
355 chunk_count: usize,
356 ) -> io::Result<()> {
357 self.post_empty(
358 "/fs/upload-session/complete",
359 &UploadSessionCompleteRequest {
360 path: path.to_string(),
361 session_id: session_id.to_string(),
362 blob_key: blob_key.to_string(),
363 size_bytes: size_bytes as i64,
364 content_sha256: content_sha256.to_string(),
365 chunk_count,
366 hash: None,
367 },
368 )
369 }
370
371 fn multipart_presign_part(
372 &self,
373 session_id: &str,
374 blob_key: &str,
375 upload_id: &str,
376 part_number: i32,
377 size_bytes: u64,
378 ) -> io::Result<String> {
379 let response: MultipartUploadPartResponse = self.post_json(
380 "/fs/multipart-upload/part",
381 &MultipartUploadPartRequest {
382 session_id: session_id.to_string(),
383 blob_key: blob_key.to_string(),
384 upload_id: upload_id.to_string(),
385 part_number,
386 size_bytes: size_bytes as i64,
387 },
388 )?;
389 Ok(response.upload_url)
390 }
391
392 fn multipart_complete(
393 &self,
394 path: &str,
395 session_id: &str,
396 blob_key: &str,
397 upload_id: &str,
398 size_bytes: u64,
399 parts: Vec<MultipartPart>,
400 ) -> io::Result<()> {
401 let resp = self
402 .request(reqwest::Method::POST, "/fs/multipart-upload/complete", &[])
403 .json(&MultipartUploadCompleteRequest {
404 path: path.to_string(),
405 session_id: session_id.to_string(),
406 blob_key: blob_key.to_string(),
407 upload_id: upload_id.to_string(),
408 size_bytes: size_bytes as i64,
409 hash: None,
410 parts,
411 })
412 .send()
413 .map_err(map_http_err)?;
414 handle_error(resp)?;
415 Ok(())
416 }
417
418 fn put_upload_url_with_etag(&self, url: &str, data: &[u8]) -> io::Result<String> {
419 let resp = self
420 .client
421 .put(url)
422 .body(data.to_vec())
423 .send()
424 .map_err(map_http_err)?;
425 let resp = handle_error(resp)?;
426 let etag = resp
427 .headers()
428 .get(reqwest::header::ETAG)
429 .and_then(|value| value.to_str().ok())
430 .unwrap_or("")
431 .to_string();
432 Ok(etag)
433 }
434
435 fn put_upload_target(
436 &self,
437 method: &str,
438 url: &str,
439 headers: &HashMap<String, String>,
440 data: &[u8],
441 ) -> io::Result<()> {
442 let method = reqwest::Method::from_bytes(method.as_bytes()).map_err(|err| {
443 io::Error::new(
444 ErrorKind::InvalidInput,
445 format!("invalid upload method `{method}`: {err}"),
446 )
447 })?;
448 let mut request = self.client.request(method, url).body(data.to_vec());
449 for (name, value) in headers {
450 request = request.header(name, value);
451 }
452 let resp = request.send().map_err(map_http_err)?;
453 handle_error(resp)?;
454 Ok(())
455 }
456
457 fn fetch_metadata(&self, path: &str) -> io::Result<MetadataResponse> {
458 self.get_json("/fs/metadata", &[("path", path.to_string())])
459 }
460
461 fn fetch_dir(&self, path: &str) -> io::Result<Vec<DirEntryResponse>> {
462 self.get_json("/fs/dir", &[("path", path.to_string())])
463 }
464
465 fn canonicalize_path(&self, path: &str) -> io::Result<String> {
466 let resp: CanonicalizeResponse =
467 self.get_json("/fs/canonicalize", &[("path", path.to_string())])?;
468 Ok(resp.path)
469 }
470}
471
472pub struct RemoteFsProvider {
473 inner: Arc<RemoteInner>,
474}
475
476impl RemoteFsProvider {
477 pub fn new(config: RemoteFsConfig) -> io::Result<Self> {
478 Ok(Self {
479 inner: Arc::new(RemoteInner::new(config)?),
480 })
481 }
482
483 pub fn data_manifest_descriptor(
485 &self,
486 request: &DataManifestRequest,
487 ) -> io::Result<DataManifestDescriptor> {
488 let mut query = vec![("path", request.path.clone())];
489 if let Some(version) = &request.version {
490 query.push(("version", version.clone()));
491 }
492 self.inner.get_json("/data/manifest", &query)
493 }
494
495 pub fn data_chunk_upload_targets(
497 &self,
498 request: &DataChunkUploadRequest,
499 ) -> io::Result<Vec<DataChunkUploadTarget>> {
500 #[derive(Deserialize)]
501 struct DataChunkUploadTargetsResponse {
502 targets: Vec<DataChunkUploadTarget>,
503 }
504 let response: DataChunkUploadTargetsResponse = self
505 .inner
506 .post_json("/data/chunks/upload-targets", request)?;
507 Ok(response.targets)
508 }
509
510 fn normalize(&self, path: &Path) -> String {
511 let mut normalized = PathBuf::new();
512 normalized.push("/");
513 normalized.push(path);
514 normalized.to_string_lossy().replace('\\', "/").to_string()
515 }
516
517 fn ensure_parent_exists(&self, path: &Path) -> io::Result<()> {
518 if let Some(parent) = path.parent() {
519 self.inner.post_empty(
520 "/fs/mkdir",
521 &CreateDirRequest {
522 path: self.normalize(parent),
523 recursive: true,
524 },
525 )?;
526 }
527 Ok(())
528 }
529
530 fn download_raw_file(&self, path: &str, len: u64) -> io::Result<Vec<u8>> {
531 if len == 0 {
532 return Ok(Vec::new());
533 }
534 if len >= self.inner.direct_read_threshold_bytes {
535 let url = self.inner.fetch_download_url(path)?.download_url;
536 return self.download_entire_file_from_url(&url, len);
537 }
538 let chunk = self.inner.chunk_bytes as u64;
539 let mut tasks = Vec::new();
540 let mut offset = 0;
541 let mut index = 0;
542 while offset < len {
543 let remaining = len - offset;
544 let length = std::cmp::min(chunk, remaining);
545 tasks.push(ChunkTask {
546 offset,
547 length: length as usize,
548 index,
549 });
550 offset += length;
551 index += 1;
552 }
553 let mut buffer = vec![0u8; len as usize];
554 let path = path.to_string();
555 let inner = self.inner.clone();
556 let queue = Arc::new(Mutex::new(VecDeque::from(tasks.clone())));
557 let results = Arc::new(Mutex::new(vec![None::<Vec<u8>>; tasks.len()]));
558 let error: Arc<Mutex<Option<io::Error>>> = Arc::new(Mutex::new(None));
559 let threads = inner
560 .parallel_requests
561 .min(queue.lock().unwrap().len())
562 .max(1);
563 thread::scope(|scope| {
564 for _ in 0..threads {
565 let queue = queue.clone();
566 let error = error.clone();
567 let results = results.clone();
568 let inner = inner.clone();
569 let path = path.clone();
570 scope.spawn(move |_| loop {
571 let task_opt = {
572 let mut guard = queue.lock().unwrap();
573 guard.pop_front()
574 };
575 let task = match task_opt {
576 Some(task) => task,
577 None => break,
578 };
579 match inner.download_chunk(&path, task.offset, task.length) {
580 Ok(bytes) => {
581 let mut guard = results.lock().unwrap();
582 guard[task.index] = Some(bytes);
583 }
584 Err(err) => {
585 let mut guard = error.lock().unwrap();
586 if guard.is_none() {
587 *guard = Some(err);
588 }
589 break;
590 }
591 }
592 });
593 }
594 })
595 .expect("remote download scope");
596 if let Some(err) = error.lock().unwrap().take() {
597 return Err(err);
598 }
599 let chunks = Arc::try_unwrap(results)
600 .expect("results still in use")
601 .into_inner()
602 .expect("results poisoned");
603 for (task, maybe) in tasks.iter().zip(chunks.into_iter()) {
604 let bytes = maybe.expect("missing downloaded chunk for remote file");
605 let start = task.offset as usize;
606 buffer[start..start + bytes.len()].copy_from_slice(&bytes);
607 }
608 Ok(buffer)
609 }
610
611 fn download_sharded_file(&self, path: &str, len: u64) -> io::Result<Vec<u8>> {
612 let manifest_bytes = self.download_raw_file(path, len)?;
613 let manifest: ShardManifest = serde_json::from_slice(&manifest_bytes)
614 .map_err(|_| io::Error::new(ErrorKind::InvalidData, "invalid shard manifest"))?;
615 if manifest.version != 1 {
616 return Err(io::Error::new(
617 ErrorKind::InvalidData,
618 "unsupported manifest",
619 ));
620 }
621 let mut buffer = Vec::with_capacity(manifest.total_size as usize);
622 for shard in manifest.shards {
623 let meta = self.inner.fetch_metadata(&shard.path)?;
624 let bytes = self.download_raw_file(&shard.path, meta.len)?;
625 buffer.extend_from_slice(&bytes);
626 }
627 Ok(buffer)
628 }
629
630 fn download_entire_file_from_url(&self, url: &str, len: u64) -> io::Result<Vec<u8>> {
631 let chunk = self.inner.chunk_bytes as u64;
632 let mut tasks = Vec::new();
633 let mut offset = 0;
634 let mut index = 0;
635 while offset < len {
636 let remaining = len - offset;
637 let length = std::cmp::min(chunk, remaining);
638 tasks.push(ChunkTask {
639 offset,
640 length: length as usize,
641 index,
642 });
643 offset += length;
644 index += 1;
645 }
646 let mut buffer = vec![0u8; len as usize];
647 let url = url.to_string();
648 let inner = self.inner.clone();
649 let queue = Arc::new(Mutex::new(VecDeque::from(tasks.clone())));
650 let results = Arc::new(Mutex::new(vec![None::<Vec<u8>>; tasks.len()]));
651 let error: Arc<Mutex<Option<io::Error>>> = Arc::new(Mutex::new(None));
652 let threads = inner
653 .parallel_requests
654 .min(queue.lock().unwrap().len())
655 .max(1);
656 thread::scope(|scope| {
657 for _ in 0..threads {
658 let queue = queue.clone();
659 let error = error.clone();
660 let results = results.clone();
661 let inner = inner.clone();
662 let url = url.clone();
663 scope.spawn(move |_| loop {
664 let task_opt = {
665 let mut guard = queue.lock().unwrap();
666 guard.pop_front()
667 };
668 let task = match task_opt {
669 Some(task) => task,
670 None => break,
671 };
672 match inner.download_range_from_url(&url, task.offset, task.length as u64) {
673 Ok(bytes) => {
674 let mut guard = results.lock().unwrap();
675 guard[task.index] = Some(bytes);
676 }
677 Err(err) => {
678 let mut guard = error.lock().unwrap();
679 if guard.is_none() {
680 *guard = Some(err);
681 }
682 break;
683 }
684 }
685 });
686 }
687 })
688 .expect("remote download scope");
689 if let Some(err) = error.lock().unwrap().take() {
690 return Err(err);
691 }
692 let chunks = Arc::try_unwrap(results)
693 .expect("results still in use")
694 .into_inner()
695 .expect("results poisoned");
696 for (task, maybe) in tasks.iter().zip(chunks.into_iter()) {
697 let bytes = maybe.expect("missing downloaded chunk for remote file");
698 let start = task.offset as usize;
699 buffer[start..start + bytes.len()].copy_from_slice(&bytes);
700 }
701 Ok(buffer)
702 }
703
704 fn upload_entire_file(&self, path: &str, data: &[u8]) -> io::Result<()> {
705 if data.len() as u64 >= self.inner.shard_threshold_bytes {
706 return self.upload_sharded_file(path, data);
707 }
708 self.upload_unsharded_file(path, data, None)
709 }
710
711 fn upload_unsharded_file(&self, path: &str, data: &[u8], hash: Option<&str>) -> io::Result<()> {
712 if data.len() as u64 >= self.inner.direct_write_threshold_bytes {
713 return self.upload_multipart_file(path, data);
714 }
715 if data.is_empty() {
716 self.inner.upload_chunk(path, 0, true, true, data, hash)?;
717 return Ok(());
718 }
719 let chunk = self.inner.chunk_bytes;
720 let mut offset = 0usize;
721 while offset < data.len() {
722 let end = std::cmp::min(offset + chunk, data.len());
723 let slice = &data[offset..end];
724 let truncate = offset == 0;
725 let final_chunk = end == data.len();
726 let result =
727 self.inner
728 .upload_chunk(path, offset as u64, truncate, final_chunk, slice, hash)?;
729 if let Some(session) = result {
730 let expected = offset as u64 + slice.len() as u64;
731 if session.next_offset as u64 != expected {
732 return Err(io::Error::other("unexpected next offset"));
733 }
734 }
735 offset = end;
736 }
737 Ok(())
738 }
739
740 fn upload_sharded_file(&self, path: &str, data: &[u8]) -> io::Result<()> {
741 let shard_size = self.inner.shard_size_bytes as usize;
742 let mut shards = Vec::new();
743 let mut offset = 0usize;
744 while offset < data.len() {
745 let end = std::cmp::min(offset + shard_size, data.len());
746 let slice = &data[offset..end];
747 let shard_path = format!("{}/{}", SHARD_PREFIX, Uuid::new_v4());
748 self.upload_unsharded_file(&shard_path, slice, None)?;
749 shards.push(ShardEntry {
750 path: shard_path,
751 size: slice.len() as u64,
752 });
753 offset = end;
754 }
755 let manifest = ShardManifest {
756 version: 1,
757 total_size: data.len() as u64,
758 shard_size: self.inner.shard_size_bytes,
759 shards,
760 };
761 let bytes = serde_json::to_vec(&manifest)
762 .map_err(|_| io::Error::new(ErrorKind::InvalidData, "invalid manifest"))?;
763 self.upload_unsharded_file(path, &bytes, Some(MANIFEST_HASH))
764 }
765
766 fn upload_multipart_file(&self, path: &str, data: &[u8]) -> io::Result<()> {
767 if data.is_empty() {
768 self.inner.upload_chunk(path, 0, true, true, data, None)?;
769 return Ok(());
770 }
771 let content_sha256 = sha256_hex(data);
772 let session =
773 match self
774 .inner
775 .upload_session_start(path, data.len() as u64, &content_sha256)
776 {
777 Ok(session) => session,
778 Err(err) if err.kind() == ErrorKind::NotFound => {
779 return self.upload_multipart_file_legacy(path, data);
780 }
781 Err(err) => return Err(err),
782 };
783 let chunk_size = (session.chunk_size_bytes as usize).max(1);
784 let mut tasks = Vec::new();
785 let mut offset = 0usize;
786 let mut index = 0usize;
787 while offset < data.len() {
788 let end = std::cmp::min(offset + chunk_size, data.len());
789 let slice = &data[offset..end];
790 tasks.push(UploadTask {
791 index,
792 offset,
793 length: end - offset,
794 chunk_sha256: sha256_hex(slice),
795 });
796 offset = end;
797 index += 1;
798 }
799 let descriptors: Vec<UploadSessionChunkDescriptor> = tasks
800 .iter()
801 .map(|task| UploadSessionChunkDescriptor {
802 chunk_index: task.index,
803 offset_bytes: task.offset as i64,
804 size_bytes: task.length as i64,
805 chunk_sha256: task.chunk_sha256.clone(),
806 })
807 .collect();
808 let chunk_response = self.inner.upload_session_chunks(
809 &session.session_id,
810 &session.blob_key,
811 &descriptors,
812 )?;
813 let targets = Arc::new(chunk_response.targets);
814 let tasks = Arc::new(Mutex::new(VecDeque::from(tasks)));
815 let error = Arc::new(Mutex::new(None));
816 let data = Arc::new(data.to_vec());
817 thread::scope(|scope| {
818 for _ in 0..self.inner.parallel_requests {
819 let tasks = Arc::clone(&tasks);
820 let targets = Arc::clone(&targets);
821 let error = Arc::clone(&error);
822 let inner = Arc::clone(&self.inner);
823 let data = Arc::clone(&data);
824 scope.spawn(move |_| loop {
825 let task = {
826 let mut guard = tasks.lock().unwrap();
827 guard.pop_front()
828 };
829 let Some(task) = task else { break };
830 let target = targets
831 .iter()
832 .find(|target| target.chunk_index == task.index)
833 .cloned();
834 let result = (|| {
835 let target = target.ok_or_else(|| {
836 io::Error::other(format!(
837 "missing upload target for chunk {}",
838 task.index
839 ))
840 })?;
841 let slice = &data[task.offset..task.offset + task.length];
842 inner.put_upload_target(
843 &target.method,
844 &target.upload_url,
845 &target.headers,
846 slice,
847 )
848 })();
849 if let Err(err) = result {
850 let mut err_guard = error.lock().unwrap();
851 if err_guard.is_none() {
852 *err_guard = Some(io::Error::new(err.kind(), err.to_string()));
853 }
854 break;
855 }
856 });
857 }
858 })
859 .expect("upload session scope");
860 if let Some(err) = error.lock().unwrap().take() {
861 return Err(err);
862 }
863 self.inner.upload_session_complete(
864 path,
865 &session.session_id,
866 &session.blob_key,
867 data.len() as u64,
868 &content_sha256,
869 descriptors.len(),
870 )?;
871 Ok(())
872 }
873
874 fn upload_multipart_file_legacy(&self, path: &str, data: &[u8]) -> io::Result<()> {
875 let session = self.inner.multipart_create(path, data.len() as u64)?;
876 let part_size = session.part_size_bytes as usize;
877 let mut tasks = std::collections::VecDeque::new();
878 let mut offset = 0usize;
879 let mut part_number = 1;
880 let mut index = 0usize;
881 while offset < data.len() {
882 let end = std::cmp::min(offset + part_size, data.len());
883 let length = end - offset;
884 tasks.push_back(MultipartTask {
885 index,
886 part_number,
887 offset,
888 length,
889 });
890 offset = end;
891 part_number += 1;
892 index += 1;
893 }
894 let tasks = Arc::new(Mutex::new(tasks));
895 let task_len = tasks.lock().unwrap().len();
896 let mut result_vec: Vec<MultipartResult> = Vec::with_capacity(task_len);
897 for _ in 0..task_len {
898 result_vec.push(None);
899 }
900 let results = Arc::new(Mutex::new(result_vec));
901 let error = Arc::new(Mutex::new(None));
902 let data = Arc::new(data.to_vec());
903 thread::scope(|scope| {
904 for _ in 0..self.inner.parallel_requests {
905 let tasks = Arc::clone(&tasks);
906 let results = Arc::clone(&results);
907 let error = Arc::clone(&error);
908 let inner = Arc::clone(&self.inner);
909 let blob_key = session.blob_key.clone();
910 let upload_id = session.upload_id.clone();
911 let session_id = session.session_id.clone();
912 let data = Arc::clone(&data);
913 scope.spawn(move |_| loop {
914 let task = {
915 let mut guard = tasks.lock().unwrap();
916 guard.pop_front()
917 };
918 let Some(task) = task else { break };
919 let slice = &data[task.offset..task.offset + task.length];
920 let result = (|| {
921 let url = inner.multipart_presign_part(
922 &session_id,
923 &blob_key,
924 &upload_id,
925 task.part_number,
926 task.length as u64,
927 )?;
928 let etag = inner.put_upload_url_with_etag(&url, slice)?;
929 Ok(MultipartPart {
930 part_number: task.part_number,
931 etag,
932 })
933 })();
934 let mut guard = results.lock().unwrap();
935 guard[task.index] = Some(result);
936 if let Some(Err(err)) = guard[task.index].as_ref() {
937 let mut err_guard = error.lock().unwrap();
938 if err_guard.is_none() {
939 *err_guard = Some(io::Error::new(err.kind(), err.to_string()));
940 }
941 break;
942 }
943 });
944 }
945 })
946 .expect("multipart upload scope");
947 if let Some(err) = error.lock().unwrap().take() {
948 return Err(err);
949 }
950 let mut parts = Vec::with_capacity(results.lock().unwrap().len());
951 for maybe in Arc::try_unwrap(results)
952 .expect("results still in use")
953 .into_inner()
954 .expect("results poisoned")
955 {
956 let part = maybe.expect("missing part result")?;
957 if part.etag.is_empty() {
958 return Err(io::Error::other("missing etag"));
959 }
960 parts.push(part);
961 }
962 parts.sort_by_key(|part| part.part_number);
963 self.inner.multipart_complete(
964 path,
965 &session.session_id,
966 &session.blob_key,
967 &session.upload_id,
968 data.len() as u64,
969 parts,
970 )?;
971 Ok(())
972 }
973}
974
975#[derive(Clone, Copy)]
976struct ChunkTask {
977 offset: u64,
978 length: usize,
979 index: usize,
980}
981
982type MultipartResult = Option<Result<MultipartPart, io::Error>>;
983
984#[derive(Clone, Copy)]
985struct MultipartTask {
986 index: usize,
987 part_number: i32,
988 offset: usize,
989 length: usize,
990}
991
992#[derive(Clone)]
993struct UploadTask {
994 index: usize,
995 offset: usize,
996 length: usize,
997 chunk_sha256: String,
998}
999
1000#[async_trait(?Send)]
1001impl FsProvider for RemoteFsProvider {
1002 fn open(&self, path: &Path, flags: &OpenFlags) -> io::Result<Box<dyn FileHandle>> {
1003 let normalized = self.normalize(path);
1004 let mut data = Vec::new();
1005 if flags.read {
1006 let meta = self.inner.fetch_metadata(&normalized)?;
1007 if meta.file_type != "file" {
1008 return Err(io::Error::other("remote path is not a file"));
1009 }
1010 data = if meta.hash.as_deref() == Some(MANIFEST_HASH) {
1011 self.download_sharded_file(&normalized, meta.len)?
1012 } else {
1013 self.download_raw_file(&normalized, meta.len)?
1014 };
1015 }
1016 if flags.truncate {
1017 data.clear();
1018 }
1019 if flags.create {
1020 self.ensure_parent_exists(path)?;
1021 }
1022 let handle = RemoteFileHandle {
1023 provider: self.clone(),
1024 path: normalized,
1025 data,
1026 position: 0,
1027 flags: flags.clone(),
1028 dirty: false,
1029 };
1030 Ok(Box::new(handle))
1031 }
1032
1033 async fn read(&self, path: &Path) -> io::Result<Vec<u8>> {
1034 let normalized = self.normalize(path);
1035 let meta = self.inner.fetch_metadata(&normalized)?;
1036 if meta.file_type != "file" {
1037 return Err(io::Error::other("remote path is not a file"));
1038 }
1039 if meta.hash.as_deref() == Some(MANIFEST_HASH) {
1040 self.download_sharded_file(&normalized, meta.len)
1041 } else {
1042 self.download_raw_file(&normalized, meta.len)
1043 }
1044 }
1045
1046 async fn write(&self, path: &Path, data: &[u8]) -> io::Result<()> {
1047 let normalized = self.normalize(path);
1048 self.ensure_parent_exists(path)?;
1049 self.upload_entire_file(&normalized, data)
1050 }
1051
1052 async fn remove_file(&self, path: &Path) -> io::Result<()> {
1053 let normalized = self.normalize(path);
1054 self.inner
1055 .delete_empty("/fs/file", &[("path", normalized)])?;
1056 Ok(())
1057 }
1058
1059 async fn metadata(&self, path: &Path) -> io::Result<FsMetadata> {
1060 let normalized = self.normalize(path);
1061 let resp = self.inner.fetch_metadata(&normalized)?;
1062 Ok(resp.into())
1063 }
1064
1065 async fn symlink_metadata(&self, path: &Path) -> io::Result<FsMetadata> {
1066 self.metadata(path).await
1067 }
1068
1069 async fn read_dir(&self, path: &Path) -> io::Result<Vec<DirEntry>> {
1070 let normalized = self.normalize(path);
1071 let resp = self.inner.fetch_dir(&normalized)?;
1072 Ok(resp
1073 .into_iter()
1074 .map(|entry| DirEntry {
1075 path: PathBuf::from(entry.path),
1076 file_name: entry.file_name.into(),
1077 file_type: entry.file_type.into(),
1078 })
1079 .collect())
1080 }
1081
1082 async fn canonicalize(&self, path: &Path) -> io::Result<PathBuf> {
1083 let normalized = self.normalize(path);
1084 let canonical = self.inner.canonicalize_path(&normalized)?;
1085 Ok(PathBuf::from(canonical))
1086 }
1087
1088 async fn create_dir(&self, path: &Path) -> io::Result<()> {
1089 let normalized = self.normalize(path);
1090 self.inner.post_empty(
1091 "/fs/mkdir",
1092 &CreateDirRequest {
1093 path: normalized,
1094 recursive: false,
1095 },
1096 )
1097 }
1098
1099 async fn create_dir_all(&self, path: &Path) -> io::Result<()> {
1100 let normalized = self.normalize(path);
1101 self.inner.post_empty(
1102 "/fs/mkdir",
1103 &CreateDirRequest {
1104 path: normalized,
1105 recursive: true,
1106 },
1107 )
1108 }
1109
1110 async fn remove_dir(&self, path: &Path) -> io::Result<()> {
1111 let normalized = self.normalize(path);
1112 self.inner.delete_empty(
1113 "/fs/dir",
1114 &[("path", normalized), ("recursive", "false".into())],
1115 )
1116 }
1117
1118 async fn remove_dir_all(&self, path: &Path) -> io::Result<()> {
1119 let normalized = self.normalize(path);
1120 self.inner.delete_empty(
1121 "/fs/dir",
1122 &[("path", normalized), ("recursive", "true".into())],
1123 )
1124 }
1125
1126 async fn rename(&self, from: &Path, to: &Path) -> io::Result<()> {
1127 self.inner.post_empty(
1128 "/fs/rename",
1129 &RenameRequest {
1130 from: self.normalize(from),
1131 to: self.normalize(to),
1132 },
1133 )
1134 }
1135
1136 async fn set_readonly(&self, path: &Path, readonly: bool) -> io::Result<()> {
1137 self.inner.post_empty(
1138 "/fs/set-readonly",
1139 &SetReadonlyRequest {
1140 path: self.normalize(path),
1141 readonly,
1142 },
1143 )
1144 }
1145
1146 async fn data_manifest_descriptor(
1147 &self,
1148 request: &DataManifestRequest,
1149 ) -> io::Result<DataManifestDescriptor> {
1150 let mut query = vec![("path", request.path.clone())];
1151 if let Some(version) = &request.version {
1152 query.push(("version", version.clone()));
1153 }
1154 self.inner.get_json("/data/manifest", &query)
1155 }
1156
1157 async fn data_chunk_upload_targets(
1158 &self,
1159 request: &DataChunkUploadRequest,
1160 ) -> io::Result<Vec<DataChunkUploadTarget>> {
1161 #[derive(Deserialize)]
1162 struct DataChunkUploadTargetsResponse {
1163 targets: Vec<DataChunkUploadTarget>,
1164 }
1165 let response: DataChunkUploadTargetsResponse = self
1166 .inner
1167 .post_json("/data/chunks/upload-targets", request)?;
1168 Ok(response.targets)
1169 }
1170
1171 async fn data_upload_chunk(
1172 &self,
1173 target: &DataChunkUploadTarget,
1174 data: &[u8],
1175 ) -> io::Result<()> {
1176 self.inner
1177 .put_upload_target(&target.method, &target.upload_url, &target.headers, data)
1178 }
1179}
1180
1181impl Clone for RemoteFsProvider {
1182 fn clone(&self) -> Self {
1183 Self {
1184 inner: Arc::clone(&self.inner),
1185 }
1186 }
1187}
1188
1189struct RemoteFileHandle {
1190 provider: RemoteFsProvider,
1191 path: String,
1192 data: Vec<u8>,
1193 position: usize,
1194 flags: OpenFlags,
1195 dirty: bool,
1196}
1197
1198impl RemoteFileHandle {
1199 fn flush_remote(&mut self) -> io::Result<()> {
1200 if !self.dirty {
1201 return Ok(());
1202 }
1203 self.provider.upload_entire_file(&self.path, &self.data)?;
1204 self.dirty = false;
1205 Ok(())
1206 }
1207}
1208
1209impl Read for RemoteFileHandle {
1210 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
1211 let remaining = &self.data[self.position..];
1212 let amt = remaining.len().min(buf.len());
1213 buf[..amt].copy_from_slice(&remaining[..amt]);
1214 self.position += amt;
1215 Ok(amt)
1216 }
1217}
1218
1219impl Write for RemoteFileHandle {
1220 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1221 if !self.flags.write && !self.flags.append {
1222 return Err(io::Error::new(
1223 ErrorKind::PermissionDenied,
1224 "remote file not opened for writing",
1225 ));
1226 }
1227 if self.flags.append {
1228 self.position = self.data.len();
1229 }
1230 let required = self.position + buf.len();
1231 if required > self.data.len() {
1232 self.data.resize(required, 0);
1233 }
1234 self.data[self.position..self.position + buf.len()].copy_from_slice(buf);
1235 self.position += buf.len();
1236 self.dirty = true;
1237 Ok(buf.len())
1238 }
1239
1240 fn flush(&mut self) -> io::Result<()> {
1241 self.flush_remote()
1242 }
1243}
1244
1245impl Seek for RemoteFileHandle {
1246 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
1247 let new_pos = match pos {
1248 SeekFrom::Start(offset) => offset as i64,
1249 SeekFrom::End(offset) => self.data.len() as i64 + offset,
1250 SeekFrom::Current(offset) => self.position as i64 + offset,
1251 };
1252 if new_pos < 0 {
1253 return Err(io::Error::new(ErrorKind::InvalidInput, "seek before start"));
1254 }
1255 self.position = new_pos as usize;
1256 Ok(self.position as u64)
1257 }
1258}
1259
1260impl Drop for RemoteFileHandle {
1261 fn drop(&mut self) {
1262 if self.dirty {
1263 if let Err(err) = self.provider.upload_entire_file(&self.path, &self.data) {
1264 eprintln!("remote fs flush failed: {err}");
1265 }
1266 }
1267 }
1268}
1269
1270#[derive(Debug, Deserialize)]
1271struct MetadataResponse {
1272 #[serde(rename = "fileType")]
1273 file_type: String,
1274 len: u64,
1275 #[serde(rename = "modifiedAt")]
1276 modified_at: Option<String>,
1277 readonly: bool,
1278 hash: Option<String>,
1279}
1280
1281impl From<MetadataResponse> for FsMetadata {
1282 fn from(value: MetadataResponse) -> Self {
1283 FsMetadata::new_with_hash(
1284 value.file_type.into(),
1285 value.len,
1286 parse_modified_at(value.modified_at.as_deref()),
1287 value.readonly,
1288 value.hash,
1289 )
1290 }
1291}
1292
1293#[derive(Debug, Deserialize)]
1294struct DirEntryResponse {
1295 path: String,
1296 #[serde(rename = "fileName")]
1297 file_name: String,
1298 #[serde(rename = "fileType")]
1299 file_type: String,
1300}
1301
1302impl From<String> for FsFileType {
1303 fn from(value: String) -> Self {
1304 match value.as_str() {
1305 "dir" => FsFileType::Directory,
1306 "file" => FsFileType::File,
1307 "symlink" => FsFileType::Symlink,
1308 "other" => FsFileType::Other,
1309 _ => FsFileType::Unknown,
1310 }
1311 }
1312}
1313
1314#[derive(Debug, Serialize, Deserialize)]
1315struct CreateDirRequest {
1316 path: String,
1317 recursive: bool,
1318}
1319
1320#[derive(Debug, Serialize, Deserialize)]
1321struct RenameRequest {
1322 from: String,
1323 to: String,
1324}
1325
1326#[derive(Debug, Serialize, Deserialize)]
1327struct SetReadonlyRequest {
1328 path: String,
1329 readonly: bool,
1330}
1331
1332#[derive(Debug, Deserialize)]
1333struct CanonicalizeResponse {
1334 path: String,
1335}
1336
1337#[derive(Debug, Deserialize)]
1338struct DownloadUrlResponse {
1339 #[serde(rename = "downloadUrl")]
1340 download_url: String,
1341}
1342
1343#[derive(Debug, Serialize, Deserialize)]
1344#[serde(rename_all = "camelCase")]
1345struct UploadSessionStartRequest {
1346 path: String,
1347 size_bytes: i64,
1348 content_type: Option<String>,
1349 content_sha256: String,
1350}
1351
1352#[derive(Debug, Serialize, Deserialize)]
1353#[serde(rename_all = "camelCase")]
1354struct UploadSessionStartResponse {
1355 session_id: String,
1356 blob_key: String,
1357 chunk_size_bytes: i64,
1358}
1359
1360#[derive(Debug, Clone, Serialize, Deserialize)]
1361#[serde(rename_all = "camelCase")]
1362struct UploadSessionChunkDescriptor {
1363 chunk_index: usize,
1364 offset_bytes: i64,
1365 size_bytes: i64,
1366 chunk_sha256: String,
1367}
1368
1369#[derive(Debug, Serialize, Deserialize)]
1370#[serde(rename_all = "camelCase")]
1371struct UploadSessionChunksRequest {
1372 session_id: String,
1373 blob_key: String,
1374 chunks: Vec<UploadSessionChunkDescriptor>,
1375}
1376
1377#[derive(Debug, Serialize, Deserialize)]
1378#[serde(rename_all = "camelCase")]
1379struct UploadSessionChunksResponse {
1380 targets: Vec<UploadChunkTarget>,
1381}
1382
1383#[derive(Debug, Clone, Serialize, Deserialize)]
1384#[serde(rename_all = "camelCase")]
1385struct UploadChunkTarget {
1386 chunk_index: usize,
1387 method: String,
1388 upload_url: String,
1389 headers: HashMap<String, String>,
1390}
1391
1392#[derive(Debug, Serialize, Deserialize)]
1393#[serde(rename_all = "camelCase")]
1394struct UploadSessionCompleteRequest {
1395 path: String,
1396 session_id: String,
1397 blob_key: String,
1398 size_bytes: i64,
1399 content_sha256: String,
1400 chunk_count: usize,
1401 hash: Option<String>,
1402}
1403
1404#[derive(Debug, Serialize, Deserialize)]
1405struct MultipartUploadRequest {
1406 path: String,
1407 #[serde(rename = "sizeBytes")]
1408 size_bytes: i64,
1409 #[serde(rename = "contentType")]
1410 content_type: Option<String>,
1411}
1412
1413#[derive(Debug, Deserialize)]
1414struct MultipartUploadResponse {
1415 #[serde(rename = "sessionId")]
1416 session_id: String,
1417 #[serde(rename = "blobKey")]
1418 blob_key: String,
1419 #[serde(rename = "uploadId")]
1420 upload_id: String,
1421 #[serde(rename = "partSizeBytes")]
1422 part_size_bytes: i64,
1423}
1424
1425#[derive(Debug, Serialize, Deserialize)]
1426struct MultipartUploadPartRequest {
1427 #[serde(rename = "sessionId")]
1428 session_id: String,
1429 #[serde(rename = "blobKey")]
1430 blob_key: String,
1431 #[serde(rename = "uploadId")]
1432 upload_id: String,
1433 #[serde(rename = "partNumber")]
1434 part_number: i32,
1435 #[serde(rename = "sizeBytes")]
1436 size_bytes: i64,
1437}
1438
1439#[derive(Debug, Deserialize)]
1440struct MultipartUploadPartResponse {
1441 #[serde(rename = "upload_url")]
1442 upload_url: String,
1443}
1444
1445#[derive(Debug, Serialize, Deserialize)]
1446struct MultipartUploadCompleteRequest {
1447 path: String,
1448 #[serde(rename = "sessionId")]
1449 session_id: String,
1450 #[serde(rename = "blobKey")]
1451 blob_key: String,
1452 #[serde(rename = "uploadId")]
1453 upload_id: String,
1454 #[serde(rename = "sizeBytes")]
1455 size_bytes: i64,
1456 hash: Option<String>,
1457 parts: Vec<MultipartPart>,
1458}
1459
1460#[derive(Debug, Serialize, Deserialize)]
1461struct MultipartPart {
1462 #[serde(rename = "partNumber")]
1463 part_number: i32,
1464 etag: String,
1465}
1466
1467#[derive(Debug, Serialize, Deserialize)]
1468struct ShardManifest {
1469 version: u32,
1470 total_size: u64,
1471 shard_size: u64,
1472 shards: Vec<ShardEntry>,
1473}
1474
1475#[derive(Debug, Serialize, Deserialize)]
1476struct ShardEntry {
1477 path: String,
1478 size: u64,
1479}
1480
1481#[derive(Debug, Deserialize)]
1482struct FsWriteSessionResponse {
1483 #[serde(rename = "sessionId")]
1484 _session_id: String,
1485 #[serde(rename = "nextOffset")]
1486 next_offset: i64,
1487}
1488
1489fn sha256_hex(data: &[u8]) -> String {
1490 let digest = Sha256::digest(data);
1491 let mut out = String::with_capacity(digest.len() * 2);
1492 for byte in digest {
1493 use std::fmt::Write as _;
1494 let _ = write!(out, "{byte:02x}");
1495 }
1496 out
1497}
1498
1499fn map_http_err(err: reqwest::Error) -> io::Error {
1500 io::Error::other(err)
1501}
1502
1503fn parse_modified_at(value: Option<&str>) -> Option<std::time::SystemTime> {
1504 let value = value?;
1505 let parsed = DateTime::parse_from_rfc3339(value).ok()?;
1506 let millis = parsed.timestamp_millis();
1507 if millis < 0 {
1508 return None;
1509 }
1510 Some(std::time::UNIX_EPOCH + Duration::from_millis(millis as u64))
1511}
1512
1513fn map_url_err(err: url::ParseError) -> io::Error {
1514 io::Error::new(ErrorKind::InvalidInput, err)
1515}
1516
1517fn handle_error(resp: Response) -> io::Result<Response> {
1518 let status = resp.status();
1519 if status.is_success() {
1520 return Ok(resp);
1521 }
1522 let text = resp.text().unwrap_or_else(|_| status.to_string());
1523 let kind = match status {
1524 StatusCode::NOT_FOUND => ErrorKind::NotFound,
1525 StatusCode::FORBIDDEN | StatusCode::UNAUTHORIZED => ErrorKind::PermissionDenied,
1526 StatusCode::CONFLICT => ErrorKind::AlreadyExists,
1527 StatusCode::BAD_REQUEST => ErrorKind::InvalidInput,
1528 _ => ErrorKind::Other,
1529 };
1530 Err(io::Error::new(kind, text))
1531}
1532
1533impl fmt::Debug for RemoteFsProvider {
1534 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1535 f.debug_struct("RemoteFsProvider").finish_non_exhaustive()
1536 }
1537}
1538
1539#[cfg(test)]
1540mod tests {
1541 use super::*;
1542 use crate::data_contract::DataChunkDescriptor;
1543 use axum::extract::{Query, State};
1544 use axum::http::{HeaderMap, StatusCode};
1545 use axum::routing::{delete, get, post, put};
1546 use axum::{Json, Router};
1547 use futures::executor;
1548 use serde::Deserialize;
1549 use std::collections::HashMap;
1550 use std::net::TcpListener as StdTcpListener;
1551 use std::sync::Arc;
1552 use tempfile::tempdir;
1553 use tokio::net::TcpListener as TokioTcpListener;
1554 use tokio::runtime::Runtime;
1555
1556 #[derive(Clone)]
1557 struct Harness {
1558 root: Arc<PathBuf>,
1559 _keeper: Arc<tempfile::TempDir>,
1560 base_url: Arc<Mutex<Option<String>>>,
1561 upload_sessions: Arc<Mutex<HashMap<String, UploadSessionTestState>>>,
1562 omit_last_chunk_target: bool,
1563 }
1564
1565 impl Harness {
1566 fn new() -> Self {
1567 Self::with_omit_last_chunk_target(false)
1568 }
1569
1570 fn with_omit_last_chunk_target(omit_last_chunk_target: bool) -> Self {
1571 let dir = tempdir().expect("tempdir");
1572 let path = dir.path().to_path_buf();
1573 Self {
1574 root: Arc::new(path),
1575 _keeper: Arc::new(dir),
1576 base_url: Arc::new(Mutex::new(None)),
1577 upload_sessions: Arc::new(Mutex::new(HashMap::new())),
1578 omit_last_chunk_target,
1579 }
1580 }
1581
1582 fn resolve(&self, remote_path: &str) -> PathBuf {
1583 let trimmed = remote_path.trim_start_matches('/');
1584 self.root.join(trimmed)
1585 }
1586 }
1587
1588 #[derive(Clone)]
1589 struct UploadSessionTestState {
1590 path: String,
1591 chunk_count: usize,
1592 }
1593
1594 #[derive(Deserialize)]
1595 struct PathParams {
1596 path: String,
1597 offset: Option<u64>,
1598 length: Option<usize>,
1599 truncate: Option<String>,
1600 recursive: Option<String>,
1601 }
1602
1603 #[derive(Deserialize)]
1604 struct UploadChunkQuery {
1605 #[serde(rename = "sessionId")]
1606 session_id: String,
1607 #[serde(rename = "chunkIndex")]
1608 chunk_index: usize,
1609 }
1610
1611 #[derive(Deserialize)]
1612 struct DataManifestQuery {
1613 path: String,
1614 version: Option<String>,
1615 }
1616
1617 async fn metadata_handler(
1618 State(harness): State<Harness>,
1619 Query(params): Query<PathParams>,
1620 ) -> Result<Json<serde_json::Value>, StatusCode> {
1621 let meta =
1622 std::fs::metadata(harness.resolve(¶ms.path)).map_err(|_| StatusCode::NOT_FOUND)?;
1623 let file_type = if meta.is_dir() {
1624 "dir"
1625 } else if meta.is_file() {
1626 "file"
1627 } else {
1628 "other"
1629 };
1630 Ok(Json(serde_json::json!({
1631 "fileType": file_type,
1632 "len": meta.len(),
1633 "modified": meta.modified().ok().and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok()).map(|d| d.as_secs()),
1634 "readonly": meta.permissions().readonly()
1635 })))
1636 }
1637
1638 async fn dir_handler(
1639 State(harness): State<Harness>,
1640 Query(params): Query<PathParams>,
1641 ) -> Result<Json<Vec<serde_json::Value>>, StatusCode> {
1642 let mut entries = Vec::new();
1643 for entry in
1644 std::fs::read_dir(harness.resolve(¶ms.path)).map_err(|_| StatusCode::NOT_FOUND)?
1645 {
1646 let entry = entry.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1647 let file_type = entry
1648 .file_type()
1649 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1650 let kind = if file_type.is_dir() {
1651 "dir"
1652 } else if file_type.is_file() {
1653 "file"
1654 } else {
1655 "other"
1656 };
1657 entries.push(serde_json::json!({
1658 "path": format!("/{}", entry.path().strip_prefix(&*harness.root).unwrap().display()),
1659 "fileName": entry.file_name().to_string_lossy(),
1660 "fileType": kind
1661 }));
1662 }
1663 Ok(Json(entries))
1664 }
1665
1666 async fn canonicalize_handler(
1667 State(harness): State<Harness>,
1668 Query(params): Query<PathParams>,
1669 ) -> Result<Json<serde_json::Value>, StatusCode> {
1670 let path = harness.resolve(¶ms.path);
1671 let canonical = std::fs::canonicalize(path).map_err(|_| StatusCode::NOT_FOUND)?;
1672 let rel = canonical.strip_prefix(&*harness.root).unwrap_or(&canonical);
1673 Ok(Json(serde_json::json!({
1674 "path": format!("/{}", rel.display())
1675 })))
1676 }
1677
1678 async fn read_handler(
1679 State(harness): State<Harness>,
1680 Query(params): Query<PathParams>,
1681 ) -> Result<Vec<u8>, StatusCode> {
1682 let mut data =
1683 std::fs::read(harness.resolve(¶ms.path)).map_err(|_| StatusCode::NOT_FOUND)?;
1684 let offset = params.offset.unwrap_or(0) as usize;
1685 let length = params.length.unwrap_or(data.len().saturating_sub(offset));
1686 let end = std::cmp::min(offset + length, data.len());
1687 if offset < data.len() {
1688 data = data[offset..end].to_vec();
1689 } else {
1690 data.clear();
1691 }
1692 Ok(data)
1693 }
1694
1695 async fn write_handler(
1696 State(harness): State<Harness>,
1697 Query(params): Query<PathParams>,
1698 body: axum::body::Bytes,
1699 ) -> Result<(), StatusCode> {
1700 let path = harness.resolve(¶ms.path);
1701 if let Some(parent) = path.parent() {
1702 std::fs::create_dir_all(parent).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1703 }
1704 let mut data = if params.truncate.as_deref() == Some("true") || !path.exists() {
1705 Vec::new()
1706 } else {
1707 std::fs::read(&path).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
1708 };
1709 let offset = params.offset.unwrap_or(0) as usize;
1710 let required = offset + body.len();
1711 if required > data.len() {
1712 data.resize(required, 0);
1713 }
1714 data[offset..offset + body.len()].copy_from_slice(&body);
1715 std::fs::write(path, data).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1716 Ok(())
1717 }
1718
1719 async fn upload_session_start_handler(
1720 State(harness): State<Harness>,
1721 Json(req): Json<UploadSessionStartRequest>,
1722 ) -> Result<Json<UploadSessionStartResponse>, StatusCode> {
1723 let session_id = Uuid::new_v4().to_string();
1724 let chunk_size_bytes = 1024i64;
1725 harness
1726 .upload_sessions
1727 .lock()
1728 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
1729 .insert(
1730 session_id.clone(),
1731 UploadSessionTestState {
1732 path: req.path,
1733 chunk_count: 0,
1734 },
1735 );
1736 Ok(Json(UploadSessionStartResponse {
1737 session_id: session_id.clone(),
1738 blob_key: format!("test/blob/{session_id}"),
1739 chunk_size_bytes,
1740 }))
1741 }
1742
1743 async fn upload_session_chunks_handler(
1744 State(harness): State<Harness>,
1745 Json(req): Json<UploadSessionChunksRequest>,
1746 ) -> Result<Json<UploadSessionChunksResponse>, StatusCode> {
1747 let base = harness
1748 .base_url
1749 .lock()
1750 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
1751 .clone()
1752 .ok_or(StatusCode::INTERNAL_SERVER_ERROR)?;
1753 let mut sessions = harness
1754 .upload_sessions
1755 .lock()
1756 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1757 let state = sessions
1758 .get_mut(&req.session_id)
1759 .ok_or(StatusCode::NOT_FOUND)?;
1760 state.chunk_count = req.chunks.len();
1761 let mut targets = req
1762 .chunks
1763 .iter()
1764 .map(|chunk| UploadChunkTarget {
1765 chunk_index: chunk.chunk_index,
1766 method: "PUT".to_string(),
1767 upload_url: format!(
1768 "{base}/upload-chunk?sessionId={}&chunkIndex={}",
1769 req.session_id, chunk.chunk_index
1770 ),
1771 headers: HashMap::new(),
1772 })
1773 .collect::<Vec<_>>();
1774 if harness.omit_last_chunk_target && !targets.is_empty() {
1775 targets.pop();
1776 }
1777 Ok(Json(UploadSessionChunksResponse { targets }))
1778 }
1779
1780 async fn upload_chunk_handler(
1781 State(harness): State<Harness>,
1782 Query(query): Query<UploadChunkQuery>,
1783 body: axum::body::Bytes,
1784 ) -> Result<(), StatusCode> {
1785 let chunk_path = harness.resolve(&format!(
1786 "/.upload-sessions/{}/{}",
1787 query.session_id, query.chunk_index
1788 ));
1789 if let Some(parent) = chunk_path.parent() {
1790 std::fs::create_dir_all(parent).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1791 }
1792 std::fs::write(chunk_path, body).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1793 Ok(())
1794 }
1795
1796 async fn data_manifest_handler(
1797 Query(query): Query<DataManifestQuery>,
1798 ) -> Result<Json<DataManifestDescriptor>, StatusCode> {
1799 let updated_at = if query.version.as_deref() == Some("v1") {
1800 "2026-01-01T00:00:00Z"
1801 } else {
1802 "2026-02-02T00:00:00Z"
1803 };
1804 Ok(Json(DataManifestDescriptor {
1805 schema_version: 1,
1806 format: "runmat-data".to_string(),
1807 dataset_id: format!("dataset:{}", query.path),
1808 updated_at: updated_at.to_string(),
1809 txn_sequence: 9,
1810 }))
1811 }
1812
1813 async fn data_chunk_upload_targets_handler(
1814 Json(req): Json<DataChunkUploadRequest>,
1815 ) -> Result<Json<serde_json::Value>, StatusCode> {
1816 let targets = req
1817 .chunks
1818 .iter()
1819 .map(|chunk| {
1820 serde_json::json!({
1821 "key": chunk.key,
1822 "method": "PUT",
1823 "upload_url": format!("https://uploads.example.test/{}/{}", req.array, chunk.object_id),
1824 "headers": {
1825 "x-runmat-hash": chunk.hash,
1826 "x-runmat-object": chunk.object_id,
1827 }
1828 })
1829 })
1830 .collect::<Vec<_>>();
1831 Ok(Json(serde_json::json!({ "targets": targets })))
1832 }
1833
1834 async fn upload_session_complete_handler(
1835 State(harness): State<Harness>,
1836 Json(req): Json<UploadSessionCompleteRequest>,
1837 ) -> Result<(), StatusCode> {
1838 let state = harness
1839 .upload_sessions
1840 .lock()
1841 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
1842 .remove(&req.session_id)
1843 .ok_or(StatusCode::NOT_FOUND)?;
1844 let mut data = Vec::new();
1845 for chunk_index in 0..state.chunk_count {
1846 let chunk_path = harness.resolve(&format!(
1847 "/.upload-sessions/{}/{}",
1848 req.session_id, chunk_index
1849 ));
1850 let chunk = std::fs::read(chunk_path).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1851 data.extend_from_slice(&chunk);
1852 }
1853 if sha256_hex(&data) != req.content_sha256 {
1854 return Err(StatusCode::BAD_REQUEST);
1855 }
1856 let target_path = harness.resolve(&state.path);
1857 if let Some(parent) = target_path.parent() {
1858 std::fs::create_dir_all(parent).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1859 }
1860 std::fs::write(target_path, data).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1861 Ok(())
1862 }
1863
1864 async fn mkdir_handler(
1865 State(harness): State<Harness>,
1866 Json(req): Json<CreateDirRequest>,
1867 ) -> Result<(), StatusCode> {
1868 let path = harness.resolve(&req.path);
1869 if req.recursive {
1870 std::fs::create_dir_all(path).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1871 } else {
1872 std::fs::create_dir(path).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1873 }
1874 Ok(())
1875 }
1876
1877 async fn delete_file_handler(
1878 State(harness): State<Harness>,
1879 Query(params): Query<PathParams>,
1880 ) -> Result<(), StatusCode> {
1881 std::fs::remove_file(harness.resolve(¶ms.path)).map_err(|_| StatusCode::NOT_FOUND)?;
1882 Ok(())
1883 }
1884
1885 async fn delete_dir_handler(
1886 State(harness): State<Harness>,
1887 Query(params): Query<PathParams>,
1888 ) -> Result<(), StatusCode> {
1889 let recursive = params.recursive.as_deref() == Some("true");
1890 if recursive {
1891 std::fs::remove_dir_all(harness.resolve(¶ms.path))
1892 .map_err(|_| StatusCode::NOT_FOUND)?;
1893 } else {
1894 std::fs::remove_dir(harness.resolve(¶ms.path))
1895 .map_err(|_| StatusCode::NOT_FOUND)?;
1896 }
1897 Ok(())
1898 }
1899
1900 async fn rename_handler(
1901 State(harness): State<Harness>,
1902 Json(req): Json<RenameRequest>,
1903 ) -> Result<(), StatusCode> {
1904 let from = harness.resolve(&req.from);
1905 let to = harness.resolve(&req.to);
1906 if let Some(parent) = to.parent() {
1907 std::fs::create_dir_all(parent).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1908 }
1909 std::fs::rename(from, to).map_err(|_| StatusCode::NOT_FOUND)?;
1910 Ok(())
1911 }
1912
1913 async fn set_readonly_handler(
1914 State(harness): State<Harness>,
1915 Json(req): Json<SetReadonlyRequest>,
1916 ) -> Result<(), StatusCode> {
1917 let path = harness.resolve(&req.path);
1918 let mut perms = std::fs::metadata(&path)
1919 .map_err(|_| StatusCode::NOT_FOUND)?
1920 .permissions();
1921 perms.set_readonly(req.readonly);
1922 std::fs::set_permissions(path, perms).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1923 Ok(())
1924 }
1925
1926 async fn read_with_download_handler(
1927 State(harness): State<Harness>,
1928 Query(params): Query<PathParams>,
1929 ) -> Result<Json<serde_json::Value>, StatusCode> {
1930 let base = harness
1931 .base_url
1932 .lock()
1933 .unwrap()
1934 .clone()
1935 .ok_or(StatusCode::INTERNAL_SERVER_ERROR)?;
1936 let download_url = format!("{base}/download?path={}", params.path);
1937 Ok(Json(serde_json::json!({
1938 "downloadUrl": download_url,
1939 "expiresAt": 0
1940 })))
1941 }
1942
1943 async fn download_handler(
1944 State(harness): State<Harness>,
1945 Query(params): Query<PathParams>,
1946 headers: HeaderMap,
1947 ) -> Result<Vec<u8>, StatusCode> {
1948 let mut data =
1949 std::fs::read(harness.resolve(¶ms.path)).map_err(|_| StatusCode::NOT_FOUND)?;
1950 if let Some(range) = headers.get("range").and_then(|value| value.to_str().ok()) {
1951 if let Some((start, end)) = parse_range(range) {
1952 let start = start.min(data.len());
1953 let end = end.min(data.len().saturating_sub(1));
1954 if start < data.len() {
1955 data = data[start..=end].to_vec();
1956 } else {
1957 data.clear();
1958 }
1959 }
1960 }
1961 Ok(data)
1962 }
1963
1964 fn parse_range(value: &str) -> Option<(usize, usize)> {
1965 let value = value.strip_prefix("bytes=")?;
1966 let (start, end) = value.split_once('-')?;
1967 let start = start.parse::<usize>().ok()?;
1968 let end = end.parse::<usize>().ok()?;
1969 Some((start, end))
1970 }
1971
1972 fn spawn_server() -> (String, Harness, Runtime) {
1973 let harness = Harness::new();
1974 let router = Router::new()
1975 .route("/fs/metadata", get(metadata_handler))
1976 .route("/fs/dir", get(dir_handler).delete(delete_dir_handler))
1977 .route("/fs/file", delete(delete_file_handler))
1978 .route("/fs/canonicalize", get(canonicalize_handler))
1979 .route("/fs/read", get(read_handler))
1980 .route("/fs/write", put(write_handler))
1981 .route(
1982 "/fs/upload-session/start",
1983 post(upload_session_start_handler),
1984 )
1985 .route(
1986 "/fs/upload-session/chunks",
1987 post(upload_session_chunks_handler),
1988 )
1989 .route(
1990 "/fs/upload-session/complete",
1991 post(upload_session_complete_handler),
1992 )
1993 .route("/fs/mkdir", post(mkdir_handler))
1994 .route("/fs/rename", post(rename_handler))
1995 .route("/fs/set-readonly", post(set_readonly_handler))
1996 .route("/data/manifest", get(data_manifest_handler))
1997 .route(
1998 "/data/chunks/upload-targets",
1999 post(data_chunk_upload_targets_handler),
2000 )
2001 .route("/upload-chunk", put(upload_chunk_handler))
2002 .with_state(harness.clone());
2003 let std_listener = StdTcpListener::bind("127.0.0.1:0").unwrap();
2004 std_listener.set_nonblocking(true).expect("nonblocking");
2005 let addr = std_listener.local_addr().unwrap();
2006 let service = router.into_make_service();
2007 let rt = Runtime::new().unwrap();
2008 let base = format!("http://{addr}");
2009 *harness.base_url.lock().unwrap() = Some(base.clone());
2010 rt.spawn(async move {
2011 let listener = TokioTcpListener::from_std(std_listener).unwrap();
2012 axum::serve(listener, service).await.unwrap();
2013 });
2014 (base, harness, rt)
2015 }
2016
2017 fn spawn_server_with_download_url() -> (String, Harness, Runtime) {
2018 let harness = Harness::new();
2019 let router = Router::new()
2020 .route("/fs/metadata", get(metadata_handler))
2021 .route("/fs/dir", get(dir_handler).delete(delete_dir_handler))
2022 .route("/fs/file", delete(delete_file_handler))
2023 .route("/fs/canonicalize", get(canonicalize_handler))
2024 .route("/fs/read", get(read_with_download_handler))
2025 .route("/fs/write", put(write_handler))
2026 .route(
2027 "/fs/upload-session/start",
2028 post(upload_session_start_handler),
2029 )
2030 .route(
2031 "/fs/upload-session/chunks",
2032 post(upload_session_chunks_handler),
2033 )
2034 .route(
2035 "/fs/upload-session/complete",
2036 post(upload_session_complete_handler),
2037 )
2038 .route("/fs/mkdir", post(mkdir_handler))
2039 .route("/fs/rename", post(rename_handler))
2040 .route("/fs/set-readonly", post(set_readonly_handler))
2041 .route("/download", get(download_handler))
2042 .route("/data/manifest", get(data_manifest_handler))
2043 .route(
2044 "/data/chunks/upload-targets",
2045 post(data_chunk_upload_targets_handler),
2046 )
2047 .route("/upload-chunk", put(upload_chunk_handler))
2048 .with_state(harness.clone());
2049 let std_listener = StdTcpListener::bind("127.0.0.1:0").unwrap();
2050 std_listener.set_nonblocking(true).expect("nonblocking");
2051 let addr = std_listener.local_addr().unwrap();
2052 let service = router.into_make_service();
2053 let rt = Runtime::new().unwrap();
2054 let base = format!("http://{addr}");
2055 *harness.base_url.lock().unwrap() = Some(base.clone());
2056 rt.spawn(async move {
2057 let listener = TokioTcpListener::from_std(std_listener).unwrap();
2058 axum::serve(listener, service).await.unwrap();
2059 });
2060 (base, harness, rt)
2061 }
2062
2063 fn spawn_server_with_missing_chunk_target() -> (String, Harness, Runtime) {
2064 let harness = Harness::with_omit_last_chunk_target(true);
2065 let router = Router::new()
2066 .route("/fs/metadata", get(metadata_handler))
2067 .route("/fs/dir", get(dir_handler).delete(delete_dir_handler))
2068 .route("/fs/file", delete(delete_file_handler))
2069 .route("/fs/canonicalize", get(canonicalize_handler))
2070 .route("/fs/read", get(read_handler))
2071 .route("/fs/write", put(write_handler))
2072 .route(
2073 "/fs/upload-session/start",
2074 post(upload_session_start_handler),
2075 )
2076 .route(
2077 "/fs/upload-session/chunks",
2078 post(upload_session_chunks_handler),
2079 )
2080 .route(
2081 "/fs/upload-session/complete",
2082 post(upload_session_complete_handler),
2083 )
2084 .route("/fs/mkdir", post(mkdir_handler))
2085 .route("/fs/rename", post(rename_handler))
2086 .route("/fs/set-readonly", post(set_readonly_handler))
2087 .route("/data/manifest", get(data_manifest_handler))
2088 .route(
2089 "/data/chunks/upload-targets",
2090 post(data_chunk_upload_targets_handler),
2091 )
2092 .route("/upload-chunk", put(upload_chunk_handler))
2093 .with_state(harness.clone());
2094 let std_listener = StdTcpListener::bind("127.0.0.1:0").unwrap();
2095 std_listener.set_nonblocking(true).expect("nonblocking");
2096 let addr = std_listener.local_addr().unwrap();
2097 let service = router.into_make_service();
2098 let rt = Runtime::new().unwrap();
2099 let base = format!("http://{addr}");
2100 *harness.base_url.lock().unwrap() = Some(base.clone());
2101 rt.spawn(async move {
2102 let listener = TokioTcpListener::from_std(std_listener).unwrap();
2103 axum::serve(listener, service).await.unwrap();
2104 });
2105 (base, harness, rt)
2106 }
2107
2108 #[test]
2109 fn remote_provider_roundtrip() {
2110 let (base, harness, _rt) = spawn_server();
2111 let provider = RemoteFsProvider::new(RemoteFsConfig {
2112 base_url: base,
2113 auth_token: None,
2114 chunk_bytes: 1024,
2115 parallel_requests: 4,
2116 direct_read_threshold_bytes: u64::MAX,
2117 direct_write_threshold_bytes: 1024,
2118 timeout: Duration::from_secs(30),
2119 ..RemoteFsConfig::default()
2120 })
2121 .expect("provider");
2122
2123 let data = (0..16_384u32)
2124 .flat_map(|v| v.to_le_bytes())
2125 .collect::<Vec<_>>();
2126 executor::block_on(provider.write(Path::new("/reports/data.bin"), &data)).expect("write");
2127 let read_back =
2128 executor::block_on(provider.read(Path::new("/reports/data.bin"))).expect("read");
2129 assert_eq!(data, read_back);
2130 executor::block_on(provider.remove_file(Path::new("/reports/data.bin"))).expect("remove");
2131 assert!(!harness.resolve("/reports/data.bin").exists());
2132 }
2133
2134 #[test]
2135 fn remote_provider_download_url_read() {
2136 let (base, harness, _rt) = spawn_server_with_download_url();
2137 let provider = RemoteFsProvider::new(RemoteFsConfig {
2138 base_url: base,
2139 auth_token: None,
2140 chunk_bytes: 128,
2141 parallel_requests: 2,
2142 direct_read_threshold_bytes: u64::MAX,
2143 timeout: Duration::from_secs(30),
2144 ..RemoteFsConfig::default()
2145 })
2146 .expect("provider");
2147
2148 let data = (0..1024u32)
2149 .flat_map(|v| v.to_le_bytes())
2150 .collect::<Vec<_>>();
2151 std::fs::create_dir_all(harness.resolve("/reports")).expect("mkdir");
2152 std::fs::write(harness.resolve("/reports/data.bin"), &data).expect("write");
2153
2154 let read_back =
2155 executor::block_on(provider.read(Path::new("/reports/data.bin"))).expect("read");
2156 assert_eq!(data, read_back);
2157 }
2158
2159 #[test]
2160 fn remote_provider_errors_when_chunk_target_is_missing() {
2161 let (base, _harness, _rt) = spawn_server_with_missing_chunk_target();
2162 let provider = RemoteFsProvider::new(RemoteFsConfig {
2163 base_url: base,
2164 auth_token: None,
2165 chunk_bytes: 128,
2166 parallel_requests: 2,
2167 direct_read_threshold_bytes: u64::MAX,
2168 direct_write_threshold_bytes: 128,
2169 timeout: Duration::from_secs(30),
2170 ..RemoteFsConfig::default()
2171 })
2172 .expect("provider");
2173
2174 let data = vec![7u8; 1024];
2175 let err =
2176 executor::block_on(provider.write(Path::new("/reports/missing-target.bin"), &data))
2177 .expect_err("write should fail when chunk target is missing");
2178 assert_eq!(err.kind(), io::ErrorKind::Other);
2179 }
2180
2181 #[test]
2182 fn remote_provider_data_manifest_descriptor_fetches_descriptor() {
2183 let (base, _harness, _rt) = spawn_server();
2184 let provider = RemoteFsProvider::new(RemoteFsConfig {
2185 base_url: base,
2186 auth_token: None,
2187 timeout: Duration::from_secs(30),
2188 ..RemoteFsConfig::default()
2189 })
2190 .expect("provider");
2191
2192 let descriptor = provider
2193 .data_manifest_descriptor(&DataManifestRequest {
2194 path: "/datasets/alpha.data".to_string(),
2195 version: Some("v1".to_string()),
2196 })
2197 .expect("manifest descriptor");
2198 assert_eq!(descriptor.schema_version, 1);
2199 assert_eq!(descriptor.format, "runmat-data");
2200 assert_eq!(descriptor.dataset_id, "dataset:/datasets/alpha.data");
2201 assert_eq!(descriptor.updated_at, "2026-01-01T00:00:00Z");
2202 assert_eq!(descriptor.txn_sequence, 9);
2203 }
2204
2205 #[test]
2206 fn remote_provider_data_chunk_upload_targets_fetches_targets() {
2207 let (base, _harness, _rt) = spawn_server();
2208 let provider = RemoteFsProvider::new(RemoteFsConfig {
2209 base_url: base,
2210 auth_token: None,
2211 timeout: Duration::from_secs(30),
2212 ..RemoteFsConfig::default()
2213 })
2214 .expect("provider");
2215
2216 let targets = provider
2217 .data_chunk_upload_targets(&DataChunkUploadRequest {
2218 dataset_path: "/datasets/alpha.data".to_string(),
2219 array: "X".to_string(),
2220 chunks: vec![DataChunkDescriptor {
2221 key: "X/0-0".to_string(),
2222 object_id: "obj_0".to_string(),
2223 hash: "sha256:abc".to_string(),
2224 bytes_raw: 10,
2225 bytes_stored: 8,
2226 }],
2227 })
2228 .expect("chunk upload targets");
2229
2230 assert_eq!(targets.len(), 1);
2231 assert_eq!(targets[0].key, "X/0-0");
2232 assert_eq!(targets[0].method, "PUT");
2233 assert_eq!(
2234 targets[0].upload_url,
2235 "https://uploads.example.test/X/obj_0"
2236 );
2237 let headers = &targets[0].headers;
2238 assert_eq!(headers.len(), 2);
2239 assert_eq!(
2240 headers.get("x-runmat-hash").map(String::as_str),
2241 Some("sha256:abc")
2242 );
2243 assert_eq!(
2244 headers.get("x-runmat-object").map(String::as_str),
2245 Some("obj_0")
2246 );
2247 }
2248}