Skip to main content

runmat_filesystem/remote/
native.rs

1use crate::data_contract::{
2    DataChunkUploadRequest, DataChunkUploadTarget, DataManifestDescriptor, DataManifestRequest,
3};
4use crate::{DirEntry, FileHandle, FsFileType, FsMetadata, FsProvider, OpenFlags};
5use async_trait::async_trait;
6use chrono::DateTime;
7use crossbeam_utils::thread;
8use once_cell::sync::Lazy;
9use reqwest::blocking::{Client, Response};
10use reqwest::StatusCode;
11use serde::{Deserialize, Serialize};
12use sha2::{Digest, Sha256};
13use std::collections::HashMap;
14use std::collections::VecDeque;
15use std::fmt;
16use std::io::{self, ErrorKind, Read, Seek, SeekFrom, Write};
17use std::path::{Path, PathBuf};
18use std::sync::{Arc, Mutex};
19use std::time::Duration;
20use url::Url;
21use uuid::Uuid;
22
23const MANIFEST_HASH: &str = "manifest:v1";
24const SHARD_PREFIX: &str = "/.runmat/shards";
25
26static USER_AGENT: Lazy<String> = Lazy::new(|| {
27    format!(
28        "runmat-remote-fs/{} ({})",
29        env!("CARGO_PKG_VERSION"),
30        std::env::consts::OS
31    )
32});
33
34#[derive(Clone, Debug)]
35pub struct RemoteFsConfig {
36    pub base_url: String,
37    pub auth_token: Option<String>,
38    pub chunk_bytes: usize,
39    pub parallel_requests: usize,
40    pub direct_read_threshold_bytes: u64,
41    pub direct_write_threshold_bytes: u64,
42    pub shard_threshold_bytes: u64,
43    pub shard_size_bytes: u64,
44    pub timeout: Duration,
45    pub retry_max_attempts: usize,
46    pub retry_base_delay: Duration,
47    pub retry_max_delay: Duration,
48}
49
50impl Default for RemoteFsConfig {
51    fn default() -> Self {
52        Self {
53            base_url: String::new(),
54            auth_token: None,
55            chunk_bytes: 16 * 1024 * 1024,
56            parallel_requests: 4,
57            direct_read_threshold_bytes: 64 * 1024 * 1024,
58            direct_write_threshold_bytes: 64 * 1024 * 1024,
59            shard_threshold_bytes: 4 * 1024 * 1024 * 1024,
60            shard_size_bytes: 512 * 1024 * 1024,
61            timeout: Duration::from_secs(120),
62            retry_max_attempts: 5,
63            retry_base_delay: Duration::from_millis(100),
64            retry_max_delay: Duration::from_secs(2),
65        }
66    }
67}
68
69struct RemoteInner {
70    client: Client,
71    base: Url,
72    auth_header: Option<String>,
73    chunk_bytes: usize,
74    parallel_requests: usize,
75    direct_read_threshold_bytes: u64,
76    direct_write_threshold_bytes: u64,
77    shard_threshold_bytes: u64,
78    shard_size_bytes: u64,
79    retry_max_attempts: usize,
80    retry_base_delay: Duration,
81    retry_max_delay: Duration,
82}
83
84impl RemoteInner {
85    fn new(config: RemoteFsConfig) -> io::Result<Self> {
86        if config.base_url.is_empty() {
87            return Err(io::Error::new(
88                ErrorKind::InvalidInput,
89                "RemoteFsConfig.base_url must be provided",
90            ));
91        }
92        let base = Url::parse(&config.base_url).map_err(map_url_err)?;
93        let client = Client::builder()
94            .timeout(config.timeout)
95            .user_agent(USER_AGENT.clone())
96            .build()
97            .map_err(map_http_err)?;
98        Ok(Self {
99            client,
100            base,
101            auth_header: config.auth_token.map(|token| format!("Bearer {token}")),
102            chunk_bytes: config.chunk_bytes.max(64 * 1024),
103            parallel_requests: config.parallel_requests.max(1),
104            direct_read_threshold_bytes: config.direct_read_threshold_bytes,
105            direct_write_threshold_bytes: config.direct_write_threshold_bytes,
106            shard_threshold_bytes: config.shard_threshold_bytes,
107            shard_size_bytes: config.shard_size_bytes.max(8 * 1024 * 1024),
108            retry_max_attempts: config.retry_max_attempts.max(1),
109            retry_base_delay: config.retry_base_delay,
110            retry_max_delay: config.retry_max_delay,
111        })
112    }
113
114    fn should_retry(&self, status: StatusCode) -> bool {
115        matches!(
116            status,
117            StatusCode::TOO_MANY_REQUESTS
118                | StatusCode::INTERNAL_SERVER_ERROR
119                | StatusCode::BAD_GATEWAY
120                | StatusCode::SERVICE_UNAVAILABLE
121                | StatusCode::GATEWAY_TIMEOUT
122        )
123    }
124
125    fn retry_delay(&self, attempt: usize) -> Duration {
126        let factor = 1u64.checked_shl(attempt as u32).unwrap_or(u64::MAX);
127        let delay = self.retry_base_delay.as_millis() as u64 * factor;
128        let capped = delay.min(self.retry_max_delay.as_millis() as u64);
129        Duration::from_millis(capped)
130    }
131
132    fn send_with_retry(
133        &self,
134        method: reqwest::Method,
135        route: &str,
136        query: &[(&str, String)],
137    ) -> io::Result<Response> {
138        for attempt in 0..self.retry_max_attempts {
139            let resp = self
140                .request(method.clone(), route, query)
141                .send()
142                .map_err(map_http_err)?;
143            if !self.should_retry(resp.status()) || attempt + 1 == self.retry_max_attempts {
144                return Ok(resp);
145            }
146            std::thread::sleep(self.retry_delay(attempt));
147        }
148        Err(io::Error::other("request retries exhausted"))
149    }
150
151    fn get_url_with_retry(&self, url: &str, range: Option<String>) -> io::Result<Response> {
152        for attempt in 0..self.retry_max_attempts {
153            let mut request = self.client.get(url);
154            if let Some(range) = &range {
155                request = request.header(reqwest::header::RANGE, range);
156            }
157            let resp = request.send().map_err(map_http_err)?;
158            if !self.should_retry(resp.status()) || attempt + 1 == self.retry_max_attempts {
159                return Ok(resp);
160            }
161            std::thread::sleep(self.retry_delay(attempt));
162        }
163        Err(io::Error::other("request retries exhausted"))
164    }
165
166    fn endpoint(&self, route: &str) -> Url {
167        self.base
168            .join(route.trim_start_matches('/'))
169            .expect("failed to join remote route")
170    }
171
172    fn request(
173        &self,
174        method: reqwest::Method,
175        route: &str,
176        query: &[(&str, String)],
177    ) -> reqwest::blocking::RequestBuilder {
178        let mut url = self.endpoint(route);
179        {
180            let mut pairs = url.query_pairs_mut();
181            for (k, v) in query {
182                pairs.append_pair(k, v);
183            }
184        }
185        let mut builder = self.client.request(method, url);
186        if let Some(auth) = &self.auth_header {
187            builder = builder.header("Authorization", auth);
188        }
189        builder
190    }
191
192    fn get_json<T: for<'a> Deserialize<'a>>(
193        &self,
194        route: &str,
195        query: &[(&str, String)],
196    ) -> io::Result<T> {
197        let resp = self.send_with_retry(reqwest::Method::GET, route, query)?;
198        handle_error(resp)?.json().map_err(map_http_err)
199    }
200
201    fn post_empty<B: Serialize>(&self, route: &str, body: &B) -> io::Result<()> {
202        let resp = self
203            .request(reqwest::Method::POST, route, &[])
204            .json(body)
205            .send()
206            .map_err(map_http_err)?;
207        handle_error(resp)?;
208        Ok(())
209    }
210
211    fn post_json<T: for<'a> Deserialize<'a>, B: Serialize>(
212        &self,
213        route: &str,
214        body: &B,
215    ) -> io::Result<T> {
216        let resp = self
217            .request(reqwest::Method::POST, route, &[])
218            .json(body)
219            .send()
220            .map_err(map_http_err)?;
221        handle_error(resp)?.json().map_err(map_http_err)
222    }
223
224    fn delete_empty(&self, route: &str, query: &[(&str, String)]) -> io::Result<()> {
225        let resp = self.send_with_retry(reqwest::Method::DELETE, route, query)?;
226        handle_error(resp)?;
227        Ok(())
228    }
229
230    fn download_chunk(&self, path: &str, offset: u64, length: usize) -> io::Result<Vec<u8>> {
231        let resp = self.send_with_retry(
232            reqwest::Method::GET,
233            "/fs/read",
234            &[
235                ("path", path.to_string()),
236                ("offset", offset.to_string()),
237                ("length", length.to_string()),
238            ],
239        )?;
240        let mut body = handle_error(resp)?;
241        if let Some(content_type) = body.headers().get(reqwest::header::CONTENT_TYPE) {
242            if content_type
243                .to_str()
244                .map(|value| value.contains("application/json"))
245                .unwrap_or(false)
246            {
247                let json: DownloadUrlResponse = body.json().map_err(map_http_err)?;
248                return self.download_range_from_url(&json.download_url, offset, length as u64);
249            }
250        }
251        let mut buf = Vec::with_capacity(length);
252        body.copy_to(&mut buf).map_err(map_http_err)?;
253        Ok(buf)
254    }
255
256    fn download_range_from_url(&self, url: &str, offset: u64, length: u64) -> io::Result<Vec<u8>> {
257        if length == 0 {
258            return Ok(Vec::new());
259        }
260        let end = offset + length - 1;
261        let resp = self.get_url_with_retry(url, Some(format!("bytes={offset}-{end}")))?;
262        let mut body = handle_error(resp)?;
263        let mut buf = Vec::with_capacity(length as usize);
264        body.copy_to(&mut buf).map_err(map_http_err)?;
265        Ok(buf)
266    }
267
268    fn fetch_download_url(&self, path: &str) -> io::Result<DownloadUrlResponse> {
269        self.get_json("/fs/download-url", &[("path", path.to_string())])
270    }
271
272    fn upload_chunk(
273        &self,
274        path: &str,
275        offset: u64,
276        truncate: bool,
277        final_chunk: bool,
278        data: &[u8],
279        hash: Option<&str>,
280    ) -> io::Result<Option<FsWriteSessionResponse>> {
281        let mut query = vec![("path", path.to_string()), ("offset", offset.to_string())];
282        if truncate {
283            query.push(("truncate", "true".to_string()));
284        }
285        if final_chunk {
286            query.push(("final", "true".to_string()));
287        }
288        if let Some(hash) = hash {
289            query.push(("hash", hash.to_string()));
290        }
291        let resp = self
292            .request(reqwest::Method::PUT, "/fs/write", &query)
293            .body(data.to_vec())
294            .send()
295            .map_err(map_http_err)?;
296        if resp.status() == StatusCode::ACCEPTED {
297            let session = handle_error(resp)?.json().map_err(map_http_err)?;
298            return Ok(Some(session));
299        }
300        handle_error(resp)?;
301        Ok(None)
302    }
303
304    fn multipart_create(&self, path: &str, size_bytes: u64) -> io::Result<MultipartUploadResponse> {
305        self.post_json(
306            "/fs/multipart-upload",
307            &MultipartUploadRequest {
308                path: path.to_string(),
309                size_bytes: size_bytes as i64,
310                content_type: None,
311            },
312        )
313    }
314
315    fn upload_session_start(
316        &self,
317        path: &str,
318        size_bytes: u64,
319        content_sha256: &str,
320    ) -> io::Result<UploadSessionStartResponse> {
321        self.post_json(
322            "/fs/upload-session/start",
323            &UploadSessionStartRequest {
324                path: path.to_string(),
325                size_bytes: size_bytes as i64,
326                content_type: None,
327                content_sha256: content_sha256.to_string(),
328            },
329        )
330    }
331
332    fn upload_session_chunks(
333        &self,
334        session_id: &str,
335        blob_key: &str,
336        chunks: &[UploadSessionChunkDescriptor],
337    ) -> io::Result<UploadSessionChunksResponse> {
338        self.post_json(
339            "/fs/upload-session/chunks",
340            &UploadSessionChunksRequest {
341                session_id: session_id.to_string(),
342                blob_key: blob_key.to_string(),
343                chunks: chunks.to_vec(),
344            },
345        )
346    }
347
348    fn upload_session_complete(
349        &self,
350        path: &str,
351        session_id: &str,
352        blob_key: &str,
353        size_bytes: u64,
354        content_sha256: &str,
355        chunk_count: usize,
356    ) -> io::Result<()> {
357        self.post_empty(
358            "/fs/upload-session/complete",
359            &UploadSessionCompleteRequest {
360                path: path.to_string(),
361                session_id: session_id.to_string(),
362                blob_key: blob_key.to_string(),
363                size_bytes: size_bytes as i64,
364                content_sha256: content_sha256.to_string(),
365                chunk_count,
366                hash: None,
367            },
368        )
369    }
370
371    fn multipart_presign_part(
372        &self,
373        session_id: &str,
374        blob_key: &str,
375        upload_id: &str,
376        part_number: i32,
377        size_bytes: u64,
378    ) -> io::Result<String> {
379        let response: MultipartUploadPartResponse = self.post_json(
380            "/fs/multipart-upload/part",
381            &MultipartUploadPartRequest {
382                session_id: session_id.to_string(),
383                blob_key: blob_key.to_string(),
384                upload_id: upload_id.to_string(),
385                part_number,
386                size_bytes: size_bytes as i64,
387            },
388        )?;
389        Ok(response.upload_url)
390    }
391
392    fn multipart_complete(
393        &self,
394        path: &str,
395        session_id: &str,
396        blob_key: &str,
397        upload_id: &str,
398        size_bytes: u64,
399        parts: Vec<MultipartPart>,
400    ) -> io::Result<()> {
401        let resp = self
402            .request(reqwest::Method::POST, "/fs/multipart-upload/complete", &[])
403            .json(&MultipartUploadCompleteRequest {
404                path: path.to_string(),
405                session_id: session_id.to_string(),
406                blob_key: blob_key.to_string(),
407                upload_id: upload_id.to_string(),
408                size_bytes: size_bytes as i64,
409                hash: None,
410                parts,
411            })
412            .send()
413            .map_err(map_http_err)?;
414        handle_error(resp)?;
415        Ok(())
416    }
417
418    fn put_upload_url_with_etag(&self, url: &str, data: &[u8]) -> io::Result<String> {
419        let resp = self
420            .client
421            .put(url)
422            .body(data.to_vec())
423            .send()
424            .map_err(map_http_err)?;
425        let resp = handle_error(resp)?;
426        let etag = resp
427            .headers()
428            .get(reqwest::header::ETAG)
429            .and_then(|value| value.to_str().ok())
430            .unwrap_or("")
431            .to_string();
432        Ok(etag)
433    }
434
435    fn put_upload_target(
436        &self,
437        method: &str,
438        url: &str,
439        headers: &HashMap<String, String>,
440        data: &[u8],
441    ) -> io::Result<()> {
442        let method = reqwest::Method::from_bytes(method.as_bytes()).map_err(|err| {
443            io::Error::new(
444                ErrorKind::InvalidInput,
445                format!("invalid upload method `{method}`: {err}"),
446            )
447        })?;
448        let mut request = self.client.request(method, url).body(data.to_vec());
449        for (name, value) in headers {
450            request = request.header(name, value);
451        }
452        let resp = request.send().map_err(map_http_err)?;
453        handle_error(resp)?;
454        Ok(())
455    }
456
457    fn fetch_metadata(&self, path: &str) -> io::Result<MetadataResponse> {
458        self.get_json("/fs/metadata", &[("path", path.to_string())])
459    }
460
461    fn fetch_dir(&self, path: &str) -> io::Result<Vec<DirEntryResponse>> {
462        self.get_json("/fs/dir", &[("path", path.to_string())])
463    }
464
465    fn canonicalize_path(&self, path: &str) -> io::Result<String> {
466        let resp: CanonicalizeResponse =
467            self.get_json("/fs/canonicalize", &[("path", path.to_string())])?;
468        Ok(resp.path)
469    }
470}
471
472pub struct RemoteFsProvider {
473    inner: Arc<RemoteInner>,
474}
475
476impl RemoteFsProvider {
477    pub fn new(config: RemoteFsConfig) -> io::Result<Self> {
478        Ok(Self {
479            inner: Arc::new(RemoteInner::new(config)?),
480        })
481    }
482
483    /// Fetch a dataset manifest descriptor through the provider-neutral data contract endpoint.
484    pub fn data_manifest_descriptor(
485        &self,
486        request: &DataManifestRequest,
487    ) -> io::Result<DataManifestDescriptor> {
488        let mut query = vec![("path", request.path.clone())];
489        if let Some(version) = &request.version {
490            query.push(("version", version.clone()));
491        }
492        self.inner.get_json("/data/manifest", &query)
493    }
494
495    /// Request direct upload targets for dataset chunk objects.
496    pub fn data_chunk_upload_targets(
497        &self,
498        request: &DataChunkUploadRequest,
499    ) -> io::Result<Vec<DataChunkUploadTarget>> {
500        #[derive(Deserialize)]
501        struct DataChunkUploadTargetsResponse {
502            targets: Vec<DataChunkUploadTarget>,
503        }
504        let response: DataChunkUploadTargetsResponse = self
505            .inner
506            .post_json("/data/chunks/upload-targets", request)?;
507        Ok(response.targets)
508    }
509
510    fn normalize(&self, path: &Path) -> String {
511        let mut normalized = PathBuf::new();
512        normalized.push("/");
513        normalized.push(path);
514        normalized.to_string_lossy().replace('\\', "/").to_string()
515    }
516
517    fn ensure_parent_exists(&self, path: &Path) -> io::Result<()> {
518        if let Some(parent) = path.parent() {
519            self.inner.post_empty(
520                "/fs/mkdir",
521                &CreateDirRequest {
522                    path: self.normalize(parent),
523                    recursive: true,
524                },
525            )?;
526        }
527        Ok(())
528    }
529
530    fn download_raw_file(&self, path: &str, len: u64) -> io::Result<Vec<u8>> {
531        if len == 0 {
532            return Ok(Vec::new());
533        }
534        if len >= self.inner.direct_read_threshold_bytes {
535            let url = self.inner.fetch_download_url(path)?.download_url;
536            return self.download_entire_file_from_url(&url, len);
537        }
538        let chunk = self.inner.chunk_bytes as u64;
539        let mut tasks = Vec::new();
540        let mut offset = 0;
541        let mut index = 0;
542        while offset < len {
543            let remaining = len - offset;
544            let length = std::cmp::min(chunk, remaining);
545            tasks.push(ChunkTask {
546                offset,
547                length: length as usize,
548                index,
549            });
550            offset += length;
551            index += 1;
552        }
553        let mut buffer = vec![0u8; len as usize];
554        let path = path.to_string();
555        let inner = self.inner.clone();
556        let queue = Arc::new(Mutex::new(VecDeque::from(tasks.clone())));
557        let results = Arc::new(Mutex::new(vec![None::<Vec<u8>>; tasks.len()]));
558        let error: Arc<Mutex<Option<io::Error>>> = Arc::new(Mutex::new(None));
559        let threads = inner
560            .parallel_requests
561            .min(queue.lock().unwrap().len())
562            .max(1);
563        thread::scope(|scope| {
564            for _ in 0..threads {
565                let queue = queue.clone();
566                let error = error.clone();
567                let results = results.clone();
568                let inner = inner.clone();
569                let path = path.clone();
570                scope.spawn(move |_| loop {
571                    let task_opt = {
572                        let mut guard = queue.lock().unwrap();
573                        guard.pop_front()
574                    };
575                    let task = match task_opt {
576                        Some(task) => task,
577                        None => break,
578                    };
579                    match inner.download_chunk(&path, task.offset, task.length) {
580                        Ok(bytes) => {
581                            let mut guard = results.lock().unwrap();
582                            guard[task.index] = Some(bytes);
583                        }
584                        Err(err) => {
585                            let mut guard = error.lock().unwrap();
586                            if guard.is_none() {
587                                *guard = Some(err);
588                            }
589                            break;
590                        }
591                    }
592                });
593            }
594        })
595        .expect("remote download scope");
596        if let Some(err) = error.lock().unwrap().take() {
597            return Err(err);
598        }
599        let chunks = Arc::try_unwrap(results)
600            .expect("results still in use")
601            .into_inner()
602            .expect("results poisoned");
603        for (task, maybe) in tasks.iter().zip(chunks.into_iter()) {
604            let bytes = maybe.expect("missing downloaded chunk for remote file");
605            let start = task.offset as usize;
606            buffer[start..start + bytes.len()].copy_from_slice(&bytes);
607        }
608        Ok(buffer)
609    }
610
611    fn download_sharded_file(&self, path: &str, len: u64) -> io::Result<Vec<u8>> {
612        let manifest_bytes = self.download_raw_file(path, len)?;
613        let manifest: ShardManifest = serde_json::from_slice(&manifest_bytes)
614            .map_err(|_| io::Error::new(ErrorKind::InvalidData, "invalid shard manifest"))?;
615        if manifest.version != 1 {
616            return Err(io::Error::new(
617                ErrorKind::InvalidData,
618                "unsupported manifest",
619            ));
620        }
621        let mut buffer = Vec::with_capacity(manifest.total_size as usize);
622        for shard in manifest.shards {
623            let meta = self.inner.fetch_metadata(&shard.path)?;
624            let bytes = self.download_raw_file(&shard.path, meta.len)?;
625            buffer.extend_from_slice(&bytes);
626        }
627        Ok(buffer)
628    }
629
630    fn download_entire_file_from_url(&self, url: &str, len: u64) -> io::Result<Vec<u8>> {
631        let chunk = self.inner.chunk_bytes as u64;
632        let mut tasks = Vec::new();
633        let mut offset = 0;
634        let mut index = 0;
635        while offset < len {
636            let remaining = len - offset;
637            let length = std::cmp::min(chunk, remaining);
638            tasks.push(ChunkTask {
639                offset,
640                length: length as usize,
641                index,
642            });
643            offset += length;
644            index += 1;
645        }
646        let mut buffer = vec![0u8; len as usize];
647        let url = url.to_string();
648        let inner = self.inner.clone();
649        let queue = Arc::new(Mutex::new(VecDeque::from(tasks.clone())));
650        let results = Arc::new(Mutex::new(vec![None::<Vec<u8>>; tasks.len()]));
651        let error: Arc<Mutex<Option<io::Error>>> = Arc::new(Mutex::new(None));
652        let threads = inner
653            .parallel_requests
654            .min(queue.lock().unwrap().len())
655            .max(1);
656        thread::scope(|scope| {
657            for _ in 0..threads {
658                let queue = queue.clone();
659                let error = error.clone();
660                let results = results.clone();
661                let inner = inner.clone();
662                let url = url.clone();
663                scope.spawn(move |_| loop {
664                    let task_opt = {
665                        let mut guard = queue.lock().unwrap();
666                        guard.pop_front()
667                    };
668                    let task = match task_opt {
669                        Some(task) => task,
670                        None => break,
671                    };
672                    match inner.download_range_from_url(&url, task.offset, task.length as u64) {
673                        Ok(bytes) => {
674                            let mut guard = results.lock().unwrap();
675                            guard[task.index] = Some(bytes);
676                        }
677                        Err(err) => {
678                            let mut guard = error.lock().unwrap();
679                            if guard.is_none() {
680                                *guard = Some(err);
681                            }
682                            break;
683                        }
684                    }
685                });
686            }
687        })
688        .expect("remote download scope");
689        if let Some(err) = error.lock().unwrap().take() {
690            return Err(err);
691        }
692        let chunks = Arc::try_unwrap(results)
693            .expect("results still in use")
694            .into_inner()
695            .expect("results poisoned");
696        for (task, maybe) in tasks.iter().zip(chunks.into_iter()) {
697            let bytes = maybe.expect("missing downloaded chunk for remote file");
698            let start = task.offset as usize;
699            buffer[start..start + bytes.len()].copy_from_slice(&bytes);
700        }
701        Ok(buffer)
702    }
703
704    fn upload_entire_file(&self, path: &str, data: &[u8]) -> io::Result<()> {
705        if data.len() as u64 >= self.inner.shard_threshold_bytes {
706            return self.upload_sharded_file(path, data);
707        }
708        self.upload_unsharded_file(path, data, None)
709    }
710
711    fn upload_unsharded_file(&self, path: &str, data: &[u8], hash: Option<&str>) -> io::Result<()> {
712        if data.len() as u64 >= self.inner.direct_write_threshold_bytes {
713            return self.upload_multipart_file(path, data);
714        }
715        if data.is_empty() {
716            self.inner.upload_chunk(path, 0, true, true, data, hash)?;
717            return Ok(());
718        }
719        let chunk = self.inner.chunk_bytes;
720        let mut offset = 0usize;
721        while offset < data.len() {
722            let end = std::cmp::min(offset + chunk, data.len());
723            let slice = &data[offset..end];
724            let truncate = offset == 0;
725            let final_chunk = end == data.len();
726            let result =
727                self.inner
728                    .upload_chunk(path, offset as u64, truncate, final_chunk, slice, hash)?;
729            if let Some(session) = result {
730                let expected = offset as u64 + slice.len() as u64;
731                if session.next_offset as u64 != expected {
732                    return Err(io::Error::other("unexpected next offset"));
733                }
734            }
735            offset = end;
736        }
737        Ok(())
738    }
739
740    fn upload_sharded_file(&self, path: &str, data: &[u8]) -> io::Result<()> {
741        let shard_size = self.inner.shard_size_bytes as usize;
742        let mut shards = Vec::new();
743        let mut offset = 0usize;
744        while offset < data.len() {
745            let end = std::cmp::min(offset + shard_size, data.len());
746            let slice = &data[offset..end];
747            let shard_path = format!("{}/{}", SHARD_PREFIX, Uuid::new_v4());
748            self.upload_unsharded_file(&shard_path, slice, None)?;
749            shards.push(ShardEntry {
750                path: shard_path,
751                size: slice.len() as u64,
752            });
753            offset = end;
754        }
755        let manifest = ShardManifest {
756            version: 1,
757            total_size: data.len() as u64,
758            shard_size: self.inner.shard_size_bytes,
759            shards,
760        };
761        let bytes = serde_json::to_vec(&manifest)
762            .map_err(|_| io::Error::new(ErrorKind::InvalidData, "invalid manifest"))?;
763        self.upload_unsharded_file(path, &bytes, Some(MANIFEST_HASH))
764    }
765
766    fn upload_multipart_file(&self, path: &str, data: &[u8]) -> io::Result<()> {
767        if data.is_empty() {
768            self.inner.upload_chunk(path, 0, true, true, data, None)?;
769            return Ok(());
770        }
771        let content_sha256 = sha256_hex(data);
772        let session =
773            match self
774                .inner
775                .upload_session_start(path, data.len() as u64, &content_sha256)
776            {
777                Ok(session) => session,
778                Err(err) if err.kind() == ErrorKind::NotFound => {
779                    return self.upload_multipart_file_legacy(path, data);
780                }
781                Err(err) => return Err(err),
782            };
783        let chunk_size = (session.chunk_size_bytes as usize).max(1);
784        let mut tasks = Vec::new();
785        let mut offset = 0usize;
786        let mut index = 0usize;
787        while offset < data.len() {
788            let end = std::cmp::min(offset + chunk_size, data.len());
789            let slice = &data[offset..end];
790            tasks.push(UploadTask {
791                index,
792                offset,
793                length: end - offset,
794                chunk_sha256: sha256_hex(slice),
795            });
796            offset = end;
797            index += 1;
798        }
799        let descriptors: Vec<UploadSessionChunkDescriptor> = tasks
800            .iter()
801            .map(|task| UploadSessionChunkDescriptor {
802                chunk_index: task.index,
803                offset_bytes: task.offset as i64,
804                size_bytes: task.length as i64,
805                chunk_sha256: task.chunk_sha256.clone(),
806            })
807            .collect();
808        let chunk_response = self.inner.upload_session_chunks(
809            &session.session_id,
810            &session.blob_key,
811            &descriptors,
812        )?;
813        let targets = Arc::new(chunk_response.targets);
814        let tasks = Arc::new(Mutex::new(VecDeque::from(tasks)));
815        let error = Arc::new(Mutex::new(None));
816        let data = Arc::new(data.to_vec());
817        thread::scope(|scope| {
818            for _ in 0..self.inner.parallel_requests {
819                let tasks = Arc::clone(&tasks);
820                let targets = Arc::clone(&targets);
821                let error = Arc::clone(&error);
822                let inner = Arc::clone(&self.inner);
823                let data = Arc::clone(&data);
824                scope.spawn(move |_| loop {
825                    let task = {
826                        let mut guard = tasks.lock().unwrap();
827                        guard.pop_front()
828                    };
829                    let Some(task) = task else { break };
830                    let target = targets
831                        .iter()
832                        .find(|target| target.chunk_index == task.index)
833                        .cloned();
834                    let result = (|| {
835                        let target = target.ok_or_else(|| {
836                            io::Error::other(format!(
837                                "missing upload target for chunk {}",
838                                task.index
839                            ))
840                        })?;
841                        let slice = &data[task.offset..task.offset + task.length];
842                        inner.put_upload_target(
843                            &target.method,
844                            &target.upload_url,
845                            &target.headers,
846                            slice,
847                        )
848                    })();
849                    if let Err(err) = result {
850                        let mut err_guard = error.lock().unwrap();
851                        if err_guard.is_none() {
852                            *err_guard = Some(io::Error::new(err.kind(), err.to_string()));
853                        }
854                        break;
855                    }
856                });
857            }
858        })
859        .expect("upload session scope");
860        if let Some(err) = error.lock().unwrap().take() {
861            return Err(err);
862        }
863        self.inner.upload_session_complete(
864            path,
865            &session.session_id,
866            &session.blob_key,
867            data.len() as u64,
868            &content_sha256,
869            descriptors.len(),
870        )?;
871        Ok(())
872    }
873
874    fn upload_multipart_file_legacy(&self, path: &str, data: &[u8]) -> io::Result<()> {
875        let session = self.inner.multipart_create(path, data.len() as u64)?;
876        let part_size = session.part_size_bytes as usize;
877        let mut tasks = std::collections::VecDeque::new();
878        let mut offset = 0usize;
879        let mut part_number = 1;
880        let mut index = 0usize;
881        while offset < data.len() {
882            let end = std::cmp::min(offset + part_size, data.len());
883            let length = end - offset;
884            tasks.push_back(MultipartTask {
885                index,
886                part_number,
887                offset,
888                length,
889            });
890            offset = end;
891            part_number += 1;
892            index += 1;
893        }
894        let tasks = Arc::new(Mutex::new(tasks));
895        let task_len = tasks.lock().unwrap().len();
896        let mut result_vec: Vec<MultipartResult> = Vec::with_capacity(task_len);
897        for _ in 0..task_len {
898            result_vec.push(None);
899        }
900        let results = Arc::new(Mutex::new(result_vec));
901        let error = Arc::new(Mutex::new(None));
902        let data = Arc::new(data.to_vec());
903        thread::scope(|scope| {
904            for _ in 0..self.inner.parallel_requests {
905                let tasks = Arc::clone(&tasks);
906                let results = Arc::clone(&results);
907                let error = Arc::clone(&error);
908                let inner = Arc::clone(&self.inner);
909                let blob_key = session.blob_key.clone();
910                let upload_id = session.upload_id.clone();
911                let session_id = session.session_id.clone();
912                let data = Arc::clone(&data);
913                scope.spawn(move |_| loop {
914                    let task = {
915                        let mut guard = tasks.lock().unwrap();
916                        guard.pop_front()
917                    };
918                    let Some(task) = task else { break };
919                    let slice = &data[task.offset..task.offset + task.length];
920                    let result = (|| {
921                        let url = inner.multipart_presign_part(
922                            &session_id,
923                            &blob_key,
924                            &upload_id,
925                            task.part_number,
926                            task.length as u64,
927                        )?;
928                        let etag = inner.put_upload_url_with_etag(&url, slice)?;
929                        Ok(MultipartPart {
930                            part_number: task.part_number,
931                            etag,
932                        })
933                    })();
934                    let mut guard = results.lock().unwrap();
935                    guard[task.index] = Some(result);
936                    if let Some(Err(err)) = guard[task.index].as_ref() {
937                        let mut err_guard = error.lock().unwrap();
938                        if err_guard.is_none() {
939                            *err_guard = Some(io::Error::new(err.kind(), err.to_string()));
940                        }
941                        break;
942                    }
943                });
944            }
945        })
946        .expect("multipart upload scope");
947        if let Some(err) = error.lock().unwrap().take() {
948            return Err(err);
949        }
950        let mut parts = Vec::with_capacity(results.lock().unwrap().len());
951        for maybe in Arc::try_unwrap(results)
952            .expect("results still in use")
953            .into_inner()
954            .expect("results poisoned")
955        {
956            let part = maybe.expect("missing part result")?;
957            if part.etag.is_empty() {
958                return Err(io::Error::other("missing etag"));
959            }
960            parts.push(part);
961        }
962        parts.sort_by_key(|part| part.part_number);
963        self.inner.multipart_complete(
964            path,
965            &session.session_id,
966            &session.blob_key,
967            &session.upload_id,
968            data.len() as u64,
969            parts,
970        )?;
971        Ok(())
972    }
973}
974
975#[derive(Clone, Copy)]
976struct ChunkTask {
977    offset: u64,
978    length: usize,
979    index: usize,
980}
981
982type MultipartResult = Option<Result<MultipartPart, io::Error>>;
983
984#[derive(Clone, Copy)]
985struct MultipartTask {
986    index: usize,
987    part_number: i32,
988    offset: usize,
989    length: usize,
990}
991
992#[derive(Clone)]
993struct UploadTask {
994    index: usize,
995    offset: usize,
996    length: usize,
997    chunk_sha256: String,
998}
999
1000#[async_trait(?Send)]
1001impl FsProvider for RemoteFsProvider {
1002    fn open(&self, path: &Path, flags: &OpenFlags) -> io::Result<Box<dyn FileHandle>> {
1003        let normalized = self.normalize(path);
1004        let mut data = Vec::new();
1005        if flags.read {
1006            let meta = self.inner.fetch_metadata(&normalized)?;
1007            if meta.file_type != "file" {
1008                return Err(io::Error::other("remote path is not a file"));
1009            }
1010            data = if meta.hash.as_deref() == Some(MANIFEST_HASH) {
1011                self.download_sharded_file(&normalized, meta.len)?
1012            } else {
1013                self.download_raw_file(&normalized, meta.len)?
1014            };
1015        }
1016        if flags.truncate {
1017            data.clear();
1018        }
1019        if flags.create {
1020            self.ensure_parent_exists(path)?;
1021        }
1022        let handle = RemoteFileHandle {
1023            provider: self.clone(),
1024            path: normalized,
1025            data,
1026            position: 0,
1027            flags: flags.clone(),
1028            dirty: false,
1029        };
1030        Ok(Box::new(handle))
1031    }
1032
1033    async fn read(&self, path: &Path) -> io::Result<Vec<u8>> {
1034        let normalized = self.normalize(path);
1035        let meta = self.inner.fetch_metadata(&normalized)?;
1036        if meta.file_type != "file" {
1037            return Err(io::Error::other("remote path is not a file"));
1038        }
1039        if meta.hash.as_deref() == Some(MANIFEST_HASH) {
1040            self.download_sharded_file(&normalized, meta.len)
1041        } else {
1042            self.download_raw_file(&normalized, meta.len)
1043        }
1044    }
1045
1046    async fn write(&self, path: &Path, data: &[u8]) -> io::Result<()> {
1047        let normalized = self.normalize(path);
1048        self.ensure_parent_exists(path)?;
1049        self.upload_entire_file(&normalized, data)
1050    }
1051
1052    async fn remove_file(&self, path: &Path) -> io::Result<()> {
1053        let normalized = self.normalize(path);
1054        self.inner
1055            .delete_empty("/fs/file", &[("path", normalized)])?;
1056        Ok(())
1057    }
1058
1059    async fn metadata(&self, path: &Path) -> io::Result<FsMetadata> {
1060        let normalized = self.normalize(path);
1061        let resp = self.inner.fetch_metadata(&normalized)?;
1062        Ok(resp.into())
1063    }
1064
1065    async fn symlink_metadata(&self, path: &Path) -> io::Result<FsMetadata> {
1066        self.metadata(path).await
1067    }
1068
1069    async fn read_dir(&self, path: &Path) -> io::Result<Vec<DirEntry>> {
1070        let normalized = self.normalize(path);
1071        let resp = self.inner.fetch_dir(&normalized)?;
1072        Ok(resp
1073            .into_iter()
1074            .map(|entry| DirEntry {
1075                path: PathBuf::from(entry.path),
1076                file_name: entry.file_name.into(),
1077                file_type: entry.file_type.into(),
1078            })
1079            .collect())
1080    }
1081
1082    async fn canonicalize(&self, path: &Path) -> io::Result<PathBuf> {
1083        let normalized = self.normalize(path);
1084        let canonical = self.inner.canonicalize_path(&normalized)?;
1085        Ok(PathBuf::from(canonical))
1086    }
1087
1088    async fn create_dir(&self, path: &Path) -> io::Result<()> {
1089        let normalized = self.normalize(path);
1090        self.inner.post_empty(
1091            "/fs/mkdir",
1092            &CreateDirRequest {
1093                path: normalized,
1094                recursive: false,
1095            },
1096        )
1097    }
1098
1099    async fn create_dir_all(&self, path: &Path) -> io::Result<()> {
1100        let normalized = self.normalize(path);
1101        self.inner.post_empty(
1102            "/fs/mkdir",
1103            &CreateDirRequest {
1104                path: normalized,
1105                recursive: true,
1106            },
1107        )
1108    }
1109
1110    async fn remove_dir(&self, path: &Path) -> io::Result<()> {
1111        let normalized = self.normalize(path);
1112        self.inner.delete_empty(
1113            "/fs/dir",
1114            &[("path", normalized), ("recursive", "false".into())],
1115        )
1116    }
1117
1118    async fn remove_dir_all(&self, path: &Path) -> io::Result<()> {
1119        let normalized = self.normalize(path);
1120        self.inner.delete_empty(
1121            "/fs/dir",
1122            &[("path", normalized), ("recursive", "true".into())],
1123        )
1124    }
1125
1126    async fn rename(&self, from: &Path, to: &Path) -> io::Result<()> {
1127        self.inner.post_empty(
1128            "/fs/rename",
1129            &RenameRequest {
1130                from: self.normalize(from),
1131                to: self.normalize(to),
1132            },
1133        )
1134    }
1135
1136    async fn set_readonly(&self, path: &Path, readonly: bool) -> io::Result<()> {
1137        self.inner.post_empty(
1138            "/fs/set-readonly",
1139            &SetReadonlyRequest {
1140                path: self.normalize(path),
1141                readonly,
1142            },
1143        )
1144    }
1145
1146    async fn data_manifest_descriptor(
1147        &self,
1148        request: &DataManifestRequest,
1149    ) -> io::Result<DataManifestDescriptor> {
1150        let mut query = vec![("path", request.path.clone())];
1151        if let Some(version) = &request.version {
1152            query.push(("version", version.clone()));
1153        }
1154        self.inner.get_json("/data/manifest", &query)
1155    }
1156
1157    async fn data_chunk_upload_targets(
1158        &self,
1159        request: &DataChunkUploadRequest,
1160    ) -> io::Result<Vec<DataChunkUploadTarget>> {
1161        #[derive(Deserialize)]
1162        struct DataChunkUploadTargetsResponse {
1163            targets: Vec<DataChunkUploadTarget>,
1164        }
1165        let response: DataChunkUploadTargetsResponse = self
1166            .inner
1167            .post_json("/data/chunks/upload-targets", request)?;
1168        Ok(response.targets)
1169    }
1170
1171    async fn data_upload_chunk(
1172        &self,
1173        target: &DataChunkUploadTarget,
1174        data: &[u8],
1175    ) -> io::Result<()> {
1176        self.inner
1177            .put_upload_target(&target.method, &target.upload_url, &target.headers, data)
1178    }
1179}
1180
1181impl Clone for RemoteFsProvider {
1182    fn clone(&self) -> Self {
1183        Self {
1184            inner: Arc::clone(&self.inner),
1185        }
1186    }
1187}
1188
1189struct RemoteFileHandle {
1190    provider: RemoteFsProvider,
1191    path: String,
1192    data: Vec<u8>,
1193    position: usize,
1194    flags: OpenFlags,
1195    dirty: bool,
1196}
1197
1198impl RemoteFileHandle {
1199    fn flush_remote(&mut self) -> io::Result<()> {
1200        if !self.dirty {
1201            return Ok(());
1202        }
1203        self.provider.upload_entire_file(&self.path, &self.data)?;
1204        self.dirty = false;
1205        Ok(())
1206    }
1207}
1208
1209impl Read for RemoteFileHandle {
1210    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
1211        let remaining = &self.data[self.position..];
1212        let amt = remaining.len().min(buf.len());
1213        buf[..amt].copy_from_slice(&remaining[..amt]);
1214        self.position += amt;
1215        Ok(amt)
1216    }
1217}
1218
1219impl Write for RemoteFileHandle {
1220    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1221        if !self.flags.write && !self.flags.append {
1222            return Err(io::Error::new(
1223                ErrorKind::PermissionDenied,
1224                "remote file not opened for writing",
1225            ));
1226        }
1227        if self.flags.append {
1228            self.position = self.data.len();
1229        }
1230        let required = self.position + buf.len();
1231        if required > self.data.len() {
1232            self.data.resize(required, 0);
1233        }
1234        self.data[self.position..self.position + buf.len()].copy_from_slice(buf);
1235        self.position += buf.len();
1236        self.dirty = true;
1237        Ok(buf.len())
1238    }
1239
1240    fn flush(&mut self) -> io::Result<()> {
1241        self.flush_remote()
1242    }
1243}
1244
1245impl Seek for RemoteFileHandle {
1246    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
1247        let new_pos = match pos {
1248            SeekFrom::Start(offset) => offset as i64,
1249            SeekFrom::End(offset) => self.data.len() as i64 + offset,
1250            SeekFrom::Current(offset) => self.position as i64 + offset,
1251        };
1252        if new_pos < 0 {
1253            return Err(io::Error::new(ErrorKind::InvalidInput, "seek before start"));
1254        }
1255        self.position = new_pos as usize;
1256        Ok(self.position as u64)
1257    }
1258}
1259
1260impl Drop for RemoteFileHandle {
1261    fn drop(&mut self) {
1262        if self.dirty {
1263            if let Err(err) = self.provider.upload_entire_file(&self.path, &self.data) {
1264                eprintln!("remote fs flush failed: {err}");
1265            }
1266        }
1267    }
1268}
1269
1270#[derive(Debug, Deserialize)]
1271struct MetadataResponse {
1272    #[serde(rename = "fileType")]
1273    file_type: String,
1274    len: u64,
1275    #[serde(rename = "modifiedAt")]
1276    modified_at: Option<String>,
1277    readonly: bool,
1278    hash: Option<String>,
1279}
1280
1281impl From<MetadataResponse> for FsMetadata {
1282    fn from(value: MetadataResponse) -> Self {
1283        FsMetadata::new_with_hash(
1284            value.file_type.into(),
1285            value.len,
1286            parse_modified_at(value.modified_at.as_deref()),
1287            value.readonly,
1288            value.hash,
1289        )
1290    }
1291}
1292
1293#[derive(Debug, Deserialize)]
1294struct DirEntryResponse {
1295    path: String,
1296    #[serde(rename = "fileName")]
1297    file_name: String,
1298    #[serde(rename = "fileType")]
1299    file_type: String,
1300}
1301
1302impl From<String> for FsFileType {
1303    fn from(value: String) -> Self {
1304        match value.as_str() {
1305            "dir" => FsFileType::Directory,
1306            "file" => FsFileType::File,
1307            "symlink" => FsFileType::Symlink,
1308            "other" => FsFileType::Other,
1309            _ => FsFileType::Unknown,
1310        }
1311    }
1312}
1313
1314#[derive(Debug, Serialize, Deserialize)]
1315struct CreateDirRequest {
1316    path: String,
1317    recursive: bool,
1318}
1319
1320#[derive(Debug, Serialize, Deserialize)]
1321struct RenameRequest {
1322    from: String,
1323    to: String,
1324}
1325
1326#[derive(Debug, Serialize, Deserialize)]
1327struct SetReadonlyRequest {
1328    path: String,
1329    readonly: bool,
1330}
1331
1332#[derive(Debug, Deserialize)]
1333struct CanonicalizeResponse {
1334    path: String,
1335}
1336
1337#[derive(Debug, Deserialize)]
1338struct DownloadUrlResponse {
1339    #[serde(rename = "downloadUrl")]
1340    download_url: String,
1341}
1342
1343#[derive(Debug, Serialize, Deserialize)]
1344#[serde(rename_all = "camelCase")]
1345struct UploadSessionStartRequest {
1346    path: String,
1347    size_bytes: i64,
1348    content_type: Option<String>,
1349    content_sha256: String,
1350}
1351
1352#[derive(Debug, Serialize, Deserialize)]
1353#[serde(rename_all = "camelCase")]
1354struct UploadSessionStartResponse {
1355    session_id: String,
1356    blob_key: String,
1357    chunk_size_bytes: i64,
1358}
1359
1360#[derive(Debug, Clone, Serialize, Deserialize)]
1361#[serde(rename_all = "camelCase")]
1362struct UploadSessionChunkDescriptor {
1363    chunk_index: usize,
1364    offset_bytes: i64,
1365    size_bytes: i64,
1366    chunk_sha256: String,
1367}
1368
1369#[derive(Debug, Serialize, Deserialize)]
1370#[serde(rename_all = "camelCase")]
1371struct UploadSessionChunksRequest {
1372    session_id: String,
1373    blob_key: String,
1374    chunks: Vec<UploadSessionChunkDescriptor>,
1375}
1376
1377#[derive(Debug, Serialize, Deserialize)]
1378#[serde(rename_all = "camelCase")]
1379struct UploadSessionChunksResponse {
1380    targets: Vec<UploadChunkTarget>,
1381}
1382
1383#[derive(Debug, Clone, Serialize, Deserialize)]
1384#[serde(rename_all = "camelCase")]
1385struct UploadChunkTarget {
1386    chunk_index: usize,
1387    method: String,
1388    upload_url: String,
1389    headers: HashMap<String, String>,
1390}
1391
1392#[derive(Debug, Serialize, Deserialize)]
1393#[serde(rename_all = "camelCase")]
1394struct UploadSessionCompleteRequest {
1395    path: String,
1396    session_id: String,
1397    blob_key: String,
1398    size_bytes: i64,
1399    content_sha256: String,
1400    chunk_count: usize,
1401    hash: Option<String>,
1402}
1403
1404#[derive(Debug, Serialize, Deserialize)]
1405struct MultipartUploadRequest {
1406    path: String,
1407    #[serde(rename = "sizeBytes")]
1408    size_bytes: i64,
1409    #[serde(rename = "contentType")]
1410    content_type: Option<String>,
1411}
1412
1413#[derive(Debug, Deserialize)]
1414struct MultipartUploadResponse {
1415    #[serde(rename = "sessionId")]
1416    session_id: String,
1417    #[serde(rename = "blobKey")]
1418    blob_key: String,
1419    #[serde(rename = "uploadId")]
1420    upload_id: String,
1421    #[serde(rename = "partSizeBytes")]
1422    part_size_bytes: i64,
1423}
1424
1425#[derive(Debug, Serialize, Deserialize)]
1426struct MultipartUploadPartRequest {
1427    #[serde(rename = "sessionId")]
1428    session_id: String,
1429    #[serde(rename = "blobKey")]
1430    blob_key: String,
1431    #[serde(rename = "uploadId")]
1432    upload_id: String,
1433    #[serde(rename = "partNumber")]
1434    part_number: i32,
1435    #[serde(rename = "sizeBytes")]
1436    size_bytes: i64,
1437}
1438
1439#[derive(Debug, Deserialize)]
1440struct MultipartUploadPartResponse {
1441    #[serde(rename = "upload_url")]
1442    upload_url: String,
1443}
1444
1445#[derive(Debug, Serialize, Deserialize)]
1446struct MultipartUploadCompleteRequest {
1447    path: String,
1448    #[serde(rename = "sessionId")]
1449    session_id: String,
1450    #[serde(rename = "blobKey")]
1451    blob_key: String,
1452    #[serde(rename = "uploadId")]
1453    upload_id: String,
1454    #[serde(rename = "sizeBytes")]
1455    size_bytes: i64,
1456    hash: Option<String>,
1457    parts: Vec<MultipartPart>,
1458}
1459
1460#[derive(Debug, Serialize, Deserialize)]
1461struct MultipartPart {
1462    #[serde(rename = "partNumber")]
1463    part_number: i32,
1464    etag: String,
1465}
1466
1467#[derive(Debug, Serialize, Deserialize)]
1468struct ShardManifest {
1469    version: u32,
1470    total_size: u64,
1471    shard_size: u64,
1472    shards: Vec<ShardEntry>,
1473}
1474
1475#[derive(Debug, Serialize, Deserialize)]
1476struct ShardEntry {
1477    path: String,
1478    size: u64,
1479}
1480
1481#[derive(Debug, Deserialize)]
1482struct FsWriteSessionResponse {
1483    #[serde(rename = "sessionId")]
1484    _session_id: String,
1485    #[serde(rename = "nextOffset")]
1486    next_offset: i64,
1487}
1488
1489fn sha256_hex(data: &[u8]) -> String {
1490    let digest = Sha256::digest(data);
1491    let mut out = String::with_capacity(digest.len() * 2);
1492    for byte in digest {
1493        use std::fmt::Write as _;
1494        let _ = write!(out, "{byte:02x}");
1495    }
1496    out
1497}
1498
1499fn map_http_err(err: reqwest::Error) -> io::Error {
1500    io::Error::other(err)
1501}
1502
1503fn parse_modified_at(value: Option<&str>) -> Option<std::time::SystemTime> {
1504    let value = value?;
1505    let parsed = DateTime::parse_from_rfc3339(value).ok()?;
1506    let millis = parsed.timestamp_millis();
1507    if millis < 0 {
1508        return None;
1509    }
1510    Some(std::time::UNIX_EPOCH + Duration::from_millis(millis as u64))
1511}
1512
1513fn map_url_err(err: url::ParseError) -> io::Error {
1514    io::Error::new(ErrorKind::InvalidInput, err)
1515}
1516
1517fn handle_error(resp: Response) -> io::Result<Response> {
1518    let status = resp.status();
1519    if status.is_success() {
1520        return Ok(resp);
1521    }
1522    let text = resp.text().unwrap_or_else(|_| status.to_string());
1523    let kind = match status {
1524        StatusCode::NOT_FOUND => ErrorKind::NotFound,
1525        StatusCode::FORBIDDEN | StatusCode::UNAUTHORIZED => ErrorKind::PermissionDenied,
1526        StatusCode::CONFLICT => ErrorKind::AlreadyExists,
1527        StatusCode::BAD_REQUEST => ErrorKind::InvalidInput,
1528        _ => ErrorKind::Other,
1529    };
1530    Err(io::Error::new(kind, text))
1531}
1532
1533impl fmt::Debug for RemoteFsProvider {
1534    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1535        f.debug_struct("RemoteFsProvider").finish_non_exhaustive()
1536    }
1537}
1538
1539#[cfg(test)]
1540mod tests {
1541    use super::*;
1542    use crate::data_contract::DataChunkDescriptor;
1543    use axum::extract::{Query, State};
1544    use axum::http::{HeaderMap, StatusCode};
1545    use axum::routing::{delete, get, post, put};
1546    use axum::{Json, Router};
1547    use futures::executor;
1548    use serde::Deserialize;
1549    use std::collections::HashMap;
1550    use std::net::TcpListener as StdTcpListener;
1551    use std::sync::Arc;
1552    use tempfile::tempdir;
1553    use tokio::net::TcpListener as TokioTcpListener;
1554    use tokio::runtime::Runtime;
1555
1556    #[derive(Clone)]
1557    struct Harness {
1558        root: Arc<PathBuf>,
1559        _keeper: Arc<tempfile::TempDir>,
1560        base_url: Arc<Mutex<Option<String>>>,
1561        upload_sessions: Arc<Mutex<HashMap<String, UploadSessionTestState>>>,
1562        omit_last_chunk_target: bool,
1563    }
1564
1565    impl Harness {
1566        fn new() -> Self {
1567            Self::with_omit_last_chunk_target(false)
1568        }
1569
1570        fn with_omit_last_chunk_target(omit_last_chunk_target: bool) -> Self {
1571            let dir = tempdir().expect("tempdir");
1572            let path = dir.path().to_path_buf();
1573            Self {
1574                root: Arc::new(path),
1575                _keeper: Arc::new(dir),
1576                base_url: Arc::new(Mutex::new(None)),
1577                upload_sessions: Arc::new(Mutex::new(HashMap::new())),
1578                omit_last_chunk_target,
1579            }
1580        }
1581
1582        fn resolve(&self, remote_path: &str) -> PathBuf {
1583            let trimmed = remote_path.trim_start_matches('/');
1584            self.root.join(trimmed)
1585        }
1586    }
1587
1588    #[derive(Clone)]
1589    struct UploadSessionTestState {
1590        path: String,
1591        chunk_count: usize,
1592    }
1593
1594    #[derive(Deserialize)]
1595    struct PathParams {
1596        path: String,
1597        offset: Option<u64>,
1598        length: Option<usize>,
1599        truncate: Option<String>,
1600        recursive: Option<String>,
1601    }
1602
1603    #[derive(Deserialize)]
1604    struct UploadChunkQuery {
1605        #[serde(rename = "sessionId")]
1606        session_id: String,
1607        #[serde(rename = "chunkIndex")]
1608        chunk_index: usize,
1609    }
1610
1611    #[derive(Deserialize)]
1612    struct DataManifestQuery {
1613        path: String,
1614        version: Option<String>,
1615    }
1616
1617    async fn metadata_handler(
1618        State(harness): State<Harness>,
1619        Query(params): Query<PathParams>,
1620    ) -> Result<Json<serde_json::Value>, StatusCode> {
1621        let meta =
1622            std::fs::metadata(harness.resolve(&params.path)).map_err(|_| StatusCode::NOT_FOUND)?;
1623        let file_type = if meta.is_dir() {
1624            "dir"
1625        } else if meta.is_file() {
1626            "file"
1627        } else {
1628            "other"
1629        };
1630        Ok(Json(serde_json::json!({
1631            "fileType": file_type,
1632            "len": meta.len(),
1633            "modified": meta.modified().ok().and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok()).map(|d| d.as_secs()),
1634            "readonly": meta.permissions().readonly()
1635        })))
1636    }
1637
1638    async fn dir_handler(
1639        State(harness): State<Harness>,
1640        Query(params): Query<PathParams>,
1641    ) -> Result<Json<Vec<serde_json::Value>>, StatusCode> {
1642        let mut entries = Vec::new();
1643        for entry in
1644            std::fs::read_dir(harness.resolve(&params.path)).map_err(|_| StatusCode::NOT_FOUND)?
1645        {
1646            let entry = entry.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1647            let file_type = entry
1648                .file_type()
1649                .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1650            let kind = if file_type.is_dir() {
1651                "dir"
1652            } else if file_type.is_file() {
1653                "file"
1654            } else {
1655                "other"
1656            };
1657            entries.push(serde_json::json!({
1658                "path": format!("/{}", entry.path().strip_prefix(&*harness.root).unwrap().display()),
1659                "fileName": entry.file_name().to_string_lossy(),
1660                "fileType": kind
1661            }));
1662        }
1663        Ok(Json(entries))
1664    }
1665
1666    async fn canonicalize_handler(
1667        State(harness): State<Harness>,
1668        Query(params): Query<PathParams>,
1669    ) -> Result<Json<serde_json::Value>, StatusCode> {
1670        let path = harness.resolve(&params.path);
1671        let canonical = std::fs::canonicalize(path).map_err(|_| StatusCode::NOT_FOUND)?;
1672        let rel = canonical.strip_prefix(&*harness.root).unwrap_or(&canonical);
1673        Ok(Json(serde_json::json!({
1674            "path": format!("/{}", rel.display())
1675        })))
1676    }
1677
1678    async fn read_handler(
1679        State(harness): State<Harness>,
1680        Query(params): Query<PathParams>,
1681    ) -> Result<Vec<u8>, StatusCode> {
1682        let mut data =
1683            std::fs::read(harness.resolve(&params.path)).map_err(|_| StatusCode::NOT_FOUND)?;
1684        let offset = params.offset.unwrap_or(0) as usize;
1685        let length = params.length.unwrap_or(data.len().saturating_sub(offset));
1686        let end = std::cmp::min(offset + length, data.len());
1687        if offset < data.len() {
1688            data = data[offset..end].to_vec();
1689        } else {
1690            data.clear();
1691        }
1692        Ok(data)
1693    }
1694
1695    async fn write_handler(
1696        State(harness): State<Harness>,
1697        Query(params): Query<PathParams>,
1698        body: axum::body::Bytes,
1699    ) -> Result<(), StatusCode> {
1700        let path = harness.resolve(&params.path);
1701        if let Some(parent) = path.parent() {
1702            std::fs::create_dir_all(parent).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1703        }
1704        let mut data = if params.truncate.as_deref() == Some("true") || !path.exists() {
1705            Vec::new()
1706        } else {
1707            std::fs::read(&path).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
1708        };
1709        let offset = params.offset.unwrap_or(0) as usize;
1710        let required = offset + body.len();
1711        if required > data.len() {
1712            data.resize(required, 0);
1713        }
1714        data[offset..offset + body.len()].copy_from_slice(&body);
1715        std::fs::write(path, data).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1716        Ok(())
1717    }
1718
1719    async fn upload_session_start_handler(
1720        State(harness): State<Harness>,
1721        Json(req): Json<UploadSessionStartRequest>,
1722    ) -> Result<Json<UploadSessionStartResponse>, StatusCode> {
1723        let session_id = Uuid::new_v4().to_string();
1724        let chunk_size_bytes = 1024i64;
1725        harness
1726            .upload_sessions
1727            .lock()
1728            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
1729            .insert(
1730                session_id.clone(),
1731                UploadSessionTestState {
1732                    path: req.path,
1733                    chunk_count: 0,
1734                },
1735            );
1736        Ok(Json(UploadSessionStartResponse {
1737            session_id: session_id.clone(),
1738            blob_key: format!("test/blob/{session_id}"),
1739            chunk_size_bytes,
1740        }))
1741    }
1742
1743    async fn upload_session_chunks_handler(
1744        State(harness): State<Harness>,
1745        Json(req): Json<UploadSessionChunksRequest>,
1746    ) -> Result<Json<UploadSessionChunksResponse>, StatusCode> {
1747        let base = harness
1748            .base_url
1749            .lock()
1750            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
1751            .clone()
1752            .ok_or(StatusCode::INTERNAL_SERVER_ERROR)?;
1753        let mut sessions = harness
1754            .upload_sessions
1755            .lock()
1756            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1757        let state = sessions
1758            .get_mut(&req.session_id)
1759            .ok_or(StatusCode::NOT_FOUND)?;
1760        state.chunk_count = req.chunks.len();
1761        let mut targets = req
1762            .chunks
1763            .iter()
1764            .map(|chunk| UploadChunkTarget {
1765                chunk_index: chunk.chunk_index,
1766                method: "PUT".to_string(),
1767                upload_url: format!(
1768                    "{base}/upload-chunk?sessionId={}&chunkIndex={}",
1769                    req.session_id, chunk.chunk_index
1770                ),
1771                headers: HashMap::new(),
1772            })
1773            .collect::<Vec<_>>();
1774        if harness.omit_last_chunk_target && !targets.is_empty() {
1775            targets.pop();
1776        }
1777        Ok(Json(UploadSessionChunksResponse { targets }))
1778    }
1779
1780    async fn upload_chunk_handler(
1781        State(harness): State<Harness>,
1782        Query(query): Query<UploadChunkQuery>,
1783        body: axum::body::Bytes,
1784    ) -> Result<(), StatusCode> {
1785        let chunk_path = harness.resolve(&format!(
1786            "/.upload-sessions/{}/{}",
1787            query.session_id, query.chunk_index
1788        ));
1789        if let Some(parent) = chunk_path.parent() {
1790            std::fs::create_dir_all(parent).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1791        }
1792        std::fs::write(chunk_path, body).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1793        Ok(())
1794    }
1795
1796    async fn data_manifest_handler(
1797        Query(query): Query<DataManifestQuery>,
1798    ) -> Result<Json<DataManifestDescriptor>, StatusCode> {
1799        let updated_at = if query.version.as_deref() == Some("v1") {
1800            "2026-01-01T00:00:00Z"
1801        } else {
1802            "2026-02-02T00:00:00Z"
1803        };
1804        Ok(Json(DataManifestDescriptor {
1805            schema_version: 1,
1806            format: "runmat-data".to_string(),
1807            dataset_id: format!("dataset:{}", query.path),
1808            updated_at: updated_at.to_string(),
1809            txn_sequence: 9,
1810        }))
1811    }
1812
1813    async fn data_chunk_upload_targets_handler(
1814        Json(req): Json<DataChunkUploadRequest>,
1815    ) -> Result<Json<serde_json::Value>, StatusCode> {
1816        let targets = req
1817            .chunks
1818            .iter()
1819            .map(|chunk| {
1820                serde_json::json!({
1821                    "key": chunk.key,
1822                    "method": "PUT",
1823                    "upload_url": format!("https://uploads.example.test/{}/{}", req.array, chunk.object_id),
1824                    "headers": {
1825                        "x-runmat-hash": chunk.hash,
1826                        "x-runmat-object": chunk.object_id,
1827                    }
1828                })
1829            })
1830            .collect::<Vec<_>>();
1831        Ok(Json(serde_json::json!({ "targets": targets })))
1832    }
1833
1834    async fn upload_session_complete_handler(
1835        State(harness): State<Harness>,
1836        Json(req): Json<UploadSessionCompleteRequest>,
1837    ) -> Result<(), StatusCode> {
1838        let state = harness
1839            .upload_sessions
1840            .lock()
1841            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
1842            .remove(&req.session_id)
1843            .ok_or(StatusCode::NOT_FOUND)?;
1844        let mut data = Vec::new();
1845        for chunk_index in 0..state.chunk_count {
1846            let chunk_path = harness.resolve(&format!(
1847                "/.upload-sessions/{}/{}",
1848                req.session_id, chunk_index
1849            ));
1850            let chunk = std::fs::read(chunk_path).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1851            data.extend_from_slice(&chunk);
1852        }
1853        if sha256_hex(&data) != req.content_sha256 {
1854            return Err(StatusCode::BAD_REQUEST);
1855        }
1856        let target_path = harness.resolve(&state.path);
1857        if let Some(parent) = target_path.parent() {
1858            std::fs::create_dir_all(parent).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1859        }
1860        std::fs::write(target_path, data).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1861        Ok(())
1862    }
1863
1864    async fn mkdir_handler(
1865        State(harness): State<Harness>,
1866        Json(req): Json<CreateDirRequest>,
1867    ) -> Result<(), StatusCode> {
1868        let path = harness.resolve(&req.path);
1869        if req.recursive {
1870            std::fs::create_dir_all(path).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1871        } else {
1872            std::fs::create_dir(path).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1873        }
1874        Ok(())
1875    }
1876
1877    async fn delete_file_handler(
1878        State(harness): State<Harness>,
1879        Query(params): Query<PathParams>,
1880    ) -> Result<(), StatusCode> {
1881        std::fs::remove_file(harness.resolve(&params.path)).map_err(|_| StatusCode::NOT_FOUND)?;
1882        Ok(())
1883    }
1884
1885    async fn delete_dir_handler(
1886        State(harness): State<Harness>,
1887        Query(params): Query<PathParams>,
1888    ) -> Result<(), StatusCode> {
1889        let recursive = params.recursive.as_deref() == Some("true");
1890        if recursive {
1891            std::fs::remove_dir_all(harness.resolve(&params.path))
1892                .map_err(|_| StatusCode::NOT_FOUND)?;
1893        } else {
1894            std::fs::remove_dir(harness.resolve(&params.path))
1895                .map_err(|_| StatusCode::NOT_FOUND)?;
1896        }
1897        Ok(())
1898    }
1899
1900    async fn rename_handler(
1901        State(harness): State<Harness>,
1902        Json(req): Json<RenameRequest>,
1903    ) -> Result<(), StatusCode> {
1904        let from = harness.resolve(&req.from);
1905        let to = harness.resolve(&req.to);
1906        if let Some(parent) = to.parent() {
1907            std::fs::create_dir_all(parent).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1908        }
1909        std::fs::rename(from, to).map_err(|_| StatusCode::NOT_FOUND)?;
1910        Ok(())
1911    }
1912
1913    async fn set_readonly_handler(
1914        State(harness): State<Harness>,
1915        Json(req): Json<SetReadonlyRequest>,
1916    ) -> Result<(), StatusCode> {
1917        let path = harness.resolve(&req.path);
1918        let mut perms = std::fs::metadata(&path)
1919            .map_err(|_| StatusCode::NOT_FOUND)?
1920            .permissions();
1921        perms.set_readonly(req.readonly);
1922        std::fs::set_permissions(path, perms).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1923        Ok(())
1924    }
1925
1926    async fn read_with_download_handler(
1927        State(harness): State<Harness>,
1928        Query(params): Query<PathParams>,
1929    ) -> Result<Json<serde_json::Value>, StatusCode> {
1930        let base = harness
1931            .base_url
1932            .lock()
1933            .unwrap()
1934            .clone()
1935            .ok_or(StatusCode::INTERNAL_SERVER_ERROR)?;
1936        let download_url = format!("{base}/download?path={}", params.path);
1937        Ok(Json(serde_json::json!({
1938            "downloadUrl": download_url,
1939            "expiresAt": 0
1940        })))
1941    }
1942
1943    async fn download_handler(
1944        State(harness): State<Harness>,
1945        Query(params): Query<PathParams>,
1946        headers: HeaderMap,
1947    ) -> Result<Vec<u8>, StatusCode> {
1948        let mut data =
1949            std::fs::read(harness.resolve(&params.path)).map_err(|_| StatusCode::NOT_FOUND)?;
1950        if let Some(range) = headers.get("range").and_then(|value| value.to_str().ok()) {
1951            if let Some((start, end)) = parse_range(range) {
1952                let start = start.min(data.len());
1953                let end = end.min(data.len().saturating_sub(1));
1954                if start < data.len() {
1955                    data = data[start..=end].to_vec();
1956                } else {
1957                    data.clear();
1958                }
1959            }
1960        }
1961        Ok(data)
1962    }
1963
1964    fn parse_range(value: &str) -> Option<(usize, usize)> {
1965        let value = value.strip_prefix("bytes=")?;
1966        let (start, end) = value.split_once('-')?;
1967        let start = start.parse::<usize>().ok()?;
1968        let end = end.parse::<usize>().ok()?;
1969        Some((start, end))
1970    }
1971
1972    fn spawn_server() -> (String, Harness, Runtime) {
1973        let harness = Harness::new();
1974        let router = Router::new()
1975            .route("/fs/metadata", get(metadata_handler))
1976            .route("/fs/dir", get(dir_handler).delete(delete_dir_handler))
1977            .route("/fs/file", delete(delete_file_handler))
1978            .route("/fs/canonicalize", get(canonicalize_handler))
1979            .route("/fs/read", get(read_handler))
1980            .route("/fs/write", put(write_handler))
1981            .route(
1982                "/fs/upload-session/start",
1983                post(upload_session_start_handler),
1984            )
1985            .route(
1986                "/fs/upload-session/chunks",
1987                post(upload_session_chunks_handler),
1988            )
1989            .route(
1990                "/fs/upload-session/complete",
1991                post(upload_session_complete_handler),
1992            )
1993            .route("/fs/mkdir", post(mkdir_handler))
1994            .route("/fs/rename", post(rename_handler))
1995            .route("/fs/set-readonly", post(set_readonly_handler))
1996            .route("/data/manifest", get(data_manifest_handler))
1997            .route(
1998                "/data/chunks/upload-targets",
1999                post(data_chunk_upload_targets_handler),
2000            )
2001            .route("/upload-chunk", put(upload_chunk_handler))
2002            .with_state(harness.clone());
2003        let std_listener = StdTcpListener::bind("127.0.0.1:0").unwrap();
2004        std_listener.set_nonblocking(true).expect("nonblocking");
2005        let addr = std_listener.local_addr().unwrap();
2006        let service = router.into_make_service();
2007        let rt = Runtime::new().unwrap();
2008        let base = format!("http://{addr}");
2009        *harness.base_url.lock().unwrap() = Some(base.clone());
2010        rt.spawn(async move {
2011            let listener = TokioTcpListener::from_std(std_listener).unwrap();
2012            axum::serve(listener, service).await.unwrap();
2013        });
2014        (base, harness, rt)
2015    }
2016
2017    fn spawn_server_with_download_url() -> (String, Harness, Runtime) {
2018        let harness = Harness::new();
2019        let router = Router::new()
2020            .route("/fs/metadata", get(metadata_handler))
2021            .route("/fs/dir", get(dir_handler).delete(delete_dir_handler))
2022            .route("/fs/file", delete(delete_file_handler))
2023            .route("/fs/canonicalize", get(canonicalize_handler))
2024            .route("/fs/read", get(read_with_download_handler))
2025            .route("/fs/write", put(write_handler))
2026            .route(
2027                "/fs/upload-session/start",
2028                post(upload_session_start_handler),
2029            )
2030            .route(
2031                "/fs/upload-session/chunks",
2032                post(upload_session_chunks_handler),
2033            )
2034            .route(
2035                "/fs/upload-session/complete",
2036                post(upload_session_complete_handler),
2037            )
2038            .route("/fs/mkdir", post(mkdir_handler))
2039            .route("/fs/rename", post(rename_handler))
2040            .route("/fs/set-readonly", post(set_readonly_handler))
2041            .route("/download", get(download_handler))
2042            .route("/data/manifest", get(data_manifest_handler))
2043            .route(
2044                "/data/chunks/upload-targets",
2045                post(data_chunk_upload_targets_handler),
2046            )
2047            .route("/upload-chunk", put(upload_chunk_handler))
2048            .with_state(harness.clone());
2049        let std_listener = StdTcpListener::bind("127.0.0.1:0").unwrap();
2050        std_listener.set_nonblocking(true).expect("nonblocking");
2051        let addr = std_listener.local_addr().unwrap();
2052        let service = router.into_make_service();
2053        let rt = Runtime::new().unwrap();
2054        let base = format!("http://{addr}");
2055        *harness.base_url.lock().unwrap() = Some(base.clone());
2056        rt.spawn(async move {
2057            let listener = TokioTcpListener::from_std(std_listener).unwrap();
2058            axum::serve(listener, service).await.unwrap();
2059        });
2060        (base, harness, rt)
2061    }
2062
2063    fn spawn_server_with_missing_chunk_target() -> (String, Harness, Runtime) {
2064        let harness = Harness::with_omit_last_chunk_target(true);
2065        let router = Router::new()
2066            .route("/fs/metadata", get(metadata_handler))
2067            .route("/fs/dir", get(dir_handler).delete(delete_dir_handler))
2068            .route("/fs/file", delete(delete_file_handler))
2069            .route("/fs/canonicalize", get(canonicalize_handler))
2070            .route("/fs/read", get(read_handler))
2071            .route("/fs/write", put(write_handler))
2072            .route(
2073                "/fs/upload-session/start",
2074                post(upload_session_start_handler),
2075            )
2076            .route(
2077                "/fs/upload-session/chunks",
2078                post(upload_session_chunks_handler),
2079            )
2080            .route(
2081                "/fs/upload-session/complete",
2082                post(upload_session_complete_handler),
2083            )
2084            .route("/fs/mkdir", post(mkdir_handler))
2085            .route("/fs/rename", post(rename_handler))
2086            .route("/fs/set-readonly", post(set_readonly_handler))
2087            .route("/data/manifest", get(data_manifest_handler))
2088            .route(
2089                "/data/chunks/upload-targets",
2090                post(data_chunk_upload_targets_handler),
2091            )
2092            .route("/upload-chunk", put(upload_chunk_handler))
2093            .with_state(harness.clone());
2094        let std_listener = StdTcpListener::bind("127.0.0.1:0").unwrap();
2095        std_listener.set_nonblocking(true).expect("nonblocking");
2096        let addr = std_listener.local_addr().unwrap();
2097        let service = router.into_make_service();
2098        let rt = Runtime::new().unwrap();
2099        let base = format!("http://{addr}");
2100        *harness.base_url.lock().unwrap() = Some(base.clone());
2101        rt.spawn(async move {
2102            let listener = TokioTcpListener::from_std(std_listener).unwrap();
2103            axum::serve(listener, service).await.unwrap();
2104        });
2105        (base, harness, rt)
2106    }
2107
2108    #[test]
2109    fn remote_provider_roundtrip() {
2110        let (base, harness, _rt) = spawn_server();
2111        let provider = RemoteFsProvider::new(RemoteFsConfig {
2112            base_url: base,
2113            auth_token: None,
2114            chunk_bytes: 1024,
2115            parallel_requests: 4,
2116            direct_read_threshold_bytes: u64::MAX,
2117            direct_write_threshold_bytes: 1024,
2118            timeout: Duration::from_secs(30),
2119            ..RemoteFsConfig::default()
2120        })
2121        .expect("provider");
2122
2123        let data = (0..16_384u32)
2124            .flat_map(|v| v.to_le_bytes())
2125            .collect::<Vec<_>>();
2126        executor::block_on(provider.write(Path::new("/reports/data.bin"), &data)).expect("write");
2127        let read_back =
2128            executor::block_on(provider.read(Path::new("/reports/data.bin"))).expect("read");
2129        assert_eq!(data, read_back);
2130        executor::block_on(provider.remove_file(Path::new("/reports/data.bin"))).expect("remove");
2131        assert!(!harness.resolve("/reports/data.bin").exists());
2132    }
2133
2134    #[test]
2135    fn remote_provider_download_url_read() {
2136        let (base, harness, _rt) = spawn_server_with_download_url();
2137        let provider = RemoteFsProvider::new(RemoteFsConfig {
2138            base_url: base,
2139            auth_token: None,
2140            chunk_bytes: 128,
2141            parallel_requests: 2,
2142            direct_read_threshold_bytes: u64::MAX,
2143            timeout: Duration::from_secs(30),
2144            ..RemoteFsConfig::default()
2145        })
2146        .expect("provider");
2147
2148        let data = (0..1024u32)
2149            .flat_map(|v| v.to_le_bytes())
2150            .collect::<Vec<_>>();
2151        std::fs::create_dir_all(harness.resolve("/reports")).expect("mkdir");
2152        std::fs::write(harness.resolve("/reports/data.bin"), &data).expect("write");
2153
2154        let read_back =
2155            executor::block_on(provider.read(Path::new("/reports/data.bin"))).expect("read");
2156        assert_eq!(data, read_back);
2157    }
2158
2159    #[test]
2160    fn remote_provider_errors_when_chunk_target_is_missing() {
2161        let (base, _harness, _rt) = spawn_server_with_missing_chunk_target();
2162        let provider = RemoteFsProvider::new(RemoteFsConfig {
2163            base_url: base,
2164            auth_token: None,
2165            chunk_bytes: 128,
2166            parallel_requests: 2,
2167            direct_read_threshold_bytes: u64::MAX,
2168            direct_write_threshold_bytes: 128,
2169            timeout: Duration::from_secs(30),
2170            ..RemoteFsConfig::default()
2171        })
2172        .expect("provider");
2173
2174        let data = vec![7u8; 1024];
2175        let err =
2176            executor::block_on(provider.write(Path::new("/reports/missing-target.bin"), &data))
2177                .expect_err("write should fail when chunk target is missing");
2178        assert_eq!(err.kind(), io::ErrorKind::Other);
2179    }
2180
2181    #[test]
2182    fn remote_provider_data_manifest_descriptor_fetches_descriptor() {
2183        let (base, _harness, _rt) = spawn_server();
2184        let provider = RemoteFsProvider::new(RemoteFsConfig {
2185            base_url: base,
2186            auth_token: None,
2187            timeout: Duration::from_secs(30),
2188            ..RemoteFsConfig::default()
2189        })
2190        .expect("provider");
2191
2192        let descriptor = provider
2193            .data_manifest_descriptor(&DataManifestRequest {
2194                path: "/datasets/alpha.data".to_string(),
2195                version: Some("v1".to_string()),
2196            })
2197            .expect("manifest descriptor");
2198        assert_eq!(descriptor.schema_version, 1);
2199        assert_eq!(descriptor.format, "runmat-data");
2200        assert_eq!(descriptor.dataset_id, "dataset:/datasets/alpha.data");
2201        assert_eq!(descriptor.updated_at, "2026-01-01T00:00:00Z");
2202        assert_eq!(descriptor.txn_sequence, 9);
2203    }
2204
2205    #[test]
2206    fn remote_provider_data_chunk_upload_targets_fetches_targets() {
2207        let (base, _harness, _rt) = spawn_server();
2208        let provider = RemoteFsProvider::new(RemoteFsConfig {
2209            base_url: base,
2210            auth_token: None,
2211            timeout: Duration::from_secs(30),
2212            ..RemoteFsConfig::default()
2213        })
2214        .expect("provider");
2215
2216        let targets = provider
2217            .data_chunk_upload_targets(&DataChunkUploadRequest {
2218                dataset_path: "/datasets/alpha.data".to_string(),
2219                array: "X".to_string(),
2220                chunks: vec![DataChunkDescriptor {
2221                    key: "X/0-0".to_string(),
2222                    object_id: "obj_0".to_string(),
2223                    hash: "sha256:abc".to_string(),
2224                    bytes_raw: 10,
2225                    bytes_stored: 8,
2226                }],
2227            })
2228            .expect("chunk upload targets");
2229
2230        assert_eq!(targets.len(), 1);
2231        assert_eq!(targets[0].key, "X/0-0");
2232        assert_eq!(targets[0].method, "PUT");
2233        assert_eq!(
2234            targets[0].upload_url,
2235            "https://uploads.example.test/X/obj_0"
2236        );
2237        let headers = &targets[0].headers;
2238        assert_eq!(headers.len(), 2);
2239        assert_eq!(
2240            headers.get("x-runmat-hash").map(String::as_str),
2241            Some("sha256:abc")
2242        );
2243        assert_eq!(
2244            headers.get("x-runmat-object").map(String::as_str),
2245            Some("obj_0")
2246        );
2247    }
2248}