Skip to main content

runmat_filesystem/remote/
native.rs

1use crate::data_contract::{
2    DataChunkUploadRequest, DataChunkUploadTarget, DataManifestDescriptor, DataManifestRequest,
3};
4use crate::{DirEntry, FileHandle, FsFileType, FsMetadata, FsProvider, OpenFlags};
5use async_trait::async_trait;
6use chrono::DateTime;
7use crossbeam_utils::thread;
8use once_cell::sync::Lazy;
9use reqwest::blocking::{Client, Response};
10use reqwest::StatusCode;
11use serde::{Deserialize, Serialize};
12use sha2::{Digest, Sha256};
13use std::collections::HashMap;
14use std::collections::VecDeque;
15use std::fmt;
16use std::io::{self, ErrorKind, Read, Seek, SeekFrom, Write};
17use std::path::{Path, PathBuf};
18use std::sync::{Arc, Mutex};
19use std::time::Duration;
20use url::Url;
21use uuid::Uuid;
22
23const MANIFEST_HASH: &str = "manifest:v1";
24const SHARD_PREFIX: &str = "/.runmat/shards";
25
26static USER_AGENT: Lazy<String> = Lazy::new(|| {
27    format!(
28        "runmat-remote-fs/{} ({})",
29        env!("CARGO_PKG_VERSION"),
30        std::env::consts::OS
31    )
32});
33
34#[derive(Clone, Debug)]
35pub struct RemoteFsConfig {
36    pub base_url: String,
37    pub auth_token: Option<String>,
38    pub chunk_bytes: usize,
39    pub parallel_requests: usize,
40    pub direct_read_threshold_bytes: u64,
41    pub direct_write_threshold_bytes: u64,
42    pub shard_threshold_bytes: u64,
43    pub shard_size_bytes: u64,
44    pub timeout: Duration,
45    pub retry_max_attempts: usize,
46    pub retry_base_delay: Duration,
47    pub retry_max_delay: Duration,
48}
49
50impl Default for RemoteFsConfig {
51    fn default() -> Self {
52        Self {
53            base_url: String::new(),
54            auth_token: None,
55            chunk_bytes: 16 * 1024 * 1024,
56            parallel_requests: 4,
57            direct_read_threshold_bytes: 64 * 1024 * 1024,
58            direct_write_threshold_bytes: 64 * 1024 * 1024,
59            shard_threshold_bytes: 4 * 1024 * 1024 * 1024,
60            shard_size_bytes: 512 * 1024 * 1024,
61            timeout: Duration::from_secs(120),
62            retry_max_attempts: 5,
63            retry_base_delay: Duration::from_millis(100),
64            retry_max_delay: Duration::from_secs(2),
65        }
66    }
67}
68
69struct RemoteInner {
70    client: Client,
71    base: Url,
72    auth_header: Option<String>,
73    chunk_bytes: usize,
74    parallel_requests: usize,
75    direct_read_threshold_bytes: u64,
76    direct_write_threshold_bytes: u64,
77    shard_threshold_bytes: u64,
78    shard_size_bytes: u64,
79    retry_max_attempts: usize,
80    retry_base_delay: Duration,
81    retry_max_delay: Duration,
82}
83
84struct FsWriteChunkRequest<'a> {
85    path: &'a str,
86    offset: u64,
87    truncate: bool,
88    final_chunk: bool,
89    create_new: bool,
90    data: &'a [u8],
91    hash: Option<&'a str>,
92}
93
94struct UploadSessionCompleteParams<'a> {
95    path: &'a str,
96    session_id: &'a str,
97    blob_key: &'a str,
98    size_bytes: u64,
99    content_sha256: &'a str,
100    chunk_count: usize,
101    create_new: bool,
102}
103
104struct MultipartCompleteParams {
105    path: String,
106    session_id: String,
107    blob_key: String,
108    upload_id: String,
109    size_bytes: u64,
110    create_new: bool,
111    parts: Vec<MultipartPart>,
112}
113
114impl RemoteInner {
115    fn new(config: RemoteFsConfig) -> io::Result<Self> {
116        if config.base_url.is_empty() {
117            return Err(io::Error::new(
118                ErrorKind::InvalidInput,
119                "RemoteFsConfig.base_url must be provided",
120            ));
121        }
122        let base = Url::parse(&config.base_url).map_err(map_url_err)?;
123        let client = Client::builder()
124            .timeout(config.timeout)
125            .user_agent(USER_AGENT.clone())
126            .build()
127            .map_err(map_http_err)?;
128        Ok(Self {
129            client,
130            base,
131            auth_header: config.auth_token.map(|token| format!("Bearer {token}")),
132            chunk_bytes: config.chunk_bytes.max(64 * 1024),
133            parallel_requests: config.parallel_requests.max(1),
134            direct_read_threshold_bytes: config.direct_read_threshold_bytes,
135            direct_write_threshold_bytes: config.direct_write_threshold_bytes,
136            shard_threshold_bytes: config.shard_threshold_bytes,
137            shard_size_bytes: config.shard_size_bytes.max(8 * 1024 * 1024),
138            retry_max_attempts: config.retry_max_attempts.max(1),
139            retry_base_delay: config.retry_base_delay,
140            retry_max_delay: config.retry_max_delay,
141        })
142    }
143
144    fn should_retry(&self, status: StatusCode) -> bool {
145        matches!(
146            status,
147            StatusCode::TOO_MANY_REQUESTS
148                | StatusCode::INTERNAL_SERVER_ERROR
149                | StatusCode::BAD_GATEWAY
150                | StatusCode::SERVICE_UNAVAILABLE
151                | StatusCode::GATEWAY_TIMEOUT
152        )
153    }
154
155    fn retry_delay(&self, attempt: usize) -> Duration {
156        let factor = 1u64.checked_shl(attempt as u32).unwrap_or(u64::MAX);
157        let delay = self.retry_base_delay.as_millis() as u64 * factor;
158        let capped = delay.min(self.retry_max_delay.as_millis() as u64);
159        Duration::from_millis(capped)
160    }
161
162    fn send_with_retry(
163        &self,
164        method: reqwest::Method,
165        route: &str,
166        query: &[(&str, String)],
167    ) -> io::Result<Response> {
168        for attempt in 0..self.retry_max_attempts {
169            let resp = self
170                .request(method.clone(), route, query)
171                .send()
172                .map_err(map_http_err)?;
173            if !self.should_retry(resp.status()) || attempt + 1 == self.retry_max_attempts {
174                return Ok(resp);
175            }
176            std::thread::sleep(self.retry_delay(attempt));
177        }
178        Err(io::Error::other("request retries exhausted"))
179    }
180
181    fn get_url_with_retry(&self, url: &str, range: Option<String>) -> io::Result<Response> {
182        for attempt in 0..self.retry_max_attempts {
183            let mut request = self.client.get(url);
184            if let Some(range) = &range {
185                request = request.header(reqwest::header::RANGE, range);
186            }
187            let resp = request.send().map_err(map_http_err)?;
188            if !self.should_retry(resp.status()) || attempt + 1 == self.retry_max_attempts {
189                return Ok(resp);
190            }
191            std::thread::sleep(self.retry_delay(attempt));
192        }
193        Err(io::Error::other("request retries exhausted"))
194    }
195
196    fn endpoint(&self, route: &str) -> Url {
197        self.base
198            .join(route.trim_start_matches('/'))
199            .expect("failed to join remote route")
200    }
201
202    fn request(
203        &self,
204        method: reqwest::Method,
205        route: &str,
206        query: &[(&str, String)],
207    ) -> reqwest::blocking::RequestBuilder {
208        let mut url = self.endpoint(route);
209        {
210            let mut pairs = url.query_pairs_mut();
211            for (k, v) in query {
212                pairs.append_pair(k, v);
213            }
214        }
215        let mut builder = self.client.request(method, url);
216        if let Some(auth) = &self.auth_header {
217            builder = builder.header("Authorization", auth);
218        }
219        builder
220    }
221
222    fn get_json<T: for<'a> Deserialize<'a>>(
223        &self,
224        route: &str,
225        query: &[(&str, String)],
226    ) -> io::Result<T> {
227        let resp = self.send_with_retry(reqwest::Method::GET, route, query)?;
228        handle_error(resp)?.json().map_err(map_http_err)
229    }
230
231    fn post_empty<B: Serialize>(&self, route: &str, body: &B) -> io::Result<()> {
232        let resp = self
233            .request(reqwest::Method::POST, route, &[])
234            .json(body)
235            .send()
236            .map_err(map_http_err)?;
237        handle_error(resp)?;
238        Ok(())
239    }
240
241    fn post_json<T: for<'a> Deserialize<'a>, B: Serialize>(
242        &self,
243        route: &str,
244        body: &B,
245    ) -> io::Result<T> {
246        let resp = self
247            .request(reqwest::Method::POST, route, &[])
248            .json(body)
249            .send()
250            .map_err(map_http_err)?;
251        handle_error(resp)?.json().map_err(map_http_err)
252    }
253
254    fn delete_empty(&self, route: &str, query: &[(&str, String)]) -> io::Result<()> {
255        let resp = self.send_with_retry(reqwest::Method::DELETE, route, query)?;
256        handle_error(resp)?;
257        Ok(())
258    }
259
260    fn download_chunk(&self, path: &str, offset: u64, length: usize) -> io::Result<Vec<u8>> {
261        let resp = self.send_with_retry(
262            reqwest::Method::GET,
263            "/fs/read",
264            &[
265                ("path", path.to_string()),
266                ("offset", offset.to_string()),
267                ("length", length.to_string()),
268            ],
269        )?;
270        let mut body = handle_error(resp)?;
271        if let Some(content_type) = body.headers().get(reqwest::header::CONTENT_TYPE) {
272            if content_type
273                .to_str()
274                .map(|value| value.contains("application/json"))
275                .unwrap_or(false)
276            {
277                let json: DownloadUrlResponse = body.json().map_err(map_http_err)?;
278                return self.download_range_from_url(&json.download_url, offset, length as u64);
279            }
280        }
281        let mut buf = Vec::with_capacity(length);
282        body.copy_to(&mut buf).map_err(map_http_err)?;
283        Ok(buf)
284    }
285
286    fn download_range_from_url(&self, url: &str, offset: u64, length: u64) -> io::Result<Vec<u8>> {
287        if length == 0 {
288            return Ok(Vec::new());
289        }
290        let end = offset + length - 1;
291        let resp = self.get_url_with_retry(url, Some(format!("bytes={offset}-{end}")))?;
292        let mut body = handle_error(resp)?;
293        let mut buf = Vec::with_capacity(length as usize);
294        body.copy_to(&mut buf).map_err(map_http_err)?;
295        Ok(buf)
296    }
297
298    fn fetch_download_url(&self, path: &str) -> io::Result<DownloadUrlResponse> {
299        self.get_json("/fs/download-url", &[("path", path.to_string())])
300    }
301
302    fn upload_chunk(
303        &self,
304        request: FsWriteChunkRequest<'_>,
305    ) -> io::Result<Option<FsWriteSessionResponse>> {
306        let mut query = vec![
307            ("path", request.path.to_string()),
308            ("offset", request.offset.to_string()),
309        ];
310        if request.truncate {
311            query.push(("truncate", "true".to_string()));
312        }
313        if request.final_chunk {
314            query.push(("final", "true".to_string()));
315        }
316        if request.create_new {
317            query.push(("createNew", "true".to_string()));
318        }
319        if let Some(hash) = request.hash {
320            query.push(("hash", hash.to_string()));
321        }
322        let resp = self
323            .request(reqwest::Method::PUT, "/fs/write", &query)
324            .body(request.data.to_vec())
325            .send()
326            .map_err(map_http_err)?;
327        if resp.status() == StatusCode::ACCEPTED {
328            let session = handle_error(resp)?.json().map_err(map_http_err)?;
329            return Ok(Some(session));
330        }
331        handle_error(resp)?;
332        Ok(None)
333    }
334
335    fn multipart_create(&self, path: &str, size_bytes: u64) -> io::Result<MultipartUploadResponse> {
336        self.post_json(
337            "/fs/multipart-upload",
338            &MultipartUploadRequest {
339                path: path.to_string(),
340                size_bytes: size_bytes as i64,
341                content_type: None,
342            },
343        )
344    }
345
346    fn upload_session_start(
347        &self,
348        path: &str,
349        size_bytes: u64,
350        content_sha256: &str,
351    ) -> io::Result<UploadSessionStartResponse> {
352        self.post_json(
353            "/fs/upload-session/start",
354            &UploadSessionStartRequest {
355                path: path.to_string(),
356                size_bytes: size_bytes as i64,
357                content_type: None,
358                content_sha256: content_sha256.to_string(),
359            },
360        )
361    }
362
363    fn upload_session_chunks(
364        &self,
365        session_id: &str,
366        blob_key: &str,
367        chunks: &[UploadSessionChunkDescriptor],
368    ) -> io::Result<UploadSessionChunksResponse> {
369        self.post_json(
370            "/fs/upload-session/chunks",
371            &UploadSessionChunksRequest {
372                session_id: session_id.to_string(),
373                blob_key: blob_key.to_string(),
374                chunks: chunks.to_vec(),
375            },
376        )
377    }
378
379    fn upload_session_complete(&self, params: UploadSessionCompleteParams<'_>) -> io::Result<()> {
380        self.post_empty(
381            "/fs/upload-session/complete",
382            &UploadSessionCompleteRequest {
383                path: params.path.to_string(),
384                session_id: params.session_id.to_string(),
385                blob_key: params.blob_key.to_string(),
386                size_bytes: params.size_bytes as i64,
387                content_sha256: params.content_sha256.to_string(),
388                chunk_count: params.chunk_count,
389                create_new: params.create_new,
390                hash: None,
391            },
392        )
393    }
394
395    fn multipart_presign_part(
396        &self,
397        session_id: &str,
398        blob_key: &str,
399        upload_id: &str,
400        part_number: i32,
401        size_bytes: u64,
402    ) -> io::Result<String> {
403        let response: MultipartUploadPartResponse = self.post_json(
404            "/fs/multipart-upload/part",
405            &MultipartUploadPartRequest {
406                session_id: session_id.to_string(),
407                blob_key: blob_key.to_string(),
408                upload_id: upload_id.to_string(),
409                part_number,
410                size_bytes: size_bytes as i64,
411            },
412        )?;
413        Ok(response.upload_url)
414    }
415
416    fn multipart_complete(&self, params: MultipartCompleteParams) -> io::Result<()> {
417        let resp = self
418            .request(reqwest::Method::POST, "/fs/multipart-upload/complete", &[])
419            .json(&MultipartUploadCompleteRequest {
420                path: params.path,
421                session_id: params.session_id,
422                blob_key: params.blob_key,
423                upload_id: params.upload_id,
424                size_bytes: params.size_bytes as i64,
425                create_new: params.create_new,
426                hash: None,
427                parts: params.parts,
428            })
429            .send()
430            .map_err(map_http_err)?;
431        handle_error(resp)?;
432        Ok(())
433    }
434
435    fn put_upload_url_with_etag(&self, url: &str, data: &[u8]) -> io::Result<String> {
436        let resp = self
437            .client
438            .put(url)
439            .body(data.to_vec())
440            .send()
441            .map_err(map_http_err)?;
442        let resp = handle_error(resp)?;
443        let etag = resp
444            .headers()
445            .get(reqwest::header::ETAG)
446            .and_then(|value| value.to_str().ok())
447            .unwrap_or("")
448            .to_string();
449        Ok(etag)
450    }
451
452    fn put_upload_target(
453        &self,
454        method: &str,
455        url: &str,
456        headers: &HashMap<String, String>,
457        data: &[u8],
458    ) -> io::Result<()> {
459        let method = reqwest::Method::from_bytes(method.as_bytes()).map_err(|err| {
460            io::Error::new(
461                ErrorKind::InvalidInput,
462                format!("invalid upload method `{method}`: {err}"),
463            )
464        })?;
465        let mut request = self.client.request(method, url).body(data.to_vec());
466        for (name, value) in headers {
467            request = request.header(name, value);
468        }
469        let resp = request.send().map_err(map_http_err)?;
470        handle_error(resp)?;
471        Ok(())
472    }
473
474    fn fetch_metadata(&self, path: &str) -> io::Result<MetadataResponse> {
475        self.get_json("/fs/metadata", &[("path", path.to_string())])
476    }
477
478    fn fetch_dir(&self, path: &str) -> io::Result<Vec<DirEntryResponse>> {
479        self.get_json("/fs/dir", &[("path", path.to_string())])
480    }
481
482    fn canonicalize_path(&self, path: &str) -> io::Result<String> {
483        let resp: CanonicalizeResponse =
484            self.get_json("/fs/canonicalize", &[("path", path.to_string())])?;
485        Ok(resp.path)
486    }
487}
488
489pub struct RemoteFsProvider {
490    inner: Arc<RemoteInner>,
491}
492
493impl RemoteFsProvider {
494    pub fn new(config: RemoteFsConfig) -> io::Result<Self> {
495        Ok(Self {
496            inner: Arc::new(RemoteInner::new(config)?),
497        })
498    }
499
500    /// Fetch a dataset manifest descriptor through the provider-neutral data contract endpoint.
501    pub fn data_manifest_descriptor(
502        &self,
503        request: &DataManifestRequest,
504    ) -> io::Result<DataManifestDescriptor> {
505        let mut query = vec![("path", request.path.clone())];
506        if let Some(version) = &request.version {
507            query.push(("version", version.clone()));
508        }
509        self.inner.get_json("/data/manifest", &query)
510    }
511
512    /// Request direct upload targets for dataset chunk objects.
513    pub fn data_chunk_upload_targets(
514        &self,
515        request: &DataChunkUploadRequest,
516    ) -> io::Result<Vec<DataChunkUploadTarget>> {
517        #[derive(Deserialize)]
518        struct DataChunkUploadTargetsResponse {
519            targets: Vec<DataChunkUploadTarget>,
520        }
521        let response: DataChunkUploadTargetsResponse = self
522            .inner
523            .post_json("/data/chunks/upload-targets", request)?;
524        Ok(response.targets)
525    }
526
527    fn normalize(&self, path: &Path) -> String {
528        let mut normalized = PathBuf::new();
529        normalized.push("/");
530        normalized.push(path);
531        normalized.to_string_lossy().replace('\\', "/").to_string()
532    }
533
534    fn ensure_parent_exists(&self, path: &Path) -> io::Result<()> {
535        if let Some(parent) = path.parent() {
536            self.inner.post_empty(
537                "/fs/mkdir",
538                &CreateDirRequest {
539                    path: self.normalize(parent),
540                    recursive: true,
541                },
542            )?;
543        }
544        Ok(())
545    }
546
547    fn download_raw_file(&self, path: &str, len: u64) -> io::Result<Vec<u8>> {
548        if len == 0 {
549            return Ok(Vec::new());
550        }
551        if len >= self.inner.direct_read_threshold_bytes {
552            let url = self.inner.fetch_download_url(path)?.download_url;
553            return self.download_entire_file_from_url(&url, len);
554        }
555        let chunk = self.inner.chunk_bytes as u64;
556        let mut tasks = Vec::new();
557        let mut offset = 0;
558        let mut index = 0;
559        while offset < len {
560            let remaining = len - offset;
561            let length = std::cmp::min(chunk, remaining);
562            tasks.push(ChunkTask {
563                offset,
564                length: length as usize,
565                index,
566            });
567            offset += length;
568            index += 1;
569        }
570        let mut buffer = vec![0u8; len as usize];
571        let path = path.to_string();
572        let inner = self.inner.clone();
573        let queue = Arc::new(Mutex::new(VecDeque::from(tasks.clone())));
574        let results = Arc::new(Mutex::new(vec![None::<Vec<u8>>; tasks.len()]));
575        let error: Arc<Mutex<Option<io::Error>>> = Arc::new(Mutex::new(None));
576        let threads = inner
577            .parallel_requests
578            .min(queue.lock().unwrap().len())
579            .max(1);
580        thread::scope(|scope| {
581            for _ in 0..threads {
582                let queue = queue.clone();
583                let error = error.clone();
584                let results = results.clone();
585                let inner = inner.clone();
586                let path = path.clone();
587                scope.spawn(move |_| loop {
588                    let task_opt = {
589                        let mut guard = queue.lock().unwrap();
590                        guard.pop_front()
591                    };
592                    let task = match task_opt {
593                        Some(task) => task,
594                        None => break,
595                    };
596                    match inner.download_chunk(&path, task.offset, task.length) {
597                        Ok(bytes) => {
598                            let mut guard = results.lock().unwrap();
599                            guard[task.index] = Some(bytes);
600                        }
601                        Err(err) => {
602                            let mut guard = error.lock().unwrap();
603                            if guard.is_none() {
604                                *guard = Some(err);
605                            }
606                            break;
607                        }
608                    }
609                });
610            }
611        })
612        .expect("remote download scope");
613        if let Some(err) = error.lock().unwrap().take() {
614            return Err(err);
615        }
616        let chunks = Arc::try_unwrap(results)
617            .expect("results still in use")
618            .into_inner()
619            .expect("results poisoned");
620        for (task, maybe) in tasks.iter().zip(chunks.into_iter()) {
621            let bytes = maybe.expect("missing downloaded chunk for remote file");
622            let start = task.offset as usize;
623            buffer[start..start + bytes.len()].copy_from_slice(&bytes);
624        }
625        Ok(buffer)
626    }
627
628    fn download_sharded_file(&self, path: &str, len: u64) -> io::Result<Vec<u8>> {
629        let manifest_bytes = self.download_raw_file(path, len)?;
630        let manifest: ShardManifest = serde_json::from_slice(&manifest_bytes)
631            .map_err(|_| io::Error::new(ErrorKind::InvalidData, "invalid shard manifest"))?;
632        if manifest.version != 1 {
633            return Err(io::Error::new(
634                ErrorKind::InvalidData,
635                "unsupported manifest",
636            ));
637        }
638        let mut buffer = Vec::with_capacity(manifest.total_size as usize);
639        for shard in manifest.shards {
640            let meta = self.inner.fetch_metadata(&shard.path)?;
641            let bytes = self.download_raw_file(&shard.path, meta.len)?;
642            buffer.extend_from_slice(&bytes);
643        }
644        Ok(buffer)
645    }
646
647    fn download_entire_file_from_url(&self, url: &str, len: u64) -> io::Result<Vec<u8>> {
648        let chunk = self.inner.chunk_bytes as u64;
649        let mut tasks = Vec::new();
650        let mut offset = 0;
651        let mut index = 0;
652        while offset < len {
653            let remaining = len - offset;
654            let length = std::cmp::min(chunk, remaining);
655            tasks.push(ChunkTask {
656                offset,
657                length: length as usize,
658                index,
659            });
660            offset += length;
661            index += 1;
662        }
663        let mut buffer = vec![0u8; len as usize];
664        let url = url.to_string();
665        let inner = self.inner.clone();
666        let queue = Arc::new(Mutex::new(VecDeque::from(tasks.clone())));
667        let results = Arc::new(Mutex::new(vec![None::<Vec<u8>>; tasks.len()]));
668        let error: Arc<Mutex<Option<io::Error>>> = Arc::new(Mutex::new(None));
669        let threads = inner
670            .parallel_requests
671            .min(queue.lock().unwrap().len())
672            .max(1);
673        thread::scope(|scope| {
674            for _ in 0..threads {
675                let queue = queue.clone();
676                let error = error.clone();
677                let results = results.clone();
678                let inner = inner.clone();
679                let url = url.clone();
680                scope.spawn(move |_| loop {
681                    let task_opt = {
682                        let mut guard = queue.lock().unwrap();
683                        guard.pop_front()
684                    };
685                    let task = match task_opt {
686                        Some(task) => task,
687                        None => break,
688                    };
689                    match inner.download_range_from_url(&url, task.offset, task.length as u64) {
690                        Ok(bytes) => {
691                            let mut guard = results.lock().unwrap();
692                            guard[task.index] = Some(bytes);
693                        }
694                        Err(err) => {
695                            let mut guard = error.lock().unwrap();
696                            if guard.is_none() {
697                                *guard = Some(err);
698                            }
699                            break;
700                        }
701                    }
702                });
703            }
704        })
705        .expect("remote download scope");
706        if let Some(err) = error.lock().unwrap().take() {
707            return Err(err);
708        }
709        let chunks = Arc::try_unwrap(results)
710            .expect("results still in use")
711            .into_inner()
712            .expect("results poisoned");
713        for (task, maybe) in tasks.iter().zip(chunks.into_iter()) {
714            let bytes = maybe.expect("missing downloaded chunk for remote file");
715            let start = task.offset as usize;
716            buffer[start..start + bytes.len()].copy_from_slice(&bytes);
717        }
718        Ok(buffer)
719    }
720
721    fn upload_entire_file(&self, path: &str, data: &[u8], create_new: bool) -> io::Result<()> {
722        if data.len() as u64 >= self.inner.shard_threshold_bytes {
723            return self.upload_sharded_file(path, data, create_new);
724        }
725        self.upload_unsharded_file(path, data, create_new, None)
726    }
727
728    fn upload_unsharded_file(
729        &self,
730        path: &str,
731        data: &[u8],
732        create_new: bool,
733        hash: Option<&str>,
734    ) -> io::Result<()> {
735        if data.len() as u64 >= self.inner.direct_write_threshold_bytes {
736            return self.upload_multipart_file(path, data, create_new);
737        }
738        if data.is_empty() {
739            self.inner.upload_chunk(FsWriteChunkRequest {
740                path,
741                offset: 0,
742                truncate: true,
743                final_chunk: true,
744                create_new,
745                data,
746                hash,
747            })?;
748            return Ok(());
749        }
750        let chunk = self.inner.chunk_bytes;
751        let mut offset = 0usize;
752        while offset < data.len() {
753            let end = std::cmp::min(offset + chunk, data.len());
754            let slice = &data[offset..end];
755            let truncate = offset == 0;
756            let final_chunk = end == data.len();
757            let result = self.inner.upload_chunk(FsWriteChunkRequest {
758                path,
759                offset: offset as u64,
760                truncate,
761                final_chunk,
762                create_new: create_new && truncate,
763                data: slice,
764                hash,
765            })?;
766            if let Some(session) = result {
767                let expected = offset as u64 + slice.len() as u64;
768                if session.next_offset as u64 != expected {
769                    return Err(io::Error::other("unexpected next offset"));
770                }
771            }
772            offset = end;
773        }
774        Ok(())
775    }
776
777    fn upload_sharded_file(&self, path: &str, data: &[u8], create_new: bool) -> io::Result<()> {
778        let shard_size = self.inner.shard_size_bytes as usize;
779        let mut shards = Vec::new();
780        let mut offset = 0usize;
781        while offset < data.len() {
782            let end = std::cmp::min(offset + shard_size, data.len());
783            let slice = &data[offset..end];
784            let shard_path = format!("{}/{}", SHARD_PREFIX, Uuid::new_v4());
785            self.upload_unsharded_file(&shard_path, slice, false, None)?;
786            shards.push(ShardEntry {
787                path: shard_path,
788                size: slice.len() as u64,
789            });
790            offset = end;
791        }
792        let manifest = ShardManifest {
793            version: 1,
794            total_size: data.len() as u64,
795            shard_size: self.inner.shard_size_bytes,
796            shards,
797        };
798        let bytes = serde_json::to_vec(&manifest)
799            .map_err(|_| io::Error::new(ErrorKind::InvalidData, "invalid manifest"))?;
800        self.upload_unsharded_file(path, &bytes, create_new, Some(MANIFEST_HASH))
801    }
802
803    fn upload_multipart_file(&self, path: &str, data: &[u8], create_new: bool) -> io::Result<()> {
804        if data.is_empty() {
805            self.inner.upload_chunk(FsWriteChunkRequest {
806                path,
807                offset: 0,
808                truncate: true,
809                final_chunk: true,
810                create_new,
811                data,
812                hash: None,
813            })?;
814            return Ok(());
815        }
816        let content_sha256 = sha256_hex(data);
817        let session =
818            match self
819                .inner
820                .upload_session_start(path, data.len() as u64, &content_sha256)
821            {
822                Ok(session) => session,
823                Err(err) if err.kind() == ErrorKind::NotFound => {
824                    return self.upload_multipart_file_legacy(path, data, create_new);
825                }
826                Err(err) => return Err(err),
827            };
828        let chunk_size = (session.chunk_size_bytes as usize).max(1);
829        let mut tasks = Vec::new();
830        let mut offset = 0usize;
831        let mut index = 0usize;
832        while offset < data.len() {
833            let end = std::cmp::min(offset + chunk_size, data.len());
834            let slice = &data[offset..end];
835            tasks.push(UploadTask {
836                index,
837                offset,
838                length: end - offset,
839                chunk_sha256: sha256_hex(slice),
840            });
841            offset = end;
842            index += 1;
843        }
844        let descriptors: Vec<UploadSessionChunkDescriptor> = tasks
845            .iter()
846            .map(|task| UploadSessionChunkDescriptor {
847                chunk_index: task.index,
848                offset_bytes: task.offset as i64,
849                size_bytes: task.length as i64,
850                chunk_sha256: task.chunk_sha256.clone(),
851            })
852            .collect();
853        let chunk_response = self.inner.upload_session_chunks(
854            &session.session_id,
855            &session.blob_key,
856            &descriptors,
857        )?;
858        let targets = Arc::new(chunk_response.targets);
859        let tasks = Arc::new(Mutex::new(VecDeque::from(tasks)));
860        let error = Arc::new(Mutex::new(None));
861        let data = Arc::new(data.to_vec());
862        thread::scope(|scope| {
863            for _ in 0..self.inner.parallel_requests {
864                let tasks = Arc::clone(&tasks);
865                let targets = Arc::clone(&targets);
866                let error = Arc::clone(&error);
867                let inner = Arc::clone(&self.inner);
868                let data = Arc::clone(&data);
869                scope.spawn(move |_| loop {
870                    let task = {
871                        let mut guard = tasks.lock().unwrap();
872                        guard.pop_front()
873                    };
874                    let Some(task) = task else { break };
875                    let target = targets
876                        .iter()
877                        .find(|target| target.chunk_index == task.index)
878                        .cloned();
879                    let result = (|| {
880                        let target = target.ok_or_else(|| {
881                            io::Error::other(format!(
882                                "missing upload target for chunk {}",
883                                task.index
884                            ))
885                        })?;
886                        let slice = &data[task.offset..task.offset + task.length];
887                        inner.put_upload_target(
888                            &target.method,
889                            &target.upload_url,
890                            &target.headers,
891                            slice,
892                        )
893                    })();
894                    if let Err(err) = result {
895                        let mut err_guard = error.lock().unwrap();
896                        if err_guard.is_none() {
897                            *err_guard = Some(io::Error::new(err.kind(), err.to_string()));
898                        }
899                        break;
900                    }
901                });
902            }
903        })
904        .expect("upload session scope");
905        if let Some(err) = error.lock().unwrap().take() {
906            return Err(err);
907        }
908        self.inner
909            .upload_session_complete(UploadSessionCompleteParams {
910                path,
911                session_id: &session.session_id,
912                blob_key: &session.blob_key,
913                size_bytes: data.len() as u64,
914                content_sha256: &content_sha256,
915                chunk_count: descriptors.len(),
916                create_new,
917            })?;
918        Ok(())
919    }
920
921    fn upload_multipart_file_legacy(
922        &self,
923        path: &str,
924        data: &[u8],
925        create_new: bool,
926    ) -> io::Result<()> {
927        let session = self.inner.multipart_create(path, data.len() as u64)?;
928        let part_size = session.part_size_bytes as usize;
929        let mut tasks = std::collections::VecDeque::new();
930        let mut offset = 0usize;
931        let mut part_number = 1;
932        let mut index = 0usize;
933        while offset < data.len() {
934            let end = std::cmp::min(offset + part_size, data.len());
935            let length = end - offset;
936            tasks.push_back(MultipartTask {
937                index,
938                part_number,
939                offset,
940                length,
941            });
942            offset = end;
943            part_number += 1;
944            index += 1;
945        }
946        let tasks = Arc::new(Mutex::new(tasks));
947        let task_len = tasks.lock().unwrap().len();
948        let mut result_vec: Vec<MultipartResult> = Vec::with_capacity(task_len);
949        for _ in 0..task_len {
950            result_vec.push(None);
951        }
952        let results = Arc::new(Mutex::new(result_vec));
953        let error = Arc::new(Mutex::new(None));
954        let data = Arc::new(data.to_vec());
955        thread::scope(|scope| {
956            for _ in 0..self.inner.parallel_requests {
957                let tasks = Arc::clone(&tasks);
958                let results = Arc::clone(&results);
959                let error = Arc::clone(&error);
960                let inner = Arc::clone(&self.inner);
961                let blob_key = session.blob_key.clone();
962                let upload_id = session.upload_id.clone();
963                let session_id = session.session_id.clone();
964                let data = Arc::clone(&data);
965                scope.spawn(move |_| loop {
966                    let task = {
967                        let mut guard = tasks.lock().unwrap();
968                        guard.pop_front()
969                    };
970                    let Some(task) = task else { break };
971                    let slice = &data[task.offset..task.offset + task.length];
972                    let result = (|| {
973                        let url = inner.multipart_presign_part(
974                            &session_id,
975                            &blob_key,
976                            &upload_id,
977                            task.part_number,
978                            task.length as u64,
979                        )?;
980                        let etag = inner.put_upload_url_with_etag(&url, slice)?;
981                        Ok(MultipartPart {
982                            part_number: task.part_number,
983                            etag,
984                        })
985                    })();
986                    let mut guard = results.lock().unwrap();
987                    guard[task.index] = Some(result);
988                    if let Some(Err(err)) = guard[task.index].as_ref() {
989                        let mut err_guard = error.lock().unwrap();
990                        if err_guard.is_none() {
991                            *err_guard = Some(io::Error::new(err.kind(), err.to_string()));
992                        }
993                        break;
994                    }
995                });
996            }
997        })
998        .expect("multipart upload scope");
999        if let Some(err) = error.lock().unwrap().take() {
1000            return Err(err);
1001        }
1002        let mut parts = Vec::with_capacity(results.lock().unwrap().len());
1003        for maybe in Arc::try_unwrap(results)
1004            .expect("results still in use")
1005            .into_inner()
1006            .expect("results poisoned")
1007        {
1008            let part = maybe.expect("missing part result")?;
1009            if part.etag.is_empty() {
1010                return Err(io::Error::other("missing etag"));
1011            }
1012            parts.push(part);
1013        }
1014        parts.sort_by_key(|part| part.part_number);
1015        self.inner.multipart_complete(MultipartCompleteParams {
1016            path: path.to_string(),
1017            session_id: session.session_id,
1018            blob_key: session.blob_key,
1019            upload_id: session.upload_id,
1020            size_bytes: data.len() as u64,
1021            create_new,
1022            parts,
1023        })?;
1024        Ok(())
1025    }
1026}
1027
1028#[derive(Clone, Copy)]
1029struct ChunkTask {
1030    offset: u64,
1031    length: usize,
1032    index: usize,
1033}
1034
1035type MultipartResult = Option<Result<MultipartPart, io::Error>>;
1036
1037#[derive(Clone, Copy)]
1038struct MultipartTask {
1039    index: usize,
1040    part_number: i32,
1041    offset: usize,
1042    length: usize,
1043}
1044
1045#[derive(Clone)]
1046struct UploadTask {
1047    index: usize,
1048    offset: usize,
1049    length: usize,
1050    chunk_sha256: String,
1051}
1052
1053#[async_trait(?Send)]
1054impl FsProvider for RemoteFsProvider {
1055    fn current_dir_override(&self) -> Option<PathBuf> {
1056        Some(PathBuf::from("/"))
1057    }
1058
1059    fn open(&self, path: &Path, flags: &OpenFlags) -> io::Result<Box<dyn FileHandle>> {
1060        let normalized = self.normalize(path);
1061        let mut data = Vec::new();
1062        let mut metadata = FsMetadata::new(FsFileType::File, 0, None, false);
1063        let mut content_loaded = false;
1064        let mut metadata_hash_valid = true;
1065        if flags.read || flags.write || flags.append || !flags.create || flags.create_new {
1066            match self.inner.fetch_metadata(&normalized) {
1067                Ok(meta) => {
1068                    if meta.file_type != "file" {
1069                        return Err(io::Error::other("remote path is not a file"));
1070                    }
1071                    if flags.create_new {
1072                        return Err(io::Error::new(
1073                            ErrorKind::AlreadyExists,
1074                            format!("File already exists: {}", path.display()),
1075                        ));
1076                    }
1077                    if flags.read || flags.append {
1078                        data = if meta.hash.as_deref() == Some(MANIFEST_HASH) {
1079                            self.download_sharded_file(&normalized, meta.len)?
1080                        } else {
1081                            self.download_raw_file(&normalized, meta.len)?
1082                        };
1083                        content_loaded = true;
1084                    }
1085                    metadata = meta.into();
1086                }
1087                Err(err)
1088                    if err.kind() == ErrorKind::NotFound && (flags.create || flags.create_new) => {}
1089                Err(err) => return Err(err),
1090            }
1091        }
1092        if flags.truncate {
1093            data.clear();
1094            content_loaded = true;
1095            metadata_hash_valid = false;
1096        }
1097        if flags.create {
1098            self.ensure_parent_exists(path)?;
1099        }
1100        let handle = RemoteFileHandle {
1101            provider: self.clone(),
1102            path: normalized,
1103            data,
1104            metadata,
1105            content_loaded,
1106            metadata_hash_valid,
1107            create_new_pending: flags.create_new,
1108            position: 0,
1109            flags: flags.clone(),
1110            dirty: false,
1111        };
1112        Ok(Box::new(handle))
1113    }
1114
1115    async fn read(&self, path: &Path) -> io::Result<Vec<u8>> {
1116        let normalized = self.normalize(path);
1117        let meta = self.inner.fetch_metadata(&normalized)?;
1118        if meta.file_type != "file" {
1119            return Err(io::Error::other("remote path is not a file"));
1120        }
1121        if meta.hash.as_deref() == Some(MANIFEST_HASH) {
1122            self.download_sharded_file(&normalized, meta.len)
1123        } else {
1124            self.download_raw_file(&normalized, meta.len)
1125        }
1126    }
1127
1128    async fn write(&self, path: &Path, data: &[u8]) -> io::Result<()> {
1129        let normalized = self.normalize(path);
1130        self.ensure_parent_exists(path)?;
1131        self.upload_entire_file(&normalized, data, false)
1132    }
1133
1134    async fn remove_file(&self, path: &Path) -> io::Result<()> {
1135        let normalized = self.normalize(path);
1136        self.inner
1137            .delete_empty("/fs/file", &[("path", normalized)])?;
1138        Ok(())
1139    }
1140
1141    async fn metadata(&self, path: &Path) -> io::Result<FsMetadata> {
1142        let normalized = self.normalize(path);
1143        let resp = self.inner.fetch_metadata(&normalized)?;
1144        Ok(resp.into())
1145    }
1146
1147    async fn symlink_metadata(&self, path: &Path) -> io::Result<FsMetadata> {
1148        self.metadata(path).await
1149    }
1150
1151    async fn read_dir(&self, path: &Path) -> io::Result<Vec<DirEntry>> {
1152        let normalized = self.normalize(path);
1153        let resp = self.inner.fetch_dir(&normalized)?;
1154        Ok(resp
1155            .into_iter()
1156            .map(|entry| DirEntry {
1157                path: PathBuf::from(entry.path),
1158                file_name: entry.file_name.into(),
1159                file_type: entry.file_type.into(),
1160            })
1161            .collect())
1162    }
1163
1164    async fn canonicalize(&self, path: &Path) -> io::Result<PathBuf> {
1165        let normalized = self.normalize(path);
1166        let canonical = self.inner.canonicalize_path(&normalized)?;
1167        Ok(PathBuf::from(canonical))
1168    }
1169
1170    async fn create_dir(&self, path: &Path) -> io::Result<()> {
1171        let normalized = self.normalize(path);
1172        self.inner.post_empty(
1173            "/fs/mkdir",
1174            &CreateDirRequest {
1175                path: normalized,
1176                recursive: false,
1177            },
1178        )
1179    }
1180
1181    async fn create_dir_all(&self, path: &Path) -> io::Result<()> {
1182        let normalized = self.normalize(path);
1183        self.inner.post_empty(
1184            "/fs/mkdir",
1185            &CreateDirRequest {
1186                path: normalized,
1187                recursive: true,
1188            },
1189        )
1190    }
1191
1192    async fn remove_dir(&self, path: &Path) -> io::Result<()> {
1193        let normalized = self.normalize(path);
1194        self.inner.delete_empty(
1195            "/fs/dir",
1196            &[("path", normalized), ("recursive", "false".into())],
1197        )
1198    }
1199
1200    async fn remove_dir_all(&self, path: &Path) -> io::Result<()> {
1201        let normalized = self.normalize(path);
1202        self.inner.delete_empty(
1203            "/fs/dir",
1204            &[("path", normalized), ("recursive", "true".into())],
1205        )
1206    }
1207
1208    async fn rename(&self, from: &Path, to: &Path) -> io::Result<()> {
1209        self.inner.post_empty(
1210            "/fs/rename",
1211            &RenameRequest {
1212                from: self.normalize(from),
1213                to: self.normalize(to),
1214            },
1215        )
1216    }
1217
1218    async fn set_readonly(&self, path: &Path, readonly: bool) -> io::Result<()> {
1219        self.inner.post_empty(
1220            "/fs/set-readonly",
1221            &SetReadonlyRequest {
1222                path: self.normalize(path),
1223                readonly,
1224            },
1225        )
1226    }
1227
1228    async fn data_manifest_descriptor(
1229        &self,
1230        request: &DataManifestRequest,
1231    ) -> io::Result<DataManifestDescriptor> {
1232        let mut query = vec![("path", request.path.clone())];
1233        if let Some(version) = &request.version {
1234            query.push(("version", version.clone()));
1235        }
1236        self.inner.get_json("/data/manifest", &query)
1237    }
1238
1239    async fn data_chunk_upload_targets(
1240        &self,
1241        request: &DataChunkUploadRequest,
1242    ) -> io::Result<Vec<DataChunkUploadTarget>> {
1243        #[derive(Deserialize)]
1244        struct DataChunkUploadTargetsResponse {
1245            targets: Vec<DataChunkUploadTarget>,
1246        }
1247        let response: DataChunkUploadTargetsResponse = self
1248            .inner
1249            .post_json("/data/chunks/upload-targets", request)?;
1250        Ok(response.targets)
1251    }
1252
1253    async fn data_upload_chunk(
1254        &self,
1255        target: &DataChunkUploadTarget,
1256        data: &[u8],
1257    ) -> io::Result<()> {
1258        self.inner
1259            .put_upload_target(&target.method, &target.upload_url, &target.headers, data)
1260    }
1261}
1262
1263impl Clone for RemoteFsProvider {
1264    fn clone(&self) -> Self {
1265        Self {
1266            inner: Arc::clone(&self.inner),
1267        }
1268    }
1269}
1270
1271struct RemoteFileHandle {
1272    provider: RemoteFsProvider,
1273    path: String,
1274    data: Vec<u8>,
1275    metadata: FsMetadata,
1276    content_loaded: bool,
1277    metadata_hash_valid: bool,
1278    create_new_pending: bool,
1279    position: usize,
1280    flags: OpenFlags,
1281    dirty: bool,
1282}
1283
1284impl RemoteFileHandle {
1285    fn flush_remote(&mut self) -> io::Result<()> {
1286        if !self.dirty {
1287            return Ok(());
1288        }
1289        self.provider
1290            .upload_entire_file(&self.path, &self.data, self.create_new_pending)?;
1291        self.dirty = false;
1292        self.content_loaded = true;
1293        self.create_new_pending = false;
1294        Ok(())
1295    }
1296
1297    fn metadata_len(&self) -> u64 {
1298        if self.dirty || self.content_loaded {
1299            self.data.len() as u64
1300        } else {
1301            self.metadata.len()
1302        }
1303    }
1304
1305    fn metadata_hash(&self) -> Option<String> {
1306        if self.metadata_hash_valid {
1307            self.metadata.hash().map(str::to_owned)
1308        } else {
1309            None
1310        }
1311    }
1312}
1313
1314impl Read for RemoteFileHandle {
1315    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
1316        let remaining = &self.data[self.position..];
1317        let amt = remaining.len().min(buf.len());
1318        buf[..amt].copy_from_slice(&remaining[..amt]);
1319        self.position += amt;
1320        Ok(amt)
1321    }
1322}
1323
1324impl Write for RemoteFileHandle {
1325    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1326        if !self.flags.write && !self.flags.append {
1327            return Err(io::Error::new(
1328                ErrorKind::PermissionDenied,
1329                "remote file not opened for writing",
1330            ));
1331        }
1332        if self.flags.append {
1333            self.position = self.data.len();
1334        }
1335        let required = self.position + buf.len();
1336        if required > self.data.len() {
1337            self.data.resize(required, 0);
1338        }
1339        self.data[self.position..self.position + buf.len()].copy_from_slice(buf);
1340        self.position += buf.len();
1341        self.dirty = true;
1342        self.metadata_hash_valid = false;
1343        Ok(buf.len())
1344    }
1345
1346    fn flush(&mut self) -> io::Result<()> {
1347        self.flush_remote()
1348    }
1349}
1350
1351impl Seek for RemoteFileHandle {
1352    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
1353        let new_pos = match pos {
1354            SeekFrom::Start(offset) => offset as i64,
1355            SeekFrom::End(offset) => self.data.len() as i64 + offset,
1356            SeekFrom::Current(offset) => self.position as i64 + offset,
1357        };
1358        if new_pos < 0 {
1359            return Err(io::Error::new(ErrorKind::InvalidInput, "seek before start"));
1360        }
1361        self.position = new_pos as usize;
1362        Ok(self.position as u64)
1363    }
1364}
1365
1366#[async_trait(?Send)]
1367impl FileHandle for RemoteFileHandle {
1368    async fn metadata_async(&self) -> io::Result<FsMetadata> {
1369        Ok(FsMetadata::new_with_hash(
1370            self.metadata.file_type(),
1371            self.metadata_len(),
1372            self.metadata.modified(),
1373            self.metadata.is_readonly(),
1374            self.metadata_hash(),
1375        ))
1376    }
1377}
1378
1379impl Drop for RemoteFileHandle {
1380    fn drop(&mut self) {
1381        if self.dirty {
1382            if let Err(err) = self.flush_remote() {
1383                eprintln!("remote fs flush failed: {err}");
1384            }
1385        }
1386    }
1387}
1388
1389#[derive(Debug, Deserialize)]
1390struct MetadataResponse {
1391    #[serde(rename = "fileType")]
1392    file_type: String,
1393    len: u64,
1394    #[serde(rename = "modifiedAt")]
1395    modified_at: Option<String>,
1396    readonly: bool,
1397    hash: Option<String>,
1398}
1399
1400impl From<MetadataResponse> for FsMetadata {
1401    fn from(value: MetadataResponse) -> Self {
1402        FsMetadata::new_with_hash(
1403            value.file_type.into(),
1404            value.len,
1405            parse_modified_at(value.modified_at.as_deref()),
1406            value.readonly,
1407            value.hash,
1408        )
1409    }
1410}
1411
1412#[derive(Debug, Deserialize)]
1413struct DirEntryResponse {
1414    path: String,
1415    #[serde(rename = "fileName")]
1416    file_name: String,
1417    #[serde(rename = "fileType")]
1418    file_type: String,
1419}
1420
1421impl From<String> for FsFileType {
1422    fn from(value: String) -> Self {
1423        match value.as_str() {
1424            "dir" => FsFileType::Directory,
1425            "file" => FsFileType::File,
1426            "symlink" => FsFileType::Symlink,
1427            "other" => FsFileType::Other,
1428            _ => FsFileType::Unknown,
1429        }
1430    }
1431}
1432
1433#[derive(Debug, Serialize, Deserialize)]
1434struct CreateDirRequest {
1435    path: String,
1436    recursive: bool,
1437}
1438
1439#[derive(Debug, Serialize, Deserialize)]
1440struct RenameRequest {
1441    from: String,
1442    to: String,
1443}
1444
1445#[derive(Debug, Serialize, Deserialize)]
1446struct SetReadonlyRequest {
1447    path: String,
1448    readonly: bool,
1449}
1450
1451#[derive(Debug, Deserialize)]
1452struct CanonicalizeResponse {
1453    path: String,
1454}
1455
1456#[derive(Debug, Deserialize)]
1457struct DownloadUrlResponse {
1458    #[serde(rename = "downloadUrl")]
1459    download_url: String,
1460}
1461
1462#[derive(Debug, Serialize, Deserialize)]
1463#[serde(rename_all = "camelCase")]
1464struct UploadSessionStartRequest {
1465    path: String,
1466    size_bytes: i64,
1467    content_type: Option<String>,
1468    content_sha256: String,
1469}
1470
1471#[derive(Debug, Serialize, Deserialize)]
1472#[serde(rename_all = "camelCase")]
1473struct UploadSessionStartResponse {
1474    session_id: String,
1475    blob_key: String,
1476    chunk_size_bytes: i64,
1477}
1478
1479#[derive(Debug, Clone, Serialize, Deserialize)]
1480#[serde(rename_all = "camelCase")]
1481struct UploadSessionChunkDescriptor {
1482    chunk_index: usize,
1483    offset_bytes: i64,
1484    size_bytes: i64,
1485    chunk_sha256: String,
1486}
1487
1488#[derive(Debug, Serialize, Deserialize)]
1489#[serde(rename_all = "camelCase")]
1490struct UploadSessionChunksRequest {
1491    session_id: String,
1492    blob_key: String,
1493    chunks: Vec<UploadSessionChunkDescriptor>,
1494}
1495
1496#[derive(Debug, Serialize, Deserialize)]
1497#[serde(rename_all = "camelCase")]
1498struct UploadSessionChunksResponse {
1499    targets: Vec<UploadChunkTarget>,
1500}
1501
1502#[derive(Debug, Clone, Serialize, Deserialize)]
1503#[serde(rename_all = "camelCase")]
1504struct UploadChunkTarget {
1505    chunk_index: usize,
1506    method: String,
1507    upload_url: String,
1508    headers: HashMap<String, String>,
1509}
1510
1511#[derive(Debug, Serialize, Deserialize)]
1512#[serde(rename_all = "camelCase")]
1513struct UploadSessionCompleteRequest {
1514    path: String,
1515    session_id: String,
1516    blob_key: String,
1517    size_bytes: i64,
1518    content_sha256: String,
1519    chunk_count: usize,
1520    #[serde(default, skip_serializing_if = "is_false")]
1521    create_new: bool,
1522    hash: Option<String>,
1523}
1524
1525#[derive(Debug, Serialize, Deserialize)]
1526struct MultipartUploadRequest {
1527    path: String,
1528    #[serde(rename = "sizeBytes")]
1529    size_bytes: i64,
1530    #[serde(rename = "contentType")]
1531    content_type: Option<String>,
1532}
1533
1534#[derive(Debug, Deserialize)]
1535struct MultipartUploadResponse {
1536    #[serde(rename = "sessionId")]
1537    session_id: String,
1538    #[serde(rename = "blobKey")]
1539    blob_key: String,
1540    #[serde(rename = "uploadId")]
1541    upload_id: String,
1542    #[serde(rename = "partSizeBytes")]
1543    part_size_bytes: i64,
1544}
1545
1546#[derive(Debug, Serialize, Deserialize)]
1547struct MultipartUploadPartRequest {
1548    #[serde(rename = "sessionId")]
1549    session_id: String,
1550    #[serde(rename = "blobKey")]
1551    blob_key: String,
1552    #[serde(rename = "uploadId")]
1553    upload_id: String,
1554    #[serde(rename = "partNumber")]
1555    part_number: i32,
1556    #[serde(rename = "sizeBytes")]
1557    size_bytes: i64,
1558}
1559
1560#[derive(Debug, Deserialize)]
1561struct MultipartUploadPartResponse {
1562    #[serde(rename = "upload_url")]
1563    upload_url: String,
1564}
1565
1566#[derive(Debug, Serialize, Deserialize)]
1567struct MultipartUploadCompleteRequest {
1568    path: String,
1569    #[serde(rename = "sessionId")]
1570    session_id: String,
1571    #[serde(rename = "blobKey")]
1572    blob_key: String,
1573    #[serde(rename = "uploadId")]
1574    upload_id: String,
1575    #[serde(rename = "sizeBytes")]
1576    size_bytes: i64,
1577    #[serde(rename = "createNew", default, skip_serializing_if = "is_false")]
1578    create_new: bool,
1579    hash: Option<String>,
1580    parts: Vec<MultipartPart>,
1581}
1582
1583#[derive(Debug, Serialize, Deserialize)]
1584struct MultipartPart {
1585    #[serde(rename = "partNumber")]
1586    part_number: i32,
1587    etag: String,
1588}
1589
1590#[derive(Debug, Serialize, Deserialize)]
1591struct ShardManifest {
1592    version: u32,
1593    total_size: u64,
1594    shard_size: u64,
1595    shards: Vec<ShardEntry>,
1596}
1597
1598#[derive(Debug, Serialize, Deserialize)]
1599struct ShardEntry {
1600    path: String,
1601    size: u64,
1602}
1603
1604#[derive(Debug, Deserialize)]
1605struct FsWriteSessionResponse {
1606    #[serde(rename = "sessionId")]
1607    _session_id: String,
1608    #[serde(rename = "nextOffset")]
1609    next_offset: i64,
1610}
1611
1612fn sha256_hex(data: &[u8]) -> String {
1613    let digest = Sha256::digest(data);
1614    let mut out = String::with_capacity(digest.len() * 2);
1615    for byte in digest {
1616        use std::fmt::Write as _;
1617        let _ = write!(out, "{byte:02x}");
1618    }
1619    out
1620}
1621
1622fn is_false(value: &bool) -> bool {
1623    !*value
1624}
1625
1626fn map_http_err(err: reqwest::Error) -> io::Error {
1627    io::Error::other(err)
1628}
1629
1630fn parse_modified_at(value: Option<&str>) -> Option<std::time::SystemTime> {
1631    let value = value?;
1632    let parsed = DateTime::parse_from_rfc3339(value).ok()?;
1633    let millis = parsed.timestamp_millis();
1634    if millis < 0 {
1635        return None;
1636    }
1637    Some(std::time::UNIX_EPOCH + Duration::from_millis(millis as u64))
1638}
1639
1640fn map_url_err(err: url::ParseError) -> io::Error {
1641    io::Error::new(ErrorKind::InvalidInput, err)
1642}
1643
1644fn handle_error(resp: Response) -> io::Result<Response> {
1645    let status = resp.status();
1646    if status.is_success() {
1647        return Ok(resp);
1648    }
1649    let text = resp.text().unwrap_or_else(|_| status.to_string());
1650    let kind = match status {
1651        StatusCode::NOT_FOUND => ErrorKind::NotFound,
1652        StatusCode::FORBIDDEN | StatusCode::UNAUTHORIZED => ErrorKind::PermissionDenied,
1653        StatusCode::CONFLICT => ErrorKind::AlreadyExists,
1654        StatusCode::BAD_REQUEST => ErrorKind::InvalidInput,
1655        _ => ErrorKind::Other,
1656    };
1657    Err(io::Error::new(kind, text))
1658}
1659
1660impl fmt::Debug for RemoteFsProvider {
1661    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1662        f.debug_struct("RemoteFsProvider").finish_non_exhaustive()
1663    }
1664}
1665
1666#[cfg(test)]
1667mod tests {
1668    use super::*;
1669    use crate::data_contract::DataChunkDescriptor;
1670    use axum::extract::{Query, State};
1671    use axum::http::{HeaderMap, StatusCode};
1672    use axum::routing::{delete, get, post, put};
1673    use axum::{Json, Router};
1674    use futures::executor;
1675    use serde::Deserialize;
1676    use std::collections::HashMap;
1677    use std::net::TcpListener as StdTcpListener;
1678    use std::sync::Arc;
1679    use tempfile::tempdir;
1680    use tokio::net::TcpListener as TokioTcpListener;
1681    use tokio::runtime::Runtime;
1682
1683    #[derive(Clone)]
1684    struct Harness {
1685        root: Arc<PathBuf>,
1686        _keeper: Arc<tempfile::TempDir>,
1687        base_url: Arc<Mutex<Option<String>>>,
1688        upload_sessions: Arc<Mutex<HashMap<String, UploadSessionTestState>>>,
1689        omit_last_chunk_target: bool,
1690    }
1691
1692    impl Harness {
1693        fn new() -> Self {
1694            Self::with_omit_last_chunk_target(false)
1695        }
1696
1697        fn with_omit_last_chunk_target(omit_last_chunk_target: bool) -> Self {
1698            let dir = tempdir().expect("tempdir");
1699            let path = dir.path().to_path_buf();
1700            Self {
1701                root: Arc::new(path),
1702                _keeper: Arc::new(dir),
1703                base_url: Arc::new(Mutex::new(None)),
1704                upload_sessions: Arc::new(Mutex::new(HashMap::new())),
1705                omit_last_chunk_target,
1706            }
1707        }
1708
1709        fn resolve(&self, remote_path: &str) -> PathBuf {
1710            let trimmed = remote_path.trim_start_matches('/');
1711            self.root.join(trimmed)
1712        }
1713
1714        fn virtualize(&self, path: &Path) -> String {
1715            let rel = path.strip_prefix(&*self.root).unwrap_or(path);
1716            if rel.as_os_str().is_empty() {
1717                "/".to_string()
1718            } else {
1719                format!("/{}", rel.to_string_lossy().replace('\\', "/"))
1720            }
1721        }
1722    }
1723
1724    #[derive(Clone)]
1725    struct UploadSessionTestState {
1726        path: String,
1727        chunk_count: usize,
1728    }
1729
1730    #[derive(Deserialize)]
1731    struct PathParams {
1732        path: String,
1733        offset: Option<u64>,
1734        length: Option<usize>,
1735        truncate: Option<String>,
1736        #[serde(rename = "createNew")]
1737        create_new: Option<String>,
1738        recursive: Option<String>,
1739    }
1740
1741    #[derive(Deserialize)]
1742    struct UploadChunkQuery {
1743        #[serde(rename = "sessionId")]
1744        session_id: String,
1745        #[serde(rename = "chunkIndex")]
1746        chunk_index: usize,
1747    }
1748
1749    #[derive(Deserialize)]
1750    struct DataManifestQuery {
1751        path: String,
1752        version: Option<String>,
1753    }
1754
1755    async fn metadata_handler(
1756        State(harness): State<Harness>,
1757        Query(params): Query<PathParams>,
1758    ) -> Result<Json<serde_json::Value>, StatusCode> {
1759        let meta =
1760            std::fs::metadata(harness.resolve(&params.path)).map_err(|_| StatusCode::NOT_FOUND)?;
1761        let file_type = if meta.is_dir() {
1762            "dir"
1763        } else if meta.is_file() {
1764            "file"
1765        } else {
1766            "other"
1767        };
1768        Ok(Json(serde_json::json!({
1769            "fileType": file_type,
1770            "len": meta.len(),
1771            "modified": meta.modified().ok().and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok()).map(|d| d.as_secs()),
1772            "readonly": meta.permissions().readonly()
1773        })))
1774    }
1775
1776    async fn dir_handler(
1777        State(harness): State<Harness>,
1778        Query(params): Query<PathParams>,
1779    ) -> Result<Json<Vec<serde_json::Value>>, StatusCode> {
1780        let mut entries = Vec::new();
1781        for entry in
1782            std::fs::read_dir(harness.resolve(&params.path)).map_err(|_| StatusCode::NOT_FOUND)?
1783        {
1784            let entry = entry.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1785            let file_type = entry
1786                .file_type()
1787                .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1788            let kind = if file_type.is_dir() {
1789                "dir"
1790            } else if file_type.is_file() {
1791                "file"
1792            } else {
1793                "other"
1794            };
1795            entries.push(serde_json::json!({
1796                "path": harness.virtualize(&entry.path()),
1797                "fileName": entry.file_name().to_string_lossy(),
1798                "fileType": kind
1799            }));
1800        }
1801        Ok(Json(entries))
1802    }
1803
1804    async fn canonicalize_handler(
1805        State(harness): State<Harness>,
1806        Query(params): Query<PathParams>,
1807    ) -> Result<Json<serde_json::Value>, StatusCode> {
1808        let path = harness.resolve(&params.path);
1809        let canonical = std::fs::canonicalize(path).map_err(|_| StatusCode::NOT_FOUND)?;
1810        Ok(Json(serde_json::json!({
1811            "path": harness.virtualize(&canonical)
1812        })))
1813    }
1814
1815    async fn read_handler(
1816        State(harness): State<Harness>,
1817        Query(params): Query<PathParams>,
1818    ) -> Result<Vec<u8>, StatusCode> {
1819        let mut data =
1820            std::fs::read(harness.resolve(&params.path)).map_err(|_| StatusCode::NOT_FOUND)?;
1821        let offset = params.offset.unwrap_or(0) as usize;
1822        let length = params.length.unwrap_or(data.len().saturating_sub(offset));
1823        let end = std::cmp::min(offset + length, data.len());
1824        if offset < data.len() {
1825            data = data[offset..end].to_vec();
1826        } else {
1827            data.clear();
1828        }
1829        Ok(data)
1830    }
1831
1832    async fn write_handler(
1833        State(harness): State<Harness>,
1834        Query(params): Query<PathParams>,
1835        body: axum::body::Bytes,
1836    ) -> Result<(), StatusCode> {
1837        let path = harness.resolve(&params.path);
1838        if let Some(parent) = path.parent() {
1839            std::fs::create_dir_all(parent).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1840        }
1841        if params.create_new.as_deref() == Some("true") {
1842            if params.offset.unwrap_or(0) != 0 {
1843                return Err(StatusCode::BAD_REQUEST);
1844            }
1845            let mut file = std::fs::OpenOptions::new()
1846                .write(true)
1847                .create_new(true)
1848                .open(path)
1849                .map_err(|err| {
1850                    if err.kind() == ErrorKind::AlreadyExists {
1851                        StatusCode::CONFLICT
1852                    } else {
1853                        StatusCode::INTERNAL_SERVER_ERROR
1854                    }
1855                })?;
1856            std::io::Write::write_all(&mut file, &body)
1857                .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1858            return Ok(());
1859        }
1860        let mut data = if params.truncate.as_deref() == Some("true") || !path.exists() {
1861            Vec::new()
1862        } else {
1863            std::fs::read(&path).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
1864        };
1865        let offset = params.offset.unwrap_or(0) as usize;
1866        let required = offset + body.len();
1867        if required > data.len() {
1868            data.resize(required, 0);
1869        }
1870        data[offset..offset + body.len()].copy_from_slice(&body);
1871        std::fs::write(path, data).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1872        Ok(())
1873    }
1874
1875    async fn upload_session_start_handler(
1876        State(harness): State<Harness>,
1877        Json(req): Json<UploadSessionStartRequest>,
1878    ) -> Result<Json<UploadSessionStartResponse>, StatusCode> {
1879        let session_id = Uuid::new_v4().to_string();
1880        let chunk_size_bytes = 1024i64;
1881        harness
1882            .upload_sessions
1883            .lock()
1884            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
1885            .insert(
1886                session_id.clone(),
1887                UploadSessionTestState {
1888                    path: req.path,
1889                    chunk_count: 0,
1890                },
1891            );
1892        Ok(Json(UploadSessionStartResponse {
1893            session_id: session_id.clone(),
1894            blob_key: format!("test/blob/{session_id}"),
1895            chunk_size_bytes,
1896        }))
1897    }
1898
1899    async fn upload_session_chunks_handler(
1900        State(harness): State<Harness>,
1901        Json(req): Json<UploadSessionChunksRequest>,
1902    ) -> Result<Json<UploadSessionChunksResponse>, StatusCode> {
1903        let base = harness
1904            .base_url
1905            .lock()
1906            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
1907            .clone()
1908            .ok_or(StatusCode::INTERNAL_SERVER_ERROR)?;
1909        let mut sessions = harness
1910            .upload_sessions
1911            .lock()
1912            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1913        let state = sessions
1914            .get_mut(&req.session_id)
1915            .ok_or(StatusCode::NOT_FOUND)?;
1916        state.chunk_count = req.chunks.len();
1917        let mut targets = req
1918            .chunks
1919            .iter()
1920            .map(|chunk| UploadChunkTarget {
1921                chunk_index: chunk.chunk_index,
1922                method: "PUT".to_string(),
1923                upload_url: format!(
1924                    "{base}/upload-chunk?sessionId={}&chunkIndex={}",
1925                    req.session_id, chunk.chunk_index
1926                ),
1927                headers: HashMap::new(),
1928            })
1929            .collect::<Vec<_>>();
1930        if harness.omit_last_chunk_target && !targets.is_empty() {
1931            targets.pop();
1932        }
1933        Ok(Json(UploadSessionChunksResponse { targets }))
1934    }
1935
1936    async fn upload_chunk_handler(
1937        State(harness): State<Harness>,
1938        Query(query): Query<UploadChunkQuery>,
1939        body: axum::body::Bytes,
1940    ) -> Result<(), StatusCode> {
1941        let chunk_path = harness.resolve(&format!(
1942            "/.upload-sessions/{}/{}",
1943            query.session_id, query.chunk_index
1944        ));
1945        if let Some(parent) = chunk_path.parent() {
1946            std::fs::create_dir_all(parent).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1947        }
1948        std::fs::write(chunk_path, body).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1949        Ok(())
1950    }
1951
1952    async fn data_manifest_handler(
1953        Query(query): Query<DataManifestQuery>,
1954    ) -> Result<Json<DataManifestDescriptor>, StatusCode> {
1955        let updated_at = if query.version.as_deref() == Some("v1") {
1956            "2026-01-01T00:00:00Z"
1957        } else {
1958            "2026-02-02T00:00:00Z"
1959        };
1960        Ok(Json(DataManifestDescriptor {
1961            schema_version: 1,
1962            format: "runmat-data".to_string(),
1963            dataset_id: format!("dataset:{}", query.path),
1964            updated_at: updated_at.to_string(),
1965            txn_sequence: 9,
1966        }))
1967    }
1968
1969    async fn data_chunk_upload_targets_handler(
1970        Json(req): Json<DataChunkUploadRequest>,
1971    ) -> Result<Json<serde_json::Value>, StatusCode> {
1972        let targets = req
1973            .chunks
1974            .iter()
1975            .map(|chunk| {
1976                serde_json::json!({
1977                    "key": chunk.key,
1978                    "method": "PUT",
1979                    "upload_url": format!("https://uploads.example.test/{}/{}", req.array, chunk.object_id),
1980                    "headers": {
1981                        "x-runmat-hash": chunk.hash,
1982                        "x-runmat-object": chunk.object_id,
1983                    }
1984                })
1985            })
1986            .collect::<Vec<_>>();
1987        Ok(Json(serde_json::json!({ "targets": targets })))
1988    }
1989
1990    async fn upload_session_complete_handler(
1991        State(harness): State<Harness>,
1992        Json(req): Json<UploadSessionCompleteRequest>,
1993    ) -> Result<(), StatusCode> {
1994        let state = harness
1995            .upload_sessions
1996            .lock()
1997            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
1998            .remove(&req.session_id)
1999            .ok_or(StatusCode::NOT_FOUND)?;
2000        let mut data = Vec::new();
2001        for chunk_index in 0..state.chunk_count {
2002            let chunk_path = harness.resolve(&format!(
2003                "/.upload-sessions/{}/{}",
2004                req.session_id, chunk_index
2005            ));
2006            let chunk = std::fs::read(chunk_path).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
2007            data.extend_from_slice(&chunk);
2008        }
2009        if sha256_hex(&data) != req.content_sha256 {
2010            return Err(StatusCode::BAD_REQUEST);
2011        }
2012        let target_path = harness.resolve(&state.path);
2013        if let Some(parent) = target_path.parent() {
2014            std::fs::create_dir_all(parent).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
2015        }
2016        if req.create_new {
2017            let mut file = std::fs::OpenOptions::new()
2018                .write(true)
2019                .create_new(true)
2020                .open(target_path)
2021                .map_err(|err| {
2022                    if err.kind() == ErrorKind::AlreadyExists {
2023                        StatusCode::CONFLICT
2024                    } else {
2025                        StatusCode::INTERNAL_SERVER_ERROR
2026                    }
2027                })?;
2028            std::io::Write::write_all(&mut file, &data)
2029                .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
2030        } else {
2031            std::fs::write(target_path, data).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
2032        }
2033        Ok(())
2034    }
2035
2036    async fn mkdir_handler(
2037        State(harness): State<Harness>,
2038        Json(req): Json<CreateDirRequest>,
2039    ) -> Result<(), StatusCode> {
2040        let path = harness.resolve(&req.path);
2041        if req.recursive {
2042            std::fs::create_dir_all(path).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
2043        } else {
2044            std::fs::create_dir(path).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
2045        }
2046        Ok(())
2047    }
2048
2049    async fn delete_file_handler(
2050        State(harness): State<Harness>,
2051        Query(params): Query<PathParams>,
2052    ) -> Result<(), StatusCode> {
2053        std::fs::remove_file(harness.resolve(&params.path)).map_err(|_| StatusCode::NOT_FOUND)?;
2054        Ok(())
2055    }
2056
2057    async fn delete_dir_handler(
2058        State(harness): State<Harness>,
2059        Query(params): Query<PathParams>,
2060    ) -> Result<(), StatusCode> {
2061        let recursive = params.recursive.as_deref() == Some("true");
2062        if recursive {
2063            std::fs::remove_dir_all(harness.resolve(&params.path))
2064                .map_err(|_| StatusCode::NOT_FOUND)?;
2065        } else {
2066            std::fs::remove_dir(harness.resolve(&params.path))
2067                .map_err(|_| StatusCode::NOT_FOUND)?;
2068        }
2069        Ok(())
2070    }
2071
2072    async fn rename_handler(
2073        State(harness): State<Harness>,
2074        Json(req): Json<RenameRequest>,
2075    ) -> Result<(), StatusCode> {
2076        let from = harness.resolve(&req.from);
2077        let to = harness.resolve(&req.to);
2078        if let Some(parent) = to.parent() {
2079            std::fs::create_dir_all(parent).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
2080        }
2081        std::fs::rename(from, to).map_err(|_| StatusCode::NOT_FOUND)?;
2082        Ok(())
2083    }
2084
2085    async fn set_readonly_handler(
2086        State(harness): State<Harness>,
2087        Json(req): Json<SetReadonlyRequest>,
2088    ) -> Result<(), StatusCode> {
2089        let path = harness.resolve(&req.path);
2090        let mut perms = std::fs::metadata(&path)
2091            .map_err(|_| StatusCode::NOT_FOUND)?
2092            .permissions();
2093        perms.set_readonly(req.readonly);
2094        std::fs::set_permissions(path, perms).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
2095        Ok(())
2096    }
2097
2098    async fn read_with_download_handler(
2099        State(harness): State<Harness>,
2100        Query(params): Query<PathParams>,
2101    ) -> Result<Json<serde_json::Value>, StatusCode> {
2102        let base = harness
2103            .base_url
2104            .lock()
2105            .unwrap()
2106            .clone()
2107            .ok_or(StatusCode::INTERNAL_SERVER_ERROR)?;
2108        let download_url = format!("{base}/download?path={}", params.path);
2109        Ok(Json(serde_json::json!({
2110            "downloadUrl": download_url,
2111            "expiresAt": 0
2112        })))
2113    }
2114
2115    async fn download_handler(
2116        State(harness): State<Harness>,
2117        Query(params): Query<PathParams>,
2118        headers: HeaderMap,
2119    ) -> Result<Vec<u8>, StatusCode> {
2120        let mut data =
2121            std::fs::read(harness.resolve(&params.path)).map_err(|_| StatusCode::NOT_FOUND)?;
2122        if let Some(range) = headers.get("range").and_then(|value| value.to_str().ok()) {
2123            if let Some((start, end)) = parse_range(range) {
2124                let start = start.min(data.len());
2125                let end = end.min(data.len().saturating_sub(1));
2126                if start < data.len() {
2127                    data = data[start..=end].to_vec();
2128                } else {
2129                    data.clear();
2130                }
2131            }
2132        }
2133        Ok(data)
2134    }
2135
2136    fn parse_range(value: &str) -> Option<(usize, usize)> {
2137        let value = value.strip_prefix("bytes=")?;
2138        let (start, end) = value.split_once('-')?;
2139        let start = start.parse::<usize>().ok()?;
2140        let end = end.parse::<usize>().ok()?;
2141        Some((start, end))
2142    }
2143
2144    fn spawn_server() -> (String, Harness, Runtime) {
2145        let harness = Harness::new();
2146        let router = Router::new()
2147            .route("/fs/metadata", get(metadata_handler))
2148            .route("/fs/dir", get(dir_handler).delete(delete_dir_handler))
2149            .route("/fs/file", delete(delete_file_handler))
2150            .route("/fs/canonicalize", get(canonicalize_handler))
2151            .route("/fs/read", get(read_handler))
2152            .route("/fs/write", put(write_handler))
2153            .route(
2154                "/fs/upload-session/start",
2155                post(upload_session_start_handler),
2156            )
2157            .route(
2158                "/fs/upload-session/chunks",
2159                post(upload_session_chunks_handler),
2160            )
2161            .route(
2162                "/fs/upload-session/complete",
2163                post(upload_session_complete_handler),
2164            )
2165            .route("/fs/mkdir", post(mkdir_handler))
2166            .route("/fs/rename", post(rename_handler))
2167            .route("/fs/set-readonly", post(set_readonly_handler))
2168            .route("/data/manifest", get(data_manifest_handler))
2169            .route(
2170                "/data/chunks/upload-targets",
2171                post(data_chunk_upload_targets_handler),
2172            )
2173            .route("/upload-chunk", put(upload_chunk_handler))
2174            .with_state(harness.clone());
2175        let std_listener = StdTcpListener::bind("127.0.0.1:0").unwrap();
2176        std_listener.set_nonblocking(true).expect("nonblocking");
2177        let addr = std_listener.local_addr().unwrap();
2178        let service = router.into_make_service();
2179        let rt = Runtime::new().unwrap();
2180        let base = format!("http://{addr}");
2181        *harness.base_url.lock().unwrap() = Some(base.clone());
2182        rt.spawn(async move {
2183            let listener = TokioTcpListener::from_std(std_listener).unwrap();
2184            axum::serve(listener, service).await.unwrap();
2185        });
2186        (base, harness, rt)
2187    }
2188
2189    fn spawn_server_with_download_url() -> (String, Harness, Runtime) {
2190        let harness = Harness::new();
2191        let router = Router::new()
2192            .route("/fs/metadata", get(metadata_handler))
2193            .route("/fs/dir", get(dir_handler).delete(delete_dir_handler))
2194            .route("/fs/file", delete(delete_file_handler))
2195            .route("/fs/canonicalize", get(canonicalize_handler))
2196            .route("/fs/read", get(read_with_download_handler))
2197            .route("/fs/write", put(write_handler))
2198            .route(
2199                "/fs/upload-session/start",
2200                post(upload_session_start_handler),
2201            )
2202            .route(
2203                "/fs/upload-session/chunks",
2204                post(upload_session_chunks_handler),
2205            )
2206            .route(
2207                "/fs/upload-session/complete",
2208                post(upload_session_complete_handler),
2209            )
2210            .route("/fs/mkdir", post(mkdir_handler))
2211            .route("/fs/rename", post(rename_handler))
2212            .route("/fs/set-readonly", post(set_readonly_handler))
2213            .route("/download", get(download_handler))
2214            .route("/data/manifest", get(data_manifest_handler))
2215            .route(
2216                "/data/chunks/upload-targets",
2217                post(data_chunk_upload_targets_handler),
2218            )
2219            .route("/upload-chunk", put(upload_chunk_handler))
2220            .with_state(harness.clone());
2221        let std_listener = StdTcpListener::bind("127.0.0.1:0").unwrap();
2222        std_listener.set_nonblocking(true).expect("nonblocking");
2223        let addr = std_listener.local_addr().unwrap();
2224        let service = router.into_make_service();
2225        let rt = Runtime::new().unwrap();
2226        let base = format!("http://{addr}");
2227        *harness.base_url.lock().unwrap() = Some(base.clone());
2228        rt.spawn(async move {
2229            let listener = TokioTcpListener::from_std(std_listener).unwrap();
2230            axum::serve(listener, service).await.unwrap();
2231        });
2232        (base, harness, rt)
2233    }
2234
2235    fn spawn_server_with_missing_chunk_target() -> (String, Harness, Runtime) {
2236        let harness = Harness::with_omit_last_chunk_target(true);
2237        let router = Router::new()
2238            .route("/fs/metadata", get(metadata_handler))
2239            .route("/fs/dir", get(dir_handler).delete(delete_dir_handler))
2240            .route("/fs/file", delete(delete_file_handler))
2241            .route("/fs/canonicalize", get(canonicalize_handler))
2242            .route("/fs/read", get(read_handler))
2243            .route("/fs/write", put(write_handler))
2244            .route(
2245                "/fs/upload-session/start",
2246                post(upload_session_start_handler),
2247            )
2248            .route(
2249                "/fs/upload-session/chunks",
2250                post(upload_session_chunks_handler),
2251            )
2252            .route(
2253                "/fs/upload-session/complete",
2254                post(upload_session_complete_handler),
2255            )
2256            .route("/fs/mkdir", post(mkdir_handler))
2257            .route("/fs/rename", post(rename_handler))
2258            .route("/fs/set-readonly", post(set_readonly_handler))
2259            .route("/data/manifest", get(data_manifest_handler))
2260            .route(
2261                "/data/chunks/upload-targets",
2262                post(data_chunk_upload_targets_handler),
2263            )
2264            .route("/upload-chunk", put(upload_chunk_handler))
2265            .with_state(harness.clone());
2266        let std_listener = StdTcpListener::bind("127.0.0.1:0").unwrap();
2267        std_listener.set_nonblocking(true).expect("nonblocking");
2268        let addr = std_listener.local_addr().unwrap();
2269        let service = router.into_make_service();
2270        let rt = Runtime::new().unwrap();
2271        let base = format!("http://{addr}");
2272        *harness.base_url.lock().unwrap() = Some(base.clone());
2273        rt.spawn(async move {
2274            let listener = TokioTcpListener::from_std(std_listener).unwrap();
2275            axum::serve(listener, service).await.unwrap();
2276        });
2277        (base, harness, rt)
2278    }
2279
2280    #[test]
2281    fn remote_provider_roundtrip() {
2282        let (base, harness, _rt) = spawn_server();
2283        let provider = RemoteFsProvider::new(RemoteFsConfig {
2284            base_url: base,
2285            auth_token: None,
2286            chunk_bytes: 1024,
2287            parallel_requests: 4,
2288            direct_read_threshold_bytes: u64::MAX,
2289            direct_write_threshold_bytes: 1024,
2290            timeout: Duration::from_secs(30),
2291            ..RemoteFsConfig::default()
2292        })
2293        .expect("provider");
2294
2295        let data = (0..16_384u32)
2296            .flat_map(|v| v.to_le_bytes())
2297            .collect::<Vec<_>>();
2298        executor::block_on(provider.write(Path::new("/reports/data.bin"), &data)).expect("write");
2299        let read_back =
2300            executor::block_on(provider.read(Path::new("/reports/data.bin"))).expect("read");
2301        assert_eq!(data, read_back);
2302        executor::block_on(provider.remove_file(Path::new("/reports/data.bin"))).expect("remove");
2303        assert!(!harness.resolve("/reports/data.bin").exists());
2304    }
2305
2306    #[test]
2307    fn remote_provider_write_only_handle_metadata_uses_remote_len_until_dirty() {
2308        let (base, harness, _rt) = spawn_server();
2309        let provider = RemoteFsProvider::new(RemoteFsConfig {
2310            base_url: base,
2311            auth_token: None,
2312            chunk_bytes: 1024,
2313            parallel_requests: 4,
2314            direct_read_threshold_bytes: u64::MAX,
2315            direct_write_threshold_bytes: 1024,
2316            timeout: Duration::from_secs(30),
2317            ..RemoteFsConfig::default()
2318        })
2319        .expect("provider");
2320        std::fs::create_dir_all(harness.resolve("/reports")).expect("mkdir");
2321        std::fs::write(harness.resolve("/reports/data.bin"), b"remote bytes").expect("write");
2322
2323        let flags = OpenFlags {
2324            write: true,
2325            ..OpenFlags::default()
2326        };
2327        let mut handle = provider
2328            .open(Path::new("/reports/data.bin"), &flags)
2329            .expect("open write-only");
2330        let metadata = executor::block_on(handle.metadata_async()).expect("metadata");
2331        assert_eq!(metadata.len(), b"remote bytes".len() as u64);
2332
2333        handle.write_all(b"new").expect("write handle");
2334        let metadata = executor::block_on(handle.metadata_async()).expect("metadata");
2335        assert_eq!(metadata.len(), 3);
2336        assert_eq!(metadata.hash(), None);
2337    }
2338
2339    #[test]
2340    fn remote_provider_create_new_rejects_existing_target_at_direct_write() {
2341        let (base, harness, _rt) = spawn_server();
2342        let provider = RemoteFsProvider::new(RemoteFsConfig {
2343            base_url: base,
2344            auth_token: None,
2345            chunk_bytes: 1024,
2346            parallel_requests: 4,
2347            direct_read_threshold_bytes: u64::MAX,
2348            direct_write_threshold_bytes: 1024,
2349            timeout: Duration::from_secs(30),
2350            ..RemoteFsConfig::default()
2351        })
2352        .expect("provider");
2353
2354        let flags = OpenFlags {
2355            write: true,
2356            create_new: true,
2357            ..OpenFlags::default()
2358        };
2359        let mut handle = provider
2360            .open(Path::new("/reports/new.bin"), &flags)
2361            .expect("open create_new");
2362        std::fs::create_dir_all(harness.resolve("/reports")).expect("mkdir");
2363        std::fs::write(harness.resolve("/reports/new.bin"), b"winner").expect("write winner");
2364
2365        handle.write_all(b"loser").expect("write handle");
2366        let err = executor::block_on(handle.flush_async()).expect_err("create_new conflict");
2367        assert_eq!(err.kind(), ErrorKind::AlreadyExists);
2368        assert_eq!(
2369            std::fs::read(harness.resolve("/reports/new.bin")).expect("read winner"),
2370            b"winner"
2371        );
2372    }
2373
2374    #[test]
2375    fn remote_provider_create_new_rejects_existing_target_at_upload_complete() {
2376        let (base, harness, _rt) = spawn_server();
2377        let provider = RemoteFsProvider::new(RemoteFsConfig {
2378            base_url: base,
2379            auth_token: None,
2380            chunk_bytes: 1024,
2381            parallel_requests: 2,
2382            direct_read_threshold_bytes: u64::MAX,
2383            direct_write_threshold_bytes: 1024,
2384            timeout: Duration::from_secs(30),
2385            ..RemoteFsConfig::default()
2386        })
2387        .expect("provider");
2388
2389        let flags = OpenFlags {
2390            write: true,
2391            create_new: true,
2392            ..OpenFlags::default()
2393        };
2394        let mut handle = provider
2395            .open(Path::new("/reports/session.bin"), &flags)
2396            .expect("open create_new");
2397        std::fs::create_dir_all(harness.resolve("/reports")).expect("mkdir");
2398        std::fs::write(harness.resolve("/reports/session.bin"), b"winner").expect("write winner");
2399
2400        let loser = vec![7u8; 4096];
2401        handle.write_all(&loser).expect("write handle");
2402        let err = executor::block_on(handle.flush_async()).expect_err("create_new conflict");
2403        assert_eq!(err.kind(), ErrorKind::AlreadyExists);
2404        assert_eq!(
2405            std::fs::read(harness.resolve("/reports/session.bin")).expect("read winner"),
2406            b"winner"
2407        );
2408    }
2409
2410    #[test]
2411    fn remote_provider_download_url_read() {
2412        let (base, harness, _rt) = spawn_server_with_download_url();
2413        let provider = RemoteFsProvider::new(RemoteFsConfig {
2414            base_url: base,
2415            auth_token: None,
2416            chunk_bytes: 128,
2417            parallel_requests: 2,
2418            direct_read_threshold_bytes: u64::MAX,
2419            timeout: Duration::from_secs(30),
2420            ..RemoteFsConfig::default()
2421        })
2422        .expect("provider");
2423
2424        let data = (0..1024u32)
2425            .flat_map(|v| v.to_le_bytes())
2426            .collect::<Vec<_>>();
2427        std::fs::create_dir_all(harness.resolve("/reports")).expect("mkdir");
2428        std::fs::write(harness.resolve("/reports/data.bin"), &data).expect("write");
2429
2430        let read_back =
2431            executor::block_on(provider.read(Path::new("/reports/data.bin"))).expect("read");
2432        assert_eq!(data, read_back);
2433    }
2434
2435    #[test]
2436    fn remote_provider_errors_when_chunk_target_is_missing() {
2437        let (base, _harness, _rt) = spawn_server_with_missing_chunk_target();
2438        let provider = RemoteFsProvider::new(RemoteFsConfig {
2439            base_url: base,
2440            auth_token: None,
2441            chunk_bytes: 128,
2442            parallel_requests: 2,
2443            direct_read_threshold_bytes: u64::MAX,
2444            direct_write_threshold_bytes: 128,
2445            timeout: Duration::from_secs(30),
2446            ..RemoteFsConfig::default()
2447        })
2448        .expect("provider");
2449
2450        let data = vec![7u8; 1024];
2451        let err =
2452            executor::block_on(provider.write(Path::new("/reports/missing-target.bin"), &data))
2453                .expect_err("write should fail when chunk target is missing");
2454        assert_eq!(err.kind(), io::ErrorKind::Other);
2455    }
2456
2457    #[test]
2458    fn remote_provider_data_manifest_descriptor_fetches_descriptor() {
2459        let (base, _harness, _rt) = spawn_server();
2460        let provider = RemoteFsProvider::new(RemoteFsConfig {
2461            base_url: base,
2462            auth_token: None,
2463            timeout: Duration::from_secs(30),
2464            ..RemoteFsConfig::default()
2465        })
2466        .expect("provider");
2467
2468        let descriptor = provider
2469            .data_manifest_descriptor(&DataManifestRequest {
2470                path: "/datasets/alpha.data".to_string(),
2471                version: Some("v1".to_string()),
2472            })
2473            .expect("manifest descriptor");
2474        assert_eq!(descriptor.schema_version, 1);
2475        assert_eq!(descriptor.format, "runmat-data");
2476        assert_eq!(descriptor.dataset_id, "dataset:/datasets/alpha.data");
2477        assert_eq!(descriptor.updated_at, "2026-01-01T00:00:00Z");
2478        assert_eq!(descriptor.txn_sequence, 9);
2479    }
2480
2481    #[test]
2482    fn remote_provider_data_chunk_upload_targets_fetches_targets() {
2483        let (base, _harness, _rt) = spawn_server();
2484        let provider = RemoteFsProvider::new(RemoteFsConfig {
2485            base_url: base,
2486            auth_token: None,
2487            timeout: Duration::from_secs(30),
2488            ..RemoteFsConfig::default()
2489        })
2490        .expect("provider");
2491
2492        let targets = provider
2493            .data_chunk_upload_targets(&DataChunkUploadRequest {
2494                dataset_path: "/datasets/alpha.data".to_string(),
2495                array: "X".to_string(),
2496                chunks: vec![DataChunkDescriptor {
2497                    key: "X/0-0".to_string(),
2498                    object_id: "obj_0".to_string(),
2499                    hash: "sha256:abc".to_string(),
2500                    bytes_raw: 10,
2501                    bytes_stored: 8,
2502                }],
2503            })
2504            .expect("chunk upload targets");
2505
2506        assert_eq!(targets.len(), 1);
2507        assert_eq!(targets[0].key, "X/0-0");
2508        assert_eq!(targets[0].method, "PUT");
2509        assert_eq!(
2510            targets[0].upload_url,
2511            "https://uploads.example.test/X/obj_0"
2512        );
2513        let headers = &targets[0].headers;
2514        assert_eq!(headers.len(), 2);
2515        assert_eq!(
2516            headers.get("x-runmat-hash").map(String::as_str),
2517            Some("sha256:abc")
2518        );
2519        assert_eq!(
2520            headers.get("x-runmat-object").map(String::as_str),
2521            Some("obj_0")
2522        );
2523    }
2524}