Skip to main content

runmat_filesystem/remote/
native.rs

1use crate::data_contract::{
2    DataChunkUploadRequest, DataChunkUploadTarget, DataManifestDescriptor, DataManifestRequest,
3};
4use crate::{DirEntry, FileHandle, FsFileType, FsMetadata, FsProvider, OpenFlags};
5use async_trait::async_trait;
6use chrono::DateTime;
7use crossbeam_utils::thread;
8use once_cell::sync::Lazy;
9use reqwest::blocking::{Client, Response};
10use reqwest::StatusCode;
11use serde::{Deserialize, Serialize};
12use sha2::{Digest, Sha256};
13use std::collections::HashMap;
14use std::collections::VecDeque;
15use std::fmt;
16use std::io::{self, ErrorKind, Read, Seek, SeekFrom, Write};
17use std::path::{Path, PathBuf};
18use std::sync::{Arc, Mutex};
19use std::time::Duration;
20use url::Url;
21use uuid::Uuid;
22
23const MANIFEST_HASH: &str = "manifest:v1";
24const SHARD_PREFIX: &str = "/.runmat/shards";
25
26static USER_AGENT: Lazy<String> = Lazy::new(|| {
27    format!(
28        "runmat-remote-fs/{} ({})",
29        env!("CARGO_PKG_VERSION"),
30        std::env::consts::OS
31    )
32});
33
34#[derive(Clone, Debug)]
35pub struct RemoteFsConfig {
36    pub base_url: String,
37    pub auth_token: Option<String>,
38    pub chunk_bytes: usize,
39    pub parallel_requests: usize,
40    pub direct_read_threshold_bytes: u64,
41    pub direct_write_threshold_bytes: u64,
42    pub shard_threshold_bytes: u64,
43    pub shard_size_bytes: u64,
44    pub timeout: Duration,
45    pub retry_max_attempts: usize,
46    pub retry_base_delay: Duration,
47    pub retry_max_delay: Duration,
48}
49
50impl Default for RemoteFsConfig {
51    fn default() -> Self {
52        Self {
53            base_url: String::new(),
54            auth_token: None,
55            chunk_bytes: 16 * 1024 * 1024,
56            parallel_requests: 4,
57            direct_read_threshold_bytes: 64 * 1024 * 1024,
58            direct_write_threshold_bytes: 64 * 1024 * 1024,
59            shard_threshold_bytes: 4 * 1024 * 1024 * 1024,
60            shard_size_bytes: 512 * 1024 * 1024,
61            timeout: Duration::from_secs(120),
62            retry_max_attempts: 5,
63            retry_base_delay: Duration::from_millis(100),
64            retry_max_delay: Duration::from_secs(2),
65        }
66    }
67}
68
69struct RemoteInner {
70    client: Client,
71    base: Url,
72    auth_header: Option<String>,
73    chunk_bytes: usize,
74    parallel_requests: usize,
75    direct_read_threshold_bytes: u64,
76    direct_write_threshold_bytes: u64,
77    shard_threshold_bytes: u64,
78    shard_size_bytes: u64,
79    retry_max_attempts: usize,
80    retry_base_delay: Duration,
81    retry_max_delay: Duration,
82}
83
84impl RemoteInner {
85    fn new(config: RemoteFsConfig) -> io::Result<Self> {
86        if config.base_url.is_empty() {
87            return Err(io::Error::new(
88                ErrorKind::InvalidInput,
89                "RemoteFsConfig.base_url must be provided",
90            ));
91        }
92        let base = Url::parse(&config.base_url).map_err(map_url_err)?;
93        let client = Client::builder()
94            .timeout(config.timeout)
95            .user_agent(USER_AGENT.clone())
96            .build()
97            .map_err(map_http_err)?;
98        Ok(Self {
99            client,
100            base,
101            auth_header: config.auth_token.map(|token| format!("Bearer {token}")),
102            chunk_bytes: config.chunk_bytes.max(64 * 1024),
103            parallel_requests: config.parallel_requests.max(1),
104            direct_read_threshold_bytes: config.direct_read_threshold_bytes,
105            direct_write_threshold_bytes: config.direct_write_threshold_bytes,
106            shard_threshold_bytes: config.shard_threshold_bytes,
107            shard_size_bytes: config.shard_size_bytes.max(8 * 1024 * 1024),
108            retry_max_attempts: config.retry_max_attempts.max(1),
109            retry_base_delay: config.retry_base_delay,
110            retry_max_delay: config.retry_max_delay,
111        })
112    }
113
114    fn should_retry(&self, status: StatusCode) -> bool {
115        matches!(
116            status,
117            StatusCode::TOO_MANY_REQUESTS
118                | StatusCode::INTERNAL_SERVER_ERROR
119                | StatusCode::BAD_GATEWAY
120                | StatusCode::SERVICE_UNAVAILABLE
121                | StatusCode::GATEWAY_TIMEOUT
122        )
123    }
124
125    fn retry_delay(&self, attempt: usize) -> Duration {
126        let factor = 1u64.checked_shl(attempt as u32).unwrap_or(u64::MAX);
127        let delay = self.retry_base_delay.as_millis() as u64 * factor;
128        let capped = delay.min(self.retry_max_delay.as_millis() as u64);
129        Duration::from_millis(capped)
130    }
131
132    fn send_with_retry(
133        &self,
134        method: reqwest::Method,
135        route: &str,
136        query: &[(&str, String)],
137    ) -> io::Result<Response> {
138        for attempt in 0..self.retry_max_attempts {
139            let resp = self
140                .request(method.clone(), route, query)
141                .send()
142                .map_err(map_http_err)?;
143            if !self.should_retry(resp.status()) || attempt + 1 == self.retry_max_attempts {
144                return Ok(resp);
145            }
146            std::thread::sleep(self.retry_delay(attempt));
147        }
148        Err(io::Error::other("request retries exhausted"))
149    }
150
151    fn get_url_with_retry(&self, url: &str, range: Option<String>) -> io::Result<Response> {
152        for attempt in 0..self.retry_max_attempts {
153            let mut request = self.client.get(url);
154            if let Some(range) = &range {
155                request = request.header(reqwest::header::RANGE, range);
156            }
157            let resp = request.send().map_err(map_http_err)?;
158            if !self.should_retry(resp.status()) || attempt + 1 == self.retry_max_attempts {
159                return Ok(resp);
160            }
161            std::thread::sleep(self.retry_delay(attempt));
162        }
163        Err(io::Error::other("request retries exhausted"))
164    }
165
166    fn endpoint(&self, route: &str) -> Url {
167        self.base
168            .join(route.trim_start_matches('/'))
169            .expect("failed to join remote route")
170    }
171
172    fn request(
173        &self,
174        method: reqwest::Method,
175        route: &str,
176        query: &[(&str, String)],
177    ) -> reqwest::blocking::RequestBuilder {
178        let mut url = self.endpoint(route);
179        {
180            let mut pairs = url.query_pairs_mut();
181            for (k, v) in query {
182                pairs.append_pair(k, v);
183            }
184        }
185        let mut builder = self.client.request(method, url);
186        if let Some(auth) = &self.auth_header {
187            builder = builder.header("Authorization", auth);
188        }
189        builder
190    }
191
192    fn get_json<T: for<'a> Deserialize<'a>>(
193        &self,
194        route: &str,
195        query: &[(&str, String)],
196    ) -> io::Result<T> {
197        let resp = self.send_with_retry(reqwest::Method::GET, route, query)?;
198        handle_error(resp)?.json().map_err(map_http_err)
199    }
200
201    fn post_empty<B: Serialize>(&self, route: &str, body: &B) -> io::Result<()> {
202        let resp = self
203            .request(reqwest::Method::POST, route, &[])
204            .json(body)
205            .send()
206            .map_err(map_http_err)?;
207        handle_error(resp)?;
208        Ok(())
209    }
210
211    fn post_json<T: for<'a> Deserialize<'a>, B: Serialize>(
212        &self,
213        route: &str,
214        body: &B,
215    ) -> io::Result<T> {
216        let resp = self
217            .request(reqwest::Method::POST, route, &[])
218            .json(body)
219            .send()
220            .map_err(map_http_err)?;
221        handle_error(resp)?.json().map_err(map_http_err)
222    }
223
224    fn delete_empty(&self, route: &str, query: &[(&str, String)]) -> io::Result<()> {
225        let resp = self.send_with_retry(reqwest::Method::DELETE, route, query)?;
226        handle_error(resp)?;
227        Ok(())
228    }
229
230    fn download_chunk(&self, path: &str, offset: u64, length: usize) -> io::Result<Vec<u8>> {
231        let resp = self.send_with_retry(
232            reqwest::Method::GET,
233            "/fs/read",
234            &[
235                ("path", path.to_string()),
236                ("offset", offset.to_string()),
237                ("length", length.to_string()),
238            ],
239        )?;
240        let mut body = handle_error(resp)?;
241        if let Some(content_type) = body.headers().get(reqwest::header::CONTENT_TYPE) {
242            if content_type
243                .to_str()
244                .map(|value| value.contains("application/json"))
245                .unwrap_or(false)
246            {
247                let json: DownloadUrlResponse = body.json().map_err(map_http_err)?;
248                return self.download_range_from_url(&json.download_url, offset, length as u64);
249            }
250        }
251        let mut buf = Vec::with_capacity(length);
252        body.copy_to(&mut buf).map_err(map_http_err)?;
253        Ok(buf)
254    }
255
256    fn download_range_from_url(&self, url: &str, offset: u64, length: u64) -> io::Result<Vec<u8>> {
257        if length == 0 {
258            return Ok(Vec::new());
259        }
260        let end = offset + length - 1;
261        let resp = self.get_url_with_retry(url, Some(format!("bytes={offset}-{end}")))?;
262        let mut body = handle_error(resp)?;
263        let mut buf = Vec::with_capacity(length as usize);
264        body.copy_to(&mut buf).map_err(map_http_err)?;
265        Ok(buf)
266    }
267
268    fn fetch_download_url(&self, path: &str) -> io::Result<DownloadUrlResponse> {
269        self.get_json("/fs/download-url", &[("path", path.to_string())])
270    }
271
272    fn upload_chunk(
273        &self,
274        path: &str,
275        offset: u64,
276        truncate: bool,
277        final_chunk: bool,
278        data: &[u8],
279        hash: Option<&str>,
280    ) -> io::Result<Option<FsWriteSessionResponse>> {
281        let mut query = vec![("path", path.to_string()), ("offset", offset.to_string())];
282        if truncate {
283            query.push(("truncate", "true".to_string()));
284        }
285        if final_chunk {
286            query.push(("final", "true".to_string()));
287        }
288        if let Some(hash) = hash {
289            query.push(("hash", hash.to_string()));
290        }
291        let resp = self
292            .request(reqwest::Method::PUT, "/fs/write", &query)
293            .body(data.to_vec())
294            .send()
295            .map_err(map_http_err)?;
296        if resp.status() == StatusCode::ACCEPTED {
297            let session = handle_error(resp)?.json().map_err(map_http_err)?;
298            return Ok(Some(session));
299        }
300        handle_error(resp)?;
301        Ok(None)
302    }
303
304    fn multipart_create(&self, path: &str, size_bytes: u64) -> io::Result<MultipartUploadResponse> {
305        self.post_json(
306            "/fs/multipart-upload",
307            &MultipartUploadRequest {
308                path: path.to_string(),
309                size_bytes: size_bytes as i64,
310                content_type: None,
311            },
312        )
313    }
314
315    fn upload_session_start(
316        &self,
317        path: &str,
318        size_bytes: u64,
319        content_sha256: &str,
320    ) -> io::Result<UploadSessionStartResponse> {
321        self.post_json(
322            "/fs/upload-session/start",
323            &UploadSessionStartRequest {
324                path: path.to_string(),
325                size_bytes: size_bytes as i64,
326                content_type: None,
327                content_sha256: content_sha256.to_string(),
328            },
329        )
330    }
331
332    fn upload_session_chunks(
333        &self,
334        session_id: &str,
335        blob_key: &str,
336        chunks: &[UploadSessionChunkDescriptor],
337    ) -> io::Result<UploadSessionChunksResponse> {
338        self.post_json(
339            "/fs/upload-session/chunks",
340            &UploadSessionChunksRequest {
341                session_id: session_id.to_string(),
342                blob_key: blob_key.to_string(),
343                chunks: chunks.to_vec(),
344            },
345        )
346    }
347
348    fn upload_session_complete(
349        &self,
350        path: &str,
351        session_id: &str,
352        blob_key: &str,
353        size_bytes: u64,
354        content_sha256: &str,
355        chunk_count: usize,
356    ) -> io::Result<()> {
357        self.post_empty(
358            "/fs/upload-session/complete",
359            &UploadSessionCompleteRequest {
360                path: path.to_string(),
361                session_id: session_id.to_string(),
362                blob_key: blob_key.to_string(),
363                size_bytes: size_bytes as i64,
364                content_sha256: content_sha256.to_string(),
365                chunk_count,
366                hash: None,
367            },
368        )
369    }
370
371    fn multipart_presign_part(
372        &self,
373        session_id: &str,
374        blob_key: &str,
375        upload_id: &str,
376        part_number: i32,
377        size_bytes: u64,
378    ) -> io::Result<String> {
379        let response: MultipartUploadPartResponse = self.post_json(
380            "/fs/multipart-upload/part",
381            &MultipartUploadPartRequest {
382                session_id: session_id.to_string(),
383                blob_key: blob_key.to_string(),
384                upload_id: upload_id.to_string(),
385                part_number,
386                size_bytes: size_bytes as i64,
387            },
388        )?;
389        Ok(response.upload_url)
390    }
391
392    fn multipart_complete(
393        &self,
394        path: &str,
395        session_id: &str,
396        blob_key: &str,
397        upload_id: &str,
398        size_bytes: u64,
399        parts: Vec<MultipartPart>,
400    ) -> io::Result<()> {
401        let resp = self
402            .request(reqwest::Method::POST, "/fs/multipart-upload/complete", &[])
403            .json(&MultipartUploadCompleteRequest {
404                path: path.to_string(),
405                session_id: session_id.to_string(),
406                blob_key: blob_key.to_string(),
407                upload_id: upload_id.to_string(),
408                size_bytes: size_bytes as i64,
409                hash: None,
410                parts,
411            })
412            .send()
413            .map_err(map_http_err)?;
414        handle_error(resp)?;
415        Ok(())
416    }
417
418    fn put_upload_url_with_etag(&self, url: &str, data: &[u8]) -> io::Result<String> {
419        let resp = self
420            .client
421            .put(url)
422            .body(data.to_vec())
423            .send()
424            .map_err(map_http_err)?;
425        let resp = handle_error(resp)?;
426        let etag = resp
427            .headers()
428            .get(reqwest::header::ETAG)
429            .and_then(|value| value.to_str().ok())
430            .unwrap_or("")
431            .to_string();
432        Ok(etag)
433    }
434
435    fn put_upload_target(
436        &self,
437        method: &str,
438        url: &str,
439        headers: &HashMap<String, String>,
440        data: &[u8],
441    ) -> io::Result<()> {
442        let method = reqwest::Method::from_bytes(method.as_bytes()).map_err(|err| {
443            io::Error::new(
444                ErrorKind::InvalidInput,
445                format!("invalid upload method `{method}`: {err}"),
446            )
447        })?;
448        let mut request = self.client.request(method, url).body(data.to_vec());
449        for (name, value) in headers {
450            request = request.header(name, value);
451        }
452        let resp = request.send().map_err(map_http_err)?;
453        handle_error(resp)?;
454        Ok(())
455    }
456
457    fn fetch_metadata(&self, path: &str) -> io::Result<MetadataResponse> {
458        self.get_json("/fs/metadata", &[("path", path.to_string())])
459    }
460
461    fn fetch_dir(&self, path: &str) -> io::Result<Vec<DirEntryResponse>> {
462        self.get_json("/fs/dir", &[("path", path.to_string())])
463    }
464
465    fn canonicalize_path(&self, path: &str) -> io::Result<String> {
466        let resp: CanonicalizeResponse =
467            self.get_json("/fs/canonicalize", &[("path", path.to_string())])?;
468        Ok(resp.path)
469    }
470}
471
472pub struct RemoteFsProvider {
473    inner: Arc<RemoteInner>,
474}
475
476impl RemoteFsProvider {
477    pub fn new(config: RemoteFsConfig) -> io::Result<Self> {
478        Ok(Self {
479            inner: Arc::new(RemoteInner::new(config)?),
480        })
481    }
482
483    /// Fetch a dataset manifest descriptor through the provider-neutral data contract endpoint.
484    pub fn data_manifest_descriptor(
485        &self,
486        request: &DataManifestRequest,
487    ) -> io::Result<DataManifestDescriptor> {
488        let mut query = vec![("path", request.path.clone())];
489        if let Some(version) = &request.version {
490            query.push(("version", version.clone()));
491        }
492        self.inner.get_json("/data/manifest", &query)
493    }
494
495    /// Request direct upload targets for dataset chunk objects.
496    pub fn data_chunk_upload_targets(
497        &self,
498        request: &DataChunkUploadRequest,
499    ) -> io::Result<Vec<DataChunkUploadTarget>> {
500        #[derive(Deserialize)]
501        struct DataChunkUploadTargetsResponse {
502            targets: Vec<DataChunkUploadTarget>,
503        }
504        let response: DataChunkUploadTargetsResponse = self
505            .inner
506            .post_json("/data/chunks/upload-targets", request)?;
507        Ok(response.targets)
508    }
509
510    fn normalize(&self, path: &Path) -> String {
511        let mut normalized = PathBuf::new();
512        normalized.push("/");
513        normalized.push(path);
514        normalized.to_string_lossy().replace('\\', "/").to_string()
515    }
516
517    fn ensure_parent_exists(&self, path: &Path) -> io::Result<()> {
518        if let Some(parent) = path.parent() {
519            self.inner.post_empty(
520                "/fs/mkdir",
521                &CreateDirRequest {
522                    path: self.normalize(parent),
523                    recursive: true,
524                },
525            )?;
526        }
527        Ok(())
528    }
529
530    fn download_raw_file(&self, path: &str, len: u64) -> io::Result<Vec<u8>> {
531        if len == 0 {
532            return Ok(Vec::new());
533        }
534        if len >= self.inner.direct_read_threshold_bytes {
535            let url = self.inner.fetch_download_url(path)?.download_url;
536            return self.download_entire_file_from_url(&url, len);
537        }
538        let chunk = self.inner.chunk_bytes as u64;
539        let mut tasks = Vec::new();
540        let mut offset = 0;
541        let mut index = 0;
542        while offset < len {
543            let remaining = len - offset;
544            let length = std::cmp::min(chunk, remaining);
545            tasks.push(ChunkTask {
546                offset,
547                length: length as usize,
548                index,
549            });
550            offset += length;
551            index += 1;
552        }
553        let mut buffer = vec![0u8; len as usize];
554        let path = path.to_string();
555        let inner = self.inner.clone();
556        let queue = Arc::new(Mutex::new(VecDeque::from(tasks.clone())));
557        let results = Arc::new(Mutex::new(vec![None::<Vec<u8>>; tasks.len()]));
558        let error: Arc<Mutex<Option<io::Error>>> = Arc::new(Mutex::new(None));
559        let threads = inner
560            .parallel_requests
561            .min(queue.lock().unwrap().len())
562            .max(1);
563        thread::scope(|scope| {
564            for _ in 0..threads {
565                let queue = queue.clone();
566                let error = error.clone();
567                let results = results.clone();
568                let inner = inner.clone();
569                let path = path.clone();
570                scope.spawn(move |_| loop {
571                    let task_opt = {
572                        let mut guard = queue.lock().unwrap();
573                        guard.pop_front()
574                    };
575                    let task = match task_opt {
576                        Some(task) => task,
577                        None => break,
578                    };
579                    match inner.download_chunk(&path, task.offset, task.length) {
580                        Ok(bytes) => {
581                            let mut guard = results.lock().unwrap();
582                            guard[task.index] = Some(bytes);
583                        }
584                        Err(err) => {
585                            let mut guard = error.lock().unwrap();
586                            if guard.is_none() {
587                                *guard = Some(err);
588                            }
589                            break;
590                        }
591                    }
592                });
593            }
594        })
595        .expect("remote download scope");
596        if let Some(err) = error.lock().unwrap().take() {
597            return Err(err);
598        }
599        let chunks = Arc::try_unwrap(results)
600            .expect("results still in use")
601            .into_inner()
602            .expect("results poisoned");
603        for (task, maybe) in tasks.iter().zip(chunks.into_iter()) {
604            let bytes = maybe.expect("missing downloaded chunk for remote file");
605            let start = task.offset as usize;
606            buffer[start..start + bytes.len()].copy_from_slice(&bytes);
607        }
608        Ok(buffer)
609    }
610
611    fn download_sharded_file(&self, path: &str, len: u64) -> io::Result<Vec<u8>> {
612        let manifest_bytes = self.download_raw_file(path, len)?;
613        let manifest: ShardManifest = serde_json::from_slice(&manifest_bytes)
614            .map_err(|_| io::Error::new(ErrorKind::InvalidData, "invalid shard manifest"))?;
615        if manifest.version != 1 {
616            return Err(io::Error::new(
617                ErrorKind::InvalidData,
618                "unsupported manifest",
619            ));
620        }
621        let mut buffer = Vec::with_capacity(manifest.total_size as usize);
622        for shard in manifest.shards {
623            let meta = self.inner.fetch_metadata(&shard.path)?;
624            let bytes = self.download_raw_file(&shard.path, meta.len)?;
625            buffer.extend_from_slice(&bytes);
626        }
627        Ok(buffer)
628    }
629
630    fn download_entire_file_from_url(&self, url: &str, len: u64) -> io::Result<Vec<u8>> {
631        let chunk = self.inner.chunk_bytes as u64;
632        let mut tasks = Vec::new();
633        let mut offset = 0;
634        let mut index = 0;
635        while offset < len {
636            let remaining = len - offset;
637            let length = std::cmp::min(chunk, remaining);
638            tasks.push(ChunkTask {
639                offset,
640                length: length as usize,
641                index,
642            });
643            offset += length;
644            index += 1;
645        }
646        let mut buffer = vec![0u8; len as usize];
647        let url = url.to_string();
648        let inner = self.inner.clone();
649        let queue = Arc::new(Mutex::new(VecDeque::from(tasks.clone())));
650        let results = Arc::new(Mutex::new(vec![None::<Vec<u8>>; tasks.len()]));
651        let error: Arc<Mutex<Option<io::Error>>> = Arc::new(Mutex::new(None));
652        let threads = inner
653            .parallel_requests
654            .min(queue.lock().unwrap().len())
655            .max(1);
656        thread::scope(|scope| {
657            for _ in 0..threads {
658                let queue = queue.clone();
659                let error = error.clone();
660                let results = results.clone();
661                let inner = inner.clone();
662                let url = url.clone();
663                scope.spawn(move |_| loop {
664                    let task_opt = {
665                        let mut guard = queue.lock().unwrap();
666                        guard.pop_front()
667                    };
668                    let task = match task_opt {
669                        Some(task) => task,
670                        None => break,
671                    };
672                    match inner.download_range_from_url(&url, task.offset, task.length as u64) {
673                        Ok(bytes) => {
674                            let mut guard = results.lock().unwrap();
675                            guard[task.index] = Some(bytes);
676                        }
677                        Err(err) => {
678                            let mut guard = error.lock().unwrap();
679                            if guard.is_none() {
680                                *guard = Some(err);
681                            }
682                            break;
683                        }
684                    }
685                });
686            }
687        })
688        .expect("remote download scope");
689        if let Some(err) = error.lock().unwrap().take() {
690            return Err(err);
691        }
692        let chunks = Arc::try_unwrap(results)
693            .expect("results still in use")
694            .into_inner()
695            .expect("results poisoned");
696        for (task, maybe) in tasks.iter().zip(chunks.into_iter()) {
697            let bytes = maybe.expect("missing downloaded chunk for remote file");
698            let start = task.offset as usize;
699            buffer[start..start + bytes.len()].copy_from_slice(&bytes);
700        }
701        Ok(buffer)
702    }
703
704    fn upload_entire_file(&self, path: &str, data: &[u8]) -> io::Result<()> {
705        if data.len() as u64 >= self.inner.shard_threshold_bytes {
706            return self.upload_sharded_file(path, data);
707        }
708        self.upload_unsharded_file(path, data, None)
709    }
710
711    fn upload_unsharded_file(&self, path: &str, data: &[u8], hash: Option<&str>) -> io::Result<()> {
712        if data.len() as u64 >= self.inner.direct_write_threshold_bytes {
713            return self.upload_multipart_file(path, data);
714        }
715        if data.is_empty() {
716            self.inner.upload_chunk(path, 0, true, true, data, hash)?;
717            return Ok(());
718        }
719        let chunk = self.inner.chunk_bytes;
720        let mut offset = 0usize;
721        while offset < data.len() {
722            let end = std::cmp::min(offset + chunk, data.len());
723            let slice = &data[offset..end];
724            let truncate = offset == 0;
725            let final_chunk = end == data.len();
726            let result =
727                self.inner
728                    .upload_chunk(path, offset as u64, truncate, final_chunk, slice, hash)?;
729            if let Some(session) = result {
730                let expected = offset as u64 + slice.len() as u64;
731                if session.next_offset as u64 != expected {
732                    return Err(io::Error::other("unexpected next offset"));
733                }
734            }
735            offset = end;
736        }
737        Ok(())
738    }
739
740    fn upload_sharded_file(&self, path: &str, data: &[u8]) -> io::Result<()> {
741        let shard_size = self.inner.shard_size_bytes as usize;
742        let mut shards = Vec::new();
743        let mut offset = 0usize;
744        while offset < data.len() {
745            let end = std::cmp::min(offset + shard_size, data.len());
746            let slice = &data[offset..end];
747            let shard_path = format!("{}/{}", SHARD_PREFIX, Uuid::new_v4());
748            self.upload_unsharded_file(&shard_path, slice, None)?;
749            shards.push(ShardEntry {
750                path: shard_path,
751                size: slice.len() as u64,
752            });
753            offset = end;
754        }
755        let manifest = ShardManifest {
756            version: 1,
757            total_size: data.len() as u64,
758            shard_size: self.inner.shard_size_bytes,
759            shards,
760        };
761        let bytes = serde_json::to_vec(&manifest)
762            .map_err(|_| io::Error::new(ErrorKind::InvalidData, "invalid manifest"))?;
763        self.upload_unsharded_file(path, &bytes, Some(MANIFEST_HASH))
764    }
765
766    fn upload_multipart_file(&self, path: &str, data: &[u8]) -> io::Result<()> {
767        if data.is_empty() {
768            self.inner.upload_chunk(path, 0, true, true, data, None)?;
769            return Ok(());
770        }
771        let content_sha256 = sha256_hex(data);
772        let session =
773            match self
774                .inner
775                .upload_session_start(path, data.len() as u64, &content_sha256)
776            {
777                Ok(session) => session,
778                Err(err) if err.kind() == ErrorKind::NotFound => {
779                    return self.upload_multipart_file_legacy(path, data);
780                }
781                Err(err) => return Err(err),
782            };
783        let chunk_size = (session.chunk_size_bytes as usize).max(1);
784        let mut tasks = Vec::new();
785        let mut offset = 0usize;
786        let mut index = 0usize;
787        while offset < data.len() {
788            let end = std::cmp::min(offset + chunk_size, data.len());
789            let slice = &data[offset..end];
790            tasks.push(UploadTask {
791                index,
792                offset,
793                length: end - offset,
794                chunk_sha256: sha256_hex(slice),
795            });
796            offset = end;
797            index += 1;
798        }
799        let descriptors: Vec<UploadSessionChunkDescriptor> = tasks
800            .iter()
801            .map(|task| UploadSessionChunkDescriptor {
802                chunk_index: task.index,
803                offset_bytes: task.offset as i64,
804                size_bytes: task.length as i64,
805                chunk_sha256: task.chunk_sha256.clone(),
806            })
807            .collect();
808        let chunk_response = self.inner.upload_session_chunks(
809            &session.session_id,
810            &session.blob_key,
811            &descriptors,
812        )?;
813        let targets = Arc::new(chunk_response.targets);
814        let tasks = Arc::new(Mutex::new(VecDeque::from(tasks)));
815        let error = Arc::new(Mutex::new(None));
816        let data = Arc::new(data.to_vec());
817        thread::scope(|scope| {
818            for _ in 0..self.inner.parallel_requests {
819                let tasks = Arc::clone(&tasks);
820                let targets = Arc::clone(&targets);
821                let error = Arc::clone(&error);
822                let inner = Arc::clone(&self.inner);
823                let data = Arc::clone(&data);
824                scope.spawn(move |_| loop {
825                    let task = {
826                        let mut guard = tasks.lock().unwrap();
827                        guard.pop_front()
828                    };
829                    let Some(task) = task else { break };
830                    let target = targets
831                        .iter()
832                        .find(|target| target.chunk_index == task.index)
833                        .cloned();
834                    let result = (|| {
835                        let target = target.ok_or_else(|| {
836                            io::Error::other(format!(
837                                "missing upload target for chunk {}",
838                                task.index
839                            ))
840                        })?;
841                        let slice = &data[task.offset..task.offset + task.length];
842                        inner.put_upload_target(
843                            &target.method,
844                            &target.upload_url,
845                            &target.headers,
846                            slice,
847                        )
848                    })();
849                    if let Err(err) = result {
850                        let mut err_guard = error.lock().unwrap();
851                        if err_guard.is_none() {
852                            *err_guard = Some(io::Error::new(err.kind(), err.to_string()));
853                        }
854                        break;
855                    }
856                });
857            }
858        })
859        .expect("upload session scope");
860        if let Some(err) = error.lock().unwrap().take() {
861            return Err(err);
862        }
863        self.inner.upload_session_complete(
864            path,
865            &session.session_id,
866            &session.blob_key,
867            data.len() as u64,
868            &content_sha256,
869            descriptors.len(),
870        )?;
871        Ok(())
872    }
873
874    fn upload_multipart_file_legacy(&self, path: &str, data: &[u8]) -> io::Result<()> {
875        let session = self.inner.multipart_create(path, data.len() as u64)?;
876        let part_size = session.part_size_bytes as usize;
877        let mut tasks = std::collections::VecDeque::new();
878        let mut offset = 0usize;
879        let mut part_number = 1;
880        let mut index = 0usize;
881        while offset < data.len() {
882            let end = std::cmp::min(offset + part_size, data.len());
883            let length = end - offset;
884            tasks.push_back(MultipartTask {
885                index,
886                part_number,
887                offset,
888                length,
889            });
890            offset = end;
891            part_number += 1;
892            index += 1;
893        }
894        let tasks = Arc::new(Mutex::new(tasks));
895        let task_len = tasks.lock().unwrap().len();
896        let mut result_vec: Vec<MultipartResult> = Vec::with_capacity(task_len);
897        for _ in 0..task_len {
898            result_vec.push(None);
899        }
900        let results = Arc::new(Mutex::new(result_vec));
901        let error = Arc::new(Mutex::new(None));
902        let data = Arc::new(data.to_vec());
903        thread::scope(|scope| {
904            for _ in 0..self.inner.parallel_requests {
905                let tasks = Arc::clone(&tasks);
906                let results = Arc::clone(&results);
907                let error = Arc::clone(&error);
908                let inner = Arc::clone(&self.inner);
909                let blob_key = session.blob_key.clone();
910                let upload_id = session.upload_id.clone();
911                let session_id = session.session_id.clone();
912                let data = Arc::clone(&data);
913                scope.spawn(move |_| loop {
914                    let task = {
915                        let mut guard = tasks.lock().unwrap();
916                        guard.pop_front()
917                    };
918                    let Some(task) = task else { break };
919                    let slice = &data[task.offset..task.offset + task.length];
920                    let result = (|| {
921                        let url = inner.multipart_presign_part(
922                            &session_id,
923                            &blob_key,
924                            &upload_id,
925                            task.part_number,
926                            task.length as u64,
927                        )?;
928                        let etag = inner.put_upload_url_with_etag(&url, slice)?;
929                        Ok(MultipartPart {
930                            part_number: task.part_number,
931                            etag,
932                        })
933                    })();
934                    let mut guard = results.lock().unwrap();
935                    guard[task.index] = Some(result);
936                    if let Some(Err(err)) = guard[task.index].as_ref() {
937                        let mut err_guard = error.lock().unwrap();
938                        if err_guard.is_none() {
939                            *err_guard = Some(io::Error::new(err.kind(), err.to_string()));
940                        }
941                        break;
942                    }
943                });
944            }
945        })
946        .expect("multipart upload scope");
947        if let Some(err) = error.lock().unwrap().take() {
948            return Err(err);
949        }
950        let mut parts = Vec::with_capacity(results.lock().unwrap().len());
951        for maybe in Arc::try_unwrap(results)
952            .expect("results still in use")
953            .into_inner()
954            .expect("results poisoned")
955        {
956            let part = maybe.expect("missing part result")?;
957            if part.etag.is_empty() {
958                return Err(io::Error::other("missing etag"));
959            }
960            parts.push(part);
961        }
962        parts.sort_by_key(|part| part.part_number);
963        self.inner.multipart_complete(
964            path,
965            &session.session_id,
966            &session.blob_key,
967            &session.upload_id,
968            data.len() as u64,
969            parts,
970        )?;
971        Ok(())
972    }
973}
974
975#[derive(Clone, Copy)]
976struct ChunkTask {
977    offset: u64,
978    length: usize,
979    index: usize,
980}
981
982type MultipartResult = Option<Result<MultipartPart, io::Error>>;
983
984#[derive(Clone, Copy)]
985struct MultipartTask {
986    index: usize,
987    part_number: i32,
988    offset: usize,
989    length: usize,
990}
991
992#[derive(Clone)]
993struct UploadTask {
994    index: usize,
995    offset: usize,
996    length: usize,
997    chunk_sha256: String,
998}
999
1000#[async_trait(?Send)]
1001impl FsProvider for RemoteFsProvider {
1002    fn open(&self, path: &Path, flags: &OpenFlags) -> io::Result<Box<dyn FileHandle>> {
1003        let normalized = self.normalize(path);
1004        let mut data = Vec::new();
1005        if flags.read {
1006            let meta = self.inner.fetch_metadata(&normalized)?;
1007            if meta.file_type != "file" {
1008                return Err(io::Error::other("remote path is not a file"));
1009            }
1010            data = if meta.hash.as_deref() == Some(MANIFEST_HASH) {
1011                self.download_sharded_file(&normalized, meta.len)?
1012            } else {
1013                self.download_raw_file(&normalized, meta.len)?
1014            };
1015        }
1016        if flags.truncate {
1017            data.clear();
1018        }
1019        if flags.create {
1020            self.ensure_parent_exists(path)?;
1021        }
1022        let handle = RemoteFileHandle {
1023            provider: self.clone(),
1024            path: normalized,
1025            data,
1026            position: 0,
1027            flags: flags.clone(),
1028            dirty: false,
1029        };
1030        Ok(Box::new(handle))
1031    }
1032
1033    async fn read(&self, path: &Path) -> io::Result<Vec<u8>> {
1034        let normalized = self.normalize(path);
1035        let meta = self.inner.fetch_metadata(&normalized)?;
1036        if meta.file_type != "file" {
1037            return Err(io::Error::other("remote path is not a file"));
1038        }
1039        if meta.hash.as_deref() == Some(MANIFEST_HASH) {
1040            self.download_sharded_file(&normalized, meta.len)
1041        } else {
1042            self.download_raw_file(&normalized, meta.len)
1043        }
1044    }
1045
1046    async fn write(&self, path: &Path, data: &[u8]) -> io::Result<()> {
1047        let normalized = self.normalize(path);
1048        self.ensure_parent_exists(path)?;
1049        self.upload_entire_file(&normalized, data)
1050    }
1051
1052    async fn remove_file(&self, path: &Path) -> io::Result<()> {
1053        let normalized = self.normalize(path);
1054        self.inner
1055            .delete_empty("/fs/file", &[("path", normalized)])?;
1056        Ok(())
1057    }
1058
1059    async fn metadata(&self, path: &Path) -> io::Result<FsMetadata> {
1060        let normalized = self.normalize(path);
1061        let resp = self.inner.fetch_metadata(&normalized)?;
1062        Ok(resp.into())
1063    }
1064
1065    async fn symlink_metadata(&self, path: &Path) -> io::Result<FsMetadata> {
1066        self.metadata(path).await
1067    }
1068
1069    async fn read_dir(&self, path: &Path) -> io::Result<Vec<DirEntry>> {
1070        let normalized = self.normalize(path);
1071        let resp = self.inner.fetch_dir(&normalized)?;
1072        Ok(resp
1073            .into_iter()
1074            .map(|entry| DirEntry {
1075                path: PathBuf::from(entry.path),
1076                file_name: entry.file_name.into(),
1077                file_type: entry.file_type.into(),
1078            })
1079            .collect())
1080    }
1081
1082    async fn canonicalize(&self, path: &Path) -> io::Result<PathBuf> {
1083        let normalized = self.normalize(path);
1084        let canonical = self.inner.canonicalize_path(&normalized)?;
1085        Ok(PathBuf::from(canonical))
1086    }
1087
1088    async fn create_dir(&self, path: &Path) -> io::Result<()> {
1089        let normalized = self.normalize(path);
1090        self.inner.post_empty(
1091            "/fs/mkdir",
1092            &CreateDirRequest {
1093                path: normalized,
1094                recursive: false,
1095            },
1096        )
1097    }
1098
1099    async fn create_dir_all(&self, path: &Path) -> io::Result<()> {
1100        let normalized = self.normalize(path);
1101        self.inner.post_empty(
1102            "/fs/mkdir",
1103            &CreateDirRequest {
1104                path: normalized,
1105                recursive: true,
1106            },
1107        )
1108    }
1109
1110    async fn remove_dir(&self, path: &Path) -> io::Result<()> {
1111        let normalized = self.normalize(path);
1112        self.inner.delete_empty(
1113            "/fs/dir",
1114            &[("path", normalized), ("recursive", "false".into())],
1115        )
1116    }
1117
1118    async fn remove_dir_all(&self, path: &Path) -> io::Result<()> {
1119        let normalized = self.normalize(path);
1120        self.inner.delete_empty(
1121            "/fs/dir",
1122            &[("path", normalized), ("recursive", "true".into())],
1123        )
1124    }
1125
1126    async fn rename(&self, from: &Path, to: &Path) -> io::Result<()> {
1127        self.inner.post_empty(
1128            "/fs/rename",
1129            &RenameRequest {
1130                from: self.normalize(from),
1131                to: self.normalize(to),
1132            },
1133        )
1134    }
1135
1136    async fn set_readonly(&self, path: &Path, readonly: bool) -> io::Result<()> {
1137        self.inner.post_empty(
1138            "/fs/set-readonly",
1139            &SetReadonlyRequest {
1140                path: self.normalize(path),
1141                readonly,
1142            },
1143        )
1144    }
1145
1146    async fn data_manifest_descriptor(
1147        &self,
1148        request: &DataManifestRequest,
1149    ) -> io::Result<DataManifestDescriptor> {
1150        let mut query = vec![("path", request.path.clone())];
1151        if let Some(version) = &request.version {
1152            query.push(("version", version.clone()));
1153        }
1154        self.inner.get_json("/data/manifest", &query)
1155    }
1156
1157    async fn data_chunk_upload_targets(
1158        &self,
1159        request: &DataChunkUploadRequest,
1160    ) -> io::Result<Vec<DataChunkUploadTarget>> {
1161        #[derive(Deserialize)]
1162        struct DataChunkUploadTargetsResponse {
1163            targets: Vec<DataChunkUploadTarget>,
1164        }
1165        let response: DataChunkUploadTargetsResponse = self
1166            .inner
1167            .post_json("/data/chunks/upload-targets", request)?;
1168        Ok(response.targets)
1169    }
1170
1171    async fn data_upload_chunk(
1172        &self,
1173        target: &DataChunkUploadTarget,
1174        data: &[u8],
1175    ) -> io::Result<()> {
1176        self.inner
1177            .put_upload_target(&target.method, &target.upload_url, &target.headers, data)
1178    }
1179}
1180
1181impl Clone for RemoteFsProvider {
1182    fn clone(&self) -> Self {
1183        Self {
1184            inner: Arc::clone(&self.inner),
1185        }
1186    }
1187}
1188
1189struct RemoteFileHandle {
1190    provider: RemoteFsProvider,
1191    path: String,
1192    data: Vec<u8>,
1193    position: usize,
1194    flags: OpenFlags,
1195    dirty: bool,
1196}
1197
1198impl RemoteFileHandle {
1199    fn flush_remote(&mut self) -> io::Result<()> {
1200        if !self.dirty {
1201            return Ok(());
1202        }
1203        self.provider.upload_entire_file(&self.path, &self.data)?;
1204        self.dirty = false;
1205        Ok(())
1206    }
1207}
1208
1209impl Read for RemoteFileHandle {
1210    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
1211        let remaining = &self.data[self.position..];
1212        let amt = remaining.len().min(buf.len());
1213        buf[..amt].copy_from_slice(&remaining[..amt]);
1214        self.position += amt;
1215        Ok(amt)
1216    }
1217}
1218
1219impl Write for RemoteFileHandle {
1220    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1221        if !self.flags.write && !self.flags.append {
1222            return Err(io::Error::new(
1223                ErrorKind::PermissionDenied,
1224                "remote file not opened for writing",
1225            ));
1226        }
1227        if self.flags.append {
1228            self.position = self.data.len();
1229        }
1230        let required = self.position + buf.len();
1231        if required > self.data.len() {
1232            self.data.resize(required, 0);
1233        }
1234        self.data[self.position..self.position + buf.len()].copy_from_slice(buf);
1235        self.position += buf.len();
1236        self.dirty = true;
1237        Ok(buf.len())
1238    }
1239
1240    fn flush(&mut self) -> io::Result<()> {
1241        self.flush_remote()
1242    }
1243}
1244
1245impl Seek for RemoteFileHandle {
1246    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
1247        let new_pos = match pos {
1248            SeekFrom::Start(offset) => offset as i64,
1249            SeekFrom::End(offset) => self.data.len() as i64 + offset,
1250            SeekFrom::Current(offset) => self.position as i64 + offset,
1251        };
1252        if new_pos < 0 {
1253            return Err(io::Error::new(ErrorKind::InvalidInput, "seek before start"));
1254        }
1255        self.position = new_pos as usize;
1256        Ok(self.position as u64)
1257    }
1258}
1259
1260#[async_trait(?Send)]
1261impl FileHandle for RemoteFileHandle {}
1262
1263impl Drop for RemoteFileHandle {
1264    fn drop(&mut self) {
1265        if self.dirty {
1266            if let Err(err) = self.provider.upload_entire_file(&self.path, &self.data) {
1267                eprintln!("remote fs flush failed: {err}");
1268            }
1269        }
1270    }
1271}
1272
1273#[derive(Debug, Deserialize)]
1274struct MetadataResponse {
1275    #[serde(rename = "fileType")]
1276    file_type: String,
1277    len: u64,
1278    #[serde(rename = "modifiedAt")]
1279    modified_at: Option<String>,
1280    readonly: bool,
1281    hash: Option<String>,
1282}
1283
1284impl From<MetadataResponse> for FsMetadata {
1285    fn from(value: MetadataResponse) -> Self {
1286        FsMetadata::new_with_hash(
1287            value.file_type.into(),
1288            value.len,
1289            parse_modified_at(value.modified_at.as_deref()),
1290            value.readonly,
1291            value.hash,
1292        )
1293    }
1294}
1295
1296#[derive(Debug, Deserialize)]
1297struct DirEntryResponse {
1298    path: String,
1299    #[serde(rename = "fileName")]
1300    file_name: String,
1301    #[serde(rename = "fileType")]
1302    file_type: String,
1303}
1304
1305impl From<String> for FsFileType {
1306    fn from(value: String) -> Self {
1307        match value.as_str() {
1308            "dir" => FsFileType::Directory,
1309            "file" => FsFileType::File,
1310            "symlink" => FsFileType::Symlink,
1311            "other" => FsFileType::Other,
1312            _ => FsFileType::Unknown,
1313        }
1314    }
1315}
1316
1317#[derive(Debug, Serialize, Deserialize)]
1318struct CreateDirRequest {
1319    path: String,
1320    recursive: bool,
1321}
1322
1323#[derive(Debug, Serialize, Deserialize)]
1324struct RenameRequest {
1325    from: String,
1326    to: String,
1327}
1328
1329#[derive(Debug, Serialize, Deserialize)]
1330struct SetReadonlyRequest {
1331    path: String,
1332    readonly: bool,
1333}
1334
1335#[derive(Debug, Deserialize)]
1336struct CanonicalizeResponse {
1337    path: String,
1338}
1339
1340#[derive(Debug, Deserialize)]
1341struct DownloadUrlResponse {
1342    #[serde(rename = "downloadUrl")]
1343    download_url: String,
1344}
1345
1346#[derive(Debug, Serialize, Deserialize)]
1347#[serde(rename_all = "camelCase")]
1348struct UploadSessionStartRequest {
1349    path: String,
1350    size_bytes: i64,
1351    content_type: Option<String>,
1352    content_sha256: String,
1353}
1354
1355#[derive(Debug, Serialize, Deserialize)]
1356#[serde(rename_all = "camelCase")]
1357struct UploadSessionStartResponse {
1358    session_id: String,
1359    blob_key: String,
1360    chunk_size_bytes: i64,
1361}
1362
1363#[derive(Debug, Clone, Serialize, Deserialize)]
1364#[serde(rename_all = "camelCase")]
1365struct UploadSessionChunkDescriptor {
1366    chunk_index: usize,
1367    offset_bytes: i64,
1368    size_bytes: i64,
1369    chunk_sha256: String,
1370}
1371
1372#[derive(Debug, Serialize, Deserialize)]
1373#[serde(rename_all = "camelCase")]
1374struct UploadSessionChunksRequest {
1375    session_id: String,
1376    blob_key: String,
1377    chunks: Vec<UploadSessionChunkDescriptor>,
1378}
1379
1380#[derive(Debug, Serialize, Deserialize)]
1381#[serde(rename_all = "camelCase")]
1382struct UploadSessionChunksResponse {
1383    targets: Vec<UploadChunkTarget>,
1384}
1385
1386#[derive(Debug, Clone, Serialize, Deserialize)]
1387#[serde(rename_all = "camelCase")]
1388struct UploadChunkTarget {
1389    chunk_index: usize,
1390    method: String,
1391    upload_url: String,
1392    headers: HashMap<String, String>,
1393}
1394
1395#[derive(Debug, Serialize, Deserialize)]
1396#[serde(rename_all = "camelCase")]
1397struct UploadSessionCompleteRequest {
1398    path: String,
1399    session_id: String,
1400    blob_key: String,
1401    size_bytes: i64,
1402    content_sha256: String,
1403    chunk_count: usize,
1404    hash: Option<String>,
1405}
1406
1407#[derive(Debug, Serialize, Deserialize)]
1408struct MultipartUploadRequest {
1409    path: String,
1410    #[serde(rename = "sizeBytes")]
1411    size_bytes: i64,
1412    #[serde(rename = "contentType")]
1413    content_type: Option<String>,
1414}
1415
1416#[derive(Debug, Deserialize)]
1417struct MultipartUploadResponse {
1418    #[serde(rename = "sessionId")]
1419    session_id: String,
1420    #[serde(rename = "blobKey")]
1421    blob_key: String,
1422    #[serde(rename = "uploadId")]
1423    upload_id: String,
1424    #[serde(rename = "partSizeBytes")]
1425    part_size_bytes: i64,
1426}
1427
1428#[derive(Debug, Serialize, Deserialize)]
1429struct MultipartUploadPartRequest {
1430    #[serde(rename = "sessionId")]
1431    session_id: String,
1432    #[serde(rename = "blobKey")]
1433    blob_key: String,
1434    #[serde(rename = "uploadId")]
1435    upload_id: String,
1436    #[serde(rename = "partNumber")]
1437    part_number: i32,
1438    #[serde(rename = "sizeBytes")]
1439    size_bytes: i64,
1440}
1441
1442#[derive(Debug, Deserialize)]
1443struct MultipartUploadPartResponse {
1444    #[serde(rename = "upload_url")]
1445    upload_url: String,
1446}
1447
1448#[derive(Debug, Serialize, Deserialize)]
1449struct MultipartUploadCompleteRequest {
1450    path: String,
1451    #[serde(rename = "sessionId")]
1452    session_id: String,
1453    #[serde(rename = "blobKey")]
1454    blob_key: String,
1455    #[serde(rename = "uploadId")]
1456    upload_id: String,
1457    #[serde(rename = "sizeBytes")]
1458    size_bytes: i64,
1459    hash: Option<String>,
1460    parts: Vec<MultipartPart>,
1461}
1462
1463#[derive(Debug, Serialize, Deserialize)]
1464struct MultipartPart {
1465    #[serde(rename = "partNumber")]
1466    part_number: i32,
1467    etag: String,
1468}
1469
1470#[derive(Debug, Serialize, Deserialize)]
1471struct ShardManifest {
1472    version: u32,
1473    total_size: u64,
1474    shard_size: u64,
1475    shards: Vec<ShardEntry>,
1476}
1477
1478#[derive(Debug, Serialize, Deserialize)]
1479struct ShardEntry {
1480    path: String,
1481    size: u64,
1482}
1483
1484#[derive(Debug, Deserialize)]
1485struct FsWriteSessionResponse {
1486    #[serde(rename = "sessionId")]
1487    _session_id: String,
1488    #[serde(rename = "nextOffset")]
1489    next_offset: i64,
1490}
1491
1492fn sha256_hex(data: &[u8]) -> String {
1493    let digest = Sha256::digest(data);
1494    let mut out = String::with_capacity(digest.len() * 2);
1495    for byte in digest {
1496        use std::fmt::Write as _;
1497        let _ = write!(out, "{byte:02x}");
1498    }
1499    out
1500}
1501
1502fn map_http_err(err: reqwest::Error) -> io::Error {
1503    io::Error::other(err)
1504}
1505
1506fn parse_modified_at(value: Option<&str>) -> Option<std::time::SystemTime> {
1507    let value = value?;
1508    let parsed = DateTime::parse_from_rfc3339(value).ok()?;
1509    let millis = parsed.timestamp_millis();
1510    if millis < 0 {
1511        return None;
1512    }
1513    Some(std::time::UNIX_EPOCH + Duration::from_millis(millis as u64))
1514}
1515
1516fn map_url_err(err: url::ParseError) -> io::Error {
1517    io::Error::new(ErrorKind::InvalidInput, err)
1518}
1519
1520fn handle_error(resp: Response) -> io::Result<Response> {
1521    let status = resp.status();
1522    if status.is_success() {
1523        return Ok(resp);
1524    }
1525    let text = resp.text().unwrap_or_else(|_| status.to_string());
1526    let kind = match status {
1527        StatusCode::NOT_FOUND => ErrorKind::NotFound,
1528        StatusCode::FORBIDDEN | StatusCode::UNAUTHORIZED => ErrorKind::PermissionDenied,
1529        StatusCode::CONFLICT => ErrorKind::AlreadyExists,
1530        StatusCode::BAD_REQUEST => ErrorKind::InvalidInput,
1531        _ => ErrorKind::Other,
1532    };
1533    Err(io::Error::new(kind, text))
1534}
1535
1536impl fmt::Debug for RemoteFsProvider {
1537    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1538        f.debug_struct("RemoteFsProvider").finish_non_exhaustive()
1539    }
1540}
1541
1542#[cfg(test)]
1543mod tests {
1544    use super::*;
1545    use crate::data_contract::DataChunkDescriptor;
1546    use axum::extract::{Query, State};
1547    use axum::http::{HeaderMap, StatusCode};
1548    use axum::routing::{delete, get, post, put};
1549    use axum::{Json, Router};
1550    use futures::executor;
1551    use serde::Deserialize;
1552    use std::collections::HashMap;
1553    use std::net::TcpListener as StdTcpListener;
1554    use std::sync::Arc;
1555    use tempfile::tempdir;
1556    use tokio::net::TcpListener as TokioTcpListener;
1557    use tokio::runtime::Runtime;
1558
1559    #[derive(Clone)]
1560    struct Harness {
1561        root: Arc<PathBuf>,
1562        _keeper: Arc<tempfile::TempDir>,
1563        base_url: Arc<Mutex<Option<String>>>,
1564        upload_sessions: Arc<Mutex<HashMap<String, UploadSessionTestState>>>,
1565        omit_last_chunk_target: bool,
1566    }
1567
1568    impl Harness {
1569        fn new() -> Self {
1570            Self::with_omit_last_chunk_target(false)
1571        }
1572
1573        fn with_omit_last_chunk_target(omit_last_chunk_target: bool) -> Self {
1574            let dir = tempdir().expect("tempdir");
1575            let path = dir.path().to_path_buf();
1576            Self {
1577                root: Arc::new(path),
1578                _keeper: Arc::new(dir),
1579                base_url: Arc::new(Mutex::new(None)),
1580                upload_sessions: Arc::new(Mutex::new(HashMap::new())),
1581                omit_last_chunk_target,
1582            }
1583        }
1584
1585        fn resolve(&self, remote_path: &str) -> PathBuf {
1586            let trimmed = remote_path.trim_start_matches('/');
1587            self.root.join(trimmed)
1588        }
1589    }
1590
1591    #[derive(Clone)]
1592    struct UploadSessionTestState {
1593        path: String,
1594        chunk_count: usize,
1595    }
1596
1597    #[derive(Deserialize)]
1598    struct PathParams {
1599        path: String,
1600        offset: Option<u64>,
1601        length: Option<usize>,
1602        truncate: Option<String>,
1603        recursive: Option<String>,
1604    }
1605
1606    #[derive(Deserialize)]
1607    struct UploadChunkQuery {
1608        #[serde(rename = "sessionId")]
1609        session_id: String,
1610        #[serde(rename = "chunkIndex")]
1611        chunk_index: usize,
1612    }
1613
1614    #[derive(Deserialize)]
1615    struct DataManifestQuery {
1616        path: String,
1617        version: Option<String>,
1618    }
1619
1620    async fn metadata_handler(
1621        State(harness): State<Harness>,
1622        Query(params): Query<PathParams>,
1623    ) -> Result<Json<serde_json::Value>, StatusCode> {
1624        let meta =
1625            std::fs::metadata(harness.resolve(&params.path)).map_err(|_| StatusCode::NOT_FOUND)?;
1626        let file_type = if meta.is_dir() {
1627            "dir"
1628        } else if meta.is_file() {
1629            "file"
1630        } else {
1631            "other"
1632        };
1633        Ok(Json(serde_json::json!({
1634            "fileType": file_type,
1635            "len": meta.len(),
1636            "modified": meta.modified().ok().and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok()).map(|d| d.as_secs()),
1637            "readonly": meta.permissions().readonly()
1638        })))
1639    }
1640
1641    async fn dir_handler(
1642        State(harness): State<Harness>,
1643        Query(params): Query<PathParams>,
1644    ) -> Result<Json<Vec<serde_json::Value>>, StatusCode> {
1645        let mut entries = Vec::new();
1646        for entry in
1647            std::fs::read_dir(harness.resolve(&params.path)).map_err(|_| StatusCode::NOT_FOUND)?
1648        {
1649            let entry = entry.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1650            let file_type = entry
1651                .file_type()
1652                .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1653            let kind = if file_type.is_dir() {
1654                "dir"
1655            } else if file_type.is_file() {
1656                "file"
1657            } else {
1658                "other"
1659            };
1660            entries.push(serde_json::json!({
1661                "path": format!("/{}", entry.path().strip_prefix(&*harness.root).unwrap().display()),
1662                "fileName": entry.file_name().to_string_lossy(),
1663                "fileType": kind
1664            }));
1665        }
1666        Ok(Json(entries))
1667    }
1668
1669    async fn canonicalize_handler(
1670        State(harness): State<Harness>,
1671        Query(params): Query<PathParams>,
1672    ) -> Result<Json<serde_json::Value>, StatusCode> {
1673        let path = harness.resolve(&params.path);
1674        let canonical = std::fs::canonicalize(path).map_err(|_| StatusCode::NOT_FOUND)?;
1675        let rel = canonical.strip_prefix(&*harness.root).unwrap_or(&canonical);
1676        Ok(Json(serde_json::json!({
1677            "path": format!("/{}", rel.display())
1678        })))
1679    }
1680
1681    async fn read_handler(
1682        State(harness): State<Harness>,
1683        Query(params): Query<PathParams>,
1684    ) -> Result<Vec<u8>, StatusCode> {
1685        let mut data =
1686            std::fs::read(harness.resolve(&params.path)).map_err(|_| StatusCode::NOT_FOUND)?;
1687        let offset = params.offset.unwrap_or(0) as usize;
1688        let length = params.length.unwrap_or(data.len().saturating_sub(offset));
1689        let end = std::cmp::min(offset + length, data.len());
1690        if offset < data.len() {
1691            data = data[offset..end].to_vec();
1692        } else {
1693            data.clear();
1694        }
1695        Ok(data)
1696    }
1697
1698    async fn write_handler(
1699        State(harness): State<Harness>,
1700        Query(params): Query<PathParams>,
1701        body: axum::body::Bytes,
1702    ) -> Result<(), StatusCode> {
1703        let path = harness.resolve(&params.path);
1704        if let Some(parent) = path.parent() {
1705            std::fs::create_dir_all(parent).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1706        }
1707        let mut data = if params.truncate.as_deref() == Some("true") || !path.exists() {
1708            Vec::new()
1709        } else {
1710            std::fs::read(&path).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
1711        };
1712        let offset = params.offset.unwrap_or(0) as usize;
1713        let required = offset + body.len();
1714        if required > data.len() {
1715            data.resize(required, 0);
1716        }
1717        data[offset..offset + body.len()].copy_from_slice(&body);
1718        std::fs::write(path, data).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1719        Ok(())
1720    }
1721
1722    async fn upload_session_start_handler(
1723        State(harness): State<Harness>,
1724        Json(req): Json<UploadSessionStartRequest>,
1725    ) -> Result<Json<UploadSessionStartResponse>, StatusCode> {
1726        let session_id = Uuid::new_v4().to_string();
1727        let chunk_size_bytes = 1024i64;
1728        harness
1729            .upload_sessions
1730            .lock()
1731            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
1732            .insert(
1733                session_id.clone(),
1734                UploadSessionTestState {
1735                    path: req.path,
1736                    chunk_count: 0,
1737                },
1738            );
1739        Ok(Json(UploadSessionStartResponse {
1740            session_id: session_id.clone(),
1741            blob_key: format!("test/blob/{session_id}"),
1742            chunk_size_bytes,
1743        }))
1744    }
1745
1746    async fn upload_session_chunks_handler(
1747        State(harness): State<Harness>,
1748        Json(req): Json<UploadSessionChunksRequest>,
1749    ) -> Result<Json<UploadSessionChunksResponse>, StatusCode> {
1750        let base = harness
1751            .base_url
1752            .lock()
1753            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
1754            .clone()
1755            .ok_or(StatusCode::INTERNAL_SERVER_ERROR)?;
1756        let mut sessions = harness
1757            .upload_sessions
1758            .lock()
1759            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1760        let state = sessions
1761            .get_mut(&req.session_id)
1762            .ok_or(StatusCode::NOT_FOUND)?;
1763        state.chunk_count = req.chunks.len();
1764        let mut targets = req
1765            .chunks
1766            .iter()
1767            .map(|chunk| UploadChunkTarget {
1768                chunk_index: chunk.chunk_index,
1769                method: "PUT".to_string(),
1770                upload_url: format!(
1771                    "{base}/upload-chunk?sessionId={}&chunkIndex={}",
1772                    req.session_id, chunk.chunk_index
1773                ),
1774                headers: HashMap::new(),
1775            })
1776            .collect::<Vec<_>>();
1777        if harness.omit_last_chunk_target && !targets.is_empty() {
1778            targets.pop();
1779        }
1780        Ok(Json(UploadSessionChunksResponse { targets }))
1781    }
1782
1783    async fn upload_chunk_handler(
1784        State(harness): State<Harness>,
1785        Query(query): Query<UploadChunkQuery>,
1786        body: axum::body::Bytes,
1787    ) -> Result<(), StatusCode> {
1788        let chunk_path = harness.resolve(&format!(
1789            "/.upload-sessions/{}/{}",
1790            query.session_id, query.chunk_index
1791        ));
1792        if let Some(parent) = chunk_path.parent() {
1793            std::fs::create_dir_all(parent).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1794        }
1795        std::fs::write(chunk_path, body).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1796        Ok(())
1797    }
1798
1799    async fn data_manifest_handler(
1800        Query(query): Query<DataManifestQuery>,
1801    ) -> Result<Json<DataManifestDescriptor>, StatusCode> {
1802        let updated_at = if query.version.as_deref() == Some("v1") {
1803            "2026-01-01T00:00:00Z"
1804        } else {
1805            "2026-02-02T00:00:00Z"
1806        };
1807        Ok(Json(DataManifestDescriptor {
1808            schema_version: 1,
1809            format: "runmat-data".to_string(),
1810            dataset_id: format!("dataset:{}", query.path),
1811            updated_at: updated_at.to_string(),
1812            txn_sequence: 9,
1813        }))
1814    }
1815
1816    async fn data_chunk_upload_targets_handler(
1817        Json(req): Json<DataChunkUploadRequest>,
1818    ) -> Result<Json<serde_json::Value>, StatusCode> {
1819        let targets = req
1820            .chunks
1821            .iter()
1822            .map(|chunk| {
1823                serde_json::json!({
1824                    "key": chunk.key,
1825                    "method": "PUT",
1826                    "upload_url": format!("https://uploads.example.test/{}/{}", req.array, chunk.object_id),
1827                    "headers": {
1828                        "x-runmat-hash": chunk.hash,
1829                        "x-runmat-object": chunk.object_id,
1830                    }
1831                })
1832            })
1833            .collect::<Vec<_>>();
1834        Ok(Json(serde_json::json!({ "targets": targets })))
1835    }
1836
1837    async fn upload_session_complete_handler(
1838        State(harness): State<Harness>,
1839        Json(req): Json<UploadSessionCompleteRequest>,
1840    ) -> Result<(), StatusCode> {
1841        let state = harness
1842            .upload_sessions
1843            .lock()
1844            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
1845            .remove(&req.session_id)
1846            .ok_or(StatusCode::NOT_FOUND)?;
1847        let mut data = Vec::new();
1848        for chunk_index in 0..state.chunk_count {
1849            let chunk_path = harness.resolve(&format!(
1850                "/.upload-sessions/{}/{}",
1851                req.session_id, chunk_index
1852            ));
1853            let chunk = std::fs::read(chunk_path).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1854            data.extend_from_slice(&chunk);
1855        }
1856        if sha256_hex(&data) != req.content_sha256 {
1857            return Err(StatusCode::BAD_REQUEST);
1858        }
1859        let target_path = harness.resolve(&state.path);
1860        if let Some(parent) = target_path.parent() {
1861            std::fs::create_dir_all(parent).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1862        }
1863        std::fs::write(target_path, data).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1864        Ok(())
1865    }
1866
1867    async fn mkdir_handler(
1868        State(harness): State<Harness>,
1869        Json(req): Json<CreateDirRequest>,
1870    ) -> Result<(), StatusCode> {
1871        let path = harness.resolve(&req.path);
1872        if req.recursive {
1873            std::fs::create_dir_all(path).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1874        } else {
1875            std::fs::create_dir(path).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1876        }
1877        Ok(())
1878    }
1879
1880    async fn delete_file_handler(
1881        State(harness): State<Harness>,
1882        Query(params): Query<PathParams>,
1883    ) -> Result<(), StatusCode> {
1884        std::fs::remove_file(harness.resolve(&params.path)).map_err(|_| StatusCode::NOT_FOUND)?;
1885        Ok(())
1886    }
1887
1888    async fn delete_dir_handler(
1889        State(harness): State<Harness>,
1890        Query(params): Query<PathParams>,
1891    ) -> Result<(), StatusCode> {
1892        let recursive = params.recursive.as_deref() == Some("true");
1893        if recursive {
1894            std::fs::remove_dir_all(harness.resolve(&params.path))
1895                .map_err(|_| StatusCode::NOT_FOUND)?;
1896        } else {
1897            std::fs::remove_dir(harness.resolve(&params.path))
1898                .map_err(|_| StatusCode::NOT_FOUND)?;
1899        }
1900        Ok(())
1901    }
1902
1903    async fn rename_handler(
1904        State(harness): State<Harness>,
1905        Json(req): Json<RenameRequest>,
1906    ) -> Result<(), StatusCode> {
1907        let from = harness.resolve(&req.from);
1908        let to = harness.resolve(&req.to);
1909        if let Some(parent) = to.parent() {
1910            std::fs::create_dir_all(parent).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1911        }
1912        std::fs::rename(from, to).map_err(|_| StatusCode::NOT_FOUND)?;
1913        Ok(())
1914    }
1915
1916    async fn set_readonly_handler(
1917        State(harness): State<Harness>,
1918        Json(req): Json<SetReadonlyRequest>,
1919    ) -> Result<(), StatusCode> {
1920        let path = harness.resolve(&req.path);
1921        let mut perms = std::fs::metadata(&path)
1922            .map_err(|_| StatusCode::NOT_FOUND)?
1923            .permissions();
1924        perms.set_readonly(req.readonly);
1925        std::fs::set_permissions(path, perms).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1926        Ok(())
1927    }
1928
1929    async fn read_with_download_handler(
1930        State(harness): State<Harness>,
1931        Query(params): Query<PathParams>,
1932    ) -> Result<Json<serde_json::Value>, StatusCode> {
1933        let base = harness
1934            .base_url
1935            .lock()
1936            .unwrap()
1937            .clone()
1938            .ok_or(StatusCode::INTERNAL_SERVER_ERROR)?;
1939        let download_url = format!("{base}/download?path={}", params.path);
1940        Ok(Json(serde_json::json!({
1941            "downloadUrl": download_url,
1942            "expiresAt": 0
1943        })))
1944    }
1945
1946    async fn download_handler(
1947        State(harness): State<Harness>,
1948        Query(params): Query<PathParams>,
1949        headers: HeaderMap,
1950    ) -> Result<Vec<u8>, StatusCode> {
1951        let mut data =
1952            std::fs::read(harness.resolve(&params.path)).map_err(|_| StatusCode::NOT_FOUND)?;
1953        if let Some(range) = headers.get("range").and_then(|value| value.to_str().ok()) {
1954            if let Some((start, end)) = parse_range(range) {
1955                let start = start.min(data.len());
1956                let end = end.min(data.len().saturating_sub(1));
1957                if start < data.len() {
1958                    data = data[start..=end].to_vec();
1959                } else {
1960                    data.clear();
1961                }
1962            }
1963        }
1964        Ok(data)
1965    }
1966
1967    fn parse_range(value: &str) -> Option<(usize, usize)> {
1968        let value = value.strip_prefix("bytes=")?;
1969        let (start, end) = value.split_once('-')?;
1970        let start = start.parse::<usize>().ok()?;
1971        let end = end.parse::<usize>().ok()?;
1972        Some((start, end))
1973    }
1974
1975    fn spawn_server() -> (String, Harness, Runtime) {
1976        let harness = Harness::new();
1977        let router = Router::new()
1978            .route("/fs/metadata", get(metadata_handler))
1979            .route("/fs/dir", get(dir_handler).delete(delete_dir_handler))
1980            .route("/fs/file", delete(delete_file_handler))
1981            .route("/fs/canonicalize", get(canonicalize_handler))
1982            .route("/fs/read", get(read_handler))
1983            .route("/fs/write", put(write_handler))
1984            .route(
1985                "/fs/upload-session/start",
1986                post(upload_session_start_handler),
1987            )
1988            .route(
1989                "/fs/upload-session/chunks",
1990                post(upload_session_chunks_handler),
1991            )
1992            .route(
1993                "/fs/upload-session/complete",
1994                post(upload_session_complete_handler),
1995            )
1996            .route("/fs/mkdir", post(mkdir_handler))
1997            .route("/fs/rename", post(rename_handler))
1998            .route("/fs/set-readonly", post(set_readonly_handler))
1999            .route("/data/manifest", get(data_manifest_handler))
2000            .route(
2001                "/data/chunks/upload-targets",
2002                post(data_chunk_upload_targets_handler),
2003            )
2004            .route("/upload-chunk", put(upload_chunk_handler))
2005            .with_state(harness.clone());
2006        let std_listener = StdTcpListener::bind("127.0.0.1:0").unwrap();
2007        std_listener.set_nonblocking(true).expect("nonblocking");
2008        let addr = std_listener.local_addr().unwrap();
2009        let service = router.into_make_service();
2010        let rt = Runtime::new().unwrap();
2011        let base = format!("http://{addr}");
2012        *harness.base_url.lock().unwrap() = Some(base.clone());
2013        rt.spawn(async move {
2014            let listener = TokioTcpListener::from_std(std_listener).unwrap();
2015            axum::serve(listener, service).await.unwrap();
2016        });
2017        (base, harness, rt)
2018    }
2019
2020    fn spawn_server_with_download_url() -> (String, Harness, Runtime) {
2021        let harness = Harness::new();
2022        let router = Router::new()
2023            .route("/fs/metadata", get(metadata_handler))
2024            .route("/fs/dir", get(dir_handler).delete(delete_dir_handler))
2025            .route("/fs/file", delete(delete_file_handler))
2026            .route("/fs/canonicalize", get(canonicalize_handler))
2027            .route("/fs/read", get(read_with_download_handler))
2028            .route("/fs/write", put(write_handler))
2029            .route(
2030                "/fs/upload-session/start",
2031                post(upload_session_start_handler),
2032            )
2033            .route(
2034                "/fs/upload-session/chunks",
2035                post(upload_session_chunks_handler),
2036            )
2037            .route(
2038                "/fs/upload-session/complete",
2039                post(upload_session_complete_handler),
2040            )
2041            .route("/fs/mkdir", post(mkdir_handler))
2042            .route("/fs/rename", post(rename_handler))
2043            .route("/fs/set-readonly", post(set_readonly_handler))
2044            .route("/download", get(download_handler))
2045            .route("/data/manifest", get(data_manifest_handler))
2046            .route(
2047                "/data/chunks/upload-targets",
2048                post(data_chunk_upload_targets_handler),
2049            )
2050            .route("/upload-chunk", put(upload_chunk_handler))
2051            .with_state(harness.clone());
2052        let std_listener = StdTcpListener::bind("127.0.0.1:0").unwrap();
2053        std_listener.set_nonblocking(true).expect("nonblocking");
2054        let addr = std_listener.local_addr().unwrap();
2055        let service = router.into_make_service();
2056        let rt = Runtime::new().unwrap();
2057        let base = format!("http://{addr}");
2058        *harness.base_url.lock().unwrap() = Some(base.clone());
2059        rt.spawn(async move {
2060            let listener = TokioTcpListener::from_std(std_listener).unwrap();
2061            axum::serve(listener, service).await.unwrap();
2062        });
2063        (base, harness, rt)
2064    }
2065
2066    fn spawn_server_with_missing_chunk_target() -> (String, Harness, Runtime) {
2067        let harness = Harness::with_omit_last_chunk_target(true);
2068        let router = Router::new()
2069            .route("/fs/metadata", get(metadata_handler))
2070            .route("/fs/dir", get(dir_handler).delete(delete_dir_handler))
2071            .route("/fs/file", delete(delete_file_handler))
2072            .route("/fs/canonicalize", get(canonicalize_handler))
2073            .route("/fs/read", get(read_handler))
2074            .route("/fs/write", put(write_handler))
2075            .route(
2076                "/fs/upload-session/start",
2077                post(upload_session_start_handler),
2078            )
2079            .route(
2080                "/fs/upload-session/chunks",
2081                post(upload_session_chunks_handler),
2082            )
2083            .route(
2084                "/fs/upload-session/complete",
2085                post(upload_session_complete_handler),
2086            )
2087            .route("/fs/mkdir", post(mkdir_handler))
2088            .route("/fs/rename", post(rename_handler))
2089            .route("/fs/set-readonly", post(set_readonly_handler))
2090            .route("/data/manifest", get(data_manifest_handler))
2091            .route(
2092                "/data/chunks/upload-targets",
2093                post(data_chunk_upload_targets_handler),
2094            )
2095            .route("/upload-chunk", put(upload_chunk_handler))
2096            .with_state(harness.clone());
2097        let std_listener = StdTcpListener::bind("127.0.0.1:0").unwrap();
2098        std_listener.set_nonblocking(true).expect("nonblocking");
2099        let addr = std_listener.local_addr().unwrap();
2100        let service = router.into_make_service();
2101        let rt = Runtime::new().unwrap();
2102        let base = format!("http://{addr}");
2103        *harness.base_url.lock().unwrap() = Some(base.clone());
2104        rt.spawn(async move {
2105            let listener = TokioTcpListener::from_std(std_listener).unwrap();
2106            axum::serve(listener, service).await.unwrap();
2107        });
2108        (base, harness, rt)
2109    }
2110
2111    #[test]
2112    fn remote_provider_roundtrip() {
2113        let (base, harness, _rt) = spawn_server();
2114        let provider = RemoteFsProvider::new(RemoteFsConfig {
2115            base_url: base,
2116            auth_token: None,
2117            chunk_bytes: 1024,
2118            parallel_requests: 4,
2119            direct_read_threshold_bytes: u64::MAX,
2120            direct_write_threshold_bytes: 1024,
2121            timeout: Duration::from_secs(30),
2122            ..RemoteFsConfig::default()
2123        })
2124        .expect("provider");
2125
2126        let data = (0..16_384u32)
2127            .flat_map(|v| v.to_le_bytes())
2128            .collect::<Vec<_>>();
2129        executor::block_on(provider.write(Path::new("/reports/data.bin"), &data)).expect("write");
2130        let read_back =
2131            executor::block_on(provider.read(Path::new("/reports/data.bin"))).expect("read");
2132        assert_eq!(data, read_back);
2133        executor::block_on(provider.remove_file(Path::new("/reports/data.bin"))).expect("remove");
2134        assert!(!harness.resolve("/reports/data.bin").exists());
2135    }
2136
2137    #[test]
2138    fn remote_provider_download_url_read() {
2139        let (base, harness, _rt) = spawn_server_with_download_url();
2140        let provider = RemoteFsProvider::new(RemoteFsConfig {
2141            base_url: base,
2142            auth_token: None,
2143            chunk_bytes: 128,
2144            parallel_requests: 2,
2145            direct_read_threshold_bytes: u64::MAX,
2146            timeout: Duration::from_secs(30),
2147            ..RemoteFsConfig::default()
2148        })
2149        .expect("provider");
2150
2151        let data = (0..1024u32)
2152            .flat_map(|v| v.to_le_bytes())
2153            .collect::<Vec<_>>();
2154        std::fs::create_dir_all(harness.resolve("/reports")).expect("mkdir");
2155        std::fs::write(harness.resolve("/reports/data.bin"), &data).expect("write");
2156
2157        let read_back =
2158            executor::block_on(provider.read(Path::new("/reports/data.bin"))).expect("read");
2159        assert_eq!(data, read_back);
2160    }
2161
2162    #[test]
2163    fn remote_provider_errors_when_chunk_target_is_missing() {
2164        let (base, _harness, _rt) = spawn_server_with_missing_chunk_target();
2165        let provider = RemoteFsProvider::new(RemoteFsConfig {
2166            base_url: base,
2167            auth_token: None,
2168            chunk_bytes: 128,
2169            parallel_requests: 2,
2170            direct_read_threshold_bytes: u64::MAX,
2171            direct_write_threshold_bytes: 128,
2172            timeout: Duration::from_secs(30),
2173            ..RemoteFsConfig::default()
2174        })
2175        .expect("provider");
2176
2177        let data = vec![7u8; 1024];
2178        let err =
2179            executor::block_on(provider.write(Path::new("/reports/missing-target.bin"), &data))
2180                .expect_err("write should fail when chunk target is missing");
2181        assert_eq!(err.kind(), io::ErrorKind::Other);
2182    }
2183
2184    #[test]
2185    fn remote_provider_data_manifest_descriptor_fetches_descriptor() {
2186        let (base, _harness, _rt) = spawn_server();
2187        let provider = RemoteFsProvider::new(RemoteFsConfig {
2188            base_url: base,
2189            auth_token: None,
2190            timeout: Duration::from_secs(30),
2191            ..RemoteFsConfig::default()
2192        })
2193        .expect("provider");
2194
2195        let descriptor = provider
2196            .data_manifest_descriptor(&DataManifestRequest {
2197                path: "/datasets/alpha.data".to_string(),
2198                version: Some("v1".to_string()),
2199            })
2200            .expect("manifest descriptor");
2201        assert_eq!(descriptor.schema_version, 1);
2202        assert_eq!(descriptor.format, "runmat-data");
2203        assert_eq!(descriptor.dataset_id, "dataset:/datasets/alpha.data");
2204        assert_eq!(descriptor.updated_at, "2026-01-01T00:00:00Z");
2205        assert_eq!(descriptor.txn_sequence, 9);
2206    }
2207
2208    #[test]
2209    fn remote_provider_data_chunk_upload_targets_fetches_targets() {
2210        let (base, _harness, _rt) = spawn_server();
2211        let provider = RemoteFsProvider::new(RemoteFsConfig {
2212            base_url: base,
2213            auth_token: None,
2214            timeout: Duration::from_secs(30),
2215            ..RemoteFsConfig::default()
2216        })
2217        .expect("provider");
2218
2219        let targets = provider
2220            .data_chunk_upload_targets(&DataChunkUploadRequest {
2221                dataset_path: "/datasets/alpha.data".to_string(),
2222                array: "X".to_string(),
2223                chunks: vec![DataChunkDescriptor {
2224                    key: "X/0-0".to_string(),
2225                    object_id: "obj_0".to_string(),
2226                    hash: "sha256:abc".to_string(),
2227                    bytes_raw: 10,
2228                    bytes_stored: 8,
2229                }],
2230            })
2231            .expect("chunk upload targets");
2232
2233        assert_eq!(targets.len(), 1);
2234        assert_eq!(targets[0].key, "X/0-0");
2235        assert_eq!(targets[0].method, "PUT");
2236        assert_eq!(
2237            targets[0].upload_url,
2238            "https://uploads.example.test/X/obj_0"
2239        );
2240        let headers = &targets[0].headers;
2241        assert_eq!(headers.len(), 2);
2242        assert_eq!(
2243            headers.get("x-runmat-hash").map(String::as_str),
2244            Some("sha256:abc")
2245        );
2246        assert_eq!(
2247            headers.get("x-runmat-object").map(String::as_str),
2248            Some("obj_0")
2249        );
2250    }
2251}