1use crate::data_contract::{
2 DataChunkUploadRequest, DataChunkUploadTarget, DataManifestDescriptor, DataManifestRequest,
3};
4use crate::{DirEntry, FileHandle, FsFileType, FsMetadata, FsProvider, OpenFlags};
5use async_trait::async_trait;
6use chrono::DateTime;
7use crossbeam_utils::thread;
8use once_cell::sync::Lazy;
9use reqwest::blocking::{Client, Response};
10use reqwest::StatusCode;
11use serde::{Deserialize, Serialize};
12use sha2::{Digest, Sha256};
13use std::collections::HashMap;
14use std::collections::VecDeque;
15use std::fmt;
16use std::io::{self, ErrorKind, Read, Seek, SeekFrom, Write};
17use std::path::{Path, PathBuf};
18use std::sync::{Arc, Mutex};
19use std::time::Duration;
20use url::Url;
21use uuid::Uuid;
22
23const MANIFEST_HASH: &str = "manifest:v1";
24const SHARD_PREFIX: &str = "/.runmat/shards";
25
26static USER_AGENT: Lazy<String> = Lazy::new(|| {
27 format!(
28 "runmat-remote-fs/{} ({})",
29 env!("CARGO_PKG_VERSION"),
30 std::env::consts::OS
31 )
32});
33
34#[derive(Clone, Debug)]
35pub struct RemoteFsConfig {
36 pub base_url: String,
37 pub auth_token: Option<String>,
38 pub chunk_bytes: usize,
39 pub parallel_requests: usize,
40 pub direct_read_threshold_bytes: u64,
41 pub direct_write_threshold_bytes: u64,
42 pub shard_threshold_bytes: u64,
43 pub shard_size_bytes: u64,
44 pub timeout: Duration,
45 pub retry_max_attempts: usize,
46 pub retry_base_delay: Duration,
47 pub retry_max_delay: Duration,
48}
49
50impl Default for RemoteFsConfig {
51 fn default() -> Self {
52 Self {
53 base_url: String::new(),
54 auth_token: None,
55 chunk_bytes: 16 * 1024 * 1024,
56 parallel_requests: 4,
57 direct_read_threshold_bytes: 64 * 1024 * 1024,
58 direct_write_threshold_bytes: 64 * 1024 * 1024,
59 shard_threshold_bytes: 4 * 1024 * 1024 * 1024,
60 shard_size_bytes: 512 * 1024 * 1024,
61 timeout: Duration::from_secs(120),
62 retry_max_attempts: 5,
63 retry_base_delay: Duration::from_millis(100),
64 retry_max_delay: Duration::from_secs(2),
65 }
66 }
67}
68
69struct RemoteInner {
70 client: Client,
71 base: Url,
72 auth_header: Option<String>,
73 chunk_bytes: usize,
74 parallel_requests: usize,
75 direct_read_threshold_bytes: u64,
76 direct_write_threshold_bytes: u64,
77 shard_threshold_bytes: u64,
78 shard_size_bytes: u64,
79 retry_max_attempts: usize,
80 retry_base_delay: Duration,
81 retry_max_delay: Duration,
82}
83
84struct FsWriteChunkRequest<'a> {
85 path: &'a str,
86 offset: u64,
87 truncate: bool,
88 final_chunk: bool,
89 create_new: bool,
90 data: &'a [u8],
91 hash: Option<&'a str>,
92}
93
94struct UploadSessionCompleteParams<'a> {
95 path: &'a str,
96 session_id: &'a str,
97 blob_key: &'a str,
98 size_bytes: u64,
99 content_sha256: &'a str,
100 chunk_count: usize,
101 create_new: bool,
102}
103
104struct MultipartCompleteParams {
105 path: String,
106 session_id: String,
107 blob_key: String,
108 upload_id: String,
109 size_bytes: u64,
110 create_new: bool,
111 parts: Vec<MultipartPart>,
112}
113
114impl RemoteInner {
115 fn new(config: RemoteFsConfig) -> io::Result<Self> {
116 if config.base_url.is_empty() {
117 return Err(io::Error::new(
118 ErrorKind::InvalidInput,
119 "RemoteFsConfig.base_url must be provided",
120 ));
121 }
122 let base = Url::parse(&config.base_url).map_err(map_url_err)?;
123 let client = Client::builder()
124 .timeout(config.timeout)
125 .user_agent(USER_AGENT.clone())
126 .build()
127 .map_err(map_http_err)?;
128 Ok(Self {
129 client,
130 base,
131 auth_header: config.auth_token.map(|token| format!("Bearer {token}")),
132 chunk_bytes: config.chunk_bytes.max(64 * 1024),
133 parallel_requests: config.parallel_requests.max(1),
134 direct_read_threshold_bytes: config.direct_read_threshold_bytes,
135 direct_write_threshold_bytes: config.direct_write_threshold_bytes,
136 shard_threshold_bytes: config.shard_threshold_bytes,
137 shard_size_bytes: config.shard_size_bytes.max(8 * 1024 * 1024),
138 retry_max_attempts: config.retry_max_attempts.max(1),
139 retry_base_delay: config.retry_base_delay,
140 retry_max_delay: config.retry_max_delay,
141 })
142 }
143
144 fn should_retry(&self, status: StatusCode) -> bool {
145 matches!(
146 status,
147 StatusCode::TOO_MANY_REQUESTS
148 | StatusCode::INTERNAL_SERVER_ERROR
149 | StatusCode::BAD_GATEWAY
150 | StatusCode::SERVICE_UNAVAILABLE
151 | StatusCode::GATEWAY_TIMEOUT
152 )
153 }
154
155 fn retry_delay(&self, attempt: usize) -> Duration {
156 let factor = 1u64.checked_shl(attempt as u32).unwrap_or(u64::MAX);
157 let delay = self.retry_base_delay.as_millis() as u64 * factor;
158 let capped = delay.min(self.retry_max_delay.as_millis() as u64);
159 Duration::from_millis(capped)
160 }
161
162 fn send_with_retry(
163 &self,
164 method: reqwest::Method,
165 route: &str,
166 query: &[(&str, String)],
167 ) -> io::Result<Response> {
168 for attempt in 0..self.retry_max_attempts {
169 let resp = self
170 .request(method.clone(), route, query)
171 .send()
172 .map_err(map_http_err)?;
173 if !self.should_retry(resp.status()) || attempt + 1 == self.retry_max_attempts {
174 return Ok(resp);
175 }
176 std::thread::sleep(self.retry_delay(attempt));
177 }
178 Err(io::Error::other("request retries exhausted"))
179 }
180
181 fn get_url_with_retry(&self, url: &str, range: Option<String>) -> io::Result<Response> {
182 for attempt in 0..self.retry_max_attempts {
183 let mut request = self.client.get(url);
184 if let Some(range) = &range {
185 request = request.header(reqwest::header::RANGE, range);
186 }
187 let resp = request.send().map_err(map_http_err)?;
188 if !self.should_retry(resp.status()) || attempt + 1 == self.retry_max_attempts {
189 return Ok(resp);
190 }
191 std::thread::sleep(self.retry_delay(attempt));
192 }
193 Err(io::Error::other("request retries exhausted"))
194 }
195
196 fn endpoint(&self, route: &str) -> Url {
197 self.base
198 .join(route.trim_start_matches('/'))
199 .expect("failed to join remote route")
200 }
201
202 fn request(
203 &self,
204 method: reqwest::Method,
205 route: &str,
206 query: &[(&str, String)],
207 ) -> reqwest::blocking::RequestBuilder {
208 let mut url = self.endpoint(route);
209 {
210 let mut pairs = url.query_pairs_mut();
211 for (k, v) in query {
212 pairs.append_pair(k, v);
213 }
214 }
215 let mut builder = self.client.request(method, url);
216 if let Some(auth) = &self.auth_header {
217 builder = builder.header("Authorization", auth);
218 }
219 builder
220 }
221
222 fn get_json<T: for<'a> Deserialize<'a>>(
223 &self,
224 route: &str,
225 query: &[(&str, String)],
226 ) -> io::Result<T> {
227 let resp = self.send_with_retry(reqwest::Method::GET, route, query)?;
228 handle_error(resp)?.json().map_err(map_http_err)
229 }
230
231 fn post_empty<B: Serialize>(&self, route: &str, body: &B) -> io::Result<()> {
232 let resp = self
233 .request(reqwest::Method::POST, route, &[])
234 .json(body)
235 .send()
236 .map_err(map_http_err)?;
237 handle_error(resp)?;
238 Ok(())
239 }
240
241 fn post_json<T: for<'a> Deserialize<'a>, B: Serialize>(
242 &self,
243 route: &str,
244 body: &B,
245 ) -> io::Result<T> {
246 let resp = self
247 .request(reqwest::Method::POST, route, &[])
248 .json(body)
249 .send()
250 .map_err(map_http_err)?;
251 handle_error(resp)?.json().map_err(map_http_err)
252 }
253
254 fn delete_empty(&self, route: &str, query: &[(&str, String)]) -> io::Result<()> {
255 let resp = self.send_with_retry(reqwest::Method::DELETE, route, query)?;
256 handle_error(resp)?;
257 Ok(())
258 }
259
260 fn download_chunk(&self, path: &str, offset: u64, length: usize) -> io::Result<Vec<u8>> {
261 let resp = self.send_with_retry(
262 reqwest::Method::GET,
263 "/fs/read",
264 &[
265 ("path", path.to_string()),
266 ("offset", offset.to_string()),
267 ("length", length.to_string()),
268 ],
269 )?;
270 let mut body = handle_error(resp)?;
271 if let Some(content_type) = body.headers().get(reqwest::header::CONTENT_TYPE) {
272 if content_type
273 .to_str()
274 .map(|value| value.contains("application/json"))
275 .unwrap_or(false)
276 {
277 let json: DownloadUrlResponse = body.json().map_err(map_http_err)?;
278 return self.download_range_from_url(&json.download_url, offset, length as u64);
279 }
280 }
281 let mut buf = Vec::with_capacity(length);
282 body.copy_to(&mut buf).map_err(map_http_err)?;
283 Ok(buf)
284 }
285
286 fn download_range_from_url(&self, url: &str, offset: u64, length: u64) -> io::Result<Vec<u8>> {
287 if length == 0 {
288 return Ok(Vec::new());
289 }
290 let end = offset + length - 1;
291 let resp = self.get_url_with_retry(url, Some(format!("bytes={offset}-{end}")))?;
292 let mut body = handle_error(resp)?;
293 let mut buf = Vec::with_capacity(length as usize);
294 body.copy_to(&mut buf).map_err(map_http_err)?;
295 Ok(buf)
296 }
297
298 fn fetch_download_url(&self, path: &str) -> io::Result<DownloadUrlResponse> {
299 self.get_json("/fs/download-url", &[("path", path.to_string())])
300 }
301
302 fn upload_chunk(
303 &self,
304 request: FsWriteChunkRequest<'_>,
305 ) -> io::Result<Option<FsWriteSessionResponse>> {
306 let mut query = vec![
307 ("path", request.path.to_string()),
308 ("offset", request.offset.to_string()),
309 ];
310 if request.truncate {
311 query.push(("truncate", "true".to_string()));
312 }
313 if request.final_chunk {
314 query.push(("final", "true".to_string()));
315 }
316 if request.create_new {
317 query.push(("createNew", "true".to_string()));
318 }
319 if let Some(hash) = request.hash {
320 query.push(("hash", hash.to_string()));
321 }
322 let resp = self
323 .request(reqwest::Method::PUT, "/fs/write", &query)
324 .body(request.data.to_vec())
325 .send()
326 .map_err(map_http_err)?;
327 if resp.status() == StatusCode::ACCEPTED {
328 let session = handle_error(resp)?.json().map_err(map_http_err)?;
329 return Ok(Some(session));
330 }
331 handle_error(resp)?;
332 Ok(None)
333 }
334
335 fn multipart_create(&self, path: &str, size_bytes: u64) -> io::Result<MultipartUploadResponse> {
336 self.post_json(
337 "/fs/multipart-upload",
338 &MultipartUploadRequest {
339 path: path.to_string(),
340 size_bytes: size_bytes as i64,
341 content_type: None,
342 },
343 )
344 }
345
346 fn upload_session_start(
347 &self,
348 path: &str,
349 size_bytes: u64,
350 content_sha256: &str,
351 ) -> io::Result<UploadSessionStartResponse> {
352 self.post_json(
353 "/fs/upload-session/start",
354 &UploadSessionStartRequest {
355 path: path.to_string(),
356 size_bytes: size_bytes as i64,
357 content_type: None,
358 content_sha256: content_sha256.to_string(),
359 },
360 )
361 }
362
363 fn upload_session_chunks(
364 &self,
365 session_id: &str,
366 blob_key: &str,
367 chunks: &[UploadSessionChunkDescriptor],
368 ) -> io::Result<UploadSessionChunksResponse> {
369 self.post_json(
370 "/fs/upload-session/chunks",
371 &UploadSessionChunksRequest {
372 session_id: session_id.to_string(),
373 blob_key: blob_key.to_string(),
374 chunks: chunks.to_vec(),
375 },
376 )
377 }
378
379 fn upload_session_complete(&self, params: UploadSessionCompleteParams<'_>) -> io::Result<()> {
380 self.post_empty(
381 "/fs/upload-session/complete",
382 &UploadSessionCompleteRequest {
383 path: params.path.to_string(),
384 session_id: params.session_id.to_string(),
385 blob_key: params.blob_key.to_string(),
386 size_bytes: params.size_bytes as i64,
387 content_sha256: params.content_sha256.to_string(),
388 chunk_count: params.chunk_count,
389 create_new: params.create_new,
390 hash: None,
391 },
392 )
393 }
394
395 fn multipart_presign_part(
396 &self,
397 session_id: &str,
398 blob_key: &str,
399 upload_id: &str,
400 part_number: i32,
401 size_bytes: u64,
402 ) -> io::Result<String> {
403 let response: MultipartUploadPartResponse = self.post_json(
404 "/fs/multipart-upload/part",
405 &MultipartUploadPartRequest {
406 session_id: session_id.to_string(),
407 blob_key: blob_key.to_string(),
408 upload_id: upload_id.to_string(),
409 part_number,
410 size_bytes: size_bytes as i64,
411 },
412 )?;
413 Ok(response.upload_url)
414 }
415
416 fn multipart_complete(&self, params: MultipartCompleteParams) -> io::Result<()> {
417 let resp = self
418 .request(reqwest::Method::POST, "/fs/multipart-upload/complete", &[])
419 .json(&MultipartUploadCompleteRequest {
420 path: params.path,
421 session_id: params.session_id,
422 blob_key: params.blob_key,
423 upload_id: params.upload_id,
424 size_bytes: params.size_bytes as i64,
425 create_new: params.create_new,
426 hash: None,
427 parts: params.parts,
428 })
429 .send()
430 .map_err(map_http_err)?;
431 handle_error(resp)?;
432 Ok(())
433 }
434
435 fn put_upload_url_with_etag(&self, url: &str, data: &[u8]) -> io::Result<String> {
436 let resp = self
437 .client
438 .put(url)
439 .body(data.to_vec())
440 .send()
441 .map_err(map_http_err)?;
442 let resp = handle_error(resp)?;
443 let etag = resp
444 .headers()
445 .get(reqwest::header::ETAG)
446 .and_then(|value| value.to_str().ok())
447 .unwrap_or("")
448 .to_string();
449 Ok(etag)
450 }
451
452 fn put_upload_target(
453 &self,
454 method: &str,
455 url: &str,
456 headers: &HashMap<String, String>,
457 data: &[u8],
458 ) -> io::Result<()> {
459 let method = reqwest::Method::from_bytes(method.as_bytes()).map_err(|err| {
460 io::Error::new(
461 ErrorKind::InvalidInput,
462 format!("invalid upload method `{method}`: {err}"),
463 )
464 })?;
465 let mut request = self.client.request(method, url).body(data.to_vec());
466 for (name, value) in headers {
467 request = request.header(name, value);
468 }
469 let resp = request.send().map_err(map_http_err)?;
470 handle_error(resp)?;
471 Ok(())
472 }
473
474 fn fetch_metadata(&self, path: &str) -> io::Result<MetadataResponse> {
475 self.get_json("/fs/metadata", &[("path", path.to_string())])
476 }
477
478 fn fetch_dir(&self, path: &str) -> io::Result<Vec<DirEntryResponse>> {
479 self.get_json("/fs/dir", &[("path", path.to_string())])
480 }
481
482 fn canonicalize_path(&self, path: &str) -> io::Result<String> {
483 let resp: CanonicalizeResponse =
484 self.get_json("/fs/canonicalize", &[("path", path.to_string())])?;
485 Ok(resp.path)
486 }
487}
488
489pub struct RemoteFsProvider {
490 inner: Arc<RemoteInner>,
491}
492
493impl RemoteFsProvider {
494 pub fn new(config: RemoteFsConfig) -> io::Result<Self> {
495 Ok(Self {
496 inner: Arc::new(RemoteInner::new(config)?),
497 })
498 }
499
500 pub fn data_manifest_descriptor(
502 &self,
503 request: &DataManifestRequest,
504 ) -> io::Result<DataManifestDescriptor> {
505 let mut query = vec![("path", request.path.clone())];
506 if let Some(version) = &request.version {
507 query.push(("version", version.clone()));
508 }
509 self.inner.get_json("/data/manifest", &query)
510 }
511
512 pub fn data_chunk_upload_targets(
514 &self,
515 request: &DataChunkUploadRequest,
516 ) -> io::Result<Vec<DataChunkUploadTarget>> {
517 #[derive(Deserialize)]
518 struct DataChunkUploadTargetsResponse {
519 targets: Vec<DataChunkUploadTarget>,
520 }
521 let response: DataChunkUploadTargetsResponse = self
522 .inner
523 .post_json("/data/chunks/upload-targets", request)?;
524 Ok(response.targets)
525 }
526
527 fn normalize(&self, path: &Path) -> String {
528 let mut normalized = PathBuf::new();
529 normalized.push("/");
530 normalized.push(path);
531 normalized.to_string_lossy().replace('\\', "/").to_string()
532 }
533
534 fn ensure_parent_exists(&self, path: &Path) -> io::Result<()> {
535 if let Some(parent) = path.parent() {
536 self.inner.post_empty(
537 "/fs/mkdir",
538 &CreateDirRequest {
539 path: self.normalize(parent),
540 recursive: true,
541 },
542 )?;
543 }
544 Ok(())
545 }
546
547 fn download_raw_file(&self, path: &str, len: u64) -> io::Result<Vec<u8>> {
548 if len == 0 {
549 return Ok(Vec::new());
550 }
551 if len >= self.inner.direct_read_threshold_bytes {
552 let url = self.inner.fetch_download_url(path)?.download_url;
553 return self.download_entire_file_from_url(&url, len);
554 }
555 let chunk = self.inner.chunk_bytes as u64;
556 let mut tasks = Vec::new();
557 let mut offset = 0;
558 let mut index = 0;
559 while offset < len {
560 let remaining = len - offset;
561 let length = std::cmp::min(chunk, remaining);
562 tasks.push(ChunkTask {
563 offset,
564 length: length as usize,
565 index,
566 });
567 offset += length;
568 index += 1;
569 }
570 let mut buffer = vec![0u8; len as usize];
571 let path = path.to_string();
572 let inner = self.inner.clone();
573 let queue = Arc::new(Mutex::new(VecDeque::from(tasks.clone())));
574 let results = Arc::new(Mutex::new(vec![None::<Vec<u8>>; tasks.len()]));
575 let error: Arc<Mutex<Option<io::Error>>> = Arc::new(Mutex::new(None));
576 let threads = inner
577 .parallel_requests
578 .min(queue.lock().unwrap().len())
579 .max(1);
580 thread::scope(|scope| {
581 for _ in 0..threads {
582 let queue = queue.clone();
583 let error = error.clone();
584 let results = results.clone();
585 let inner = inner.clone();
586 let path = path.clone();
587 scope.spawn(move |_| loop {
588 let task_opt = {
589 let mut guard = queue.lock().unwrap();
590 guard.pop_front()
591 };
592 let task = match task_opt {
593 Some(task) => task,
594 None => break,
595 };
596 match inner.download_chunk(&path, task.offset, task.length) {
597 Ok(bytes) => {
598 let mut guard = results.lock().unwrap();
599 guard[task.index] = Some(bytes);
600 }
601 Err(err) => {
602 let mut guard = error.lock().unwrap();
603 if guard.is_none() {
604 *guard = Some(err);
605 }
606 break;
607 }
608 }
609 });
610 }
611 })
612 .expect("remote download scope");
613 if let Some(err) = error.lock().unwrap().take() {
614 return Err(err);
615 }
616 let chunks = Arc::try_unwrap(results)
617 .expect("results still in use")
618 .into_inner()
619 .expect("results poisoned");
620 for (task, maybe) in tasks.iter().zip(chunks.into_iter()) {
621 let bytes = maybe.expect("missing downloaded chunk for remote file");
622 let start = task.offset as usize;
623 buffer[start..start + bytes.len()].copy_from_slice(&bytes);
624 }
625 Ok(buffer)
626 }
627
628 fn download_sharded_file(&self, path: &str, len: u64) -> io::Result<Vec<u8>> {
629 let manifest_bytes = self.download_raw_file(path, len)?;
630 let manifest: ShardManifest = serde_json::from_slice(&manifest_bytes)
631 .map_err(|_| io::Error::new(ErrorKind::InvalidData, "invalid shard manifest"))?;
632 if manifest.version != 1 {
633 return Err(io::Error::new(
634 ErrorKind::InvalidData,
635 "unsupported manifest",
636 ));
637 }
638 let mut buffer = Vec::with_capacity(manifest.total_size as usize);
639 for shard in manifest.shards {
640 let meta = self.inner.fetch_metadata(&shard.path)?;
641 let bytes = self.download_raw_file(&shard.path, meta.len)?;
642 buffer.extend_from_slice(&bytes);
643 }
644 Ok(buffer)
645 }
646
647 fn download_entire_file_from_url(&self, url: &str, len: u64) -> io::Result<Vec<u8>> {
648 let chunk = self.inner.chunk_bytes as u64;
649 let mut tasks = Vec::new();
650 let mut offset = 0;
651 let mut index = 0;
652 while offset < len {
653 let remaining = len - offset;
654 let length = std::cmp::min(chunk, remaining);
655 tasks.push(ChunkTask {
656 offset,
657 length: length as usize,
658 index,
659 });
660 offset += length;
661 index += 1;
662 }
663 let mut buffer = vec![0u8; len as usize];
664 let url = url.to_string();
665 let inner = self.inner.clone();
666 let queue = Arc::new(Mutex::new(VecDeque::from(tasks.clone())));
667 let results = Arc::new(Mutex::new(vec![None::<Vec<u8>>; tasks.len()]));
668 let error: Arc<Mutex<Option<io::Error>>> = Arc::new(Mutex::new(None));
669 let threads = inner
670 .parallel_requests
671 .min(queue.lock().unwrap().len())
672 .max(1);
673 thread::scope(|scope| {
674 for _ in 0..threads {
675 let queue = queue.clone();
676 let error = error.clone();
677 let results = results.clone();
678 let inner = inner.clone();
679 let url = url.clone();
680 scope.spawn(move |_| loop {
681 let task_opt = {
682 let mut guard = queue.lock().unwrap();
683 guard.pop_front()
684 };
685 let task = match task_opt {
686 Some(task) => task,
687 None => break,
688 };
689 match inner.download_range_from_url(&url, task.offset, task.length as u64) {
690 Ok(bytes) => {
691 let mut guard = results.lock().unwrap();
692 guard[task.index] = Some(bytes);
693 }
694 Err(err) => {
695 let mut guard = error.lock().unwrap();
696 if guard.is_none() {
697 *guard = Some(err);
698 }
699 break;
700 }
701 }
702 });
703 }
704 })
705 .expect("remote download scope");
706 if let Some(err) = error.lock().unwrap().take() {
707 return Err(err);
708 }
709 let chunks = Arc::try_unwrap(results)
710 .expect("results still in use")
711 .into_inner()
712 .expect("results poisoned");
713 for (task, maybe) in tasks.iter().zip(chunks.into_iter()) {
714 let bytes = maybe.expect("missing downloaded chunk for remote file");
715 let start = task.offset as usize;
716 buffer[start..start + bytes.len()].copy_from_slice(&bytes);
717 }
718 Ok(buffer)
719 }
720
721 fn upload_entire_file(&self, path: &str, data: &[u8], create_new: bool) -> io::Result<()> {
722 if data.len() as u64 >= self.inner.shard_threshold_bytes {
723 return self.upload_sharded_file(path, data, create_new);
724 }
725 self.upload_unsharded_file(path, data, create_new, None)
726 }
727
728 fn upload_unsharded_file(
729 &self,
730 path: &str,
731 data: &[u8],
732 create_new: bool,
733 hash: Option<&str>,
734 ) -> io::Result<()> {
735 if data.len() as u64 >= self.inner.direct_write_threshold_bytes {
736 return self.upload_multipart_file(path, data, create_new);
737 }
738 if data.is_empty() {
739 self.inner.upload_chunk(FsWriteChunkRequest {
740 path,
741 offset: 0,
742 truncate: true,
743 final_chunk: true,
744 create_new,
745 data,
746 hash,
747 })?;
748 return Ok(());
749 }
750 let chunk = self.inner.chunk_bytes;
751 let mut offset = 0usize;
752 while offset < data.len() {
753 let end = std::cmp::min(offset + chunk, data.len());
754 let slice = &data[offset..end];
755 let truncate = offset == 0;
756 let final_chunk = end == data.len();
757 let result = self.inner.upload_chunk(FsWriteChunkRequest {
758 path,
759 offset: offset as u64,
760 truncate,
761 final_chunk,
762 create_new: create_new && truncate,
763 data: slice,
764 hash,
765 })?;
766 if let Some(session) = result {
767 let expected = offset as u64 + slice.len() as u64;
768 if session.next_offset as u64 != expected {
769 return Err(io::Error::other("unexpected next offset"));
770 }
771 }
772 offset = end;
773 }
774 Ok(())
775 }
776
777 fn upload_sharded_file(&self, path: &str, data: &[u8], create_new: bool) -> io::Result<()> {
778 let shard_size = self.inner.shard_size_bytes as usize;
779 let mut shards = Vec::new();
780 let mut offset = 0usize;
781 while offset < data.len() {
782 let end = std::cmp::min(offset + shard_size, data.len());
783 let slice = &data[offset..end];
784 let shard_path = format!("{}/{}", SHARD_PREFIX, Uuid::new_v4());
785 self.upload_unsharded_file(&shard_path, slice, false, None)?;
786 shards.push(ShardEntry {
787 path: shard_path,
788 size: slice.len() as u64,
789 });
790 offset = end;
791 }
792 let manifest = ShardManifest {
793 version: 1,
794 total_size: data.len() as u64,
795 shard_size: self.inner.shard_size_bytes,
796 shards,
797 };
798 let bytes = serde_json::to_vec(&manifest)
799 .map_err(|_| io::Error::new(ErrorKind::InvalidData, "invalid manifest"))?;
800 self.upload_unsharded_file(path, &bytes, create_new, Some(MANIFEST_HASH))
801 }
802
803 fn upload_multipart_file(&self, path: &str, data: &[u8], create_new: bool) -> io::Result<()> {
804 if data.is_empty() {
805 self.inner.upload_chunk(FsWriteChunkRequest {
806 path,
807 offset: 0,
808 truncate: true,
809 final_chunk: true,
810 create_new,
811 data,
812 hash: None,
813 })?;
814 return Ok(());
815 }
816 let content_sha256 = sha256_hex(data);
817 let session =
818 match self
819 .inner
820 .upload_session_start(path, data.len() as u64, &content_sha256)
821 {
822 Ok(session) => session,
823 Err(err) if err.kind() == ErrorKind::NotFound => {
824 return self.upload_multipart_file_legacy(path, data, create_new);
825 }
826 Err(err) => return Err(err),
827 };
828 let chunk_size = (session.chunk_size_bytes as usize).max(1);
829 let mut tasks = Vec::new();
830 let mut offset = 0usize;
831 let mut index = 0usize;
832 while offset < data.len() {
833 let end = std::cmp::min(offset + chunk_size, data.len());
834 let slice = &data[offset..end];
835 tasks.push(UploadTask {
836 index,
837 offset,
838 length: end - offset,
839 chunk_sha256: sha256_hex(slice),
840 });
841 offset = end;
842 index += 1;
843 }
844 let descriptors: Vec<UploadSessionChunkDescriptor> = tasks
845 .iter()
846 .map(|task| UploadSessionChunkDescriptor {
847 chunk_index: task.index,
848 offset_bytes: task.offset as i64,
849 size_bytes: task.length as i64,
850 chunk_sha256: task.chunk_sha256.clone(),
851 })
852 .collect();
853 let chunk_response = self.inner.upload_session_chunks(
854 &session.session_id,
855 &session.blob_key,
856 &descriptors,
857 )?;
858 let targets = Arc::new(chunk_response.targets);
859 let tasks = Arc::new(Mutex::new(VecDeque::from(tasks)));
860 let error = Arc::new(Mutex::new(None));
861 let data = Arc::new(data.to_vec());
862 thread::scope(|scope| {
863 for _ in 0..self.inner.parallel_requests {
864 let tasks = Arc::clone(&tasks);
865 let targets = Arc::clone(&targets);
866 let error = Arc::clone(&error);
867 let inner = Arc::clone(&self.inner);
868 let data = Arc::clone(&data);
869 scope.spawn(move |_| loop {
870 let task = {
871 let mut guard = tasks.lock().unwrap();
872 guard.pop_front()
873 };
874 let Some(task) = task else { break };
875 let target = targets
876 .iter()
877 .find(|target| target.chunk_index == task.index)
878 .cloned();
879 let result = (|| {
880 let target = target.ok_or_else(|| {
881 io::Error::other(format!(
882 "missing upload target for chunk {}",
883 task.index
884 ))
885 })?;
886 let slice = &data[task.offset..task.offset + task.length];
887 inner.put_upload_target(
888 &target.method,
889 &target.upload_url,
890 &target.headers,
891 slice,
892 )
893 })();
894 if let Err(err) = result {
895 let mut err_guard = error.lock().unwrap();
896 if err_guard.is_none() {
897 *err_guard = Some(io::Error::new(err.kind(), err.to_string()));
898 }
899 break;
900 }
901 });
902 }
903 })
904 .expect("upload session scope");
905 if let Some(err) = error.lock().unwrap().take() {
906 return Err(err);
907 }
908 self.inner
909 .upload_session_complete(UploadSessionCompleteParams {
910 path,
911 session_id: &session.session_id,
912 blob_key: &session.blob_key,
913 size_bytes: data.len() as u64,
914 content_sha256: &content_sha256,
915 chunk_count: descriptors.len(),
916 create_new,
917 })?;
918 Ok(())
919 }
920
921 fn upload_multipart_file_legacy(
922 &self,
923 path: &str,
924 data: &[u8],
925 create_new: bool,
926 ) -> io::Result<()> {
927 let session = self.inner.multipart_create(path, data.len() as u64)?;
928 let part_size = session.part_size_bytes as usize;
929 let mut tasks = std::collections::VecDeque::new();
930 let mut offset = 0usize;
931 let mut part_number = 1;
932 let mut index = 0usize;
933 while offset < data.len() {
934 let end = std::cmp::min(offset + part_size, data.len());
935 let length = end - offset;
936 tasks.push_back(MultipartTask {
937 index,
938 part_number,
939 offset,
940 length,
941 });
942 offset = end;
943 part_number += 1;
944 index += 1;
945 }
946 let tasks = Arc::new(Mutex::new(tasks));
947 let task_len = tasks.lock().unwrap().len();
948 let mut result_vec: Vec<MultipartResult> = Vec::with_capacity(task_len);
949 for _ in 0..task_len {
950 result_vec.push(None);
951 }
952 let results = Arc::new(Mutex::new(result_vec));
953 let error = Arc::new(Mutex::new(None));
954 let data = Arc::new(data.to_vec());
955 thread::scope(|scope| {
956 for _ in 0..self.inner.parallel_requests {
957 let tasks = Arc::clone(&tasks);
958 let results = Arc::clone(&results);
959 let error = Arc::clone(&error);
960 let inner = Arc::clone(&self.inner);
961 let blob_key = session.blob_key.clone();
962 let upload_id = session.upload_id.clone();
963 let session_id = session.session_id.clone();
964 let data = Arc::clone(&data);
965 scope.spawn(move |_| loop {
966 let task = {
967 let mut guard = tasks.lock().unwrap();
968 guard.pop_front()
969 };
970 let Some(task) = task else { break };
971 let slice = &data[task.offset..task.offset + task.length];
972 let result = (|| {
973 let url = inner.multipart_presign_part(
974 &session_id,
975 &blob_key,
976 &upload_id,
977 task.part_number,
978 task.length as u64,
979 )?;
980 let etag = inner.put_upload_url_with_etag(&url, slice)?;
981 Ok(MultipartPart {
982 part_number: task.part_number,
983 etag,
984 })
985 })();
986 let mut guard = results.lock().unwrap();
987 guard[task.index] = Some(result);
988 if let Some(Err(err)) = guard[task.index].as_ref() {
989 let mut err_guard = error.lock().unwrap();
990 if err_guard.is_none() {
991 *err_guard = Some(io::Error::new(err.kind(), err.to_string()));
992 }
993 break;
994 }
995 });
996 }
997 })
998 .expect("multipart upload scope");
999 if let Some(err) = error.lock().unwrap().take() {
1000 return Err(err);
1001 }
1002 let mut parts = Vec::with_capacity(results.lock().unwrap().len());
1003 for maybe in Arc::try_unwrap(results)
1004 .expect("results still in use")
1005 .into_inner()
1006 .expect("results poisoned")
1007 {
1008 let part = maybe.expect("missing part result")?;
1009 if part.etag.is_empty() {
1010 return Err(io::Error::other("missing etag"));
1011 }
1012 parts.push(part);
1013 }
1014 parts.sort_by_key(|part| part.part_number);
1015 self.inner.multipart_complete(MultipartCompleteParams {
1016 path: path.to_string(),
1017 session_id: session.session_id,
1018 blob_key: session.blob_key,
1019 upload_id: session.upload_id,
1020 size_bytes: data.len() as u64,
1021 create_new,
1022 parts,
1023 })?;
1024 Ok(())
1025 }
1026}
1027
1028#[derive(Clone, Copy)]
1029struct ChunkTask {
1030 offset: u64,
1031 length: usize,
1032 index: usize,
1033}
1034
1035type MultipartResult = Option<Result<MultipartPart, io::Error>>;
1036
1037#[derive(Clone, Copy)]
1038struct MultipartTask {
1039 index: usize,
1040 part_number: i32,
1041 offset: usize,
1042 length: usize,
1043}
1044
1045#[derive(Clone)]
1046struct UploadTask {
1047 index: usize,
1048 offset: usize,
1049 length: usize,
1050 chunk_sha256: String,
1051}
1052
1053#[async_trait(?Send)]
1054impl FsProvider for RemoteFsProvider {
1055 fn current_dir_override(&self) -> Option<PathBuf> {
1056 Some(PathBuf::from("/"))
1057 }
1058
1059 fn open(&self, path: &Path, flags: &OpenFlags) -> io::Result<Box<dyn FileHandle>> {
1060 let normalized = self.normalize(path);
1061 let mut data = Vec::new();
1062 let mut metadata = FsMetadata::new(FsFileType::File, 0, None, false);
1063 let mut content_loaded = false;
1064 let mut metadata_hash_valid = true;
1065 if flags.read || flags.write || flags.append || !flags.create || flags.create_new {
1066 match self.inner.fetch_metadata(&normalized) {
1067 Ok(meta) => {
1068 if meta.file_type != "file" {
1069 return Err(io::Error::other("remote path is not a file"));
1070 }
1071 if flags.create_new {
1072 return Err(io::Error::new(
1073 ErrorKind::AlreadyExists,
1074 format!("File already exists: {}", path.display()),
1075 ));
1076 }
1077 if flags.read || flags.append {
1078 data = if meta.hash.as_deref() == Some(MANIFEST_HASH) {
1079 self.download_sharded_file(&normalized, meta.len)?
1080 } else {
1081 self.download_raw_file(&normalized, meta.len)?
1082 };
1083 content_loaded = true;
1084 }
1085 metadata = meta.into();
1086 }
1087 Err(err)
1088 if err.kind() == ErrorKind::NotFound && (flags.create || flags.create_new) => {}
1089 Err(err) => return Err(err),
1090 }
1091 }
1092 if flags.truncate {
1093 data.clear();
1094 content_loaded = true;
1095 metadata_hash_valid = false;
1096 }
1097 if flags.create {
1098 self.ensure_parent_exists(path)?;
1099 }
1100 let handle = RemoteFileHandle {
1101 provider: self.clone(),
1102 path: normalized,
1103 data,
1104 metadata,
1105 content_loaded,
1106 metadata_hash_valid,
1107 create_new_pending: flags.create_new,
1108 position: 0,
1109 flags: flags.clone(),
1110 dirty: false,
1111 };
1112 Ok(Box::new(handle))
1113 }
1114
1115 async fn read(&self, path: &Path) -> io::Result<Vec<u8>> {
1116 let normalized = self.normalize(path);
1117 let meta = self.inner.fetch_metadata(&normalized)?;
1118 if meta.file_type != "file" {
1119 return Err(io::Error::other("remote path is not a file"));
1120 }
1121 if meta.hash.as_deref() == Some(MANIFEST_HASH) {
1122 self.download_sharded_file(&normalized, meta.len)
1123 } else {
1124 self.download_raw_file(&normalized, meta.len)
1125 }
1126 }
1127
1128 async fn write(&self, path: &Path, data: &[u8]) -> io::Result<()> {
1129 let normalized = self.normalize(path);
1130 self.ensure_parent_exists(path)?;
1131 self.upload_entire_file(&normalized, data, false)
1132 }
1133
1134 async fn remove_file(&self, path: &Path) -> io::Result<()> {
1135 let normalized = self.normalize(path);
1136 self.inner
1137 .delete_empty("/fs/file", &[("path", normalized)])?;
1138 Ok(())
1139 }
1140
1141 async fn metadata(&self, path: &Path) -> io::Result<FsMetadata> {
1142 let normalized = self.normalize(path);
1143 let resp = self.inner.fetch_metadata(&normalized)?;
1144 Ok(resp.into())
1145 }
1146
1147 async fn symlink_metadata(&self, path: &Path) -> io::Result<FsMetadata> {
1148 self.metadata(path).await
1149 }
1150
1151 async fn read_dir(&self, path: &Path) -> io::Result<Vec<DirEntry>> {
1152 let normalized = self.normalize(path);
1153 let resp = self.inner.fetch_dir(&normalized)?;
1154 Ok(resp
1155 .into_iter()
1156 .map(|entry| DirEntry {
1157 path: PathBuf::from(entry.path),
1158 file_name: entry.file_name.into(),
1159 file_type: entry.file_type.into(),
1160 })
1161 .collect())
1162 }
1163
1164 async fn canonicalize(&self, path: &Path) -> io::Result<PathBuf> {
1165 let normalized = self.normalize(path);
1166 let canonical = self.inner.canonicalize_path(&normalized)?;
1167 Ok(PathBuf::from(canonical))
1168 }
1169
1170 async fn create_dir(&self, path: &Path) -> io::Result<()> {
1171 let normalized = self.normalize(path);
1172 self.inner.post_empty(
1173 "/fs/mkdir",
1174 &CreateDirRequest {
1175 path: normalized,
1176 recursive: false,
1177 },
1178 )
1179 }
1180
1181 async fn create_dir_all(&self, path: &Path) -> io::Result<()> {
1182 let normalized = self.normalize(path);
1183 self.inner.post_empty(
1184 "/fs/mkdir",
1185 &CreateDirRequest {
1186 path: normalized,
1187 recursive: true,
1188 },
1189 )
1190 }
1191
1192 async fn remove_dir(&self, path: &Path) -> io::Result<()> {
1193 let normalized = self.normalize(path);
1194 self.inner.delete_empty(
1195 "/fs/dir",
1196 &[("path", normalized), ("recursive", "false".into())],
1197 )
1198 }
1199
1200 async fn remove_dir_all(&self, path: &Path) -> io::Result<()> {
1201 let normalized = self.normalize(path);
1202 self.inner.delete_empty(
1203 "/fs/dir",
1204 &[("path", normalized), ("recursive", "true".into())],
1205 )
1206 }
1207
1208 async fn rename(&self, from: &Path, to: &Path) -> io::Result<()> {
1209 self.inner.post_empty(
1210 "/fs/rename",
1211 &RenameRequest {
1212 from: self.normalize(from),
1213 to: self.normalize(to),
1214 },
1215 )
1216 }
1217
1218 async fn set_readonly(&self, path: &Path, readonly: bool) -> io::Result<()> {
1219 self.inner.post_empty(
1220 "/fs/set-readonly",
1221 &SetReadonlyRequest {
1222 path: self.normalize(path),
1223 readonly,
1224 },
1225 )
1226 }
1227
1228 async fn data_manifest_descriptor(
1229 &self,
1230 request: &DataManifestRequest,
1231 ) -> io::Result<DataManifestDescriptor> {
1232 let mut query = vec![("path", request.path.clone())];
1233 if let Some(version) = &request.version {
1234 query.push(("version", version.clone()));
1235 }
1236 self.inner.get_json("/data/manifest", &query)
1237 }
1238
1239 async fn data_chunk_upload_targets(
1240 &self,
1241 request: &DataChunkUploadRequest,
1242 ) -> io::Result<Vec<DataChunkUploadTarget>> {
1243 #[derive(Deserialize)]
1244 struct DataChunkUploadTargetsResponse {
1245 targets: Vec<DataChunkUploadTarget>,
1246 }
1247 let response: DataChunkUploadTargetsResponse = self
1248 .inner
1249 .post_json("/data/chunks/upload-targets", request)?;
1250 Ok(response.targets)
1251 }
1252
1253 async fn data_upload_chunk(
1254 &self,
1255 target: &DataChunkUploadTarget,
1256 data: &[u8],
1257 ) -> io::Result<()> {
1258 self.inner
1259 .put_upload_target(&target.method, &target.upload_url, &target.headers, data)
1260 }
1261}
1262
1263impl Clone for RemoteFsProvider {
1264 fn clone(&self) -> Self {
1265 Self {
1266 inner: Arc::clone(&self.inner),
1267 }
1268 }
1269}
1270
1271struct RemoteFileHandle {
1272 provider: RemoteFsProvider,
1273 path: String,
1274 data: Vec<u8>,
1275 metadata: FsMetadata,
1276 content_loaded: bool,
1277 metadata_hash_valid: bool,
1278 create_new_pending: bool,
1279 position: usize,
1280 flags: OpenFlags,
1281 dirty: bool,
1282}
1283
1284impl RemoteFileHandle {
1285 fn flush_remote(&mut self) -> io::Result<()> {
1286 if !self.dirty {
1287 return Ok(());
1288 }
1289 self.provider
1290 .upload_entire_file(&self.path, &self.data, self.create_new_pending)?;
1291 self.dirty = false;
1292 self.content_loaded = true;
1293 self.create_new_pending = false;
1294 Ok(())
1295 }
1296
1297 fn metadata_len(&self) -> u64 {
1298 if self.dirty || self.content_loaded {
1299 self.data.len() as u64
1300 } else {
1301 self.metadata.len()
1302 }
1303 }
1304
1305 fn metadata_hash(&self) -> Option<String> {
1306 if self.metadata_hash_valid {
1307 self.metadata.hash().map(str::to_owned)
1308 } else {
1309 None
1310 }
1311 }
1312}
1313
1314impl Read for RemoteFileHandle {
1315 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
1316 let remaining = &self.data[self.position..];
1317 let amt = remaining.len().min(buf.len());
1318 buf[..amt].copy_from_slice(&remaining[..amt]);
1319 self.position += amt;
1320 Ok(amt)
1321 }
1322}
1323
1324impl Write for RemoteFileHandle {
1325 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1326 if !self.flags.write && !self.flags.append {
1327 return Err(io::Error::new(
1328 ErrorKind::PermissionDenied,
1329 "remote file not opened for writing",
1330 ));
1331 }
1332 if self.flags.append {
1333 self.position = self.data.len();
1334 }
1335 let required = self.position + buf.len();
1336 if required > self.data.len() {
1337 self.data.resize(required, 0);
1338 }
1339 self.data[self.position..self.position + buf.len()].copy_from_slice(buf);
1340 self.position += buf.len();
1341 self.dirty = true;
1342 self.metadata_hash_valid = false;
1343 Ok(buf.len())
1344 }
1345
1346 fn flush(&mut self) -> io::Result<()> {
1347 self.flush_remote()
1348 }
1349}
1350
1351impl Seek for RemoteFileHandle {
1352 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
1353 let new_pos = match pos {
1354 SeekFrom::Start(offset) => offset as i64,
1355 SeekFrom::End(offset) => self.data.len() as i64 + offset,
1356 SeekFrom::Current(offset) => self.position as i64 + offset,
1357 };
1358 if new_pos < 0 {
1359 return Err(io::Error::new(ErrorKind::InvalidInput, "seek before start"));
1360 }
1361 self.position = new_pos as usize;
1362 Ok(self.position as u64)
1363 }
1364}
1365
1366#[async_trait(?Send)]
1367impl FileHandle for RemoteFileHandle {
1368 async fn metadata_async(&self) -> io::Result<FsMetadata> {
1369 Ok(FsMetadata::new_with_hash(
1370 self.metadata.file_type(),
1371 self.metadata_len(),
1372 self.metadata.modified(),
1373 self.metadata.is_readonly(),
1374 self.metadata_hash(),
1375 ))
1376 }
1377}
1378
1379impl Drop for RemoteFileHandle {
1380 fn drop(&mut self) {
1381 if self.dirty {
1382 if let Err(err) = self.flush_remote() {
1383 eprintln!("remote fs flush failed: {err}");
1384 }
1385 }
1386 }
1387}
1388
1389#[derive(Debug, Deserialize)]
1390struct MetadataResponse {
1391 #[serde(rename = "fileType")]
1392 file_type: String,
1393 len: u64,
1394 #[serde(rename = "modifiedAt")]
1395 modified_at: Option<String>,
1396 readonly: bool,
1397 hash: Option<String>,
1398}
1399
1400impl From<MetadataResponse> for FsMetadata {
1401 fn from(value: MetadataResponse) -> Self {
1402 FsMetadata::new_with_hash(
1403 value.file_type.into(),
1404 value.len,
1405 parse_modified_at(value.modified_at.as_deref()),
1406 value.readonly,
1407 value.hash,
1408 )
1409 }
1410}
1411
1412#[derive(Debug, Deserialize)]
1413struct DirEntryResponse {
1414 path: String,
1415 #[serde(rename = "fileName")]
1416 file_name: String,
1417 #[serde(rename = "fileType")]
1418 file_type: String,
1419}
1420
1421impl From<String> for FsFileType {
1422 fn from(value: String) -> Self {
1423 match value.as_str() {
1424 "dir" => FsFileType::Directory,
1425 "file" => FsFileType::File,
1426 "symlink" => FsFileType::Symlink,
1427 "other" => FsFileType::Other,
1428 _ => FsFileType::Unknown,
1429 }
1430 }
1431}
1432
1433#[derive(Debug, Serialize, Deserialize)]
1434struct CreateDirRequest {
1435 path: String,
1436 recursive: bool,
1437}
1438
1439#[derive(Debug, Serialize, Deserialize)]
1440struct RenameRequest {
1441 from: String,
1442 to: String,
1443}
1444
1445#[derive(Debug, Serialize, Deserialize)]
1446struct SetReadonlyRequest {
1447 path: String,
1448 readonly: bool,
1449}
1450
1451#[derive(Debug, Deserialize)]
1452struct CanonicalizeResponse {
1453 path: String,
1454}
1455
1456#[derive(Debug, Deserialize)]
1457struct DownloadUrlResponse {
1458 #[serde(rename = "downloadUrl")]
1459 download_url: String,
1460}
1461
1462#[derive(Debug, Serialize, Deserialize)]
1463#[serde(rename_all = "camelCase")]
1464struct UploadSessionStartRequest {
1465 path: String,
1466 size_bytes: i64,
1467 content_type: Option<String>,
1468 content_sha256: String,
1469}
1470
1471#[derive(Debug, Serialize, Deserialize)]
1472#[serde(rename_all = "camelCase")]
1473struct UploadSessionStartResponse {
1474 session_id: String,
1475 blob_key: String,
1476 chunk_size_bytes: i64,
1477}
1478
1479#[derive(Debug, Clone, Serialize, Deserialize)]
1480#[serde(rename_all = "camelCase")]
1481struct UploadSessionChunkDescriptor {
1482 chunk_index: usize,
1483 offset_bytes: i64,
1484 size_bytes: i64,
1485 chunk_sha256: String,
1486}
1487
1488#[derive(Debug, Serialize, Deserialize)]
1489#[serde(rename_all = "camelCase")]
1490struct UploadSessionChunksRequest {
1491 session_id: String,
1492 blob_key: String,
1493 chunks: Vec<UploadSessionChunkDescriptor>,
1494}
1495
1496#[derive(Debug, Serialize, Deserialize)]
1497#[serde(rename_all = "camelCase")]
1498struct UploadSessionChunksResponse {
1499 targets: Vec<UploadChunkTarget>,
1500}
1501
1502#[derive(Debug, Clone, Serialize, Deserialize)]
1503#[serde(rename_all = "camelCase")]
1504struct UploadChunkTarget {
1505 chunk_index: usize,
1506 method: String,
1507 upload_url: String,
1508 headers: HashMap<String, String>,
1509}
1510
1511#[derive(Debug, Serialize, Deserialize)]
1512#[serde(rename_all = "camelCase")]
1513struct UploadSessionCompleteRequest {
1514 path: String,
1515 session_id: String,
1516 blob_key: String,
1517 size_bytes: i64,
1518 content_sha256: String,
1519 chunk_count: usize,
1520 #[serde(default, skip_serializing_if = "is_false")]
1521 create_new: bool,
1522 hash: Option<String>,
1523}
1524
1525#[derive(Debug, Serialize, Deserialize)]
1526struct MultipartUploadRequest {
1527 path: String,
1528 #[serde(rename = "sizeBytes")]
1529 size_bytes: i64,
1530 #[serde(rename = "contentType")]
1531 content_type: Option<String>,
1532}
1533
1534#[derive(Debug, Deserialize)]
1535struct MultipartUploadResponse {
1536 #[serde(rename = "sessionId")]
1537 session_id: String,
1538 #[serde(rename = "blobKey")]
1539 blob_key: String,
1540 #[serde(rename = "uploadId")]
1541 upload_id: String,
1542 #[serde(rename = "partSizeBytes")]
1543 part_size_bytes: i64,
1544}
1545
1546#[derive(Debug, Serialize, Deserialize)]
1547struct MultipartUploadPartRequest {
1548 #[serde(rename = "sessionId")]
1549 session_id: String,
1550 #[serde(rename = "blobKey")]
1551 blob_key: String,
1552 #[serde(rename = "uploadId")]
1553 upload_id: String,
1554 #[serde(rename = "partNumber")]
1555 part_number: i32,
1556 #[serde(rename = "sizeBytes")]
1557 size_bytes: i64,
1558}
1559
1560#[derive(Debug, Deserialize)]
1561struct MultipartUploadPartResponse {
1562 #[serde(rename = "upload_url")]
1563 upload_url: String,
1564}
1565
1566#[derive(Debug, Serialize, Deserialize)]
1567struct MultipartUploadCompleteRequest {
1568 path: String,
1569 #[serde(rename = "sessionId")]
1570 session_id: String,
1571 #[serde(rename = "blobKey")]
1572 blob_key: String,
1573 #[serde(rename = "uploadId")]
1574 upload_id: String,
1575 #[serde(rename = "sizeBytes")]
1576 size_bytes: i64,
1577 #[serde(rename = "createNew", default, skip_serializing_if = "is_false")]
1578 create_new: bool,
1579 hash: Option<String>,
1580 parts: Vec<MultipartPart>,
1581}
1582
1583#[derive(Debug, Serialize, Deserialize)]
1584struct MultipartPart {
1585 #[serde(rename = "partNumber")]
1586 part_number: i32,
1587 etag: String,
1588}
1589
1590#[derive(Debug, Serialize, Deserialize)]
1591struct ShardManifest {
1592 version: u32,
1593 total_size: u64,
1594 shard_size: u64,
1595 shards: Vec<ShardEntry>,
1596}
1597
1598#[derive(Debug, Serialize, Deserialize)]
1599struct ShardEntry {
1600 path: String,
1601 size: u64,
1602}
1603
1604#[derive(Debug, Deserialize)]
1605struct FsWriteSessionResponse {
1606 #[serde(rename = "sessionId")]
1607 _session_id: String,
1608 #[serde(rename = "nextOffset")]
1609 next_offset: i64,
1610}
1611
1612fn sha256_hex(data: &[u8]) -> String {
1613 let digest = Sha256::digest(data);
1614 let mut out = String::with_capacity(digest.len() * 2);
1615 for byte in digest {
1616 use std::fmt::Write as _;
1617 let _ = write!(out, "{byte:02x}");
1618 }
1619 out
1620}
1621
1622fn is_false(value: &bool) -> bool {
1623 !*value
1624}
1625
1626fn map_http_err(err: reqwest::Error) -> io::Error {
1627 io::Error::other(err)
1628}
1629
1630fn parse_modified_at(value: Option<&str>) -> Option<std::time::SystemTime> {
1631 let value = value?;
1632 let parsed = DateTime::parse_from_rfc3339(value).ok()?;
1633 let millis = parsed.timestamp_millis();
1634 if millis < 0 {
1635 return None;
1636 }
1637 Some(std::time::UNIX_EPOCH + Duration::from_millis(millis as u64))
1638}
1639
1640fn map_url_err(err: url::ParseError) -> io::Error {
1641 io::Error::new(ErrorKind::InvalidInput, err)
1642}
1643
1644fn handle_error(resp: Response) -> io::Result<Response> {
1645 let status = resp.status();
1646 if status.is_success() {
1647 return Ok(resp);
1648 }
1649 let text = resp.text().unwrap_or_else(|_| status.to_string());
1650 let kind = match status {
1651 StatusCode::NOT_FOUND => ErrorKind::NotFound,
1652 StatusCode::FORBIDDEN | StatusCode::UNAUTHORIZED => ErrorKind::PermissionDenied,
1653 StatusCode::CONFLICT => ErrorKind::AlreadyExists,
1654 StatusCode::BAD_REQUEST => ErrorKind::InvalidInput,
1655 _ => ErrorKind::Other,
1656 };
1657 Err(io::Error::new(kind, text))
1658}
1659
1660impl fmt::Debug for RemoteFsProvider {
1661 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1662 f.debug_struct("RemoteFsProvider").finish_non_exhaustive()
1663 }
1664}
1665
1666#[cfg(test)]
1667mod tests {
1668 use super::*;
1669 use crate::data_contract::DataChunkDescriptor;
1670 use axum::extract::{Query, State};
1671 use axum::http::{HeaderMap, StatusCode};
1672 use axum::routing::{delete, get, post, put};
1673 use axum::{Json, Router};
1674 use futures::executor;
1675 use serde::Deserialize;
1676 use std::collections::HashMap;
1677 use std::net::TcpListener as StdTcpListener;
1678 use std::sync::Arc;
1679 use tempfile::tempdir;
1680 use tokio::net::TcpListener as TokioTcpListener;
1681 use tokio::runtime::Runtime;
1682
1683 #[derive(Clone)]
1684 struct Harness {
1685 root: Arc<PathBuf>,
1686 _keeper: Arc<tempfile::TempDir>,
1687 base_url: Arc<Mutex<Option<String>>>,
1688 upload_sessions: Arc<Mutex<HashMap<String, UploadSessionTestState>>>,
1689 omit_last_chunk_target: bool,
1690 }
1691
1692 impl Harness {
1693 fn new() -> Self {
1694 Self::with_omit_last_chunk_target(false)
1695 }
1696
1697 fn with_omit_last_chunk_target(omit_last_chunk_target: bool) -> Self {
1698 let dir = tempdir().expect("tempdir");
1699 let path = dir.path().to_path_buf();
1700 Self {
1701 root: Arc::new(path),
1702 _keeper: Arc::new(dir),
1703 base_url: Arc::new(Mutex::new(None)),
1704 upload_sessions: Arc::new(Mutex::new(HashMap::new())),
1705 omit_last_chunk_target,
1706 }
1707 }
1708
1709 fn resolve(&self, remote_path: &str) -> PathBuf {
1710 let trimmed = remote_path.trim_start_matches('/');
1711 self.root.join(trimmed)
1712 }
1713
1714 fn virtualize(&self, path: &Path) -> String {
1715 let rel = path.strip_prefix(&*self.root).unwrap_or(path);
1716 if rel.as_os_str().is_empty() {
1717 "/".to_string()
1718 } else {
1719 format!("/{}", rel.to_string_lossy().replace('\\', "/"))
1720 }
1721 }
1722 }
1723
1724 #[derive(Clone)]
1725 struct UploadSessionTestState {
1726 path: String,
1727 chunk_count: usize,
1728 }
1729
1730 #[derive(Deserialize)]
1731 struct PathParams {
1732 path: String,
1733 offset: Option<u64>,
1734 length: Option<usize>,
1735 truncate: Option<String>,
1736 #[serde(rename = "createNew")]
1737 create_new: Option<String>,
1738 recursive: Option<String>,
1739 }
1740
1741 #[derive(Deserialize)]
1742 struct UploadChunkQuery {
1743 #[serde(rename = "sessionId")]
1744 session_id: String,
1745 #[serde(rename = "chunkIndex")]
1746 chunk_index: usize,
1747 }
1748
1749 #[derive(Deserialize)]
1750 struct DataManifestQuery {
1751 path: String,
1752 version: Option<String>,
1753 }
1754
1755 async fn metadata_handler(
1756 State(harness): State<Harness>,
1757 Query(params): Query<PathParams>,
1758 ) -> Result<Json<serde_json::Value>, StatusCode> {
1759 let meta =
1760 std::fs::metadata(harness.resolve(¶ms.path)).map_err(|_| StatusCode::NOT_FOUND)?;
1761 let file_type = if meta.is_dir() {
1762 "dir"
1763 } else if meta.is_file() {
1764 "file"
1765 } else {
1766 "other"
1767 };
1768 Ok(Json(serde_json::json!({
1769 "fileType": file_type,
1770 "len": meta.len(),
1771 "modified": meta.modified().ok().and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok()).map(|d| d.as_secs()),
1772 "readonly": meta.permissions().readonly()
1773 })))
1774 }
1775
1776 async fn dir_handler(
1777 State(harness): State<Harness>,
1778 Query(params): Query<PathParams>,
1779 ) -> Result<Json<Vec<serde_json::Value>>, StatusCode> {
1780 let mut entries = Vec::new();
1781 for entry in
1782 std::fs::read_dir(harness.resolve(¶ms.path)).map_err(|_| StatusCode::NOT_FOUND)?
1783 {
1784 let entry = entry.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1785 let file_type = entry
1786 .file_type()
1787 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1788 let kind = if file_type.is_dir() {
1789 "dir"
1790 } else if file_type.is_file() {
1791 "file"
1792 } else {
1793 "other"
1794 };
1795 entries.push(serde_json::json!({
1796 "path": harness.virtualize(&entry.path()),
1797 "fileName": entry.file_name().to_string_lossy(),
1798 "fileType": kind
1799 }));
1800 }
1801 Ok(Json(entries))
1802 }
1803
1804 async fn canonicalize_handler(
1805 State(harness): State<Harness>,
1806 Query(params): Query<PathParams>,
1807 ) -> Result<Json<serde_json::Value>, StatusCode> {
1808 let path = harness.resolve(¶ms.path);
1809 let canonical = std::fs::canonicalize(path).map_err(|_| StatusCode::NOT_FOUND)?;
1810 Ok(Json(serde_json::json!({
1811 "path": harness.virtualize(&canonical)
1812 })))
1813 }
1814
1815 async fn read_handler(
1816 State(harness): State<Harness>,
1817 Query(params): Query<PathParams>,
1818 ) -> Result<Vec<u8>, StatusCode> {
1819 let mut data =
1820 std::fs::read(harness.resolve(¶ms.path)).map_err(|_| StatusCode::NOT_FOUND)?;
1821 let offset = params.offset.unwrap_or(0) as usize;
1822 let length = params.length.unwrap_or(data.len().saturating_sub(offset));
1823 let end = std::cmp::min(offset + length, data.len());
1824 if offset < data.len() {
1825 data = data[offset..end].to_vec();
1826 } else {
1827 data.clear();
1828 }
1829 Ok(data)
1830 }
1831
1832 async fn write_handler(
1833 State(harness): State<Harness>,
1834 Query(params): Query<PathParams>,
1835 body: axum::body::Bytes,
1836 ) -> Result<(), StatusCode> {
1837 let path = harness.resolve(¶ms.path);
1838 if let Some(parent) = path.parent() {
1839 std::fs::create_dir_all(parent).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1840 }
1841 if params.create_new.as_deref() == Some("true") {
1842 if params.offset.unwrap_or(0) != 0 {
1843 return Err(StatusCode::BAD_REQUEST);
1844 }
1845 let mut file = std::fs::OpenOptions::new()
1846 .write(true)
1847 .create_new(true)
1848 .open(path)
1849 .map_err(|err| {
1850 if err.kind() == ErrorKind::AlreadyExists {
1851 StatusCode::CONFLICT
1852 } else {
1853 StatusCode::INTERNAL_SERVER_ERROR
1854 }
1855 })?;
1856 std::io::Write::write_all(&mut file, &body)
1857 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1858 return Ok(());
1859 }
1860 let mut data = if params.truncate.as_deref() == Some("true") || !path.exists() {
1861 Vec::new()
1862 } else {
1863 std::fs::read(&path).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
1864 };
1865 let offset = params.offset.unwrap_or(0) as usize;
1866 let required = offset + body.len();
1867 if required > data.len() {
1868 data.resize(required, 0);
1869 }
1870 data[offset..offset + body.len()].copy_from_slice(&body);
1871 std::fs::write(path, data).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1872 Ok(())
1873 }
1874
1875 async fn upload_session_start_handler(
1876 State(harness): State<Harness>,
1877 Json(req): Json<UploadSessionStartRequest>,
1878 ) -> Result<Json<UploadSessionStartResponse>, StatusCode> {
1879 let session_id = Uuid::new_v4().to_string();
1880 let chunk_size_bytes = 1024i64;
1881 harness
1882 .upload_sessions
1883 .lock()
1884 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
1885 .insert(
1886 session_id.clone(),
1887 UploadSessionTestState {
1888 path: req.path,
1889 chunk_count: 0,
1890 },
1891 );
1892 Ok(Json(UploadSessionStartResponse {
1893 session_id: session_id.clone(),
1894 blob_key: format!("test/blob/{session_id}"),
1895 chunk_size_bytes,
1896 }))
1897 }
1898
1899 async fn upload_session_chunks_handler(
1900 State(harness): State<Harness>,
1901 Json(req): Json<UploadSessionChunksRequest>,
1902 ) -> Result<Json<UploadSessionChunksResponse>, StatusCode> {
1903 let base = harness
1904 .base_url
1905 .lock()
1906 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
1907 .clone()
1908 .ok_or(StatusCode::INTERNAL_SERVER_ERROR)?;
1909 let mut sessions = harness
1910 .upload_sessions
1911 .lock()
1912 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1913 let state = sessions
1914 .get_mut(&req.session_id)
1915 .ok_or(StatusCode::NOT_FOUND)?;
1916 state.chunk_count = req.chunks.len();
1917 let mut targets = req
1918 .chunks
1919 .iter()
1920 .map(|chunk| UploadChunkTarget {
1921 chunk_index: chunk.chunk_index,
1922 method: "PUT".to_string(),
1923 upload_url: format!(
1924 "{base}/upload-chunk?sessionId={}&chunkIndex={}",
1925 req.session_id, chunk.chunk_index
1926 ),
1927 headers: HashMap::new(),
1928 })
1929 .collect::<Vec<_>>();
1930 if harness.omit_last_chunk_target && !targets.is_empty() {
1931 targets.pop();
1932 }
1933 Ok(Json(UploadSessionChunksResponse { targets }))
1934 }
1935
1936 async fn upload_chunk_handler(
1937 State(harness): State<Harness>,
1938 Query(query): Query<UploadChunkQuery>,
1939 body: axum::body::Bytes,
1940 ) -> Result<(), StatusCode> {
1941 let chunk_path = harness.resolve(&format!(
1942 "/.upload-sessions/{}/{}",
1943 query.session_id, query.chunk_index
1944 ));
1945 if let Some(parent) = chunk_path.parent() {
1946 std::fs::create_dir_all(parent).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1947 }
1948 std::fs::write(chunk_path, body).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1949 Ok(())
1950 }
1951
1952 async fn data_manifest_handler(
1953 Query(query): Query<DataManifestQuery>,
1954 ) -> Result<Json<DataManifestDescriptor>, StatusCode> {
1955 let updated_at = if query.version.as_deref() == Some("v1") {
1956 "2026-01-01T00:00:00Z"
1957 } else {
1958 "2026-02-02T00:00:00Z"
1959 };
1960 Ok(Json(DataManifestDescriptor {
1961 schema_version: 1,
1962 format: "runmat-data".to_string(),
1963 dataset_id: format!("dataset:{}", query.path),
1964 updated_at: updated_at.to_string(),
1965 txn_sequence: 9,
1966 }))
1967 }
1968
1969 async fn data_chunk_upload_targets_handler(
1970 Json(req): Json<DataChunkUploadRequest>,
1971 ) -> Result<Json<serde_json::Value>, StatusCode> {
1972 let targets = req
1973 .chunks
1974 .iter()
1975 .map(|chunk| {
1976 serde_json::json!({
1977 "key": chunk.key,
1978 "method": "PUT",
1979 "upload_url": format!("https://uploads.example.test/{}/{}", req.array, chunk.object_id),
1980 "headers": {
1981 "x-runmat-hash": chunk.hash,
1982 "x-runmat-object": chunk.object_id,
1983 }
1984 })
1985 })
1986 .collect::<Vec<_>>();
1987 Ok(Json(serde_json::json!({ "targets": targets })))
1988 }
1989
1990 async fn upload_session_complete_handler(
1991 State(harness): State<Harness>,
1992 Json(req): Json<UploadSessionCompleteRequest>,
1993 ) -> Result<(), StatusCode> {
1994 let state = harness
1995 .upload_sessions
1996 .lock()
1997 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
1998 .remove(&req.session_id)
1999 .ok_or(StatusCode::NOT_FOUND)?;
2000 let mut data = Vec::new();
2001 for chunk_index in 0..state.chunk_count {
2002 let chunk_path = harness.resolve(&format!(
2003 "/.upload-sessions/{}/{}",
2004 req.session_id, chunk_index
2005 ));
2006 let chunk = std::fs::read(chunk_path).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
2007 data.extend_from_slice(&chunk);
2008 }
2009 if sha256_hex(&data) != req.content_sha256 {
2010 return Err(StatusCode::BAD_REQUEST);
2011 }
2012 let target_path = harness.resolve(&state.path);
2013 if let Some(parent) = target_path.parent() {
2014 std::fs::create_dir_all(parent).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
2015 }
2016 if req.create_new {
2017 let mut file = std::fs::OpenOptions::new()
2018 .write(true)
2019 .create_new(true)
2020 .open(target_path)
2021 .map_err(|err| {
2022 if err.kind() == ErrorKind::AlreadyExists {
2023 StatusCode::CONFLICT
2024 } else {
2025 StatusCode::INTERNAL_SERVER_ERROR
2026 }
2027 })?;
2028 std::io::Write::write_all(&mut file, &data)
2029 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
2030 } else {
2031 std::fs::write(target_path, data).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
2032 }
2033 Ok(())
2034 }
2035
2036 async fn mkdir_handler(
2037 State(harness): State<Harness>,
2038 Json(req): Json<CreateDirRequest>,
2039 ) -> Result<(), StatusCode> {
2040 let path = harness.resolve(&req.path);
2041 if req.recursive {
2042 std::fs::create_dir_all(path).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
2043 } else {
2044 std::fs::create_dir(path).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
2045 }
2046 Ok(())
2047 }
2048
2049 async fn delete_file_handler(
2050 State(harness): State<Harness>,
2051 Query(params): Query<PathParams>,
2052 ) -> Result<(), StatusCode> {
2053 std::fs::remove_file(harness.resolve(¶ms.path)).map_err(|_| StatusCode::NOT_FOUND)?;
2054 Ok(())
2055 }
2056
2057 async fn delete_dir_handler(
2058 State(harness): State<Harness>,
2059 Query(params): Query<PathParams>,
2060 ) -> Result<(), StatusCode> {
2061 let recursive = params.recursive.as_deref() == Some("true");
2062 if recursive {
2063 std::fs::remove_dir_all(harness.resolve(¶ms.path))
2064 .map_err(|_| StatusCode::NOT_FOUND)?;
2065 } else {
2066 std::fs::remove_dir(harness.resolve(¶ms.path))
2067 .map_err(|_| StatusCode::NOT_FOUND)?;
2068 }
2069 Ok(())
2070 }
2071
2072 async fn rename_handler(
2073 State(harness): State<Harness>,
2074 Json(req): Json<RenameRequest>,
2075 ) -> Result<(), StatusCode> {
2076 let from = harness.resolve(&req.from);
2077 let to = harness.resolve(&req.to);
2078 if let Some(parent) = to.parent() {
2079 std::fs::create_dir_all(parent).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
2080 }
2081 std::fs::rename(from, to).map_err(|_| StatusCode::NOT_FOUND)?;
2082 Ok(())
2083 }
2084
2085 async fn set_readonly_handler(
2086 State(harness): State<Harness>,
2087 Json(req): Json<SetReadonlyRequest>,
2088 ) -> Result<(), StatusCode> {
2089 let path = harness.resolve(&req.path);
2090 let mut perms = std::fs::metadata(&path)
2091 .map_err(|_| StatusCode::NOT_FOUND)?
2092 .permissions();
2093 perms.set_readonly(req.readonly);
2094 std::fs::set_permissions(path, perms).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
2095 Ok(())
2096 }
2097
2098 async fn read_with_download_handler(
2099 State(harness): State<Harness>,
2100 Query(params): Query<PathParams>,
2101 ) -> Result<Json<serde_json::Value>, StatusCode> {
2102 let base = harness
2103 .base_url
2104 .lock()
2105 .unwrap()
2106 .clone()
2107 .ok_or(StatusCode::INTERNAL_SERVER_ERROR)?;
2108 let download_url = format!("{base}/download?path={}", params.path);
2109 Ok(Json(serde_json::json!({
2110 "downloadUrl": download_url,
2111 "expiresAt": 0
2112 })))
2113 }
2114
2115 async fn download_handler(
2116 State(harness): State<Harness>,
2117 Query(params): Query<PathParams>,
2118 headers: HeaderMap,
2119 ) -> Result<Vec<u8>, StatusCode> {
2120 let mut data =
2121 std::fs::read(harness.resolve(¶ms.path)).map_err(|_| StatusCode::NOT_FOUND)?;
2122 if let Some(range) = headers.get("range").and_then(|value| value.to_str().ok()) {
2123 if let Some((start, end)) = parse_range(range) {
2124 let start = start.min(data.len());
2125 let end = end.min(data.len().saturating_sub(1));
2126 if start < data.len() {
2127 data = data[start..=end].to_vec();
2128 } else {
2129 data.clear();
2130 }
2131 }
2132 }
2133 Ok(data)
2134 }
2135
2136 fn parse_range(value: &str) -> Option<(usize, usize)> {
2137 let value = value.strip_prefix("bytes=")?;
2138 let (start, end) = value.split_once('-')?;
2139 let start = start.parse::<usize>().ok()?;
2140 let end = end.parse::<usize>().ok()?;
2141 Some((start, end))
2142 }
2143
2144 fn spawn_server() -> (String, Harness, Runtime) {
2145 let harness = Harness::new();
2146 let router = Router::new()
2147 .route("/fs/metadata", get(metadata_handler))
2148 .route("/fs/dir", get(dir_handler).delete(delete_dir_handler))
2149 .route("/fs/file", delete(delete_file_handler))
2150 .route("/fs/canonicalize", get(canonicalize_handler))
2151 .route("/fs/read", get(read_handler))
2152 .route("/fs/write", put(write_handler))
2153 .route(
2154 "/fs/upload-session/start",
2155 post(upload_session_start_handler),
2156 )
2157 .route(
2158 "/fs/upload-session/chunks",
2159 post(upload_session_chunks_handler),
2160 )
2161 .route(
2162 "/fs/upload-session/complete",
2163 post(upload_session_complete_handler),
2164 )
2165 .route("/fs/mkdir", post(mkdir_handler))
2166 .route("/fs/rename", post(rename_handler))
2167 .route("/fs/set-readonly", post(set_readonly_handler))
2168 .route("/data/manifest", get(data_manifest_handler))
2169 .route(
2170 "/data/chunks/upload-targets",
2171 post(data_chunk_upload_targets_handler),
2172 )
2173 .route("/upload-chunk", put(upload_chunk_handler))
2174 .with_state(harness.clone());
2175 let std_listener = StdTcpListener::bind("127.0.0.1:0").unwrap();
2176 std_listener.set_nonblocking(true).expect("nonblocking");
2177 let addr = std_listener.local_addr().unwrap();
2178 let service = router.into_make_service();
2179 let rt = Runtime::new().unwrap();
2180 let base = format!("http://{addr}");
2181 *harness.base_url.lock().unwrap() = Some(base.clone());
2182 rt.spawn(async move {
2183 let listener = TokioTcpListener::from_std(std_listener).unwrap();
2184 axum::serve(listener, service).await.unwrap();
2185 });
2186 (base, harness, rt)
2187 }
2188
2189 fn spawn_server_with_download_url() -> (String, Harness, Runtime) {
2190 let harness = Harness::new();
2191 let router = Router::new()
2192 .route("/fs/metadata", get(metadata_handler))
2193 .route("/fs/dir", get(dir_handler).delete(delete_dir_handler))
2194 .route("/fs/file", delete(delete_file_handler))
2195 .route("/fs/canonicalize", get(canonicalize_handler))
2196 .route("/fs/read", get(read_with_download_handler))
2197 .route("/fs/write", put(write_handler))
2198 .route(
2199 "/fs/upload-session/start",
2200 post(upload_session_start_handler),
2201 )
2202 .route(
2203 "/fs/upload-session/chunks",
2204 post(upload_session_chunks_handler),
2205 )
2206 .route(
2207 "/fs/upload-session/complete",
2208 post(upload_session_complete_handler),
2209 )
2210 .route("/fs/mkdir", post(mkdir_handler))
2211 .route("/fs/rename", post(rename_handler))
2212 .route("/fs/set-readonly", post(set_readonly_handler))
2213 .route("/download", get(download_handler))
2214 .route("/data/manifest", get(data_manifest_handler))
2215 .route(
2216 "/data/chunks/upload-targets",
2217 post(data_chunk_upload_targets_handler),
2218 )
2219 .route("/upload-chunk", put(upload_chunk_handler))
2220 .with_state(harness.clone());
2221 let std_listener = StdTcpListener::bind("127.0.0.1:0").unwrap();
2222 std_listener.set_nonblocking(true).expect("nonblocking");
2223 let addr = std_listener.local_addr().unwrap();
2224 let service = router.into_make_service();
2225 let rt = Runtime::new().unwrap();
2226 let base = format!("http://{addr}");
2227 *harness.base_url.lock().unwrap() = Some(base.clone());
2228 rt.spawn(async move {
2229 let listener = TokioTcpListener::from_std(std_listener).unwrap();
2230 axum::serve(listener, service).await.unwrap();
2231 });
2232 (base, harness, rt)
2233 }
2234
2235 fn spawn_server_with_missing_chunk_target() -> (String, Harness, Runtime) {
2236 let harness = Harness::with_omit_last_chunk_target(true);
2237 let router = Router::new()
2238 .route("/fs/metadata", get(metadata_handler))
2239 .route("/fs/dir", get(dir_handler).delete(delete_dir_handler))
2240 .route("/fs/file", delete(delete_file_handler))
2241 .route("/fs/canonicalize", get(canonicalize_handler))
2242 .route("/fs/read", get(read_handler))
2243 .route("/fs/write", put(write_handler))
2244 .route(
2245 "/fs/upload-session/start",
2246 post(upload_session_start_handler),
2247 )
2248 .route(
2249 "/fs/upload-session/chunks",
2250 post(upload_session_chunks_handler),
2251 )
2252 .route(
2253 "/fs/upload-session/complete",
2254 post(upload_session_complete_handler),
2255 )
2256 .route("/fs/mkdir", post(mkdir_handler))
2257 .route("/fs/rename", post(rename_handler))
2258 .route("/fs/set-readonly", post(set_readonly_handler))
2259 .route("/data/manifest", get(data_manifest_handler))
2260 .route(
2261 "/data/chunks/upload-targets",
2262 post(data_chunk_upload_targets_handler),
2263 )
2264 .route("/upload-chunk", put(upload_chunk_handler))
2265 .with_state(harness.clone());
2266 let std_listener = StdTcpListener::bind("127.0.0.1:0").unwrap();
2267 std_listener.set_nonblocking(true).expect("nonblocking");
2268 let addr = std_listener.local_addr().unwrap();
2269 let service = router.into_make_service();
2270 let rt = Runtime::new().unwrap();
2271 let base = format!("http://{addr}");
2272 *harness.base_url.lock().unwrap() = Some(base.clone());
2273 rt.spawn(async move {
2274 let listener = TokioTcpListener::from_std(std_listener).unwrap();
2275 axum::serve(listener, service).await.unwrap();
2276 });
2277 (base, harness, rt)
2278 }
2279
2280 #[test]
2281 fn remote_provider_roundtrip() {
2282 let (base, harness, _rt) = spawn_server();
2283 let provider = RemoteFsProvider::new(RemoteFsConfig {
2284 base_url: base,
2285 auth_token: None,
2286 chunk_bytes: 1024,
2287 parallel_requests: 4,
2288 direct_read_threshold_bytes: u64::MAX,
2289 direct_write_threshold_bytes: 1024,
2290 timeout: Duration::from_secs(30),
2291 ..RemoteFsConfig::default()
2292 })
2293 .expect("provider");
2294
2295 let data = (0..16_384u32)
2296 .flat_map(|v| v.to_le_bytes())
2297 .collect::<Vec<_>>();
2298 executor::block_on(provider.write(Path::new("/reports/data.bin"), &data)).expect("write");
2299 let read_back =
2300 executor::block_on(provider.read(Path::new("/reports/data.bin"))).expect("read");
2301 assert_eq!(data, read_back);
2302 executor::block_on(provider.remove_file(Path::new("/reports/data.bin"))).expect("remove");
2303 assert!(!harness.resolve("/reports/data.bin").exists());
2304 }
2305
2306 #[test]
2307 fn remote_provider_write_only_handle_metadata_uses_remote_len_until_dirty() {
2308 let (base, harness, _rt) = spawn_server();
2309 let provider = RemoteFsProvider::new(RemoteFsConfig {
2310 base_url: base,
2311 auth_token: None,
2312 chunk_bytes: 1024,
2313 parallel_requests: 4,
2314 direct_read_threshold_bytes: u64::MAX,
2315 direct_write_threshold_bytes: 1024,
2316 timeout: Duration::from_secs(30),
2317 ..RemoteFsConfig::default()
2318 })
2319 .expect("provider");
2320 std::fs::create_dir_all(harness.resolve("/reports")).expect("mkdir");
2321 std::fs::write(harness.resolve("/reports/data.bin"), b"remote bytes").expect("write");
2322
2323 let flags = OpenFlags {
2324 write: true,
2325 ..OpenFlags::default()
2326 };
2327 let mut handle = provider
2328 .open(Path::new("/reports/data.bin"), &flags)
2329 .expect("open write-only");
2330 let metadata = executor::block_on(handle.metadata_async()).expect("metadata");
2331 assert_eq!(metadata.len(), b"remote bytes".len() as u64);
2332
2333 handle.write_all(b"new").expect("write handle");
2334 let metadata = executor::block_on(handle.metadata_async()).expect("metadata");
2335 assert_eq!(metadata.len(), 3);
2336 assert_eq!(metadata.hash(), None);
2337 }
2338
2339 #[test]
2340 fn remote_provider_create_new_rejects_existing_target_at_direct_write() {
2341 let (base, harness, _rt) = spawn_server();
2342 let provider = RemoteFsProvider::new(RemoteFsConfig {
2343 base_url: base,
2344 auth_token: None,
2345 chunk_bytes: 1024,
2346 parallel_requests: 4,
2347 direct_read_threshold_bytes: u64::MAX,
2348 direct_write_threshold_bytes: 1024,
2349 timeout: Duration::from_secs(30),
2350 ..RemoteFsConfig::default()
2351 })
2352 .expect("provider");
2353
2354 let flags = OpenFlags {
2355 write: true,
2356 create_new: true,
2357 ..OpenFlags::default()
2358 };
2359 let mut handle = provider
2360 .open(Path::new("/reports/new.bin"), &flags)
2361 .expect("open create_new");
2362 std::fs::create_dir_all(harness.resolve("/reports")).expect("mkdir");
2363 std::fs::write(harness.resolve("/reports/new.bin"), b"winner").expect("write winner");
2364
2365 handle.write_all(b"loser").expect("write handle");
2366 let err = executor::block_on(handle.flush_async()).expect_err("create_new conflict");
2367 assert_eq!(err.kind(), ErrorKind::AlreadyExists);
2368 assert_eq!(
2369 std::fs::read(harness.resolve("/reports/new.bin")).expect("read winner"),
2370 b"winner"
2371 );
2372 }
2373
2374 #[test]
2375 fn remote_provider_create_new_rejects_existing_target_at_upload_complete() {
2376 let (base, harness, _rt) = spawn_server();
2377 let provider = RemoteFsProvider::new(RemoteFsConfig {
2378 base_url: base,
2379 auth_token: None,
2380 chunk_bytes: 1024,
2381 parallel_requests: 2,
2382 direct_read_threshold_bytes: u64::MAX,
2383 direct_write_threshold_bytes: 1024,
2384 timeout: Duration::from_secs(30),
2385 ..RemoteFsConfig::default()
2386 })
2387 .expect("provider");
2388
2389 let flags = OpenFlags {
2390 write: true,
2391 create_new: true,
2392 ..OpenFlags::default()
2393 };
2394 let mut handle = provider
2395 .open(Path::new("/reports/session.bin"), &flags)
2396 .expect("open create_new");
2397 std::fs::create_dir_all(harness.resolve("/reports")).expect("mkdir");
2398 std::fs::write(harness.resolve("/reports/session.bin"), b"winner").expect("write winner");
2399
2400 let loser = vec![7u8; 4096];
2401 handle.write_all(&loser).expect("write handle");
2402 let err = executor::block_on(handle.flush_async()).expect_err("create_new conflict");
2403 assert_eq!(err.kind(), ErrorKind::AlreadyExists);
2404 assert_eq!(
2405 std::fs::read(harness.resolve("/reports/session.bin")).expect("read winner"),
2406 b"winner"
2407 );
2408 }
2409
2410 #[test]
2411 fn remote_provider_download_url_read() {
2412 let (base, harness, _rt) = spawn_server_with_download_url();
2413 let provider = RemoteFsProvider::new(RemoteFsConfig {
2414 base_url: base,
2415 auth_token: None,
2416 chunk_bytes: 128,
2417 parallel_requests: 2,
2418 direct_read_threshold_bytes: u64::MAX,
2419 timeout: Duration::from_secs(30),
2420 ..RemoteFsConfig::default()
2421 })
2422 .expect("provider");
2423
2424 let data = (0..1024u32)
2425 .flat_map(|v| v.to_le_bytes())
2426 .collect::<Vec<_>>();
2427 std::fs::create_dir_all(harness.resolve("/reports")).expect("mkdir");
2428 std::fs::write(harness.resolve("/reports/data.bin"), &data).expect("write");
2429
2430 let read_back =
2431 executor::block_on(provider.read(Path::new("/reports/data.bin"))).expect("read");
2432 assert_eq!(data, read_back);
2433 }
2434
2435 #[test]
2436 fn remote_provider_errors_when_chunk_target_is_missing() {
2437 let (base, _harness, _rt) = spawn_server_with_missing_chunk_target();
2438 let provider = RemoteFsProvider::new(RemoteFsConfig {
2439 base_url: base,
2440 auth_token: None,
2441 chunk_bytes: 128,
2442 parallel_requests: 2,
2443 direct_read_threshold_bytes: u64::MAX,
2444 direct_write_threshold_bytes: 128,
2445 timeout: Duration::from_secs(30),
2446 ..RemoteFsConfig::default()
2447 })
2448 .expect("provider");
2449
2450 let data = vec![7u8; 1024];
2451 let err =
2452 executor::block_on(provider.write(Path::new("/reports/missing-target.bin"), &data))
2453 .expect_err("write should fail when chunk target is missing");
2454 assert_eq!(err.kind(), io::ErrorKind::Other);
2455 }
2456
2457 #[test]
2458 fn remote_provider_data_manifest_descriptor_fetches_descriptor() {
2459 let (base, _harness, _rt) = spawn_server();
2460 let provider = RemoteFsProvider::new(RemoteFsConfig {
2461 base_url: base,
2462 auth_token: None,
2463 timeout: Duration::from_secs(30),
2464 ..RemoteFsConfig::default()
2465 })
2466 .expect("provider");
2467
2468 let descriptor = provider
2469 .data_manifest_descriptor(&DataManifestRequest {
2470 path: "/datasets/alpha.data".to_string(),
2471 version: Some("v1".to_string()),
2472 })
2473 .expect("manifest descriptor");
2474 assert_eq!(descriptor.schema_version, 1);
2475 assert_eq!(descriptor.format, "runmat-data");
2476 assert_eq!(descriptor.dataset_id, "dataset:/datasets/alpha.data");
2477 assert_eq!(descriptor.updated_at, "2026-01-01T00:00:00Z");
2478 assert_eq!(descriptor.txn_sequence, 9);
2479 }
2480
2481 #[test]
2482 fn remote_provider_data_chunk_upload_targets_fetches_targets() {
2483 let (base, _harness, _rt) = spawn_server();
2484 let provider = RemoteFsProvider::new(RemoteFsConfig {
2485 base_url: base,
2486 auth_token: None,
2487 timeout: Duration::from_secs(30),
2488 ..RemoteFsConfig::default()
2489 })
2490 .expect("provider");
2491
2492 let targets = provider
2493 .data_chunk_upload_targets(&DataChunkUploadRequest {
2494 dataset_path: "/datasets/alpha.data".to_string(),
2495 array: "X".to_string(),
2496 chunks: vec![DataChunkDescriptor {
2497 key: "X/0-0".to_string(),
2498 object_id: "obj_0".to_string(),
2499 hash: "sha256:abc".to_string(),
2500 bytes_raw: 10,
2501 bytes_stored: 8,
2502 }],
2503 })
2504 .expect("chunk upload targets");
2505
2506 assert_eq!(targets.len(), 1);
2507 assert_eq!(targets[0].key, "X/0-0");
2508 assert_eq!(targets[0].method, "PUT");
2509 assert_eq!(
2510 targets[0].upload_url,
2511 "https://uploads.example.test/X/obj_0"
2512 );
2513 let headers = &targets[0].headers;
2514 assert_eq!(headers.len(), 2);
2515 assert_eq!(
2516 headers.get("x-runmat-hash").map(String::as_str),
2517 Some("sha256:abc")
2518 );
2519 assert_eq!(
2520 headers.get("x-runmat-object").map(String::as_str),
2521 Some("obj_0")
2522 );
2523 }
2524}