1use crate::data_contract::{
2 DataChunkUploadRequest, DataChunkUploadTarget, DataManifestDescriptor, DataManifestRequest,
3};
4use crate::{DirEntry, FileHandle, FsFileType, FsMetadata, FsProvider, OpenFlags};
5use async_trait::async_trait;
6use chrono::DateTime;
7use crossbeam_utils::thread;
8use once_cell::sync::Lazy;
9use reqwest::blocking::{Client, Response};
10use reqwest::StatusCode;
11use serde::{Deserialize, Serialize};
12use sha2::{Digest, Sha256};
13use std::collections::HashMap;
14use std::collections::VecDeque;
15use std::fmt;
16use std::io::{self, ErrorKind, Read, Seek, SeekFrom, Write};
17use std::path::{Path, PathBuf};
18use std::sync::{Arc, Mutex};
19use std::time::Duration;
20use url::Url;
21use uuid::Uuid;
22
23const MANIFEST_HASH: &str = "manifest:v1";
24const SHARD_PREFIX: &str = "/.runmat/shards";
25
26static USER_AGENT: Lazy<String> = Lazy::new(|| {
27 format!(
28 "runmat-remote-fs/{} ({})",
29 env!("CARGO_PKG_VERSION"),
30 std::env::consts::OS
31 )
32});
33
34#[derive(Clone, Debug)]
35pub struct RemoteFsConfig {
36 pub base_url: String,
37 pub auth_token: Option<String>,
38 pub chunk_bytes: usize,
39 pub parallel_requests: usize,
40 pub direct_read_threshold_bytes: u64,
41 pub direct_write_threshold_bytes: u64,
42 pub shard_threshold_bytes: u64,
43 pub shard_size_bytes: u64,
44 pub timeout: Duration,
45 pub retry_max_attempts: usize,
46 pub retry_base_delay: Duration,
47 pub retry_max_delay: Duration,
48}
49
50impl Default for RemoteFsConfig {
51 fn default() -> Self {
52 Self {
53 base_url: String::new(),
54 auth_token: None,
55 chunk_bytes: 16 * 1024 * 1024,
56 parallel_requests: 4,
57 direct_read_threshold_bytes: 64 * 1024 * 1024,
58 direct_write_threshold_bytes: 64 * 1024 * 1024,
59 shard_threshold_bytes: 4 * 1024 * 1024 * 1024,
60 shard_size_bytes: 512 * 1024 * 1024,
61 timeout: Duration::from_secs(120),
62 retry_max_attempts: 5,
63 retry_base_delay: Duration::from_millis(100),
64 retry_max_delay: Duration::from_secs(2),
65 }
66 }
67}
68
69struct RemoteInner {
70 client: Client,
71 base: Url,
72 auth_header: Option<String>,
73 chunk_bytes: usize,
74 parallel_requests: usize,
75 direct_read_threshold_bytes: u64,
76 direct_write_threshold_bytes: u64,
77 shard_threshold_bytes: u64,
78 shard_size_bytes: u64,
79 retry_max_attempts: usize,
80 retry_base_delay: Duration,
81 retry_max_delay: Duration,
82}
83
84impl RemoteInner {
85 fn new(config: RemoteFsConfig) -> io::Result<Self> {
86 if config.base_url.is_empty() {
87 return Err(io::Error::new(
88 ErrorKind::InvalidInput,
89 "RemoteFsConfig.base_url must be provided",
90 ));
91 }
92 let base = Url::parse(&config.base_url).map_err(map_url_err)?;
93 let client = Client::builder()
94 .timeout(config.timeout)
95 .user_agent(USER_AGENT.clone())
96 .build()
97 .map_err(map_http_err)?;
98 Ok(Self {
99 client,
100 base,
101 auth_header: config.auth_token.map(|token| format!("Bearer {token}")),
102 chunk_bytes: config.chunk_bytes.max(64 * 1024),
103 parallel_requests: config.parallel_requests.max(1),
104 direct_read_threshold_bytes: config.direct_read_threshold_bytes,
105 direct_write_threshold_bytes: config.direct_write_threshold_bytes,
106 shard_threshold_bytes: config.shard_threshold_bytes,
107 shard_size_bytes: config.shard_size_bytes.max(8 * 1024 * 1024),
108 retry_max_attempts: config.retry_max_attempts.max(1),
109 retry_base_delay: config.retry_base_delay,
110 retry_max_delay: config.retry_max_delay,
111 })
112 }
113
114 fn should_retry(&self, status: StatusCode) -> bool {
115 matches!(
116 status,
117 StatusCode::TOO_MANY_REQUESTS
118 | StatusCode::INTERNAL_SERVER_ERROR
119 | StatusCode::BAD_GATEWAY
120 | StatusCode::SERVICE_UNAVAILABLE
121 | StatusCode::GATEWAY_TIMEOUT
122 )
123 }
124
125 fn retry_delay(&self, attempt: usize) -> Duration {
126 let factor = 1u64.checked_shl(attempt as u32).unwrap_or(u64::MAX);
127 let delay = self.retry_base_delay.as_millis() as u64 * factor;
128 let capped = delay.min(self.retry_max_delay.as_millis() as u64);
129 Duration::from_millis(capped)
130 }
131
132 fn send_with_retry(
133 &self,
134 method: reqwest::Method,
135 route: &str,
136 query: &[(&str, String)],
137 ) -> io::Result<Response> {
138 for attempt in 0..self.retry_max_attempts {
139 let resp = self
140 .request(method.clone(), route, query)
141 .send()
142 .map_err(map_http_err)?;
143 if !self.should_retry(resp.status()) || attempt + 1 == self.retry_max_attempts {
144 return Ok(resp);
145 }
146 std::thread::sleep(self.retry_delay(attempt));
147 }
148 Err(io::Error::other("request retries exhausted"))
149 }
150
151 fn get_url_with_retry(&self, url: &str, range: Option<String>) -> io::Result<Response> {
152 for attempt in 0..self.retry_max_attempts {
153 let mut request = self.client.get(url);
154 if let Some(range) = &range {
155 request = request.header(reqwest::header::RANGE, range);
156 }
157 let resp = request.send().map_err(map_http_err)?;
158 if !self.should_retry(resp.status()) || attempt + 1 == self.retry_max_attempts {
159 return Ok(resp);
160 }
161 std::thread::sleep(self.retry_delay(attempt));
162 }
163 Err(io::Error::other("request retries exhausted"))
164 }
165
166 fn endpoint(&self, route: &str) -> Url {
167 self.base
168 .join(route.trim_start_matches('/'))
169 .expect("failed to join remote route")
170 }
171
172 fn request(
173 &self,
174 method: reqwest::Method,
175 route: &str,
176 query: &[(&str, String)],
177 ) -> reqwest::blocking::RequestBuilder {
178 let mut url = self.endpoint(route);
179 {
180 let mut pairs = url.query_pairs_mut();
181 for (k, v) in query {
182 pairs.append_pair(k, v);
183 }
184 }
185 let mut builder = self.client.request(method, url);
186 if let Some(auth) = &self.auth_header {
187 builder = builder.header("Authorization", auth);
188 }
189 builder
190 }
191
192 fn get_json<T: for<'a> Deserialize<'a>>(
193 &self,
194 route: &str,
195 query: &[(&str, String)],
196 ) -> io::Result<T> {
197 let resp = self.send_with_retry(reqwest::Method::GET, route, query)?;
198 handle_error(resp)?.json().map_err(map_http_err)
199 }
200
201 fn post_empty<B: Serialize>(&self, route: &str, body: &B) -> io::Result<()> {
202 let resp = self
203 .request(reqwest::Method::POST, route, &[])
204 .json(body)
205 .send()
206 .map_err(map_http_err)?;
207 handle_error(resp)?;
208 Ok(())
209 }
210
211 fn post_json<T: for<'a> Deserialize<'a>, B: Serialize>(
212 &self,
213 route: &str,
214 body: &B,
215 ) -> io::Result<T> {
216 let resp = self
217 .request(reqwest::Method::POST, route, &[])
218 .json(body)
219 .send()
220 .map_err(map_http_err)?;
221 handle_error(resp)?.json().map_err(map_http_err)
222 }
223
224 fn delete_empty(&self, route: &str, query: &[(&str, String)]) -> io::Result<()> {
225 let resp = self.send_with_retry(reqwest::Method::DELETE, route, query)?;
226 handle_error(resp)?;
227 Ok(())
228 }
229
230 fn download_chunk(&self, path: &str, offset: u64, length: usize) -> io::Result<Vec<u8>> {
231 let resp = self.send_with_retry(
232 reqwest::Method::GET,
233 "/fs/read",
234 &[
235 ("path", path.to_string()),
236 ("offset", offset.to_string()),
237 ("length", length.to_string()),
238 ],
239 )?;
240 let mut body = handle_error(resp)?;
241 if let Some(content_type) = body.headers().get(reqwest::header::CONTENT_TYPE) {
242 if content_type
243 .to_str()
244 .map(|value| value.contains("application/json"))
245 .unwrap_or(false)
246 {
247 let json: DownloadUrlResponse = body.json().map_err(map_http_err)?;
248 return self.download_range_from_url(&json.download_url, offset, length as u64);
249 }
250 }
251 let mut buf = Vec::with_capacity(length);
252 body.copy_to(&mut buf).map_err(map_http_err)?;
253 Ok(buf)
254 }
255
256 fn download_range_from_url(&self, url: &str, offset: u64, length: u64) -> io::Result<Vec<u8>> {
257 if length == 0 {
258 return Ok(Vec::new());
259 }
260 let end = offset + length - 1;
261 let resp = self.get_url_with_retry(url, Some(format!("bytes={offset}-{end}")))?;
262 let mut body = handle_error(resp)?;
263 let mut buf = Vec::with_capacity(length as usize);
264 body.copy_to(&mut buf).map_err(map_http_err)?;
265 Ok(buf)
266 }
267
268 fn fetch_download_url(&self, path: &str) -> io::Result<DownloadUrlResponse> {
269 self.get_json("/fs/download-url", &[("path", path.to_string())])
270 }
271
272 fn upload_chunk(
273 &self,
274 path: &str,
275 offset: u64,
276 truncate: bool,
277 final_chunk: bool,
278 data: &[u8],
279 hash: Option<&str>,
280 ) -> io::Result<Option<FsWriteSessionResponse>> {
281 let mut query = vec![("path", path.to_string()), ("offset", offset.to_string())];
282 if truncate {
283 query.push(("truncate", "true".to_string()));
284 }
285 if final_chunk {
286 query.push(("final", "true".to_string()));
287 }
288 if let Some(hash) = hash {
289 query.push(("hash", hash.to_string()));
290 }
291 let resp = self
292 .request(reqwest::Method::PUT, "/fs/write", &query)
293 .body(data.to_vec())
294 .send()
295 .map_err(map_http_err)?;
296 if resp.status() == StatusCode::ACCEPTED {
297 let session = handle_error(resp)?.json().map_err(map_http_err)?;
298 return Ok(Some(session));
299 }
300 handle_error(resp)?;
301 Ok(None)
302 }
303
304 fn multipart_create(&self, path: &str, size_bytes: u64) -> io::Result<MultipartUploadResponse> {
305 self.post_json(
306 "/fs/multipart-upload",
307 &MultipartUploadRequest {
308 path: path.to_string(),
309 size_bytes: size_bytes as i64,
310 content_type: None,
311 },
312 )
313 }
314
315 fn upload_session_start(
316 &self,
317 path: &str,
318 size_bytes: u64,
319 content_sha256: &str,
320 ) -> io::Result<UploadSessionStartResponse> {
321 self.post_json(
322 "/fs/upload-session/start",
323 &UploadSessionStartRequest {
324 path: path.to_string(),
325 size_bytes: size_bytes as i64,
326 content_type: None,
327 content_sha256: content_sha256.to_string(),
328 },
329 )
330 }
331
332 fn upload_session_chunks(
333 &self,
334 session_id: &str,
335 blob_key: &str,
336 chunks: &[UploadSessionChunkDescriptor],
337 ) -> io::Result<UploadSessionChunksResponse> {
338 self.post_json(
339 "/fs/upload-session/chunks",
340 &UploadSessionChunksRequest {
341 session_id: session_id.to_string(),
342 blob_key: blob_key.to_string(),
343 chunks: chunks.to_vec(),
344 },
345 )
346 }
347
348 fn upload_session_complete(
349 &self,
350 path: &str,
351 session_id: &str,
352 blob_key: &str,
353 size_bytes: u64,
354 content_sha256: &str,
355 chunk_count: usize,
356 ) -> io::Result<()> {
357 self.post_empty(
358 "/fs/upload-session/complete",
359 &UploadSessionCompleteRequest {
360 path: path.to_string(),
361 session_id: session_id.to_string(),
362 blob_key: blob_key.to_string(),
363 size_bytes: size_bytes as i64,
364 content_sha256: content_sha256.to_string(),
365 chunk_count,
366 hash: None,
367 },
368 )
369 }
370
371 fn multipart_presign_part(
372 &self,
373 session_id: &str,
374 blob_key: &str,
375 upload_id: &str,
376 part_number: i32,
377 size_bytes: u64,
378 ) -> io::Result<String> {
379 let response: MultipartUploadPartResponse = self.post_json(
380 "/fs/multipart-upload/part",
381 &MultipartUploadPartRequest {
382 session_id: session_id.to_string(),
383 blob_key: blob_key.to_string(),
384 upload_id: upload_id.to_string(),
385 part_number,
386 size_bytes: size_bytes as i64,
387 },
388 )?;
389 Ok(response.upload_url)
390 }
391
392 fn multipart_complete(
393 &self,
394 path: &str,
395 session_id: &str,
396 blob_key: &str,
397 upload_id: &str,
398 size_bytes: u64,
399 parts: Vec<MultipartPart>,
400 ) -> io::Result<()> {
401 let resp = self
402 .request(reqwest::Method::POST, "/fs/multipart-upload/complete", &[])
403 .json(&MultipartUploadCompleteRequest {
404 path: path.to_string(),
405 session_id: session_id.to_string(),
406 blob_key: blob_key.to_string(),
407 upload_id: upload_id.to_string(),
408 size_bytes: size_bytes as i64,
409 hash: None,
410 parts,
411 })
412 .send()
413 .map_err(map_http_err)?;
414 handle_error(resp)?;
415 Ok(())
416 }
417
418 fn put_upload_url_with_etag(&self, url: &str, data: &[u8]) -> io::Result<String> {
419 let resp = self
420 .client
421 .put(url)
422 .body(data.to_vec())
423 .send()
424 .map_err(map_http_err)?;
425 let resp = handle_error(resp)?;
426 let etag = resp
427 .headers()
428 .get(reqwest::header::ETAG)
429 .and_then(|value| value.to_str().ok())
430 .unwrap_or("")
431 .to_string();
432 Ok(etag)
433 }
434
435 fn put_upload_target(
436 &self,
437 method: &str,
438 url: &str,
439 headers: &HashMap<String, String>,
440 data: &[u8],
441 ) -> io::Result<()> {
442 let method = reqwest::Method::from_bytes(method.as_bytes()).map_err(|err| {
443 io::Error::new(
444 ErrorKind::InvalidInput,
445 format!("invalid upload method `{method}`: {err}"),
446 )
447 })?;
448 let mut request = self.client.request(method, url).body(data.to_vec());
449 for (name, value) in headers {
450 request = request.header(name, value);
451 }
452 let resp = request.send().map_err(map_http_err)?;
453 handle_error(resp)?;
454 Ok(())
455 }
456
457 fn fetch_metadata(&self, path: &str) -> io::Result<MetadataResponse> {
458 self.get_json("/fs/metadata", &[("path", path.to_string())])
459 }
460
461 fn fetch_dir(&self, path: &str) -> io::Result<Vec<DirEntryResponse>> {
462 self.get_json("/fs/dir", &[("path", path.to_string())])
463 }
464
465 fn canonicalize_path(&self, path: &str) -> io::Result<String> {
466 let resp: CanonicalizeResponse =
467 self.get_json("/fs/canonicalize", &[("path", path.to_string())])?;
468 Ok(resp.path)
469 }
470}
471
472pub struct RemoteFsProvider {
473 inner: Arc<RemoteInner>,
474}
475
476impl RemoteFsProvider {
477 pub fn new(config: RemoteFsConfig) -> io::Result<Self> {
478 Ok(Self {
479 inner: Arc::new(RemoteInner::new(config)?),
480 })
481 }
482
483 pub fn data_manifest_descriptor(
485 &self,
486 request: &DataManifestRequest,
487 ) -> io::Result<DataManifestDescriptor> {
488 let mut query = vec![("path", request.path.clone())];
489 if let Some(version) = &request.version {
490 query.push(("version", version.clone()));
491 }
492 self.inner.get_json("/data/manifest", &query)
493 }
494
495 pub fn data_chunk_upload_targets(
497 &self,
498 request: &DataChunkUploadRequest,
499 ) -> io::Result<Vec<DataChunkUploadTarget>> {
500 #[derive(Deserialize)]
501 struct DataChunkUploadTargetsResponse {
502 targets: Vec<DataChunkUploadTarget>,
503 }
504 let response: DataChunkUploadTargetsResponse = self
505 .inner
506 .post_json("/data/chunks/upload-targets", request)?;
507 Ok(response.targets)
508 }
509
510 fn normalize(&self, path: &Path) -> String {
511 let mut normalized = PathBuf::new();
512 normalized.push("/");
513 normalized.push(path);
514 normalized.to_string_lossy().replace('\\', "/").to_string()
515 }
516
517 fn ensure_parent_exists(&self, path: &Path) -> io::Result<()> {
518 if let Some(parent) = path.parent() {
519 self.inner.post_empty(
520 "/fs/mkdir",
521 &CreateDirRequest {
522 path: self.normalize(parent),
523 recursive: true,
524 },
525 )?;
526 }
527 Ok(())
528 }
529
530 fn download_raw_file(&self, path: &str, len: u64) -> io::Result<Vec<u8>> {
531 if len == 0 {
532 return Ok(Vec::new());
533 }
534 if len >= self.inner.direct_read_threshold_bytes {
535 let url = self.inner.fetch_download_url(path)?.download_url;
536 return self.download_entire_file_from_url(&url, len);
537 }
538 let chunk = self.inner.chunk_bytes as u64;
539 let mut tasks = Vec::new();
540 let mut offset = 0;
541 let mut index = 0;
542 while offset < len {
543 let remaining = len - offset;
544 let length = std::cmp::min(chunk, remaining);
545 tasks.push(ChunkTask {
546 offset,
547 length: length as usize,
548 index,
549 });
550 offset += length;
551 index += 1;
552 }
553 let mut buffer = vec![0u8; len as usize];
554 let path = path.to_string();
555 let inner = self.inner.clone();
556 let queue = Arc::new(Mutex::new(VecDeque::from(tasks.clone())));
557 let results = Arc::new(Mutex::new(vec![None::<Vec<u8>>; tasks.len()]));
558 let error: Arc<Mutex<Option<io::Error>>> = Arc::new(Mutex::new(None));
559 let threads = inner
560 .parallel_requests
561 .min(queue.lock().unwrap().len())
562 .max(1);
563 thread::scope(|scope| {
564 for _ in 0..threads {
565 let queue = queue.clone();
566 let error = error.clone();
567 let results = results.clone();
568 let inner = inner.clone();
569 let path = path.clone();
570 scope.spawn(move |_| loop {
571 let task_opt = {
572 let mut guard = queue.lock().unwrap();
573 guard.pop_front()
574 };
575 let task = match task_opt {
576 Some(task) => task,
577 None => break,
578 };
579 match inner.download_chunk(&path, task.offset, task.length) {
580 Ok(bytes) => {
581 let mut guard = results.lock().unwrap();
582 guard[task.index] = Some(bytes);
583 }
584 Err(err) => {
585 let mut guard = error.lock().unwrap();
586 if guard.is_none() {
587 *guard = Some(err);
588 }
589 break;
590 }
591 }
592 });
593 }
594 })
595 .expect("remote download scope");
596 if let Some(err) = error.lock().unwrap().take() {
597 return Err(err);
598 }
599 let chunks = Arc::try_unwrap(results)
600 .expect("results still in use")
601 .into_inner()
602 .expect("results poisoned");
603 for (task, maybe) in tasks.iter().zip(chunks.into_iter()) {
604 let bytes = maybe.expect("missing downloaded chunk for remote file");
605 let start = task.offset as usize;
606 buffer[start..start + bytes.len()].copy_from_slice(&bytes);
607 }
608 Ok(buffer)
609 }
610
611 fn download_sharded_file(&self, path: &str, len: u64) -> io::Result<Vec<u8>> {
612 let manifest_bytes = self.download_raw_file(path, len)?;
613 let manifest: ShardManifest = serde_json::from_slice(&manifest_bytes)
614 .map_err(|_| io::Error::new(ErrorKind::InvalidData, "invalid shard manifest"))?;
615 if manifest.version != 1 {
616 return Err(io::Error::new(
617 ErrorKind::InvalidData,
618 "unsupported manifest",
619 ));
620 }
621 let mut buffer = Vec::with_capacity(manifest.total_size as usize);
622 for shard in manifest.shards {
623 let meta = self.inner.fetch_metadata(&shard.path)?;
624 let bytes = self.download_raw_file(&shard.path, meta.len)?;
625 buffer.extend_from_slice(&bytes);
626 }
627 Ok(buffer)
628 }
629
630 fn download_entire_file_from_url(&self, url: &str, len: u64) -> io::Result<Vec<u8>> {
631 let chunk = self.inner.chunk_bytes as u64;
632 let mut tasks = Vec::new();
633 let mut offset = 0;
634 let mut index = 0;
635 while offset < len {
636 let remaining = len - offset;
637 let length = std::cmp::min(chunk, remaining);
638 tasks.push(ChunkTask {
639 offset,
640 length: length as usize,
641 index,
642 });
643 offset += length;
644 index += 1;
645 }
646 let mut buffer = vec![0u8; len as usize];
647 let url = url.to_string();
648 let inner = self.inner.clone();
649 let queue = Arc::new(Mutex::new(VecDeque::from(tasks.clone())));
650 let results = Arc::new(Mutex::new(vec![None::<Vec<u8>>; tasks.len()]));
651 let error: Arc<Mutex<Option<io::Error>>> = Arc::new(Mutex::new(None));
652 let threads = inner
653 .parallel_requests
654 .min(queue.lock().unwrap().len())
655 .max(1);
656 thread::scope(|scope| {
657 for _ in 0..threads {
658 let queue = queue.clone();
659 let error = error.clone();
660 let results = results.clone();
661 let inner = inner.clone();
662 let url = url.clone();
663 scope.spawn(move |_| loop {
664 let task_opt = {
665 let mut guard = queue.lock().unwrap();
666 guard.pop_front()
667 };
668 let task = match task_opt {
669 Some(task) => task,
670 None => break,
671 };
672 match inner.download_range_from_url(&url, task.offset, task.length as u64) {
673 Ok(bytes) => {
674 let mut guard = results.lock().unwrap();
675 guard[task.index] = Some(bytes);
676 }
677 Err(err) => {
678 let mut guard = error.lock().unwrap();
679 if guard.is_none() {
680 *guard = Some(err);
681 }
682 break;
683 }
684 }
685 });
686 }
687 })
688 .expect("remote download scope");
689 if let Some(err) = error.lock().unwrap().take() {
690 return Err(err);
691 }
692 let chunks = Arc::try_unwrap(results)
693 .expect("results still in use")
694 .into_inner()
695 .expect("results poisoned");
696 for (task, maybe) in tasks.iter().zip(chunks.into_iter()) {
697 let bytes = maybe.expect("missing downloaded chunk for remote file");
698 let start = task.offset as usize;
699 buffer[start..start + bytes.len()].copy_from_slice(&bytes);
700 }
701 Ok(buffer)
702 }
703
704 fn upload_entire_file(&self, path: &str, data: &[u8]) -> io::Result<()> {
705 if data.len() as u64 >= self.inner.shard_threshold_bytes {
706 return self.upload_sharded_file(path, data);
707 }
708 self.upload_unsharded_file(path, data, None)
709 }
710
711 fn upload_unsharded_file(&self, path: &str, data: &[u8], hash: Option<&str>) -> io::Result<()> {
712 if data.len() as u64 >= self.inner.direct_write_threshold_bytes {
713 return self.upload_multipart_file(path, data);
714 }
715 if data.is_empty() {
716 self.inner.upload_chunk(path, 0, true, true, data, hash)?;
717 return Ok(());
718 }
719 let chunk = self.inner.chunk_bytes;
720 let mut offset = 0usize;
721 while offset < data.len() {
722 let end = std::cmp::min(offset + chunk, data.len());
723 let slice = &data[offset..end];
724 let truncate = offset == 0;
725 let final_chunk = end == data.len();
726 let result =
727 self.inner
728 .upload_chunk(path, offset as u64, truncate, final_chunk, slice, hash)?;
729 if let Some(session) = result {
730 let expected = offset as u64 + slice.len() as u64;
731 if session.next_offset as u64 != expected {
732 return Err(io::Error::other("unexpected next offset"));
733 }
734 }
735 offset = end;
736 }
737 Ok(())
738 }
739
740 fn upload_sharded_file(&self, path: &str, data: &[u8]) -> io::Result<()> {
741 let shard_size = self.inner.shard_size_bytes as usize;
742 let mut shards = Vec::new();
743 let mut offset = 0usize;
744 while offset < data.len() {
745 let end = std::cmp::min(offset + shard_size, data.len());
746 let slice = &data[offset..end];
747 let shard_path = format!("{}/{}", SHARD_PREFIX, Uuid::new_v4());
748 self.upload_unsharded_file(&shard_path, slice, None)?;
749 shards.push(ShardEntry {
750 path: shard_path,
751 size: slice.len() as u64,
752 });
753 offset = end;
754 }
755 let manifest = ShardManifest {
756 version: 1,
757 total_size: data.len() as u64,
758 shard_size: self.inner.shard_size_bytes,
759 shards,
760 };
761 let bytes = serde_json::to_vec(&manifest)
762 .map_err(|_| io::Error::new(ErrorKind::InvalidData, "invalid manifest"))?;
763 self.upload_unsharded_file(path, &bytes, Some(MANIFEST_HASH))
764 }
765
766 fn upload_multipart_file(&self, path: &str, data: &[u8]) -> io::Result<()> {
767 if data.is_empty() {
768 self.inner.upload_chunk(path, 0, true, true, data, None)?;
769 return Ok(());
770 }
771 let content_sha256 = sha256_hex(data);
772 let session =
773 match self
774 .inner
775 .upload_session_start(path, data.len() as u64, &content_sha256)
776 {
777 Ok(session) => session,
778 Err(err) if err.kind() == ErrorKind::NotFound => {
779 return self.upload_multipart_file_legacy(path, data);
780 }
781 Err(err) => return Err(err),
782 };
783 let chunk_size = (session.chunk_size_bytes as usize).max(1);
784 let mut tasks = Vec::new();
785 let mut offset = 0usize;
786 let mut index = 0usize;
787 while offset < data.len() {
788 let end = std::cmp::min(offset + chunk_size, data.len());
789 let slice = &data[offset..end];
790 tasks.push(UploadTask {
791 index,
792 offset,
793 length: end - offset,
794 chunk_sha256: sha256_hex(slice),
795 });
796 offset = end;
797 index += 1;
798 }
799 let descriptors: Vec<UploadSessionChunkDescriptor> = tasks
800 .iter()
801 .map(|task| UploadSessionChunkDescriptor {
802 chunk_index: task.index,
803 offset_bytes: task.offset as i64,
804 size_bytes: task.length as i64,
805 chunk_sha256: task.chunk_sha256.clone(),
806 })
807 .collect();
808 let chunk_response = self.inner.upload_session_chunks(
809 &session.session_id,
810 &session.blob_key,
811 &descriptors,
812 )?;
813 let targets = Arc::new(chunk_response.targets);
814 let tasks = Arc::new(Mutex::new(VecDeque::from(tasks)));
815 let error = Arc::new(Mutex::new(None));
816 let data = Arc::new(data.to_vec());
817 thread::scope(|scope| {
818 for _ in 0..self.inner.parallel_requests {
819 let tasks = Arc::clone(&tasks);
820 let targets = Arc::clone(&targets);
821 let error = Arc::clone(&error);
822 let inner = Arc::clone(&self.inner);
823 let data = Arc::clone(&data);
824 scope.spawn(move |_| loop {
825 let task = {
826 let mut guard = tasks.lock().unwrap();
827 guard.pop_front()
828 };
829 let Some(task) = task else { break };
830 let target = targets
831 .iter()
832 .find(|target| target.chunk_index == task.index)
833 .cloned();
834 let result = (|| {
835 let target = target.ok_or_else(|| {
836 io::Error::other(format!(
837 "missing upload target for chunk {}",
838 task.index
839 ))
840 })?;
841 let slice = &data[task.offset..task.offset + task.length];
842 inner.put_upload_target(
843 &target.method,
844 &target.upload_url,
845 &target.headers,
846 slice,
847 )
848 })();
849 if let Err(err) = result {
850 let mut err_guard = error.lock().unwrap();
851 if err_guard.is_none() {
852 *err_guard = Some(io::Error::new(err.kind(), err.to_string()));
853 }
854 break;
855 }
856 });
857 }
858 })
859 .expect("upload session scope");
860 if let Some(err) = error.lock().unwrap().take() {
861 return Err(err);
862 }
863 self.inner.upload_session_complete(
864 path,
865 &session.session_id,
866 &session.blob_key,
867 data.len() as u64,
868 &content_sha256,
869 descriptors.len(),
870 )?;
871 Ok(())
872 }
873
874 fn upload_multipart_file_legacy(&self, path: &str, data: &[u8]) -> io::Result<()> {
875 let session = self.inner.multipart_create(path, data.len() as u64)?;
876 let part_size = session.part_size_bytes as usize;
877 let mut tasks = std::collections::VecDeque::new();
878 let mut offset = 0usize;
879 let mut part_number = 1;
880 let mut index = 0usize;
881 while offset < data.len() {
882 let end = std::cmp::min(offset + part_size, data.len());
883 let length = end - offset;
884 tasks.push_back(MultipartTask {
885 index,
886 part_number,
887 offset,
888 length,
889 });
890 offset = end;
891 part_number += 1;
892 index += 1;
893 }
894 let tasks = Arc::new(Mutex::new(tasks));
895 let task_len = tasks.lock().unwrap().len();
896 let mut result_vec: Vec<MultipartResult> = Vec::with_capacity(task_len);
897 for _ in 0..task_len {
898 result_vec.push(None);
899 }
900 let results = Arc::new(Mutex::new(result_vec));
901 let error = Arc::new(Mutex::new(None));
902 let data = Arc::new(data.to_vec());
903 thread::scope(|scope| {
904 for _ in 0..self.inner.parallel_requests {
905 let tasks = Arc::clone(&tasks);
906 let results = Arc::clone(&results);
907 let error = Arc::clone(&error);
908 let inner = Arc::clone(&self.inner);
909 let blob_key = session.blob_key.clone();
910 let upload_id = session.upload_id.clone();
911 let session_id = session.session_id.clone();
912 let data = Arc::clone(&data);
913 scope.spawn(move |_| loop {
914 let task = {
915 let mut guard = tasks.lock().unwrap();
916 guard.pop_front()
917 };
918 let Some(task) = task else { break };
919 let slice = &data[task.offset..task.offset + task.length];
920 let result = (|| {
921 let url = inner.multipart_presign_part(
922 &session_id,
923 &blob_key,
924 &upload_id,
925 task.part_number,
926 task.length as u64,
927 )?;
928 let etag = inner.put_upload_url_with_etag(&url, slice)?;
929 Ok(MultipartPart {
930 part_number: task.part_number,
931 etag,
932 })
933 })();
934 let mut guard = results.lock().unwrap();
935 guard[task.index] = Some(result);
936 if let Some(Err(err)) = guard[task.index].as_ref() {
937 let mut err_guard = error.lock().unwrap();
938 if err_guard.is_none() {
939 *err_guard = Some(io::Error::new(err.kind(), err.to_string()));
940 }
941 break;
942 }
943 });
944 }
945 })
946 .expect("multipart upload scope");
947 if let Some(err) = error.lock().unwrap().take() {
948 return Err(err);
949 }
950 let mut parts = Vec::with_capacity(results.lock().unwrap().len());
951 for maybe in Arc::try_unwrap(results)
952 .expect("results still in use")
953 .into_inner()
954 .expect("results poisoned")
955 {
956 let part = maybe.expect("missing part result")?;
957 if part.etag.is_empty() {
958 return Err(io::Error::other("missing etag"));
959 }
960 parts.push(part);
961 }
962 parts.sort_by_key(|part| part.part_number);
963 self.inner.multipart_complete(
964 path,
965 &session.session_id,
966 &session.blob_key,
967 &session.upload_id,
968 data.len() as u64,
969 parts,
970 )?;
971 Ok(())
972 }
973}
974
975#[derive(Clone, Copy)]
976struct ChunkTask {
977 offset: u64,
978 length: usize,
979 index: usize,
980}
981
982type MultipartResult = Option<Result<MultipartPart, io::Error>>;
983
984#[derive(Clone, Copy)]
985struct MultipartTask {
986 index: usize,
987 part_number: i32,
988 offset: usize,
989 length: usize,
990}
991
992#[derive(Clone)]
993struct UploadTask {
994 index: usize,
995 offset: usize,
996 length: usize,
997 chunk_sha256: String,
998}
999
1000#[async_trait(?Send)]
1001impl FsProvider for RemoteFsProvider {
1002 fn current_dir_override(&self) -> Option<PathBuf> {
1003 Some(PathBuf::from("/"))
1004 }
1005
1006 fn open(&self, path: &Path, flags: &OpenFlags) -> io::Result<Box<dyn FileHandle>> {
1007 let normalized = self.normalize(path);
1008 let mut data = Vec::new();
1009 if flags.read {
1010 let meta = self.inner.fetch_metadata(&normalized)?;
1011 if meta.file_type != "file" {
1012 return Err(io::Error::other("remote path is not a file"));
1013 }
1014 data = if meta.hash.as_deref() == Some(MANIFEST_HASH) {
1015 self.download_sharded_file(&normalized, meta.len)?
1016 } else {
1017 self.download_raw_file(&normalized, meta.len)?
1018 };
1019 }
1020 if flags.truncate {
1021 data.clear();
1022 }
1023 if flags.create {
1024 self.ensure_parent_exists(path)?;
1025 }
1026 let handle = RemoteFileHandle {
1027 provider: self.clone(),
1028 path: normalized,
1029 data,
1030 position: 0,
1031 flags: flags.clone(),
1032 dirty: false,
1033 };
1034 Ok(Box::new(handle))
1035 }
1036
1037 async fn read(&self, path: &Path) -> io::Result<Vec<u8>> {
1038 let normalized = self.normalize(path);
1039 let meta = self.inner.fetch_metadata(&normalized)?;
1040 if meta.file_type != "file" {
1041 return Err(io::Error::other("remote path is not a file"));
1042 }
1043 if meta.hash.as_deref() == Some(MANIFEST_HASH) {
1044 self.download_sharded_file(&normalized, meta.len)
1045 } else {
1046 self.download_raw_file(&normalized, meta.len)
1047 }
1048 }
1049
1050 async fn write(&self, path: &Path, data: &[u8]) -> io::Result<()> {
1051 let normalized = self.normalize(path);
1052 self.ensure_parent_exists(path)?;
1053 self.upload_entire_file(&normalized, data)
1054 }
1055
1056 async fn remove_file(&self, path: &Path) -> io::Result<()> {
1057 let normalized = self.normalize(path);
1058 self.inner
1059 .delete_empty("/fs/file", &[("path", normalized)])?;
1060 Ok(())
1061 }
1062
1063 async fn metadata(&self, path: &Path) -> io::Result<FsMetadata> {
1064 let normalized = self.normalize(path);
1065 let resp = self.inner.fetch_metadata(&normalized)?;
1066 Ok(resp.into())
1067 }
1068
1069 async fn symlink_metadata(&self, path: &Path) -> io::Result<FsMetadata> {
1070 self.metadata(path).await
1071 }
1072
1073 async fn read_dir(&self, path: &Path) -> io::Result<Vec<DirEntry>> {
1074 let normalized = self.normalize(path);
1075 let resp = self.inner.fetch_dir(&normalized)?;
1076 Ok(resp
1077 .into_iter()
1078 .map(|entry| DirEntry {
1079 path: PathBuf::from(entry.path),
1080 file_name: entry.file_name.into(),
1081 file_type: entry.file_type.into(),
1082 })
1083 .collect())
1084 }
1085
1086 async fn canonicalize(&self, path: &Path) -> io::Result<PathBuf> {
1087 let normalized = self.normalize(path);
1088 let canonical = self.inner.canonicalize_path(&normalized)?;
1089 Ok(PathBuf::from(canonical))
1090 }
1091
1092 async fn create_dir(&self, path: &Path) -> io::Result<()> {
1093 let normalized = self.normalize(path);
1094 self.inner.post_empty(
1095 "/fs/mkdir",
1096 &CreateDirRequest {
1097 path: normalized,
1098 recursive: false,
1099 },
1100 )
1101 }
1102
1103 async fn create_dir_all(&self, path: &Path) -> io::Result<()> {
1104 let normalized = self.normalize(path);
1105 self.inner.post_empty(
1106 "/fs/mkdir",
1107 &CreateDirRequest {
1108 path: normalized,
1109 recursive: true,
1110 },
1111 )
1112 }
1113
1114 async fn remove_dir(&self, path: &Path) -> io::Result<()> {
1115 let normalized = self.normalize(path);
1116 self.inner.delete_empty(
1117 "/fs/dir",
1118 &[("path", normalized), ("recursive", "false".into())],
1119 )
1120 }
1121
1122 async fn remove_dir_all(&self, path: &Path) -> io::Result<()> {
1123 let normalized = self.normalize(path);
1124 self.inner.delete_empty(
1125 "/fs/dir",
1126 &[("path", normalized), ("recursive", "true".into())],
1127 )
1128 }
1129
1130 async fn rename(&self, from: &Path, to: &Path) -> io::Result<()> {
1131 self.inner.post_empty(
1132 "/fs/rename",
1133 &RenameRequest {
1134 from: self.normalize(from),
1135 to: self.normalize(to),
1136 },
1137 )
1138 }
1139
1140 async fn set_readonly(&self, path: &Path, readonly: bool) -> io::Result<()> {
1141 self.inner.post_empty(
1142 "/fs/set-readonly",
1143 &SetReadonlyRequest {
1144 path: self.normalize(path),
1145 readonly,
1146 },
1147 )
1148 }
1149
1150 async fn data_manifest_descriptor(
1151 &self,
1152 request: &DataManifestRequest,
1153 ) -> io::Result<DataManifestDescriptor> {
1154 let mut query = vec![("path", request.path.clone())];
1155 if let Some(version) = &request.version {
1156 query.push(("version", version.clone()));
1157 }
1158 self.inner.get_json("/data/manifest", &query)
1159 }
1160
1161 async fn data_chunk_upload_targets(
1162 &self,
1163 request: &DataChunkUploadRequest,
1164 ) -> io::Result<Vec<DataChunkUploadTarget>> {
1165 #[derive(Deserialize)]
1166 struct DataChunkUploadTargetsResponse {
1167 targets: Vec<DataChunkUploadTarget>,
1168 }
1169 let response: DataChunkUploadTargetsResponse = self
1170 .inner
1171 .post_json("/data/chunks/upload-targets", request)?;
1172 Ok(response.targets)
1173 }
1174
1175 async fn data_upload_chunk(
1176 &self,
1177 target: &DataChunkUploadTarget,
1178 data: &[u8],
1179 ) -> io::Result<()> {
1180 self.inner
1181 .put_upload_target(&target.method, &target.upload_url, &target.headers, data)
1182 }
1183}
1184
1185impl Clone for RemoteFsProvider {
1186 fn clone(&self) -> Self {
1187 Self {
1188 inner: Arc::clone(&self.inner),
1189 }
1190 }
1191}
1192
1193struct RemoteFileHandle {
1194 provider: RemoteFsProvider,
1195 path: String,
1196 data: Vec<u8>,
1197 position: usize,
1198 flags: OpenFlags,
1199 dirty: bool,
1200}
1201
1202impl RemoteFileHandle {
1203 fn flush_remote(&mut self) -> io::Result<()> {
1204 if !self.dirty {
1205 return Ok(());
1206 }
1207 self.provider.upload_entire_file(&self.path, &self.data)?;
1208 self.dirty = false;
1209 Ok(())
1210 }
1211}
1212
1213impl Read for RemoteFileHandle {
1214 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
1215 let remaining = &self.data[self.position..];
1216 let amt = remaining.len().min(buf.len());
1217 buf[..amt].copy_from_slice(&remaining[..amt]);
1218 self.position += amt;
1219 Ok(amt)
1220 }
1221}
1222
1223impl Write for RemoteFileHandle {
1224 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1225 if !self.flags.write && !self.flags.append {
1226 return Err(io::Error::new(
1227 ErrorKind::PermissionDenied,
1228 "remote file not opened for writing",
1229 ));
1230 }
1231 if self.flags.append {
1232 self.position = self.data.len();
1233 }
1234 let required = self.position + buf.len();
1235 if required > self.data.len() {
1236 self.data.resize(required, 0);
1237 }
1238 self.data[self.position..self.position + buf.len()].copy_from_slice(buf);
1239 self.position += buf.len();
1240 self.dirty = true;
1241 Ok(buf.len())
1242 }
1243
1244 fn flush(&mut self) -> io::Result<()> {
1245 self.flush_remote()
1246 }
1247}
1248
1249impl Seek for RemoteFileHandle {
1250 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
1251 let new_pos = match pos {
1252 SeekFrom::Start(offset) => offset as i64,
1253 SeekFrom::End(offset) => self.data.len() as i64 + offset,
1254 SeekFrom::Current(offset) => self.position as i64 + offset,
1255 };
1256 if new_pos < 0 {
1257 return Err(io::Error::new(ErrorKind::InvalidInput, "seek before start"));
1258 }
1259 self.position = new_pos as usize;
1260 Ok(self.position as u64)
1261 }
1262}
1263
1264#[async_trait(?Send)]
1265impl FileHandle for RemoteFileHandle {}
1266
1267impl Drop for RemoteFileHandle {
1268 fn drop(&mut self) {
1269 if self.dirty {
1270 if let Err(err) = self.provider.upload_entire_file(&self.path, &self.data) {
1271 eprintln!("remote fs flush failed: {err}");
1272 }
1273 }
1274 }
1275}
1276
1277#[derive(Debug, Deserialize)]
1278struct MetadataResponse {
1279 #[serde(rename = "fileType")]
1280 file_type: String,
1281 len: u64,
1282 #[serde(rename = "modifiedAt")]
1283 modified_at: Option<String>,
1284 readonly: bool,
1285 hash: Option<String>,
1286}
1287
1288impl From<MetadataResponse> for FsMetadata {
1289 fn from(value: MetadataResponse) -> Self {
1290 FsMetadata::new_with_hash(
1291 value.file_type.into(),
1292 value.len,
1293 parse_modified_at(value.modified_at.as_deref()),
1294 value.readonly,
1295 value.hash,
1296 )
1297 }
1298}
1299
1300#[derive(Debug, Deserialize)]
1301struct DirEntryResponse {
1302 path: String,
1303 #[serde(rename = "fileName")]
1304 file_name: String,
1305 #[serde(rename = "fileType")]
1306 file_type: String,
1307}
1308
1309impl From<String> for FsFileType {
1310 fn from(value: String) -> Self {
1311 match value.as_str() {
1312 "dir" => FsFileType::Directory,
1313 "file" => FsFileType::File,
1314 "symlink" => FsFileType::Symlink,
1315 "other" => FsFileType::Other,
1316 _ => FsFileType::Unknown,
1317 }
1318 }
1319}
1320
1321#[derive(Debug, Serialize, Deserialize)]
1322struct CreateDirRequest {
1323 path: String,
1324 recursive: bool,
1325}
1326
1327#[derive(Debug, Serialize, Deserialize)]
1328struct RenameRequest {
1329 from: String,
1330 to: String,
1331}
1332
1333#[derive(Debug, Serialize, Deserialize)]
1334struct SetReadonlyRequest {
1335 path: String,
1336 readonly: bool,
1337}
1338
1339#[derive(Debug, Deserialize)]
1340struct CanonicalizeResponse {
1341 path: String,
1342}
1343
1344#[derive(Debug, Deserialize)]
1345struct DownloadUrlResponse {
1346 #[serde(rename = "downloadUrl")]
1347 download_url: String,
1348}
1349
1350#[derive(Debug, Serialize, Deserialize)]
1351#[serde(rename_all = "camelCase")]
1352struct UploadSessionStartRequest {
1353 path: String,
1354 size_bytes: i64,
1355 content_type: Option<String>,
1356 content_sha256: String,
1357}
1358
1359#[derive(Debug, Serialize, Deserialize)]
1360#[serde(rename_all = "camelCase")]
1361struct UploadSessionStartResponse {
1362 session_id: String,
1363 blob_key: String,
1364 chunk_size_bytes: i64,
1365}
1366
1367#[derive(Debug, Clone, Serialize, Deserialize)]
1368#[serde(rename_all = "camelCase")]
1369struct UploadSessionChunkDescriptor {
1370 chunk_index: usize,
1371 offset_bytes: i64,
1372 size_bytes: i64,
1373 chunk_sha256: String,
1374}
1375
1376#[derive(Debug, Serialize, Deserialize)]
1377#[serde(rename_all = "camelCase")]
1378struct UploadSessionChunksRequest {
1379 session_id: String,
1380 blob_key: String,
1381 chunks: Vec<UploadSessionChunkDescriptor>,
1382}
1383
1384#[derive(Debug, Serialize, Deserialize)]
1385#[serde(rename_all = "camelCase")]
1386struct UploadSessionChunksResponse {
1387 targets: Vec<UploadChunkTarget>,
1388}
1389
1390#[derive(Debug, Clone, Serialize, Deserialize)]
1391#[serde(rename_all = "camelCase")]
1392struct UploadChunkTarget {
1393 chunk_index: usize,
1394 method: String,
1395 upload_url: String,
1396 headers: HashMap<String, String>,
1397}
1398
1399#[derive(Debug, Serialize, Deserialize)]
1400#[serde(rename_all = "camelCase")]
1401struct UploadSessionCompleteRequest {
1402 path: String,
1403 session_id: String,
1404 blob_key: String,
1405 size_bytes: i64,
1406 content_sha256: String,
1407 chunk_count: usize,
1408 hash: Option<String>,
1409}
1410
1411#[derive(Debug, Serialize, Deserialize)]
1412struct MultipartUploadRequest {
1413 path: String,
1414 #[serde(rename = "sizeBytes")]
1415 size_bytes: i64,
1416 #[serde(rename = "contentType")]
1417 content_type: Option<String>,
1418}
1419
1420#[derive(Debug, Deserialize)]
1421struct MultipartUploadResponse {
1422 #[serde(rename = "sessionId")]
1423 session_id: String,
1424 #[serde(rename = "blobKey")]
1425 blob_key: String,
1426 #[serde(rename = "uploadId")]
1427 upload_id: String,
1428 #[serde(rename = "partSizeBytes")]
1429 part_size_bytes: i64,
1430}
1431
1432#[derive(Debug, Serialize, Deserialize)]
1433struct MultipartUploadPartRequest {
1434 #[serde(rename = "sessionId")]
1435 session_id: String,
1436 #[serde(rename = "blobKey")]
1437 blob_key: String,
1438 #[serde(rename = "uploadId")]
1439 upload_id: String,
1440 #[serde(rename = "partNumber")]
1441 part_number: i32,
1442 #[serde(rename = "sizeBytes")]
1443 size_bytes: i64,
1444}
1445
1446#[derive(Debug, Deserialize)]
1447struct MultipartUploadPartResponse {
1448 #[serde(rename = "upload_url")]
1449 upload_url: String,
1450}
1451
1452#[derive(Debug, Serialize, Deserialize)]
1453struct MultipartUploadCompleteRequest {
1454 path: String,
1455 #[serde(rename = "sessionId")]
1456 session_id: String,
1457 #[serde(rename = "blobKey")]
1458 blob_key: String,
1459 #[serde(rename = "uploadId")]
1460 upload_id: String,
1461 #[serde(rename = "sizeBytes")]
1462 size_bytes: i64,
1463 hash: Option<String>,
1464 parts: Vec<MultipartPart>,
1465}
1466
1467#[derive(Debug, Serialize, Deserialize)]
1468struct MultipartPart {
1469 #[serde(rename = "partNumber")]
1470 part_number: i32,
1471 etag: String,
1472}
1473
1474#[derive(Debug, Serialize, Deserialize)]
1475struct ShardManifest {
1476 version: u32,
1477 total_size: u64,
1478 shard_size: u64,
1479 shards: Vec<ShardEntry>,
1480}
1481
1482#[derive(Debug, Serialize, Deserialize)]
1483struct ShardEntry {
1484 path: String,
1485 size: u64,
1486}
1487
1488#[derive(Debug, Deserialize)]
1489struct FsWriteSessionResponse {
1490 #[serde(rename = "sessionId")]
1491 _session_id: String,
1492 #[serde(rename = "nextOffset")]
1493 next_offset: i64,
1494}
1495
1496fn sha256_hex(data: &[u8]) -> String {
1497 let digest = Sha256::digest(data);
1498 let mut out = String::with_capacity(digest.len() * 2);
1499 for byte in digest {
1500 use std::fmt::Write as _;
1501 let _ = write!(out, "{byte:02x}");
1502 }
1503 out
1504}
1505
1506fn map_http_err(err: reqwest::Error) -> io::Error {
1507 io::Error::other(err)
1508}
1509
1510fn parse_modified_at(value: Option<&str>) -> Option<std::time::SystemTime> {
1511 let value = value?;
1512 let parsed = DateTime::parse_from_rfc3339(value).ok()?;
1513 let millis = parsed.timestamp_millis();
1514 if millis < 0 {
1515 return None;
1516 }
1517 Some(std::time::UNIX_EPOCH + Duration::from_millis(millis as u64))
1518}
1519
1520fn map_url_err(err: url::ParseError) -> io::Error {
1521 io::Error::new(ErrorKind::InvalidInput, err)
1522}
1523
1524fn handle_error(resp: Response) -> io::Result<Response> {
1525 let status = resp.status();
1526 if status.is_success() {
1527 return Ok(resp);
1528 }
1529 let text = resp.text().unwrap_or_else(|_| status.to_string());
1530 let kind = match status {
1531 StatusCode::NOT_FOUND => ErrorKind::NotFound,
1532 StatusCode::FORBIDDEN | StatusCode::UNAUTHORIZED => ErrorKind::PermissionDenied,
1533 StatusCode::CONFLICT => ErrorKind::AlreadyExists,
1534 StatusCode::BAD_REQUEST => ErrorKind::InvalidInput,
1535 _ => ErrorKind::Other,
1536 };
1537 Err(io::Error::new(kind, text))
1538}
1539
1540impl fmt::Debug for RemoteFsProvider {
1541 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1542 f.debug_struct("RemoteFsProvider").finish_non_exhaustive()
1543 }
1544}
1545
1546#[cfg(test)]
1547mod tests {
1548 use super::*;
1549 use crate::data_contract::DataChunkDescriptor;
1550 use axum::extract::{Query, State};
1551 use axum::http::{HeaderMap, StatusCode};
1552 use axum::routing::{delete, get, post, put};
1553 use axum::{Json, Router};
1554 use futures::executor;
1555 use serde::Deserialize;
1556 use std::collections::HashMap;
1557 use std::net::TcpListener as StdTcpListener;
1558 use std::sync::Arc;
1559 use tempfile::tempdir;
1560 use tokio::net::TcpListener as TokioTcpListener;
1561 use tokio::runtime::Runtime;
1562
1563 #[derive(Clone)]
1564 struct Harness {
1565 root: Arc<PathBuf>,
1566 _keeper: Arc<tempfile::TempDir>,
1567 base_url: Arc<Mutex<Option<String>>>,
1568 upload_sessions: Arc<Mutex<HashMap<String, UploadSessionTestState>>>,
1569 omit_last_chunk_target: bool,
1570 }
1571
1572 impl Harness {
1573 fn new() -> Self {
1574 Self::with_omit_last_chunk_target(false)
1575 }
1576
1577 fn with_omit_last_chunk_target(omit_last_chunk_target: bool) -> Self {
1578 let dir = tempdir().expect("tempdir");
1579 let path = dir.path().to_path_buf();
1580 Self {
1581 root: Arc::new(path),
1582 _keeper: Arc::new(dir),
1583 base_url: Arc::new(Mutex::new(None)),
1584 upload_sessions: Arc::new(Mutex::new(HashMap::new())),
1585 omit_last_chunk_target,
1586 }
1587 }
1588
1589 fn resolve(&self, remote_path: &str) -> PathBuf {
1590 let trimmed = remote_path.trim_start_matches('/');
1591 self.root.join(trimmed)
1592 }
1593
1594 fn virtualize(&self, path: &Path) -> String {
1595 let rel = path.strip_prefix(&*self.root).unwrap_or(path);
1596 if rel.as_os_str().is_empty() {
1597 "/".to_string()
1598 } else {
1599 format!("/{}", rel.to_string_lossy().replace('\\', "/"))
1600 }
1601 }
1602 }
1603
1604 #[derive(Clone)]
1605 struct UploadSessionTestState {
1606 path: String,
1607 chunk_count: usize,
1608 }
1609
1610 #[derive(Deserialize)]
1611 struct PathParams {
1612 path: String,
1613 offset: Option<u64>,
1614 length: Option<usize>,
1615 truncate: Option<String>,
1616 recursive: Option<String>,
1617 }
1618
1619 #[derive(Deserialize)]
1620 struct UploadChunkQuery {
1621 #[serde(rename = "sessionId")]
1622 session_id: String,
1623 #[serde(rename = "chunkIndex")]
1624 chunk_index: usize,
1625 }
1626
1627 #[derive(Deserialize)]
1628 struct DataManifestQuery {
1629 path: String,
1630 version: Option<String>,
1631 }
1632
1633 async fn metadata_handler(
1634 State(harness): State<Harness>,
1635 Query(params): Query<PathParams>,
1636 ) -> Result<Json<serde_json::Value>, StatusCode> {
1637 let meta =
1638 std::fs::metadata(harness.resolve(¶ms.path)).map_err(|_| StatusCode::NOT_FOUND)?;
1639 let file_type = if meta.is_dir() {
1640 "dir"
1641 } else if meta.is_file() {
1642 "file"
1643 } else {
1644 "other"
1645 };
1646 Ok(Json(serde_json::json!({
1647 "fileType": file_type,
1648 "len": meta.len(),
1649 "modified": meta.modified().ok().and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok()).map(|d| d.as_secs()),
1650 "readonly": meta.permissions().readonly()
1651 })))
1652 }
1653
1654 async fn dir_handler(
1655 State(harness): State<Harness>,
1656 Query(params): Query<PathParams>,
1657 ) -> Result<Json<Vec<serde_json::Value>>, StatusCode> {
1658 let mut entries = Vec::new();
1659 for entry in
1660 std::fs::read_dir(harness.resolve(¶ms.path)).map_err(|_| StatusCode::NOT_FOUND)?
1661 {
1662 let entry = entry.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1663 let file_type = entry
1664 .file_type()
1665 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1666 let kind = if file_type.is_dir() {
1667 "dir"
1668 } else if file_type.is_file() {
1669 "file"
1670 } else {
1671 "other"
1672 };
1673 entries.push(serde_json::json!({
1674 "path": harness.virtualize(&entry.path()),
1675 "fileName": entry.file_name().to_string_lossy(),
1676 "fileType": kind
1677 }));
1678 }
1679 Ok(Json(entries))
1680 }
1681
1682 async fn canonicalize_handler(
1683 State(harness): State<Harness>,
1684 Query(params): Query<PathParams>,
1685 ) -> Result<Json<serde_json::Value>, StatusCode> {
1686 let path = harness.resolve(¶ms.path);
1687 let canonical = std::fs::canonicalize(path).map_err(|_| StatusCode::NOT_FOUND)?;
1688 Ok(Json(serde_json::json!({
1689 "path": harness.virtualize(&canonical)
1690 })))
1691 }
1692
1693 async fn read_handler(
1694 State(harness): State<Harness>,
1695 Query(params): Query<PathParams>,
1696 ) -> Result<Vec<u8>, StatusCode> {
1697 let mut data =
1698 std::fs::read(harness.resolve(¶ms.path)).map_err(|_| StatusCode::NOT_FOUND)?;
1699 let offset = params.offset.unwrap_or(0) as usize;
1700 let length = params.length.unwrap_or(data.len().saturating_sub(offset));
1701 let end = std::cmp::min(offset + length, data.len());
1702 if offset < data.len() {
1703 data = data[offset..end].to_vec();
1704 } else {
1705 data.clear();
1706 }
1707 Ok(data)
1708 }
1709
1710 async fn write_handler(
1711 State(harness): State<Harness>,
1712 Query(params): Query<PathParams>,
1713 body: axum::body::Bytes,
1714 ) -> Result<(), StatusCode> {
1715 let path = harness.resolve(¶ms.path);
1716 if let Some(parent) = path.parent() {
1717 std::fs::create_dir_all(parent).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1718 }
1719 let mut data = if params.truncate.as_deref() == Some("true") || !path.exists() {
1720 Vec::new()
1721 } else {
1722 std::fs::read(&path).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
1723 };
1724 let offset = params.offset.unwrap_or(0) as usize;
1725 let required = offset + body.len();
1726 if required > data.len() {
1727 data.resize(required, 0);
1728 }
1729 data[offset..offset + body.len()].copy_from_slice(&body);
1730 std::fs::write(path, data).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1731 Ok(())
1732 }
1733
1734 async fn upload_session_start_handler(
1735 State(harness): State<Harness>,
1736 Json(req): Json<UploadSessionStartRequest>,
1737 ) -> Result<Json<UploadSessionStartResponse>, StatusCode> {
1738 let session_id = Uuid::new_v4().to_string();
1739 let chunk_size_bytes = 1024i64;
1740 harness
1741 .upload_sessions
1742 .lock()
1743 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
1744 .insert(
1745 session_id.clone(),
1746 UploadSessionTestState {
1747 path: req.path,
1748 chunk_count: 0,
1749 },
1750 );
1751 Ok(Json(UploadSessionStartResponse {
1752 session_id: session_id.clone(),
1753 blob_key: format!("test/blob/{session_id}"),
1754 chunk_size_bytes,
1755 }))
1756 }
1757
1758 async fn upload_session_chunks_handler(
1759 State(harness): State<Harness>,
1760 Json(req): Json<UploadSessionChunksRequest>,
1761 ) -> Result<Json<UploadSessionChunksResponse>, StatusCode> {
1762 let base = harness
1763 .base_url
1764 .lock()
1765 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
1766 .clone()
1767 .ok_or(StatusCode::INTERNAL_SERVER_ERROR)?;
1768 let mut sessions = harness
1769 .upload_sessions
1770 .lock()
1771 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1772 let state = sessions
1773 .get_mut(&req.session_id)
1774 .ok_or(StatusCode::NOT_FOUND)?;
1775 state.chunk_count = req.chunks.len();
1776 let mut targets = req
1777 .chunks
1778 .iter()
1779 .map(|chunk| UploadChunkTarget {
1780 chunk_index: chunk.chunk_index,
1781 method: "PUT".to_string(),
1782 upload_url: format!(
1783 "{base}/upload-chunk?sessionId={}&chunkIndex={}",
1784 req.session_id, chunk.chunk_index
1785 ),
1786 headers: HashMap::new(),
1787 })
1788 .collect::<Vec<_>>();
1789 if harness.omit_last_chunk_target && !targets.is_empty() {
1790 targets.pop();
1791 }
1792 Ok(Json(UploadSessionChunksResponse { targets }))
1793 }
1794
1795 async fn upload_chunk_handler(
1796 State(harness): State<Harness>,
1797 Query(query): Query<UploadChunkQuery>,
1798 body: axum::body::Bytes,
1799 ) -> Result<(), StatusCode> {
1800 let chunk_path = harness.resolve(&format!(
1801 "/.upload-sessions/{}/{}",
1802 query.session_id, query.chunk_index
1803 ));
1804 if let Some(parent) = chunk_path.parent() {
1805 std::fs::create_dir_all(parent).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1806 }
1807 std::fs::write(chunk_path, body).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1808 Ok(())
1809 }
1810
1811 async fn data_manifest_handler(
1812 Query(query): Query<DataManifestQuery>,
1813 ) -> Result<Json<DataManifestDescriptor>, StatusCode> {
1814 let updated_at = if query.version.as_deref() == Some("v1") {
1815 "2026-01-01T00:00:00Z"
1816 } else {
1817 "2026-02-02T00:00:00Z"
1818 };
1819 Ok(Json(DataManifestDescriptor {
1820 schema_version: 1,
1821 format: "runmat-data".to_string(),
1822 dataset_id: format!("dataset:{}", query.path),
1823 updated_at: updated_at.to_string(),
1824 txn_sequence: 9,
1825 }))
1826 }
1827
1828 async fn data_chunk_upload_targets_handler(
1829 Json(req): Json<DataChunkUploadRequest>,
1830 ) -> Result<Json<serde_json::Value>, StatusCode> {
1831 let targets = req
1832 .chunks
1833 .iter()
1834 .map(|chunk| {
1835 serde_json::json!({
1836 "key": chunk.key,
1837 "method": "PUT",
1838 "upload_url": format!("https://uploads.example.test/{}/{}", req.array, chunk.object_id),
1839 "headers": {
1840 "x-runmat-hash": chunk.hash,
1841 "x-runmat-object": chunk.object_id,
1842 }
1843 })
1844 })
1845 .collect::<Vec<_>>();
1846 Ok(Json(serde_json::json!({ "targets": targets })))
1847 }
1848
1849 async fn upload_session_complete_handler(
1850 State(harness): State<Harness>,
1851 Json(req): Json<UploadSessionCompleteRequest>,
1852 ) -> Result<(), StatusCode> {
1853 let state = harness
1854 .upload_sessions
1855 .lock()
1856 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
1857 .remove(&req.session_id)
1858 .ok_or(StatusCode::NOT_FOUND)?;
1859 let mut data = Vec::new();
1860 for chunk_index in 0..state.chunk_count {
1861 let chunk_path = harness.resolve(&format!(
1862 "/.upload-sessions/{}/{}",
1863 req.session_id, chunk_index
1864 ));
1865 let chunk = std::fs::read(chunk_path).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1866 data.extend_from_slice(&chunk);
1867 }
1868 if sha256_hex(&data) != req.content_sha256 {
1869 return Err(StatusCode::BAD_REQUEST);
1870 }
1871 let target_path = harness.resolve(&state.path);
1872 if let Some(parent) = target_path.parent() {
1873 std::fs::create_dir_all(parent).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1874 }
1875 std::fs::write(target_path, data).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1876 Ok(())
1877 }
1878
1879 async fn mkdir_handler(
1880 State(harness): State<Harness>,
1881 Json(req): Json<CreateDirRequest>,
1882 ) -> Result<(), StatusCode> {
1883 let path = harness.resolve(&req.path);
1884 if req.recursive {
1885 std::fs::create_dir_all(path).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1886 } else {
1887 std::fs::create_dir(path).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1888 }
1889 Ok(())
1890 }
1891
1892 async fn delete_file_handler(
1893 State(harness): State<Harness>,
1894 Query(params): Query<PathParams>,
1895 ) -> Result<(), StatusCode> {
1896 std::fs::remove_file(harness.resolve(¶ms.path)).map_err(|_| StatusCode::NOT_FOUND)?;
1897 Ok(())
1898 }
1899
1900 async fn delete_dir_handler(
1901 State(harness): State<Harness>,
1902 Query(params): Query<PathParams>,
1903 ) -> Result<(), StatusCode> {
1904 let recursive = params.recursive.as_deref() == Some("true");
1905 if recursive {
1906 std::fs::remove_dir_all(harness.resolve(¶ms.path))
1907 .map_err(|_| StatusCode::NOT_FOUND)?;
1908 } else {
1909 std::fs::remove_dir(harness.resolve(¶ms.path))
1910 .map_err(|_| StatusCode::NOT_FOUND)?;
1911 }
1912 Ok(())
1913 }
1914
1915 async fn rename_handler(
1916 State(harness): State<Harness>,
1917 Json(req): Json<RenameRequest>,
1918 ) -> Result<(), StatusCode> {
1919 let from = harness.resolve(&req.from);
1920 let to = harness.resolve(&req.to);
1921 if let Some(parent) = to.parent() {
1922 std::fs::create_dir_all(parent).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1923 }
1924 std::fs::rename(from, to).map_err(|_| StatusCode::NOT_FOUND)?;
1925 Ok(())
1926 }
1927
1928 async fn set_readonly_handler(
1929 State(harness): State<Harness>,
1930 Json(req): Json<SetReadonlyRequest>,
1931 ) -> Result<(), StatusCode> {
1932 let path = harness.resolve(&req.path);
1933 let mut perms = std::fs::metadata(&path)
1934 .map_err(|_| StatusCode::NOT_FOUND)?
1935 .permissions();
1936 perms.set_readonly(req.readonly);
1937 std::fs::set_permissions(path, perms).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1938 Ok(())
1939 }
1940
1941 async fn read_with_download_handler(
1942 State(harness): State<Harness>,
1943 Query(params): Query<PathParams>,
1944 ) -> Result<Json<serde_json::Value>, StatusCode> {
1945 let base = harness
1946 .base_url
1947 .lock()
1948 .unwrap()
1949 .clone()
1950 .ok_or(StatusCode::INTERNAL_SERVER_ERROR)?;
1951 let download_url = format!("{base}/download?path={}", params.path);
1952 Ok(Json(serde_json::json!({
1953 "downloadUrl": download_url,
1954 "expiresAt": 0
1955 })))
1956 }
1957
1958 async fn download_handler(
1959 State(harness): State<Harness>,
1960 Query(params): Query<PathParams>,
1961 headers: HeaderMap,
1962 ) -> Result<Vec<u8>, StatusCode> {
1963 let mut data =
1964 std::fs::read(harness.resolve(¶ms.path)).map_err(|_| StatusCode::NOT_FOUND)?;
1965 if let Some(range) = headers.get("range").and_then(|value| value.to_str().ok()) {
1966 if let Some((start, end)) = parse_range(range) {
1967 let start = start.min(data.len());
1968 let end = end.min(data.len().saturating_sub(1));
1969 if start < data.len() {
1970 data = data[start..=end].to_vec();
1971 } else {
1972 data.clear();
1973 }
1974 }
1975 }
1976 Ok(data)
1977 }
1978
1979 fn parse_range(value: &str) -> Option<(usize, usize)> {
1980 let value = value.strip_prefix("bytes=")?;
1981 let (start, end) = value.split_once('-')?;
1982 let start = start.parse::<usize>().ok()?;
1983 let end = end.parse::<usize>().ok()?;
1984 Some((start, end))
1985 }
1986
1987 fn spawn_server() -> (String, Harness, Runtime) {
1988 let harness = Harness::new();
1989 let router = Router::new()
1990 .route("/fs/metadata", get(metadata_handler))
1991 .route("/fs/dir", get(dir_handler).delete(delete_dir_handler))
1992 .route("/fs/file", delete(delete_file_handler))
1993 .route("/fs/canonicalize", get(canonicalize_handler))
1994 .route("/fs/read", get(read_handler))
1995 .route("/fs/write", put(write_handler))
1996 .route(
1997 "/fs/upload-session/start",
1998 post(upload_session_start_handler),
1999 )
2000 .route(
2001 "/fs/upload-session/chunks",
2002 post(upload_session_chunks_handler),
2003 )
2004 .route(
2005 "/fs/upload-session/complete",
2006 post(upload_session_complete_handler),
2007 )
2008 .route("/fs/mkdir", post(mkdir_handler))
2009 .route("/fs/rename", post(rename_handler))
2010 .route("/fs/set-readonly", post(set_readonly_handler))
2011 .route("/data/manifest", get(data_manifest_handler))
2012 .route(
2013 "/data/chunks/upload-targets",
2014 post(data_chunk_upload_targets_handler),
2015 )
2016 .route("/upload-chunk", put(upload_chunk_handler))
2017 .with_state(harness.clone());
2018 let std_listener = StdTcpListener::bind("127.0.0.1:0").unwrap();
2019 std_listener.set_nonblocking(true).expect("nonblocking");
2020 let addr = std_listener.local_addr().unwrap();
2021 let service = router.into_make_service();
2022 let rt = Runtime::new().unwrap();
2023 let base = format!("http://{addr}");
2024 *harness.base_url.lock().unwrap() = Some(base.clone());
2025 rt.spawn(async move {
2026 let listener = TokioTcpListener::from_std(std_listener).unwrap();
2027 axum::serve(listener, service).await.unwrap();
2028 });
2029 (base, harness, rt)
2030 }
2031
2032 fn spawn_server_with_download_url() -> (String, Harness, Runtime) {
2033 let harness = Harness::new();
2034 let router = Router::new()
2035 .route("/fs/metadata", get(metadata_handler))
2036 .route("/fs/dir", get(dir_handler).delete(delete_dir_handler))
2037 .route("/fs/file", delete(delete_file_handler))
2038 .route("/fs/canonicalize", get(canonicalize_handler))
2039 .route("/fs/read", get(read_with_download_handler))
2040 .route("/fs/write", put(write_handler))
2041 .route(
2042 "/fs/upload-session/start",
2043 post(upload_session_start_handler),
2044 )
2045 .route(
2046 "/fs/upload-session/chunks",
2047 post(upload_session_chunks_handler),
2048 )
2049 .route(
2050 "/fs/upload-session/complete",
2051 post(upload_session_complete_handler),
2052 )
2053 .route("/fs/mkdir", post(mkdir_handler))
2054 .route("/fs/rename", post(rename_handler))
2055 .route("/fs/set-readonly", post(set_readonly_handler))
2056 .route("/download", get(download_handler))
2057 .route("/data/manifest", get(data_manifest_handler))
2058 .route(
2059 "/data/chunks/upload-targets",
2060 post(data_chunk_upload_targets_handler),
2061 )
2062 .route("/upload-chunk", put(upload_chunk_handler))
2063 .with_state(harness.clone());
2064 let std_listener = StdTcpListener::bind("127.0.0.1:0").unwrap();
2065 std_listener.set_nonblocking(true).expect("nonblocking");
2066 let addr = std_listener.local_addr().unwrap();
2067 let service = router.into_make_service();
2068 let rt = Runtime::new().unwrap();
2069 let base = format!("http://{addr}");
2070 *harness.base_url.lock().unwrap() = Some(base.clone());
2071 rt.spawn(async move {
2072 let listener = TokioTcpListener::from_std(std_listener).unwrap();
2073 axum::serve(listener, service).await.unwrap();
2074 });
2075 (base, harness, rt)
2076 }
2077
2078 fn spawn_server_with_missing_chunk_target() -> (String, Harness, Runtime) {
2079 let harness = Harness::with_omit_last_chunk_target(true);
2080 let router = Router::new()
2081 .route("/fs/metadata", get(metadata_handler))
2082 .route("/fs/dir", get(dir_handler).delete(delete_dir_handler))
2083 .route("/fs/file", delete(delete_file_handler))
2084 .route("/fs/canonicalize", get(canonicalize_handler))
2085 .route("/fs/read", get(read_handler))
2086 .route("/fs/write", put(write_handler))
2087 .route(
2088 "/fs/upload-session/start",
2089 post(upload_session_start_handler),
2090 )
2091 .route(
2092 "/fs/upload-session/chunks",
2093 post(upload_session_chunks_handler),
2094 )
2095 .route(
2096 "/fs/upload-session/complete",
2097 post(upload_session_complete_handler),
2098 )
2099 .route("/fs/mkdir", post(mkdir_handler))
2100 .route("/fs/rename", post(rename_handler))
2101 .route("/fs/set-readonly", post(set_readonly_handler))
2102 .route("/data/manifest", get(data_manifest_handler))
2103 .route(
2104 "/data/chunks/upload-targets",
2105 post(data_chunk_upload_targets_handler),
2106 )
2107 .route("/upload-chunk", put(upload_chunk_handler))
2108 .with_state(harness.clone());
2109 let std_listener = StdTcpListener::bind("127.0.0.1:0").unwrap();
2110 std_listener.set_nonblocking(true).expect("nonblocking");
2111 let addr = std_listener.local_addr().unwrap();
2112 let service = router.into_make_service();
2113 let rt = Runtime::new().unwrap();
2114 let base = format!("http://{addr}");
2115 *harness.base_url.lock().unwrap() = Some(base.clone());
2116 rt.spawn(async move {
2117 let listener = TokioTcpListener::from_std(std_listener).unwrap();
2118 axum::serve(listener, service).await.unwrap();
2119 });
2120 (base, harness, rt)
2121 }
2122
2123 #[test]
2124 fn remote_provider_roundtrip() {
2125 let (base, harness, _rt) = spawn_server();
2126 let provider = RemoteFsProvider::new(RemoteFsConfig {
2127 base_url: base,
2128 auth_token: None,
2129 chunk_bytes: 1024,
2130 parallel_requests: 4,
2131 direct_read_threshold_bytes: u64::MAX,
2132 direct_write_threshold_bytes: 1024,
2133 timeout: Duration::from_secs(30),
2134 ..RemoteFsConfig::default()
2135 })
2136 .expect("provider");
2137
2138 let data = (0..16_384u32)
2139 .flat_map(|v| v.to_le_bytes())
2140 .collect::<Vec<_>>();
2141 executor::block_on(provider.write(Path::new("/reports/data.bin"), &data)).expect("write");
2142 let read_back =
2143 executor::block_on(provider.read(Path::new("/reports/data.bin"))).expect("read");
2144 assert_eq!(data, read_back);
2145 executor::block_on(provider.remove_file(Path::new("/reports/data.bin"))).expect("remove");
2146 assert!(!harness.resolve("/reports/data.bin").exists());
2147 }
2148
2149 #[test]
2150 fn remote_provider_download_url_read() {
2151 let (base, harness, _rt) = spawn_server_with_download_url();
2152 let provider = RemoteFsProvider::new(RemoteFsConfig {
2153 base_url: base,
2154 auth_token: None,
2155 chunk_bytes: 128,
2156 parallel_requests: 2,
2157 direct_read_threshold_bytes: u64::MAX,
2158 timeout: Duration::from_secs(30),
2159 ..RemoteFsConfig::default()
2160 })
2161 .expect("provider");
2162
2163 let data = (0..1024u32)
2164 .flat_map(|v| v.to_le_bytes())
2165 .collect::<Vec<_>>();
2166 std::fs::create_dir_all(harness.resolve("/reports")).expect("mkdir");
2167 std::fs::write(harness.resolve("/reports/data.bin"), &data).expect("write");
2168
2169 let read_back =
2170 executor::block_on(provider.read(Path::new("/reports/data.bin"))).expect("read");
2171 assert_eq!(data, read_back);
2172 }
2173
2174 #[test]
2175 fn remote_provider_errors_when_chunk_target_is_missing() {
2176 let (base, _harness, _rt) = spawn_server_with_missing_chunk_target();
2177 let provider = RemoteFsProvider::new(RemoteFsConfig {
2178 base_url: base,
2179 auth_token: None,
2180 chunk_bytes: 128,
2181 parallel_requests: 2,
2182 direct_read_threshold_bytes: u64::MAX,
2183 direct_write_threshold_bytes: 128,
2184 timeout: Duration::from_secs(30),
2185 ..RemoteFsConfig::default()
2186 })
2187 .expect("provider");
2188
2189 let data = vec![7u8; 1024];
2190 let err =
2191 executor::block_on(provider.write(Path::new("/reports/missing-target.bin"), &data))
2192 .expect_err("write should fail when chunk target is missing");
2193 assert_eq!(err.kind(), io::ErrorKind::Other);
2194 }
2195
2196 #[test]
2197 fn remote_provider_data_manifest_descriptor_fetches_descriptor() {
2198 let (base, _harness, _rt) = spawn_server();
2199 let provider = RemoteFsProvider::new(RemoteFsConfig {
2200 base_url: base,
2201 auth_token: None,
2202 timeout: Duration::from_secs(30),
2203 ..RemoteFsConfig::default()
2204 })
2205 .expect("provider");
2206
2207 let descriptor = provider
2208 .data_manifest_descriptor(&DataManifestRequest {
2209 path: "/datasets/alpha.data".to_string(),
2210 version: Some("v1".to_string()),
2211 })
2212 .expect("manifest descriptor");
2213 assert_eq!(descriptor.schema_version, 1);
2214 assert_eq!(descriptor.format, "runmat-data");
2215 assert_eq!(descriptor.dataset_id, "dataset:/datasets/alpha.data");
2216 assert_eq!(descriptor.updated_at, "2026-01-01T00:00:00Z");
2217 assert_eq!(descriptor.txn_sequence, 9);
2218 }
2219
2220 #[test]
2221 fn remote_provider_data_chunk_upload_targets_fetches_targets() {
2222 let (base, _harness, _rt) = spawn_server();
2223 let provider = RemoteFsProvider::new(RemoteFsConfig {
2224 base_url: base,
2225 auth_token: None,
2226 timeout: Duration::from_secs(30),
2227 ..RemoteFsConfig::default()
2228 })
2229 .expect("provider");
2230
2231 let targets = provider
2232 .data_chunk_upload_targets(&DataChunkUploadRequest {
2233 dataset_path: "/datasets/alpha.data".to_string(),
2234 array: "X".to_string(),
2235 chunks: vec![DataChunkDescriptor {
2236 key: "X/0-0".to_string(),
2237 object_id: "obj_0".to_string(),
2238 hash: "sha256:abc".to_string(),
2239 bytes_raw: 10,
2240 bytes_stored: 8,
2241 }],
2242 })
2243 .expect("chunk upload targets");
2244
2245 assert_eq!(targets.len(), 1);
2246 assert_eq!(targets[0].key, "X/0-0");
2247 assert_eq!(targets[0].method, "PUT");
2248 assert_eq!(
2249 targets[0].upload_url,
2250 "https://uploads.example.test/X/obj_0"
2251 );
2252 let headers = &targets[0].headers;
2253 assert_eq!(headers.len(), 2);
2254 assert_eq!(
2255 headers.get("x-runmat-hash").map(String::as_str),
2256 Some("sha256:abc")
2257 );
2258 assert_eq!(
2259 headers.get("x-runmat-object").map(String::as_str),
2260 Some("obj_0")
2261 );
2262 }
2263}