Skip to main content

runmat_filesystem/remote/
native.rs

1use crate::data_contract::{
2    DataChunkUploadRequest, DataChunkUploadTarget, DataManifestDescriptor, DataManifestRequest,
3};
4use crate::{DirEntry, FileHandle, FsFileType, FsMetadata, FsProvider, OpenFlags};
5use async_trait::async_trait;
6use chrono::DateTime;
7use crossbeam_utils::thread;
8use once_cell::sync::Lazy;
9use reqwest::blocking::{Client, Response};
10use reqwest::StatusCode;
11use serde::{Deserialize, Serialize};
12use sha2::{Digest, Sha256};
13use std::collections::HashMap;
14use std::collections::VecDeque;
15use std::fmt;
16use std::io::{self, ErrorKind, Read, Seek, SeekFrom, Write};
17use std::path::{Path, PathBuf};
18use std::sync::{Arc, Mutex};
19use std::time::Duration;
20use url::Url;
21use uuid::Uuid;
22
23const MANIFEST_HASH: &str = "manifest:v1";
24const SHARD_PREFIX: &str = "/.runmat/shards";
25
26static USER_AGENT: Lazy<String> = Lazy::new(|| {
27    format!(
28        "runmat-remote-fs/{} ({})",
29        env!("CARGO_PKG_VERSION"),
30        std::env::consts::OS
31    )
32});
33
34#[derive(Clone, Debug)]
35pub struct RemoteFsConfig {
36    pub base_url: String,
37    pub auth_token: Option<String>,
38    pub chunk_bytes: usize,
39    pub parallel_requests: usize,
40    pub direct_read_threshold_bytes: u64,
41    pub direct_write_threshold_bytes: u64,
42    pub shard_threshold_bytes: u64,
43    pub shard_size_bytes: u64,
44    pub timeout: Duration,
45    pub retry_max_attempts: usize,
46    pub retry_base_delay: Duration,
47    pub retry_max_delay: Duration,
48}
49
50impl Default for RemoteFsConfig {
51    fn default() -> Self {
52        Self {
53            base_url: String::new(),
54            auth_token: None,
55            chunk_bytes: 16 * 1024 * 1024,
56            parallel_requests: 4,
57            direct_read_threshold_bytes: 64 * 1024 * 1024,
58            direct_write_threshold_bytes: 64 * 1024 * 1024,
59            shard_threshold_bytes: 4 * 1024 * 1024 * 1024,
60            shard_size_bytes: 512 * 1024 * 1024,
61            timeout: Duration::from_secs(120),
62            retry_max_attempts: 5,
63            retry_base_delay: Duration::from_millis(100),
64            retry_max_delay: Duration::from_secs(2),
65        }
66    }
67}
68
69struct RemoteInner {
70    client: Client,
71    base: Url,
72    auth_header: Option<String>,
73    chunk_bytes: usize,
74    parallel_requests: usize,
75    direct_read_threshold_bytes: u64,
76    direct_write_threshold_bytes: u64,
77    shard_threshold_bytes: u64,
78    shard_size_bytes: u64,
79    retry_max_attempts: usize,
80    retry_base_delay: Duration,
81    retry_max_delay: Duration,
82}
83
84impl RemoteInner {
85    fn new(config: RemoteFsConfig) -> io::Result<Self> {
86        if config.base_url.is_empty() {
87            return Err(io::Error::new(
88                ErrorKind::InvalidInput,
89                "RemoteFsConfig.base_url must be provided",
90            ));
91        }
92        let base = Url::parse(&config.base_url).map_err(map_url_err)?;
93        let client = Client::builder()
94            .timeout(config.timeout)
95            .user_agent(USER_AGENT.clone())
96            .build()
97            .map_err(map_http_err)?;
98        Ok(Self {
99            client,
100            base,
101            auth_header: config.auth_token.map(|token| format!("Bearer {token}")),
102            chunk_bytes: config.chunk_bytes.max(64 * 1024),
103            parallel_requests: config.parallel_requests.max(1),
104            direct_read_threshold_bytes: config.direct_read_threshold_bytes,
105            direct_write_threshold_bytes: config.direct_write_threshold_bytes,
106            shard_threshold_bytes: config.shard_threshold_bytes,
107            shard_size_bytes: config.shard_size_bytes.max(8 * 1024 * 1024),
108            retry_max_attempts: config.retry_max_attempts.max(1),
109            retry_base_delay: config.retry_base_delay,
110            retry_max_delay: config.retry_max_delay,
111        })
112    }
113
114    fn should_retry(&self, status: StatusCode) -> bool {
115        matches!(
116            status,
117            StatusCode::TOO_MANY_REQUESTS
118                | StatusCode::INTERNAL_SERVER_ERROR
119                | StatusCode::BAD_GATEWAY
120                | StatusCode::SERVICE_UNAVAILABLE
121                | StatusCode::GATEWAY_TIMEOUT
122        )
123    }
124
125    fn retry_delay(&self, attempt: usize) -> Duration {
126        let factor = 1u64.checked_shl(attempt as u32).unwrap_or(u64::MAX);
127        let delay = self.retry_base_delay.as_millis() as u64 * factor;
128        let capped = delay.min(self.retry_max_delay.as_millis() as u64);
129        Duration::from_millis(capped)
130    }
131
132    fn send_with_retry(
133        &self,
134        method: reqwest::Method,
135        route: &str,
136        query: &[(&str, String)],
137    ) -> io::Result<Response> {
138        for attempt in 0..self.retry_max_attempts {
139            let resp = self
140                .request(method.clone(), route, query)
141                .send()
142                .map_err(map_http_err)?;
143            if !self.should_retry(resp.status()) || attempt + 1 == self.retry_max_attempts {
144                return Ok(resp);
145            }
146            std::thread::sleep(self.retry_delay(attempt));
147        }
148        Err(io::Error::other("request retries exhausted"))
149    }
150
151    fn get_url_with_retry(&self, url: &str, range: Option<String>) -> io::Result<Response> {
152        for attempt in 0..self.retry_max_attempts {
153            let mut request = self.client.get(url);
154            if let Some(range) = &range {
155                request = request.header(reqwest::header::RANGE, range);
156            }
157            let resp = request.send().map_err(map_http_err)?;
158            if !self.should_retry(resp.status()) || attempt + 1 == self.retry_max_attempts {
159                return Ok(resp);
160            }
161            std::thread::sleep(self.retry_delay(attempt));
162        }
163        Err(io::Error::other("request retries exhausted"))
164    }
165
166    fn endpoint(&self, route: &str) -> Url {
167        self.base
168            .join(route.trim_start_matches('/'))
169            .expect("failed to join remote route")
170    }
171
172    fn request(
173        &self,
174        method: reqwest::Method,
175        route: &str,
176        query: &[(&str, String)],
177    ) -> reqwest::blocking::RequestBuilder {
178        let mut url = self.endpoint(route);
179        {
180            let mut pairs = url.query_pairs_mut();
181            for (k, v) in query {
182                pairs.append_pair(k, v);
183            }
184        }
185        let mut builder = self.client.request(method, url);
186        if let Some(auth) = &self.auth_header {
187            builder = builder.header("Authorization", auth);
188        }
189        builder
190    }
191
192    fn get_json<T: for<'a> Deserialize<'a>>(
193        &self,
194        route: &str,
195        query: &[(&str, String)],
196    ) -> io::Result<T> {
197        let resp = self.send_with_retry(reqwest::Method::GET, route, query)?;
198        handle_error(resp)?.json().map_err(map_http_err)
199    }
200
201    fn post_empty<B: Serialize>(&self, route: &str, body: &B) -> io::Result<()> {
202        let resp = self
203            .request(reqwest::Method::POST, route, &[])
204            .json(body)
205            .send()
206            .map_err(map_http_err)?;
207        handle_error(resp)?;
208        Ok(())
209    }
210
211    fn post_json<T: for<'a> Deserialize<'a>, B: Serialize>(
212        &self,
213        route: &str,
214        body: &B,
215    ) -> io::Result<T> {
216        let resp = self
217            .request(reqwest::Method::POST, route, &[])
218            .json(body)
219            .send()
220            .map_err(map_http_err)?;
221        handle_error(resp)?.json().map_err(map_http_err)
222    }
223
224    fn delete_empty(&self, route: &str, query: &[(&str, String)]) -> io::Result<()> {
225        let resp = self.send_with_retry(reqwest::Method::DELETE, route, query)?;
226        handle_error(resp)?;
227        Ok(())
228    }
229
230    fn download_chunk(&self, path: &str, offset: u64, length: usize) -> io::Result<Vec<u8>> {
231        let resp = self.send_with_retry(
232            reqwest::Method::GET,
233            "/fs/read",
234            &[
235                ("path", path.to_string()),
236                ("offset", offset.to_string()),
237                ("length", length.to_string()),
238            ],
239        )?;
240        let mut body = handle_error(resp)?;
241        if let Some(content_type) = body.headers().get(reqwest::header::CONTENT_TYPE) {
242            if content_type
243                .to_str()
244                .map(|value| value.contains("application/json"))
245                .unwrap_or(false)
246            {
247                let json: DownloadUrlResponse = body.json().map_err(map_http_err)?;
248                return self.download_range_from_url(&json.download_url, offset, length as u64);
249            }
250        }
251        let mut buf = Vec::with_capacity(length);
252        body.copy_to(&mut buf).map_err(map_http_err)?;
253        Ok(buf)
254    }
255
256    fn download_range_from_url(&self, url: &str, offset: u64, length: u64) -> io::Result<Vec<u8>> {
257        if length == 0 {
258            return Ok(Vec::new());
259        }
260        let end = offset + length - 1;
261        let resp = self.get_url_with_retry(url, Some(format!("bytes={offset}-{end}")))?;
262        let mut body = handle_error(resp)?;
263        let mut buf = Vec::with_capacity(length as usize);
264        body.copy_to(&mut buf).map_err(map_http_err)?;
265        Ok(buf)
266    }
267
268    fn fetch_download_url(&self, path: &str) -> io::Result<DownloadUrlResponse> {
269        self.get_json("/fs/download-url", &[("path", path.to_string())])
270    }
271
272    fn upload_chunk(
273        &self,
274        path: &str,
275        offset: u64,
276        truncate: bool,
277        final_chunk: bool,
278        data: &[u8],
279        hash: Option<&str>,
280    ) -> io::Result<Option<FsWriteSessionResponse>> {
281        let mut query = vec![("path", path.to_string()), ("offset", offset.to_string())];
282        if truncate {
283            query.push(("truncate", "true".to_string()));
284        }
285        if final_chunk {
286            query.push(("final", "true".to_string()));
287        }
288        if let Some(hash) = hash {
289            query.push(("hash", hash.to_string()));
290        }
291        let resp = self
292            .request(reqwest::Method::PUT, "/fs/write", &query)
293            .body(data.to_vec())
294            .send()
295            .map_err(map_http_err)?;
296        if resp.status() == StatusCode::ACCEPTED {
297            let session = handle_error(resp)?.json().map_err(map_http_err)?;
298            return Ok(Some(session));
299        }
300        handle_error(resp)?;
301        Ok(None)
302    }
303
304    fn multipart_create(&self, path: &str, size_bytes: u64) -> io::Result<MultipartUploadResponse> {
305        self.post_json(
306            "/fs/multipart-upload",
307            &MultipartUploadRequest {
308                path: path.to_string(),
309                size_bytes: size_bytes as i64,
310                content_type: None,
311            },
312        )
313    }
314
315    fn upload_session_start(
316        &self,
317        path: &str,
318        size_bytes: u64,
319        content_sha256: &str,
320    ) -> io::Result<UploadSessionStartResponse> {
321        self.post_json(
322            "/fs/upload-session/start",
323            &UploadSessionStartRequest {
324                path: path.to_string(),
325                size_bytes: size_bytes as i64,
326                content_type: None,
327                content_sha256: content_sha256.to_string(),
328            },
329        )
330    }
331
332    fn upload_session_chunks(
333        &self,
334        session_id: &str,
335        blob_key: &str,
336        chunks: &[UploadSessionChunkDescriptor],
337    ) -> io::Result<UploadSessionChunksResponse> {
338        self.post_json(
339            "/fs/upload-session/chunks",
340            &UploadSessionChunksRequest {
341                session_id: session_id.to_string(),
342                blob_key: blob_key.to_string(),
343                chunks: chunks.to_vec(),
344            },
345        )
346    }
347
348    fn upload_session_complete(
349        &self,
350        path: &str,
351        session_id: &str,
352        blob_key: &str,
353        size_bytes: u64,
354        content_sha256: &str,
355        chunk_count: usize,
356    ) -> io::Result<()> {
357        self.post_empty(
358            "/fs/upload-session/complete",
359            &UploadSessionCompleteRequest {
360                path: path.to_string(),
361                session_id: session_id.to_string(),
362                blob_key: blob_key.to_string(),
363                size_bytes: size_bytes as i64,
364                content_sha256: content_sha256.to_string(),
365                chunk_count,
366                hash: None,
367            },
368        )
369    }
370
371    fn multipart_presign_part(
372        &self,
373        session_id: &str,
374        blob_key: &str,
375        upload_id: &str,
376        part_number: i32,
377        size_bytes: u64,
378    ) -> io::Result<String> {
379        let response: MultipartUploadPartResponse = self.post_json(
380            "/fs/multipart-upload/part",
381            &MultipartUploadPartRequest {
382                session_id: session_id.to_string(),
383                blob_key: blob_key.to_string(),
384                upload_id: upload_id.to_string(),
385                part_number,
386                size_bytes: size_bytes as i64,
387            },
388        )?;
389        Ok(response.upload_url)
390    }
391
392    fn multipart_complete(
393        &self,
394        path: &str,
395        session_id: &str,
396        blob_key: &str,
397        upload_id: &str,
398        size_bytes: u64,
399        parts: Vec<MultipartPart>,
400    ) -> io::Result<()> {
401        let resp = self
402            .request(reqwest::Method::POST, "/fs/multipart-upload/complete", &[])
403            .json(&MultipartUploadCompleteRequest {
404                path: path.to_string(),
405                session_id: session_id.to_string(),
406                blob_key: blob_key.to_string(),
407                upload_id: upload_id.to_string(),
408                size_bytes: size_bytes as i64,
409                hash: None,
410                parts,
411            })
412            .send()
413            .map_err(map_http_err)?;
414        handle_error(resp)?;
415        Ok(())
416    }
417
418    fn put_upload_url_with_etag(&self, url: &str, data: &[u8]) -> io::Result<String> {
419        let resp = self
420            .client
421            .put(url)
422            .body(data.to_vec())
423            .send()
424            .map_err(map_http_err)?;
425        let resp = handle_error(resp)?;
426        let etag = resp
427            .headers()
428            .get(reqwest::header::ETAG)
429            .and_then(|value| value.to_str().ok())
430            .unwrap_or("")
431            .to_string();
432        Ok(etag)
433    }
434
435    fn put_upload_target(
436        &self,
437        method: &str,
438        url: &str,
439        headers: &HashMap<String, String>,
440        data: &[u8],
441    ) -> io::Result<()> {
442        let method = reqwest::Method::from_bytes(method.as_bytes()).map_err(|err| {
443            io::Error::new(
444                ErrorKind::InvalidInput,
445                format!("invalid upload method `{method}`: {err}"),
446            )
447        })?;
448        let mut request = self.client.request(method, url).body(data.to_vec());
449        for (name, value) in headers {
450            request = request.header(name, value);
451        }
452        let resp = request.send().map_err(map_http_err)?;
453        handle_error(resp)?;
454        Ok(())
455    }
456
457    fn fetch_metadata(&self, path: &str) -> io::Result<MetadataResponse> {
458        self.get_json("/fs/metadata", &[("path", path.to_string())])
459    }
460
461    fn fetch_dir(&self, path: &str) -> io::Result<Vec<DirEntryResponse>> {
462        self.get_json("/fs/dir", &[("path", path.to_string())])
463    }
464
465    fn canonicalize_path(&self, path: &str) -> io::Result<String> {
466        let resp: CanonicalizeResponse =
467            self.get_json("/fs/canonicalize", &[("path", path.to_string())])?;
468        Ok(resp.path)
469    }
470}
471
472pub struct RemoteFsProvider {
473    inner: Arc<RemoteInner>,
474}
475
476impl RemoteFsProvider {
477    pub fn new(config: RemoteFsConfig) -> io::Result<Self> {
478        Ok(Self {
479            inner: Arc::new(RemoteInner::new(config)?),
480        })
481    }
482
483    /// Fetch a dataset manifest descriptor through the provider-neutral data contract endpoint.
484    pub fn data_manifest_descriptor(
485        &self,
486        request: &DataManifestRequest,
487    ) -> io::Result<DataManifestDescriptor> {
488        let mut query = vec![("path", request.path.clone())];
489        if let Some(version) = &request.version {
490            query.push(("version", version.clone()));
491        }
492        self.inner.get_json("/data/manifest", &query)
493    }
494
495    /// Request direct upload targets for dataset chunk objects.
496    pub fn data_chunk_upload_targets(
497        &self,
498        request: &DataChunkUploadRequest,
499    ) -> io::Result<Vec<DataChunkUploadTarget>> {
500        #[derive(Deserialize)]
501        struct DataChunkUploadTargetsResponse {
502            targets: Vec<DataChunkUploadTarget>,
503        }
504        let response: DataChunkUploadTargetsResponse = self
505            .inner
506            .post_json("/data/chunks/upload-targets", request)?;
507        Ok(response.targets)
508    }
509
510    fn normalize(&self, path: &Path) -> String {
511        let mut normalized = PathBuf::new();
512        normalized.push("/");
513        normalized.push(path);
514        normalized.to_string_lossy().replace('\\', "/").to_string()
515    }
516
517    fn ensure_parent_exists(&self, path: &Path) -> io::Result<()> {
518        if let Some(parent) = path.parent() {
519            self.inner.post_empty(
520                "/fs/mkdir",
521                &CreateDirRequest {
522                    path: self.normalize(parent),
523                    recursive: true,
524                },
525            )?;
526        }
527        Ok(())
528    }
529
530    fn download_raw_file(&self, path: &str, len: u64) -> io::Result<Vec<u8>> {
531        if len == 0 {
532            return Ok(Vec::new());
533        }
534        if len >= self.inner.direct_read_threshold_bytes {
535            let url = self.inner.fetch_download_url(path)?.download_url;
536            return self.download_entire_file_from_url(&url, len);
537        }
538        let chunk = self.inner.chunk_bytes as u64;
539        let mut tasks = Vec::new();
540        let mut offset = 0;
541        let mut index = 0;
542        while offset < len {
543            let remaining = len - offset;
544            let length = std::cmp::min(chunk, remaining);
545            tasks.push(ChunkTask {
546                offset,
547                length: length as usize,
548                index,
549            });
550            offset += length;
551            index += 1;
552        }
553        let mut buffer = vec![0u8; len as usize];
554        let path = path.to_string();
555        let inner = self.inner.clone();
556        let queue = Arc::new(Mutex::new(VecDeque::from(tasks.clone())));
557        let results = Arc::new(Mutex::new(vec![None::<Vec<u8>>; tasks.len()]));
558        let error: Arc<Mutex<Option<io::Error>>> = Arc::new(Mutex::new(None));
559        let threads = inner
560            .parallel_requests
561            .min(queue.lock().unwrap().len())
562            .max(1);
563        thread::scope(|scope| {
564            for _ in 0..threads {
565                let queue = queue.clone();
566                let error = error.clone();
567                let results = results.clone();
568                let inner = inner.clone();
569                let path = path.clone();
570                scope.spawn(move |_| loop {
571                    let task_opt = {
572                        let mut guard = queue.lock().unwrap();
573                        guard.pop_front()
574                    };
575                    let task = match task_opt {
576                        Some(task) => task,
577                        None => break,
578                    };
579                    match inner.download_chunk(&path, task.offset, task.length) {
580                        Ok(bytes) => {
581                            let mut guard = results.lock().unwrap();
582                            guard[task.index] = Some(bytes);
583                        }
584                        Err(err) => {
585                            let mut guard = error.lock().unwrap();
586                            if guard.is_none() {
587                                *guard = Some(err);
588                            }
589                            break;
590                        }
591                    }
592                });
593            }
594        })
595        .expect("remote download scope");
596        if let Some(err) = error.lock().unwrap().take() {
597            return Err(err);
598        }
599        let chunks = Arc::try_unwrap(results)
600            .expect("results still in use")
601            .into_inner()
602            .expect("results poisoned");
603        for (task, maybe) in tasks.iter().zip(chunks.into_iter()) {
604            let bytes = maybe.expect("missing downloaded chunk for remote file");
605            let start = task.offset as usize;
606            buffer[start..start + bytes.len()].copy_from_slice(&bytes);
607        }
608        Ok(buffer)
609    }
610
611    fn download_sharded_file(&self, path: &str, len: u64) -> io::Result<Vec<u8>> {
612        let manifest_bytes = self.download_raw_file(path, len)?;
613        let manifest: ShardManifest = serde_json::from_slice(&manifest_bytes)
614            .map_err(|_| io::Error::new(ErrorKind::InvalidData, "invalid shard manifest"))?;
615        if manifest.version != 1 {
616            return Err(io::Error::new(
617                ErrorKind::InvalidData,
618                "unsupported manifest",
619            ));
620        }
621        let mut buffer = Vec::with_capacity(manifest.total_size as usize);
622        for shard in manifest.shards {
623            let meta = self.inner.fetch_metadata(&shard.path)?;
624            let bytes = self.download_raw_file(&shard.path, meta.len)?;
625            buffer.extend_from_slice(&bytes);
626        }
627        Ok(buffer)
628    }
629
630    fn download_entire_file_from_url(&self, url: &str, len: u64) -> io::Result<Vec<u8>> {
631        let chunk = self.inner.chunk_bytes as u64;
632        let mut tasks = Vec::new();
633        let mut offset = 0;
634        let mut index = 0;
635        while offset < len {
636            let remaining = len - offset;
637            let length = std::cmp::min(chunk, remaining);
638            tasks.push(ChunkTask {
639                offset,
640                length: length as usize,
641                index,
642            });
643            offset += length;
644            index += 1;
645        }
646        let mut buffer = vec![0u8; len as usize];
647        let url = url.to_string();
648        let inner = self.inner.clone();
649        let queue = Arc::new(Mutex::new(VecDeque::from(tasks.clone())));
650        let results = Arc::new(Mutex::new(vec![None::<Vec<u8>>; tasks.len()]));
651        let error: Arc<Mutex<Option<io::Error>>> = Arc::new(Mutex::new(None));
652        let threads = inner
653            .parallel_requests
654            .min(queue.lock().unwrap().len())
655            .max(1);
656        thread::scope(|scope| {
657            for _ in 0..threads {
658                let queue = queue.clone();
659                let error = error.clone();
660                let results = results.clone();
661                let inner = inner.clone();
662                let url = url.clone();
663                scope.spawn(move |_| loop {
664                    let task_opt = {
665                        let mut guard = queue.lock().unwrap();
666                        guard.pop_front()
667                    };
668                    let task = match task_opt {
669                        Some(task) => task,
670                        None => break,
671                    };
672                    match inner.download_range_from_url(&url, task.offset, task.length as u64) {
673                        Ok(bytes) => {
674                            let mut guard = results.lock().unwrap();
675                            guard[task.index] = Some(bytes);
676                        }
677                        Err(err) => {
678                            let mut guard = error.lock().unwrap();
679                            if guard.is_none() {
680                                *guard = Some(err);
681                            }
682                            break;
683                        }
684                    }
685                });
686            }
687        })
688        .expect("remote download scope");
689        if let Some(err) = error.lock().unwrap().take() {
690            return Err(err);
691        }
692        let chunks = Arc::try_unwrap(results)
693            .expect("results still in use")
694            .into_inner()
695            .expect("results poisoned");
696        for (task, maybe) in tasks.iter().zip(chunks.into_iter()) {
697            let bytes = maybe.expect("missing downloaded chunk for remote file");
698            let start = task.offset as usize;
699            buffer[start..start + bytes.len()].copy_from_slice(&bytes);
700        }
701        Ok(buffer)
702    }
703
704    fn upload_entire_file(&self, path: &str, data: &[u8]) -> io::Result<()> {
705        if data.len() as u64 >= self.inner.shard_threshold_bytes {
706            return self.upload_sharded_file(path, data);
707        }
708        self.upload_unsharded_file(path, data, None)
709    }
710
711    fn upload_unsharded_file(&self, path: &str, data: &[u8], hash: Option<&str>) -> io::Result<()> {
712        if data.len() as u64 >= self.inner.direct_write_threshold_bytes {
713            return self.upload_multipart_file(path, data);
714        }
715        if data.is_empty() {
716            self.inner.upload_chunk(path, 0, true, true, data, hash)?;
717            return Ok(());
718        }
719        let chunk = self.inner.chunk_bytes;
720        let mut offset = 0usize;
721        while offset < data.len() {
722            let end = std::cmp::min(offset + chunk, data.len());
723            let slice = &data[offset..end];
724            let truncate = offset == 0;
725            let final_chunk = end == data.len();
726            let result =
727                self.inner
728                    .upload_chunk(path, offset as u64, truncate, final_chunk, slice, hash)?;
729            if let Some(session) = result {
730                let expected = offset as u64 + slice.len() as u64;
731                if session.next_offset as u64 != expected {
732                    return Err(io::Error::other("unexpected next offset"));
733                }
734            }
735            offset = end;
736        }
737        Ok(())
738    }
739
740    fn upload_sharded_file(&self, path: &str, data: &[u8]) -> io::Result<()> {
741        let shard_size = self.inner.shard_size_bytes as usize;
742        let mut shards = Vec::new();
743        let mut offset = 0usize;
744        while offset < data.len() {
745            let end = std::cmp::min(offset + shard_size, data.len());
746            let slice = &data[offset..end];
747            let shard_path = format!("{}/{}", SHARD_PREFIX, Uuid::new_v4());
748            self.upload_unsharded_file(&shard_path, slice, None)?;
749            shards.push(ShardEntry {
750                path: shard_path,
751                size: slice.len() as u64,
752            });
753            offset = end;
754        }
755        let manifest = ShardManifest {
756            version: 1,
757            total_size: data.len() as u64,
758            shard_size: self.inner.shard_size_bytes,
759            shards,
760        };
761        let bytes = serde_json::to_vec(&manifest)
762            .map_err(|_| io::Error::new(ErrorKind::InvalidData, "invalid manifest"))?;
763        self.upload_unsharded_file(path, &bytes, Some(MANIFEST_HASH))
764    }
765
766    fn upload_multipart_file(&self, path: &str, data: &[u8]) -> io::Result<()> {
767        if data.is_empty() {
768            self.inner.upload_chunk(path, 0, true, true, data, None)?;
769            return Ok(());
770        }
771        let content_sha256 = sha256_hex(data);
772        let session =
773            match self
774                .inner
775                .upload_session_start(path, data.len() as u64, &content_sha256)
776            {
777                Ok(session) => session,
778                Err(err) if err.kind() == ErrorKind::NotFound => {
779                    return self.upload_multipart_file_legacy(path, data);
780                }
781                Err(err) => return Err(err),
782            };
783        let chunk_size = (session.chunk_size_bytes as usize).max(1);
784        let mut tasks = Vec::new();
785        let mut offset = 0usize;
786        let mut index = 0usize;
787        while offset < data.len() {
788            let end = std::cmp::min(offset + chunk_size, data.len());
789            let slice = &data[offset..end];
790            tasks.push(UploadTask {
791                index,
792                offset,
793                length: end - offset,
794                chunk_sha256: sha256_hex(slice),
795            });
796            offset = end;
797            index += 1;
798        }
799        let descriptors: Vec<UploadSessionChunkDescriptor> = tasks
800            .iter()
801            .map(|task| UploadSessionChunkDescriptor {
802                chunk_index: task.index,
803                offset_bytes: task.offset as i64,
804                size_bytes: task.length as i64,
805                chunk_sha256: task.chunk_sha256.clone(),
806            })
807            .collect();
808        let chunk_response = self.inner.upload_session_chunks(
809            &session.session_id,
810            &session.blob_key,
811            &descriptors,
812        )?;
813        let targets = Arc::new(chunk_response.targets);
814        let tasks = Arc::new(Mutex::new(VecDeque::from(tasks)));
815        let error = Arc::new(Mutex::new(None));
816        let data = Arc::new(data.to_vec());
817        thread::scope(|scope| {
818            for _ in 0..self.inner.parallel_requests {
819                let tasks = Arc::clone(&tasks);
820                let targets = Arc::clone(&targets);
821                let error = Arc::clone(&error);
822                let inner = Arc::clone(&self.inner);
823                let data = Arc::clone(&data);
824                scope.spawn(move |_| loop {
825                    let task = {
826                        let mut guard = tasks.lock().unwrap();
827                        guard.pop_front()
828                    };
829                    let Some(task) = task else { break };
830                    let target = targets
831                        .iter()
832                        .find(|target| target.chunk_index == task.index)
833                        .cloned();
834                    let result = (|| {
835                        let target = target.ok_or_else(|| {
836                            io::Error::other(format!(
837                                "missing upload target for chunk {}",
838                                task.index
839                            ))
840                        })?;
841                        let slice = &data[task.offset..task.offset + task.length];
842                        inner.put_upload_target(
843                            &target.method,
844                            &target.upload_url,
845                            &target.headers,
846                            slice,
847                        )
848                    })();
849                    if let Err(err) = result {
850                        let mut err_guard = error.lock().unwrap();
851                        if err_guard.is_none() {
852                            *err_guard = Some(io::Error::new(err.kind(), err.to_string()));
853                        }
854                        break;
855                    }
856                });
857            }
858        })
859        .expect("upload session scope");
860        if let Some(err) = error.lock().unwrap().take() {
861            return Err(err);
862        }
863        self.inner.upload_session_complete(
864            path,
865            &session.session_id,
866            &session.blob_key,
867            data.len() as u64,
868            &content_sha256,
869            descriptors.len(),
870        )?;
871        Ok(())
872    }
873
874    fn upload_multipart_file_legacy(&self, path: &str, data: &[u8]) -> io::Result<()> {
875        let session = self.inner.multipart_create(path, data.len() as u64)?;
876        let part_size = session.part_size_bytes as usize;
877        let mut tasks = std::collections::VecDeque::new();
878        let mut offset = 0usize;
879        let mut part_number = 1;
880        let mut index = 0usize;
881        while offset < data.len() {
882            let end = std::cmp::min(offset + part_size, data.len());
883            let length = end - offset;
884            tasks.push_back(MultipartTask {
885                index,
886                part_number,
887                offset,
888                length,
889            });
890            offset = end;
891            part_number += 1;
892            index += 1;
893        }
894        let tasks = Arc::new(Mutex::new(tasks));
895        let task_len = tasks.lock().unwrap().len();
896        let mut result_vec: Vec<MultipartResult> = Vec::with_capacity(task_len);
897        for _ in 0..task_len {
898            result_vec.push(None);
899        }
900        let results = Arc::new(Mutex::new(result_vec));
901        let error = Arc::new(Mutex::new(None));
902        let data = Arc::new(data.to_vec());
903        thread::scope(|scope| {
904            for _ in 0..self.inner.parallel_requests {
905                let tasks = Arc::clone(&tasks);
906                let results = Arc::clone(&results);
907                let error = Arc::clone(&error);
908                let inner = Arc::clone(&self.inner);
909                let blob_key = session.blob_key.clone();
910                let upload_id = session.upload_id.clone();
911                let session_id = session.session_id.clone();
912                let data = Arc::clone(&data);
913                scope.spawn(move |_| loop {
914                    let task = {
915                        let mut guard = tasks.lock().unwrap();
916                        guard.pop_front()
917                    };
918                    let Some(task) = task else { break };
919                    let slice = &data[task.offset..task.offset + task.length];
920                    let result = (|| {
921                        let url = inner.multipart_presign_part(
922                            &session_id,
923                            &blob_key,
924                            &upload_id,
925                            task.part_number,
926                            task.length as u64,
927                        )?;
928                        let etag = inner.put_upload_url_with_etag(&url, slice)?;
929                        Ok(MultipartPart {
930                            part_number: task.part_number,
931                            etag,
932                        })
933                    })();
934                    let mut guard = results.lock().unwrap();
935                    guard[task.index] = Some(result);
936                    if let Some(Err(err)) = guard[task.index].as_ref() {
937                        let mut err_guard = error.lock().unwrap();
938                        if err_guard.is_none() {
939                            *err_guard = Some(io::Error::new(err.kind(), err.to_string()));
940                        }
941                        break;
942                    }
943                });
944            }
945        })
946        .expect("multipart upload scope");
947        if let Some(err) = error.lock().unwrap().take() {
948            return Err(err);
949        }
950        let mut parts = Vec::with_capacity(results.lock().unwrap().len());
951        for maybe in Arc::try_unwrap(results)
952            .expect("results still in use")
953            .into_inner()
954            .expect("results poisoned")
955        {
956            let part = maybe.expect("missing part result")?;
957            if part.etag.is_empty() {
958                return Err(io::Error::other("missing etag"));
959            }
960            parts.push(part);
961        }
962        parts.sort_by_key(|part| part.part_number);
963        self.inner.multipart_complete(
964            path,
965            &session.session_id,
966            &session.blob_key,
967            &session.upload_id,
968            data.len() as u64,
969            parts,
970        )?;
971        Ok(())
972    }
973}
974
975#[derive(Clone, Copy)]
976struct ChunkTask {
977    offset: u64,
978    length: usize,
979    index: usize,
980}
981
982type MultipartResult = Option<Result<MultipartPart, io::Error>>;
983
984#[derive(Clone, Copy)]
985struct MultipartTask {
986    index: usize,
987    part_number: i32,
988    offset: usize,
989    length: usize,
990}
991
992#[derive(Clone)]
993struct UploadTask {
994    index: usize,
995    offset: usize,
996    length: usize,
997    chunk_sha256: String,
998}
999
1000#[async_trait(?Send)]
1001impl FsProvider for RemoteFsProvider {
1002    fn current_dir_override(&self) -> Option<PathBuf> {
1003        Some(PathBuf::from("/"))
1004    }
1005
1006    fn open(&self, path: &Path, flags: &OpenFlags) -> io::Result<Box<dyn FileHandle>> {
1007        let normalized = self.normalize(path);
1008        let mut data = Vec::new();
1009        if flags.read {
1010            let meta = self.inner.fetch_metadata(&normalized)?;
1011            if meta.file_type != "file" {
1012                return Err(io::Error::other("remote path is not a file"));
1013            }
1014            data = if meta.hash.as_deref() == Some(MANIFEST_HASH) {
1015                self.download_sharded_file(&normalized, meta.len)?
1016            } else {
1017                self.download_raw_file(&normalized, meta.len)?
1018            };
1019        }
1020        if flags.truncate {
1021            data.clear();
1022        }
1023        if flags.create {
1024            self.ensure_parent_exists(path)?;
1025        }
1026        let handle = RemoteFileHandle {
1027            provider: self.clone(),
1028            path: normalized,
1029            data,
1030            position: 0,
1031            flags: flags.clone(),
1032            dirty: false,
1033        };
1034        Ok(Box::new(handle))
1035    }
1036
1037    async fn read(&self, path: &Path) -> io::Result<Vec<u8>> {
1038        let normalized = self.normalize(path);
1039        let meta = self.inner.fetch_metadata(&normalized)?;
1040        if meta.file_type != "file" {
1041            return Err(io::Error::other("remote path is not a file"));
1042        }
1043        if meta.hash.as_deref() == Some(MANIFEST_HASH) {
1044            self.download_sharded_file(&normalized, meta.len)
1045        } else {
1046            self.download_raw_file(&normalized, meta.len)
1047        }
1048    }
1049
1050    async fn write(&self, path: &Path, data: &[u8]) -> io::Result<()> {
1051        let normalized = self.normalize(path);
1052        self.ensure_parent_exists(path)?;
1053        self.upload_entire_file(&normalized, data)
1054    }
1055
1056    async fn remove_file(&self, path: &Path) -> io::Result<()> {
1057        let normalized = self.normalize(path);
1058        self.inner
1059            .delete_empty("/fs/file", &[("path", normalized)])?;
1060        Ok(())
1061    }
1062
1063    async fn metadata(&self, path: &Path) -> io::Result<FsMetadata> {
1064        let normalized = self.normalize(path);
1065        let resp = self.inner.fetch_metadata(&normalized)?;
1066        Ok(resp.into())
1067    }
1068
1069    async fn symlink_metadata(&self, path: &Path) -> io::Result<FsMetadata> {
1070        self.metadata(path).await
1071    }
1072
1073    async fn read_dir(&self, path: &Path) -> io::Result<Vec<DirEntry>> {
1074        let normalized = self.normalize(path);
1075        let resp = self.inner.fetch_dir(&normalized)?;
1076        Ok(resp
1077            .into_iter()
1078            .map(|entry| DirEntry {
1079                path: PathBuf::from(entry.path),
1080                file_name: entry.file_name.into(),
1081                file_type: entry.file_type.into(),
1082            })
1083            .collect())
1084    }
1085
1086    async fn canonicalize(&self, path: &Path) -> io::Result<PathBuf> {
1087        let normalized = self.normalize(path);
1088        let canonical = self.inner.canonicalize_path(&normalized)?;
1089        Ok(PathBuf::from(canonical))
1090    }
1091
1092    async fn create_dir(&self, path: &Path) -> io::Result<()> {
1093        let normalized = self.normalize(path);
1094        self.inner.post_empty(
1095            "/fs/mkdir",
1096            &CreateDirRequest {
1097                path: normalized,
1098                recursive: false,
1099            },
1100        )
1101    }
1102
1103    async fn create_dir_all(&self, path: &Path) -> io::Result<()> {
1104        let normalized = self.normalize(path);
1105        self.inner.post_empty(
1106            "/fs/mkdir",
1107            &CreateDirRequest {
1108                path: normalized,
1109                recursive: true,
1110            },
1111        )
1112    }
1113
1114    async fn remove_dir(&self, path: &Path) -> io::Result<()> {
1115        let normalized = self.normalize(path);
1116        self.inner.delete_empty(
1117            "/fs/dir",
1118            &[("path", normalized), ("recursive", "false".into())],
1119        )
1120    }
1121
1122    async fn remove_dir_all(&self, path: &Path) -> io::Result<()> {
1123        let normalized = self.normalize(path);
1124        self.inner.delete_empty(
1125            "/fs/dir",
1126            &[("path", normalized), ("recursive", "true".into())],
1127        )
1128    }
1129
1130    async fn rename(&self, from: &Path, to: &Path) -> io::Result<()> {
1131        self.inner.post_empty(
1132            "/fs/rename",
1133            &RenameRequest {
1134                from: self.normalize(from),
1135                to: self.normalize(to),
1136            },
1137        )
1138    }
1139
1140    async fn set_readonly(&self, path: &Path, readonly: bool) -> io::Result<()> {
1141        self.inner.post_empty(
1142            "/fs/set-readonly",
1143            &SetReadonlyRequest {
1144                path: self.normalize(path),
1145                readonly,
1146            },
1147        )
1148    }
1149
1150    async fn data_manifest_descriptor(
1151        &self,
1152        request: &DataManifestRequest,
1153    ) -> io::Result<DataManifestDescriptor> {
1154        let mut query = vec![("path", request.path.clone())];
1155        if let Some(version) = &request.version {
1156            query.push(("version", version.clone()));
1157        }
1158        self.inner.get_json("/data/manifest", &query)
1159    }
1160
1161    async fn data_chunk_upload_targets(
1162        &self,
1163        request: &DataChunkUploadRequest,
1164    ) -> io::Result<Vec<DataChunkUploadTarget>> {
1165        #[derive(Deserialize)]
1166        struct DataChunkUploadTargetsResponse {
1167            targets: Vec<DataChunkUploadTarget>,
1168        }
1169        let response: DataChunkUploadTargetsResponse = self
1170            .inner
1171            .post_json("/data/chunks/upload-targets", request)?;
1172        Ok(response.targets)
1173    }
1174
1175    async fn data_upload_chunk(
1176        &self,
1177        target: &DataChunkUploadTarget,
1178        data: &[u8],
1179    ) -> io::Result<()> {
1180        self.inner
1181            .put_upload_target(&target.method, &target.upload_url, &target.headers, data)
1182    }
1183}
1184
1185impl Clone for RemoteFsProvider {
1186    fn clone(&self) -> Self {
1187        Self {
1188            inner: Arc::clone(&self.inner),
1189        }
1190    }
1191}
1192
1193struct RemoteFileHandle {
1194    provider: RemoteFsProvider,
1195    path: String,
1196    data: Vec<u8>,
1197    position: usize,
1198    flags: OpenFlags,
1199    dirty: bool,
1200}
1201
1202impl RemoteFileHandle {
1203    fn flush_remote(&mut self) -> io::Result<()> {
1204        if !self.dirty {
1205            return Ok(());
1206        }
1207        self.provider.upload_entire_file(&self.path, &self.data)?;
1208        self.dirty = false;
1209        Ok(())
1210    }
1211}
1212
1213impl Read for RemoteFileHandle {
1214    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
1215        let remaining = &self.data[self.position..];
1216        let amt = remaining.len().min(buf.len());
1217        buf[..amt].copy_from_slice(&remaining[..amt]);
1218        self.position += amt;
1219        Ok(amt)
1220    }
1221}
1222
1223impl Write for RemoteFileHandle {
1224    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1225        if !self.flags.write && !self.flags.append {
1226            return Err(io::Error::new(
1227                ErrorKind::PermissionDenied,
1228                "remote file not opened for writing",
1229            ));
1230        }
1231        if self.flags.append {
1232            self.position = self.data.len();
1233        }
1234        let required = self.position + buf.len();
1235        if required > self.data.len() {
1236            self.data.resize(required, 0);
1237        }
1238        self.data[self.position..self.position + buf.len()].copy_from_slice(buf);
1239        self.position += buf.len();
1240        self.dirty = true;
1241        Ok(buf.len())
1242    }
1243
1244    fn flush(&mut self) -> io::Result<()> {
1245        self.flush_remote()
1246    }
1247}
1248
1249impl Seek for RemoteFileHandle {
1250    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
1251        let new_pos = match pos {
1252            SeekFrom::Start(offset) => offset as i64,
1253            SeekFrom::End(offset) => self.data.len() as i64 + offset,
1254            SeekFrom::Current(offset) => self.position as i64 + offset,
1255        };
1256        if new_pos < 0 {
1257            return Err(io::Error::new(ErrorKind::InvalidInput, "seek before start"));
1258        }
1259        self.position = new_pos as usize;
1260        Ok(self.position as u64)
1261    }
1262}
1263
1264#[async_trait(?Send)]
1265impl FileHandle for RemoteFileHandle {}
1266
1267impl Drop for RemoteFileHandle {
1268    fn drop(&mut self) {
1269        if self.dirty {
1270            if let Err(err) = self.provider.upload_entire_file(&self.path, &self.data) {
1271                eprintln!("remote fs flush failed: {err}");
1272            }
1273        }
1274    }
1275}
1276
1277#[derive(Debug, Deserialize)]
1278struct MetadataResponse {
1279    #[serde(rename = "fileType")]
1280    file_type: String,
1281    len: u64,
1282    #[serde(rename = "modifiedAt")]
1283    modified_at: Option<String>,
1284    readonly: bool,
1285    hash: Option<String>,
1286}
1287
1288impl From<MetadataResponse> for FsMetadata {
1289    fn from(value: MetadataResponse) -> Self {
1290        FsMetadata::new_with_hash(
1291            value.file_type.into(),
1292            value.len,
1293            parse_modified_at(value.modified_at.as_deref()),
1294            value.readonly,
1295            value.hash,
1296        )
1297    }
1298}
1299
1300#[derive(Debug, Deserialize)]
1301struct DirEntryResponse {
1302    path: String,
1303    #[serde(rename = "fileName")]
1304    file_name: String,
1305    #[serde(rename = "fileType")]
1306    file_type: String,
1307}
1308
1309impl From<String> for FsFileType {
1310    fn from(value: String) -> Self {
1311        match value.as_str() {
1312            "dir" => FsFileType::Directory,
1313            "file" => FsFileType::File,
1314            "symlink" => FsFileType::Symlink,
1315            "other" => FsFileType::Other,
1316            _ => FsFileType::Unknown,
1317        }
1318    }
1319}
1320
1321#[derive(Debug, Serialize, Deserialize)]
1322struct CreateDirRequest {
1323    path: String,
1324    recursive: bool,
1325}
1326
1327#[derive(Debug, Serialize, Deserialize)]
1328struct RenameRequest {
1329    from: String,
1330    to: String,
1331}
1332
1333#[derive(Debug, Serialize, Deserialize)]
1334struct SetReadonlyRequest {
1335    path: String,
1336    readonly: bool,
1337}
1338
1339#[derive(Debug, Deserialize)]
1340struct CanonicalizeResponse {
1341    path: String,
1342}
1343
1344#[derive(Debug, Deserialize)]
1345struct DownloadUrlResponse {
1346    #[serde(rename = "downloadUrl")]
1347    download_url: String,
1348}
1349
1350#[derive(Debug, Serialize, Deserialize)]
1351#[serde(rename_all = "camelCase")]
1352struct UploadSessionStartRequest {
1353    path: String,
1354    size_bytes: i64,
1355    content_type: Option<String>,
1356    content_sha256: String,
1357}
1358
1359#[derive(Debug, Serialize, Deserialize)]
1360#[serde(rename_all = "camelCase")]
1361struct UploadSessionStartResponse {
1362    session_id: String,
1363    blob_key: String,
1364    chunk_size_bytes: i64,
1365}
1366
1367#[derive(Debug, Clone, Serialize, Deserialize)]
1368#[serde(rename_all = "camelCase")]
1369struct UploadSessionChunkDescriptor {
1370    chunk_index: usize,
1371    offset_bytes: i64,
1372    size_bytes: i64,
1373    chunk_sha256: String,
1374}
1375
1376#[derive(Debug, Serialize, Deserialize)]
1377#[serde(rename_all = "camelCase")]
1378struct UploadSessionChunksRequest {
1379    session_id: String,
1380    blob_key: String,
1381    chunks: Vec<UploadSessionChunkDescriptor>,
1382}
1383
1384#[derive(Debug, Serialize, Deserialize)]
1385#[serde(rename_all = "camelCase")]
1386struct UploadSessionChunksResponse {
1387    targets: Vec<UploadChunkTarget>,
1388}
1389
1390#[derive(Debug, Clone, Serialize, Deserialize)]
1391#[serde(rename_all = "camelCase")]
1392struct UploadChunkTarget {
1393    chunk_index: usize,
1394    method: String,
1395    upload_url: String,
1396    headers: HashMap<String, String>,
1397}
1398
1399#[derive(Debug, Serialize, Deserialize)]
1400#[serde(rename_all = "camelCase")]
1401struct UploadSessionCompleteRequest {
1402    path: String,
1403    session_id: String,
1404    blob_key: String,
1405    size_bytes: i64,
1406    content_sha256: String,
1407    chunk_count: usize,
1408    hash: Option<String>,
1409}
1410
1411#[derive(Debug, Serialize, Deserialize)]
1412struct MultipartUploadRequest {
1413    path: String,
1414    #[serde(rename = "sizeBytes")]
1415    size_bytes: i64,
1416    #[serde(rename = "contentType")]
1417    content_type: Option<String>,
1418}
1419
1420#[derive(Debug, Deserialize)]
1421struct MultipartUploadResponse {
1422    #[serde(rename = "sessionId")]
1423    session_id: String,
1424    #[serde(rename = "blobKey")]
1425    blob_key: String,
1426    #[serde(rename = "uploadId")]
1427    upload_id: String,
1428    #[serde(rename = "partSizeBytes")]
1429    part_size_bytes: i64,
1430}
1431
1432#[derive(Debug, Serialize, Deserialize)]
1433struct MultipartUploadPartRequest {
1434    #[serde(rename = "sessionId")]
1435    session_id: String,
1436    #[serde(rename = "blobKey")]
1437    blob_key: String,
1438    #[serde(rename = "uploadId")]
1439    upload_id: String,
1440    #[serde(rename = "partNumber")]
1441    part_number: i32,
1442    #[serde(rename = "sizeBytes")]
1443    size_bytes: i64,
1444}
1445
1446#[derive(Debug, Deserialize)]
1447struct MultipartUploadPartResponse {
1448    #[serde(rename = "upload_url")]
1449    upload_url: String,
1450}
1451
1452#[derive(Debug, Serialize, Deserialize)]
1453struct MultipartUploadCompleteRequest {
1454    path: String,
1455    #[serde(rename = "sessionId")]
1456    session_id: String,
1457    #[serde(rename = "blobKey")]
1458    blob_key: String,
1459    #[serde(rename = "uploadId")]
1460    upload_id: String,
1461    #[serde(rename = "sizeBytes")]
1462    size_bytes: i64,
1463    hash: Option<String>,
1464    parts: Vec<MultipartPart>,
1465}
1466
1467#[derive(Debug, Serialize, Deserialize)]
1468struct MultipartPart {
1469    #[serde(rename = "partNumber")]
1470    part_number: i32,
1471    etag: String,
1472}
1473
1474#[derive(Debug, Serialize, Deserialize)]
1475struct ShardManifest {
1476    version: u32,
1477    total_size: u64,
1478    shard_size: u64,
1479    shards: Vec<ShardEntry>,
1480}
1481
1482#[derive(Debug, Serialize, Deserialize)]
1483struct ShardEntry {
1484    path: String,
1485    size: u64,
1486}
1487
1488#[derive(Debug, Deserialize)]
1489struct FsWriteSessionResponse {
1490    #[serde(rename = "sessionId")]
1491    _session_id: String,
1492    #[serde(rename = "nextOffset")]
1493    next_offset: i64,
1494}
1495
1496fn sha256_hex(data: &[u8]) -> String {
1497    let digest = Sha256::digest(data);
1498    let mut out = String::with_capacity(digest.len() * 2);
1499    for byte in digest {
1500        use std::fmt::Write as _;
1501        let _ = write!(out, "{byte:02x}");
1502    }
1503    out
1504}
1505
1506fn map_http_err(err: reqwest::Error) -> io::Error {
1507    io::Error::other(err)
1508}
1509
1510fn parse_modified_at(value: Option<&str>) -> Option<std::time::SystemTime> {
1511    let value = value?;
1512    let parsed = DateTime::parse_from_rfc3339(value).ok()?;
1513    let millis = parsed.timestamp_millis();
1514    if millis < 0 {
1515        return None;
1516    }
1517    Some(std::time::UNIX_EPOCH + Duration::from_millis(millis as u64))
1518}
1519
1520fn map_url_err(err: url::ParseError) -> io::Error {
1521    io::Error::new(ErrorKind::InvalidInput, err)
1522}
1523
1524fn handle_error(resp: Response) -> io::Result<Response> {
1525    let status = resp.status();
1526    if status.is_success() {
1527        return Ok(resp);
1528    }
1529    let text = resp.text().unwrap_or_else(|_| status.to_string());
1530    let kind = match status {
1531        StatusCode::NOT_FOUND => ErrorKind::NotFound,
1532        StatusCode::FORBIDDEN | StatusCode::UNAUTHORIZED => ErrorKind::PermissionDenied,
1533        StatusCode::CONFLICT => ErrorKind::AlreadyExists,
1534        StatusCode::BAD_REQUEST => ErrorKind::InvalidInput,
1535        _ => ErrorKind::Other,
1536    };
1537    Err(io::Error::new(kind, text))
1538}
1539
1540impl fmt::Debug for RemoteFsProvider {
1541    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1542        f.debug_struct("RemoteFsProvider").finish_non_exhaustive()
1543    }
1544}
1545
1546#[cfg(test)]
1547mod tests {
1548    use super::*;
1549    use crate::data_contract::DataChunkDescriptor;
1550    use axum::extract::{Query, State};
1551    use axum::http::{HeaderMap, StatusCode};
1552    use axum::routing::{delete, get, post, put};
1553    use axum::{Json, Router};
1554    use futures::executor;
1555    use serde::Deserialize;
1556    use std::collections::HashMap;
1557    use std::net::TcpListener as StdTcpListener;
1558    use std::sync::Arc;
1559    use tempfile::tempdir;
1560    use tokio::net::TcpListener as TokioTcpListener;
1561    use tokio::runtime::Runtime;
1562
1563    #[derive(Clone)]
1564    struct Harness {
1565        root: Arc<PathBuf>,
1566        _keeper: Arc<tempfile::TempDir>,
1567        base_url: Arc<Mutex<Option<String>>>,
1568        upload_sessions: Arc<Mutex<HashMap<String, UploadSessionTestState>>>,
1569        omit_last_chunk_target: bool,
1570    }
1571
1572    impl Harness {
1573        fn new() -> Self {
1574            Self::with_omit_last_chunk_target(false)
1575        }
1576
1577        fn with_omit_last_chunk_target(omit_last_chunk_target: bool) -> Self {
1578            let dir = tempdir().expect("tempdir");
1579            let path = dir.path().to_path_buf();
1580            Self {
1581                root: Arc::new(path),
1582                _keeper: Arc::new(dir),
1583                base_url: Arc::new(Mutex::new(None)),
1584                upload_sessions: Arc::new(Mutex::new(HashMap::new())),
1585                omit_last_chunk_target,
1586            }
1587        }
1588
1589        fn resolve(&self, remote_path: &str) -> PathBuf {
1590            let trimmed = remote_path.trim_start_matches('/');
1591            self.root.join(trimmed)
1592        }
1593
1594        fn virtualize(&self, path: &Path) -> String {
1595            let rel = path.strip_prefix(&*self.root).unwrap_or(path);
1596            if rel.as_os_str().is_empty() {
1597                "/".to_string()
1598            } else {
1599                format!("/{}", rel.to_string_lossy().replace('\\', "/"))
1600            }
1601        }
1602    }
1603
1604    #[derive(Clone)]
1605    struct UploadSessionTestState {
1606        path: String,
1607        chunk_count: usize,
1608    }
1609
1610    #[derive(Deserialize)]
1611    struct PathParams {
1612        path: String,
1613        offset: Option<u64>,
1614        length: Option<usize>,
1615        truncate: Option<String>,
1616        recursive: Option<String>,
1617    }
1618
1619    #[derive(Deserialize)]
1620    struct UploadChunkQuery {
1621        #[serde(rename = "sessionId")]
1622        session_id: String,
1623        #[serde(rename = "chunkIndex")]
1624        chunk_index: usize,
1625    }
1626
1627    #[derive(Deserialize)]
1628    struct DataManifestQuery {
1629        path: String,
1630        version: Option<String>,
1631    }
1632
1633    async fn metadata_handler(
1634        State(harness): State<Harness>,
1635        Query(params): Query<PathParams>,
1636    ) -> Result<Json<serde_json::Value>, StatusCode> {
1637        let meta =
1638            std::fs::metadata(harness.resolve(&params.path)).map_err(|_| StatusCode::NOT_FOUND)?;
1639        let file_type = if meta.is_dir() {
1640            "dir"
1641        } else if meta.is_file() {
1642            "file"
1643        } else {
1644            "other"
1645        };
1646        Ok(Json(serde_json::json!({
1647            "fileType": file_type,
1648            "len": meta.len(),
1649            "modified": meta.modified().ok().and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok()).map(|d| d.as_secs()),
1650            "readonly": meta.permissions().readonly()
1651        })))
1652    }
1653
1654    async fn dir_handler(
1655        State(harness): State<Harness>,
1656        Query(params): Query<PathParams>,
1657    ) -> Result<Json<Vec<serde_json::Value>>, StatusCode> {
1658        let mut entries = Vec::new();
1659        for entry in
1660            std::fs::read_dir(harness.resolve(&params.path)).map_err(|_| StatusCode::NOT_FOUND)?
1661        {
1662            let entry = entry.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1663            let file_type = entry
1664                .file_type()
1665                .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1666            let kind = if file_type.is_dir() {
1667                "dir"
1668            } else if file_type.is_file() {
1669                "file"
1670            } else {
1671                "other"
1672            };
1673            entries.push(serde_json::json!({
1674                "path": harness.virtualize(&entry.path()),
1675                "fileName": entry.file_name().to_string_lossy(),
1676                "fileType": kind
1677            }));
1678        }
1679        Ok(Json(entries))
1680    }
1681
1682    async fn canonicalize_handler(
1683        State(harness): State<Harness>,
1684        Query(params): Query<PathParams>,
1685    ) -> Result<Json<serde_json::Value>, StatusCode> {
1686        let path = harness.resolve(&params.path);
1687        let canonical = std::fs::canonicalize(path).map_err(|_| StatusCode::NOT_FOUND)?;
1688        Ok(Json(serde_json::json!({
1689            "path": harness.virtualize(&canonical)
1690        })))
1691    }
1692
1693    async fn read_handler(
1694        State(harness): State<Harness>,
1695        Query(params): Query<PathParams>,
1696    ) -> Result<Vec<u8>, StatusCode> {
1697        let mut data =
1698            std::fs::read(harness.resolve(&params.path)).map_err(|_| StatusCode::NOT_FOUND)?;
1699        let offset = params.offset.unwrap_or(0) as usize;
1700        let length = params.length.unwrap_or(data.len().saturating_sub(offset));
1701        let end = std::cmp::min(offset + length, data.len());
1702        if offset < data.len() {
1703            data = data[offset..end].to_vec();
1704        } else {
1705            data.clear();
1706        }
1707        Ok(data)
1708    }
1709
1710    async fn write_handler(
1711        State(harness): State<Harness>,
1712        Query(params): Query<PathParams>,
1713        body: axum::body::Bytes,
1714    ) -> Result<(), StatusCode> {
1715        let path = harness.resolve(&params.path);
1716        if let Some(parent) = path.parent() {
1717            std::fs::create_dir_all(parent).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1718        }
1719        let mut data = if params.truncate.as_deref() == Some("true") || !path.exists() {
1720            Vec::new()
1721        } else {
1722            std::fs::read(&path).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
1723        };
1724        let offset = params.offset.unwrap_or(0) as usize;
1725        let required = offset + body.len();
1726        if required > data.len() {
1727            data.resize(required, 0);
1728        }
1729        data[offset..offset + body.len()].copy_from_slice(&body);
1730        std::fs::write(path, data).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1731        Ok(())
1732    }
1733
1734    async fn upload_session_start_handler(
1735        State(harness): State<Harness>,
1736        Json(req): Json<UploadSessionStartRequest>,
1737    ) -> Result<Json<UploadSessionStartResponse>, StatusCode> {
1738        let session_id = Uuid::new_v4().to_string();
1739        let chunk_size_bytes = 1024i64;
1740        harness
1741            .upload_sessions
1742            .lock()
1743            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
1744            .insert(
1745                session_id.clone(),
1746                UploadSessionTestState {
1747                    path: req.path,
1748                    chunk_count: 0,
1749                },
1750            );
1751        Ok(Json(UploadSessionStartResponse {
1752            session_id: session_id.clone(),
1753            blob_key: format!("test/blob/{session_id}"),
1754            chunk_size_bytes,
1755        }))
1756    }
1757
1758    async fn upload_session_chunks_handler(
1759        State(harness): State<Harness>,
1760        Json(req): Json<UploadSessionChunksRequest>,
1761    ) -> Result<Json<UploadSessionChunksResponse>, StatusCode> {
1762        let base = harness
1763            .base_url
1764            .lock()
1765            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
1766            .clone()
1767            .ok_or(StatusCode::INTERNAL_SERVER_ERROR)?;
1768        let mut sessions = harness
1769            .upload_sessions
1770            .lock()
1771            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1772        let state = sessions
1773            .get_mut(&req.session_id)
1774            .ok_or(StatusCode::NOT_FOUND)?;
1775        state.chunk_count = req.chunks.len();
1776        let mut targets = req
1777            .chunks
1778            .iter()
1779            .map(|chunk| UploadChunkTarget {
1780                chunk_index: chunk.chunk_index,
1781                method: "PUT".to_string(),
1782                upload_url: format!(
1783                    "{base}/upload-chunk?sessionId={}&chunkIndex={}",
1784                    req.session_id, chunk.chunk_index
1785                ),
1786                headers: HashMap::new(),
1787            })
1788            .collect::<Vec<_>>();
1789        if harness.omit_last_chunk_target && !targets.is_empty() {
1790            targets.pop();
1791        }
1792        Ok(Json(UploadSessionChunksResponse { targets }))
1793    }
1794
1795    async fn upload_chunk_handler(
1796        State(harness): State<Harness>,
1797        Query(query): Query<UploadChunkQuery>,
1798        body: axum::body::Bytes,
1799    ) -> Result<(), StatusCode> {
1800        let chunk_path = harness.resolve(&format!(
1801            "/.upload-sessions/{}/{}",
1802            query.session_id, query.chunk_index
1803        ));
1804        if let Some(parent) = chunk_path.parent() {
1805            std::fs::create_dir_all(parent).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1806        }
1807        std::fs::write(chunk_path, body).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1808        Ok(())
1809    }
1810
1811    async fn data_manifest_handler(
1812        Query(query): Query<DataManifestQuery>,
1813    ) -> Result<Json<DataManifestDescriptor>, StatusCode> {
1814        let updated_at = if query.version.as_deref() == Some("v1") {
1815            "2026-01-01T00:00:00Z"
1816        } else {
1817            "2026-02-02T00:00:00Z"
1818        };
1819        Ok(Json(DataManifestDescriptor {
1820            schema_version: 1,
1821            format: "runmat-data".to_string(),
1822            dataset_id: format!("dataset:{}", query.path),
1823            updated_at: updated_at.to_string(),
1824            txn_sequence: 9,
1825        }))
1826    }
1827
1828    async fn data_chunk_upload_targets_handler(
1829        Json(req): Json<DataChunkUploadRequest>,
1830    ) -> Result<Json<serde_json::Value>, StatusCode> {
1831        let targets = req
1832            .chunks
1833            .iter()
1834            .map(|chunk| {
1835                serde_json::json!({
1836                    "key": chunk.key,
1837                    "method": "PUT",
1838                    "upload_url": format!("https://uploads.example.test/{}/{}", req.array, chunk.object_id),
1839                    "headers": {
1840                        "x-runmat-hash": chunk.hash,
1841                        "x-runmat-object": chunk.object_id,
1842                    }
1843                })
1844            })
1845            .collect::<Vec<_>>();
1846        Ok(Json(serde_json::json!({ "targets": targets })))
1847    }
1848
1849    async fn upload_session_complete_handler(
1850        State(harness): State<Harness>,
1851        Json(req): Json<UploadSessionCompleteRequest>,
1852    ) -> Result<(), StatusCode> {
1853        let state = harness
1854            .upload_sessions
1855            .lock()
1856            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
1857            .remove(&req.session_id)
1858            .ok_or(StatusCode::NOT_FOUND)?;
1859        let mut data = Vec::new();
1860        for chunk_index in 0..state.chunk_count {
1861            let chunk_path = harness.resolve(&format!(
1862                "/.upload-sessions/{}/{}",
1863                req.session_id, chunk_index
1864            ));
1865            let chunk = std::fs::read(chunk_path).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1866            data.extend_from_slice(&chunk);
1867        }
1868        if sha256_hex(&data) != req.content_sha256 {
1869            return Err(StatusCode::BAD_REQUEST);
1870        }
1871        let target_path = harness.resolve(&state.path);
1872        if let Some(parent) = target_path.parent() {
1873            std::fs::create_dir_all(parent).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1874        }
1875        std::fs::write(target_path, data).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1876        Ok(())
1877    }
1878
1879    async fn mkdir_handler(
1880        State(harness): State<Harness>,
1881        Json(req): Json<CreateDirRequest>,
1882    ) -> Result<(), StatusCode> {
1883        let path = harness.resolve(&req.path);
1884        if req.recursive {
1885            std::fs::create_dir_all(path).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1886        } else {
1887            std::fs::create_dir(path).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1888        }
1889        Ok(())
1890    }
1891
1892    async fn delete_file_handler(
1893        State(harness): State<Harness>,
1894        Query(params): Query<PathParams>,
1895    ) -> Result<(), StatusCode> {
1896        std::fs::remove_file(harness.resolve(&params.path)).map_err(|_| StatusCode::NOT_FOUND)?;
1897        Ok(())
1898    }
1899
1900    async fn delete_dir_handler(
1901        State(harness): State<Harness>,
1902        Query(params): Query<PathParams>,
1903    ) -> Result<(), StatusCode> {
1904        let recursive = params.recursive.as_deref() == Some("true");
1905        if recursive {
1906            std::fs::remove_dir_all(harness.resolve(&params.path))
1907                .map_err(|_| StatusCode::NOT_FOUND)?;
1908        } else {
1909            std::fs::remove_dir(harness.resolve(&params.path))
1910                .map_err(|_| StatusCode::NOT_FOUND)?;
1911        }
1912        Ok(())
1913    }
1914
1915    async fn rename_handler(
1916        State(harness): State<Harness>,
1917        Json(req): Json<RenameRequest>,
1918    ) -> Result<(), StatusCode> {
1919        let from = harness.resolve(&req.from);
1920        let to = harness.resolve(&req.to);
1921        if let Some(parent) = to.parent() {
1922            std::fs::create_dir_all(parent).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1923        }
1924        std::fs::rename(from, to).map_err(|_| StatusCode::NOT_FOUND)?;
1925        Ok(())
1926    }
1927
1928    async fn set_readonly_handler(
1929        State(harness): State<Harness>,
1930        Json(req): Json<SetReadonlyRequest>,
1931    ) -> Result<(), StatusCode> {
1932        let path = harness.resolve(&req.path);
1933        let mut perms = std::fs::metadata(&path)
1934            .map_err(|_| StatusCode::NOT_FOUND)?
1935            .permissions();
1936        perms.set_readonly(req.readonly);
1937        std::fs::set_permissions(path, perms).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1938        Ok(())
1939    }
1940
1941    async fn read_with_download_handler(
1942        State(harness): State<Harness>,
1943        Query(params): Query<PathParams>,
1944    ) -> Result<Json<serde_json::Value>, StatusCode> {
1945        let base = harness
1946            .base_url
1947            .lock()
1948            .unwrap()
1949            .clone()
1950            .ok_or(StatusCode::INTERNAL_SERVER_ERROR)?;
1951        let download_url = format!("{base}/download?path={}", params.path);
1952        Ok(Json(serde_json::json!({
1953            "downloadUrl": download_url,
1954            "expiresAt": 0
1955        })))
1956    }
1957
1958    async fn download_handler(
1959        State(harness): State<Harness>,
1960        Query(params): Query<PathParams>,
1961        headers: HeaderMap,
1962    ) -> Result<Vec<u8>, StatusCode> {
1963        let mut data =
1964            std::fs::read(harness.resolve(&params.path)).map_err(|_| StatusCode::NOT_FOUND)?;
1965        if let Some(range) = headers.get("range").and_then(|value| value.to_str().ok()) {
1966            if let Some((start, end)) = parse_range(range) {
1967                let start = start.min(data.len());
1968                let end = end.min(data.len().saturating_sub(1));
1969                if start < data.len() {
1970                    data = data[start..=end].to_vec();
1971                } else {
1972                    data.clear();
1973                }
1974            }
1975        }
1976        Ok(data)
1977    }
1978
1979    fn parse_range(value: &str) -> Option<(usize, usize)> {
1980        let value = value.strip_prefix("bytes=")?;
1981        let (start, end) = value.split_once('-')?;
1982        let start = start.parse::<usize>().ok()?;
1983        let end = end.parse::<usize>().ok()?;
1984        Some((start, end))
1985    }
1986
1987    fn spawn_server() -> (String, Harness, Runtime) {
1988        let harness = Harness::new();
1989        let router = Router::new()
1990            .route("/fs/metadata", get(metadata_handler))
1991            .route("/fs/dir", get(dir_handler).delete(delete_dir_handler))
1992            .route("/fs/file", delete(delete_file_handler))
1993            .route("/fs/canonicalize", get(canonicalize_handler))
1994            .route("/fs/read", get(read_handler))
1995            .route("/fs/write", put(write_handler))
1996            .route(
1997                "/fs/upload-session/start",
1998                post(upload_session_start_handler),
1999            )
2000            .route(
2001                "/fs/upload-session/chunks",
2002                post(upload_session_chunks_handler),
2003            )
2004            .route(
2005                "/fs/upload-session/complete",
2006                post(upload_session_complete_handler),
2007            )
2008            .route("/fs/mkdir", post(mkdir_handler))
2009            .route("/fs/rename", post(rename_handler))
2010            .route("/fs/set-readonly", post(set_readonly_handler))
2011            .route("/data/manifest", get(data_manifest_handler))
2012            .route(
2013                "/data/chunks/upload-targets",
2014                post(data_chunk_upload_targets_handler),
2015            )
2016            .route("/upload-chunk", put(upload_chunk_handler))
2017            .with_state(harness.clone());
2018        let std_listener = StdTcpListener::bind("127.0.0.1:0").unwrap();
2019        std_listener.set_nonblocking(true).expect("nonblocking");
2020        let addr = std_listener.local_addr().unwrap();
2021        let service = router.into_make_service();
2022        let rt = Runtime::new().unwrap();
2023        let base = format!("http://{addr}");
2024        *harness.base_url.lock().unwrap() = Some(base.clone());
2025        rt.spawn(async move {
2026            let listener = TokioTcpListener::from_std(std_listener).unwrap();
2027            axum::serve(listener, service).await.unwrap();
2028        });
2029        (base, harness, rt)
2030    }
2031
2032    fn spawn_server_with_download_url() -> (String, Harness, Runtime) {
2033        let harness = Harness::new();
2034        let router = Router::new()
2035            .route("/fs/metadata", get(metadata_handler))
2036            .route("/fs/dir", get(dir_handler).delete(delete_dir_handler))
2037            .route("/fs/file", delete(delete_file_handler))
2038            .route("/fs/canonicalize", get(canonicalize_handler))
2039            .route("/fs/read", get(read_with_download_handler))
2040            .route("/fs/write", put(write_handler))
2041            .route(
2042                "/fs/upload-session/start",
2043                post(upload_session_start_handler),
2044            )
2045            .route(
2046                "/fs/upload-session/chunks",
2047                post(upload_session_chunks_handler),
2048            )
2049            .route(
2050                "/fs/upload-session/complete",
2051                post(upload_session_complete_handler),
2052            )
2053            .route("/fs/mkdir", post(mkdir_handler))
2054            .route("/fs/rename", post(rename_handler))
2055            .route("/fs/set-readonly", post(set_readonly_handler))
2056            .route("/download", get(download_handler))
2057            .route("/data/manifest", get(data_manifest_handler))
2058            .route(
2059                "/data/chunks/upload-targets",
2060                post(data_chunk_upload_targets_handler),
2061            )
2062            .route("/upload-chunk", put(upload_chunk_handler))
2063            .with_state(harness.clone());
2064        let std_listener = StdTcpListener::bind("127.0.0.1:0").unwrap();
2065        std_listener.set_nonblocking(true).expect("nonblocking");
2066        let addr = std_listener.local_addr().unwrap();
2067        let service = router.into_make_service();
2068        let rt = Runtime::new().unwrap();
2069        let base = format!("http://{addr}");
2070        *harness.base_url.lock().unwrap() = Some(base.clone());
2071        rt.spawn(async move {
2072            let listener = TokioTcpListener::from_std(std_listener).unwrap();
2073            axum::serve(listener, service).await.unwrap();
2074        });
2075        (base, harness, rt)
2076    }
2077
2078    fn spawn_server_with_missing_chunk_target() -> (String, Harness, Runtime) {
2079        let harness = Harness::with_omit_last_chunk_target(true);
2080        let router = Router::new()
2081            .route("/fs/metadata", get(metadata_handler))
2082            .route("/fs/dir", get(dir_handler).delete(delete_dir_handler))
2083            .route("/fs/file", delete(delete_file_handler))
2084            .route("/fs/canonicalize", get(canonicalize_handler))
2085            .route("/fs/read", get(read_handler))
2086            .route("/fs/write", put(write_handler))
2087            .route(
2088                "/fs/upload-session/start",
2089                post(upload_session_start_handler),
2090            )
2091            .route(
2092                "/fs/upload-session/chunks",
2093                post(upload_session_chunks_handler),
2094            )
2095            .route(
2096                "/fs/upload-session/complete",
2097                post(upload_session_complete_handler),
2098            )
2099            .route("/fs/mkdir", post(mkdir_handler))
2100            .route("/fs/rename", post(rename_handler))
2101            .route("/fs/set-readonly", post(set_readonly_handler))
2102            .route("/data/manifest", get(data_manifest_handler))
2103            .route(
2104                "/data/chunks/upload-targets",
2105                post(data_chunk_upload_targets_handler),
2106            )
2107            .route("/upload-chunk", put(upload_chunk_handler))
2108            .with_state(harness.clone());
2109        let std_listener = StdTcpListener::bind("127.0.0.1:0").unwrap();
2110        std_listener.set_nonblocking(true).expect("nonblocking");
2111        let addr = std_listener.local_addr().unwrap();
2112        let service = router.into_make_service();
2113        let rt = Runtime::new().unwrap();
2114        let base = format!("http://{addr}");
2115        *harness.base_url.lock().unwrap() = Some(base.clone());
2116        rt.spawn(async move {
2117            let listener = TokioTcpListener::from_std(std_listener).unwrap();
2118            axum::serve(listener, service).await.unwrap();
2119        });
2120        (base, harness, rt)
2121    }
2122
2123    #[test]
2124    fn remote_provider_roundtrip() {
2125        let (base, harness, _rt) = spawn_server();
2126        let provider = RemoteFsProvider::new(RemoteFsConfig {
2127            base_url: base,
2128            auth_token: None,
2129            chunk_bytes: 1024,
2130            parallel_requests: 4,
2131            direct_read_threshold_bytes: u64::MAX,
2132            direct_write_threshold_bytes: 1024,
2133            timeout: Duration::from_secs(30),
2134            ..RemoteFsConfig::default()
2135        })
2136        .expect("provider");
2137
2138        let data = (0..16_384u32)
2139            .flat_map(|v| v.to_le_bytes())
2140            .collect::<Vec<_>>();
2141        executor::block_on(provider.write(Path::new("/reports/data.bin"), &data)).expect("write");
2142        let read_back =
2143            executor::block_on(provider.read(Path::new("/reports/data.bin"))).expect("read");
2144        assert_eq!(data, read_back);
2145        executor::block_on(provider.remove_file(Path::new("/reports/data.bin"))).expect("remove");
2146        assert!(!harness.resolve("/reports/data.bin").exists());
2147    }
2148
2149    #[test]
2150    fn remote_provider_download_url_read() {
2151        let (base, harness, _rt) = spawn_server_with_download_url();
2152        let provider = RemoteFsProvider::new(RemoteFsConfig {
2153            base_url: base,
2154            auth_token: None,
2155            chunk_bytes: 128,
2156            parallel_requests: 2,
2157            direct_read_threshold_bytes: u64::MAX,
2158            timeout: Duration::from_secs(30),
2159            ..RemoteFsConfig::default()
2160        })
2161        .expect("provider");
2162
2163        let data = (0..1024u32)
2164            .flat_map(|v| v.to_le_bytes())
2165            .collect::<Vec<_>>();
2166        std::fs::create_dir_all(harness.resolve("/reports")).expect("mkdir");
2167        std::fs::write(harness.resolve("/reports/data.bin"), &data).expect("write");
2168
2169        let read_back =
2170            executor::block_on(provider.read(Path::new("/reports/data.bin"))).expect("read");
2171        assert_eq!(data, read_back);
2172    }
2173
2174    #[test]
2175    fn remote_provider_errors_when_chunk_target_is_missing() {
2176        let (base, _harness, _rt) = spawn_server_with_missing_chunk_target();
2177        let provider = RemoteFsProvider::new(RemoteFsConfig {
2178            base_url: base,
2179            auth_token: None,
2180            chunk_bytes: 128,
2181            parallel_requests: 2,
2182            direct_read_threshold_bytes: u64::MAX,
2183            direct_write_threshold_bytes: 128,
2184            timeout: Duration::from_secs(30),
2185            ..RemoteFsConfig::default()
2186        })
2187        .expect("provider");
2188
2189        let data = vec![7u8; 1024];
2190        let err =
2191            executor::block_on(provider.write(Path::new("/reports/missing-target.bin"), &data))
2192                .expect_err("write should fail when chunk target is missing");
2193        assert_eq!(err.kind(), io::ErrorKind::Other);
2194    }
2195
2196    #[test]
2197    fn remote_provider_data_manifest_descriptor_fetches_descriptor() {
2198        let (base, _harness, _rt) = spawn_server();
2199        let provider = RemoteFsProvider::new(RemoteFsConfig {
2200            base_url: base,
2201            auth_token: None,
2202            timeout: Duration::from_secs(30),
2203            ..RemoteFsConfig::default()
2204        })
2205        .expect("provider");
2206
2207        let descriptor = provider
2208            .data_manifest_descriptor(&DataManifestRequest {
2209                path: "/datasets/alpha.data".to_string(),
2210                version: Some("v1".to_string()),
2211            })
2212            .expect("manifest descriptor");
2213        assert_eq!(descriptor.schema_version, 1);
2214        assert_eq!(descriptor.format, "runmat-data");
2215        assert_eq!(descriptor.dataset_id, "dataset:/datasets/alpha.data");
2216        assert_eq!(descriptor.updated_at, "2026-01-01T00:00:00Z");
2217        assert_eq!(descriptor.txn_sequence, 9);
2218    }
2219
2220    #[test]
2221    fn remote_provider_data_chunk_upload_targets_fetches_targets() {
2222        let (base, _harness, _rt) = spawn_server();
2223        let provider = RemoteFsProvider::new(RemoteFsConfig {
2224            base_url: base,
2225            auth_token: None,
2226            timeout: Duration::from_secs(30),
2227            ..RemoteFsConfig::default()
2228        })
2229        .expect("provider");
2230
2231        let targets = provider
2232            .data_chunk_upload_targets(&DataChunkUploadRequest {
2233                dataset_path: "/datasets/alpha.data".to_string(),
2234                array: "X".to_string(),
2235                chunks: vec![DataChunkDescriptor {
2236                    key: "X/0-0".to_string(),
2237                    object_id: "obj_0".to_string(),
2238                    hash: "sha256:abc".to_string(),
2239                    bytes_raw: 10,
2240                    bytes_stored: 8,
2241                }],
2242            })
2243            .expect("chunk upload targets");
2244
2245        assert_eq!(targets.len(), 1);
2246        assert_eq!(targets[0].key, "X/0-0");
2247        assert_eq!(targets[0].method, "PUT");
2248        assert_eq!(
2249            targets[0].upload_url,
2250            "https://uploads.example.test/X/obj_0"
2251        );
2252        let headers = &targets[0].headers;
2253        assert_eq!(headers.len(), 2);
2254        assert_eq!(
2255            headers.get("x-runmat-hash").map(String::as_str),
2256            Some("sha256:abc")
2257        );
2258        assert_eq!(
2259            headers.get("x-runmat-object").map(String::as_str),
2260            Some("obj_0")
2261        );
2262    }
2263}