Skip to main content

runmat_filesystem/
native.rs

1use crate::data_contract::{
2    DataChunkUploadRequest, DataChunkUploadTarget, DataManifestDescriptor, DataManifestRequest,
3};
4use crate::{DirEntry, FileHandle, FsFileType, FsMetadata, FsProvider, OpenFlags};
5use async_trait::async_trait;
6use chrono::Utc;
7use serde_json::Value as JsonValue;
8use std::fs;
9use std::io;
10use std::path::{Path, PathBuf};
11use url::Url;
12
13#[derive(Default)]
14pub struct NativeFsProvider;
15
16#[async_trait(?Send)]
17impl FsProvider for NativeFsProvider {
18    fn open(&self, path: &Path, flags: &OpenFlags) -> io::Result<Box<dyn FileHandle>> {
19        let mut opts = fs::OpenOptions::new();
20        opts.read(flags.read);
21        opts.write(flags.write);
22        opts.append(flags.append);
23        opts.truncate(flags.truncate);
24        opts.create(flags.create);
25        opts.create_new(flags.create_new);
26        let file = opts.open(path)?;
27        Ok(Box::new(file))
28    }
29
30    async fn read(&self, path: &Path) -> io::Result<Vec<u8>> {
31        fs::read(path)
32    }
33
34    async fn write(&self, path: &Path, data: &[u8]) -> io::Result<()> {
35        fs::write(path, data)
36    }
37
38    async fn remove_file(&self, path: &Path) -> io::Result<()> {
39        fs::remove_file(path)
40    }
41
42    async fn metadata(&self, path: &Path) -> io::Result<FsMetadata> {
43        fs::metadata(path).map(FsMetadata::from)
44    }
45
46    async fn symlink_metadata(&self, path: &Path) -> io::Result<FsMetadata> {
47        fs::symlink_metadata(path).map(FsMetadata::from)
48    }
49
50    async fn read_dir(&self, path: &Path) -> io::Result<Vec<DirEntry>> {
51        let entries = fs::read_dir(path)?;
52        let mut out = Vec::new();
53        for entry in entries {
54            let entry = entry?;
55            let file_type = entry
56                .file_type()
57                .ok()
58                .map(FsFileType::from)
59                .unwrap_or(FsFileType::Unknown);
60            out.push(DirEntry {
61                path: entry.path(),
62                file_name: entry.file_name(),
63                file_type,
64            });
65        }
66        Ok(out)
67    }
68
69    async fn canonicalize(&self, path: &Path) -> io::Result<std::path::PathBuf> {
70        fs::canonicalize(path)
71    }
72
73    async fn create_dir(&self, path: &Path) -> io::Result<()> {
74        fs::create_dir(path)
75    }
76
77    async fn create_dir_all(&self, path: &Path) -> io::Result<()> {
78        fs::create_dir_all(path)
79    }
80
81    async fn remove_dir(&self, path: &Path) -> io::Result<()> {
82        fs::remove_dir(path)
83    }
84
85    async fn remove_dir_all(&self, path: &Path) -> io::Result<()> {
86        fs::remove_dir_all(path)
87    }
88
89    async fn rename(&self, from: &Path, to: &Path) -> io::Result<()> {
90        fs::rename(from, to)
91    }
92
93    async fn set_readonly(&self, path: &Path, readonly: bool) -> io::Result<()> {
94        let mut perms = fs::metadata(path)?.permissions();
95        perms.set_readonly(readonly);
96        fs::set_permissions(path, perms)
97    }
98
99    async fn data_manifest_descriptor(
100        &self,
101        request: &DataManifestRequest,
102    ) -> io::Result<DataManifestDescriptor> {
103        let path = dataset_manifest_path(&request.path);
104        let bytes = fs::read(&path)?;
105        let json: JsonValue = serde_json::from_slice(&bytes)
106            .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?;
107        Ok(DataManifestDescriptor {
108            schema_version: json
109                .get("schema_version")
110                .or_else(|| json.get("schemaVersion"))
111                .and_then(|v| v.as_u64())
112                .unwrap_or(1) as u32,
113            format: json
114                .get("format")
115                .and_then(|v| v.as_str())
116                .unwrap_or("runmat-data")
117                .to_string(),
118            dataset_id: json
119                .get("dataset_id")
120                .or_else(|| json.get("datasetId"))
121                .and_then(|v| v.as_str())
122                .unwrap_or_default()
123                .to_string(),
124            updated_at: json
125                .get("updated_at")
126                .or_else(|| json.get("updatedAt"))
127                .and_then(|v| v.as_str())
128                .map(ToString::to_string)
129                .unwrap_or_else(|| Utc::now().to_rfc3339()),
130            txn_sequence: json
131                .get("txn_sequence")
132                .or_else(|| json.get("txnSequence"))
133                .and_then(|v| v.as_u64())
134                .unwrap_or(0),
135        })
136    }
137
138    async fn data_chunk_upload_targets(
139        &self,
140        request: &DataChunkUploadRequest,
141    ) -> io::Result<Vec<DataChunkUploadTarget>> {
142        let root = dataset_chunk_root(&request.dataset_path, &request.array);
143        fs::create_dir_all(&root)?;
144        request
145            .chunks
146            .iter()
147            .map(|chunk| {
148                let path = root.join(format!("{}.bin", sanitize_segment(&chunk.object_id)));
149                let canonical = path_to_file_url(&path)?;
150                Ok(DataChunkUploadTarget {
151                    key: chunk.key.clone(),
152                    method: "PUT".to_string(),
153                    upload_url: canonical,
154                    headers: std::collections::HashMap::new(),
155                })
156            })
157            .collect()
158    }
159
160    async fn data_upload_chunk(
161        &self,
162        target: &DataChunkUploadTarget,
163        data: &[u8],
164    ) -> io::Result<()> {
165        if !target.method.eq_ignore_ascii_case("PUT") {
166            return Err(io::Error::new(
167                io::ErrorKind::InvalidInput,
168                format!("unsupported upload method '{}'", target.method),
169            ));
170        }
171        let path = file_url_to_path(&target.upload_url)?;
172        if let Some(parent) = path.parent() {
173            fs::create_dir_all(parent)?;
174        }
175        fs::write(path, data)
176    }
177}
178
179fn dataset_manifest_path(path: &str) -> PathBuf {
180    if path.ends_with(".json") {
181        return PathBuf::from(path);
182    }
183    PathBuf::from(path).join("manifest.json")
184}
185
186fn dataset_chunk_root(dataset_path: &str, array: &str) -> PathBuf {
187    PathBuf::from(dataset_path)
188        .join("arrays")
189        .join(sanitize_segment(array))
190        .join("chunks")
191}
192
193fn sanitize_segment(input: &str) -> String {
194    input
195        .chars()
196        .map(|ch| {
197            if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' || ch == '.' {
198                ch
199            } else {
200                '_'
201            }
202        })
203        .collect()
204}
205
206fn path_to_file_url(path: &Path) -> io::Result<String> {
207    let abs = if path.is_absolute() {
208        path.to_path_buf()
209    } else {
210        std::env::current_dir()?.join(path)
211    };
212    Url::from_file_path(abs)
213        .map(|url| url.to_string())
214        .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid local path"))
215}
216
217fn file_url_to_path(upload_url: &str) -> io::Result<PathBuf> {
218    let url = Url::parse(upload_url)
219        .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err.to_string()))?;
220    if url.scheme() != "file" {
221        return Err(io::Error::new(
222            io::ErrorKind::InvalidInput,
223            format!("unsupported upload url scheme '{}'", url.scheme()),
224        ));
225    }
226    url.to_file_path().map_err(|_| {
227        io::Error::new(
228            io::ErrorKind::InvalidInput,
229            "failed to decode local file upload url",
230        )
231    })
232}
233
234impl From<fs::Metadata> for FsMetadata {
235    fn from(meta: fs::Metadata) -> Self {
236        let file_type = FsFileType::from(meta.file_type());
237        FsMetadata {
238            file_type,
239            len: meta.len(),
240            modified: meta.modified().ok(),
241            readonly: meta.permissions().readonly(),
242            hash: None,
243        }
244    }
245}
246
247impl From<fs::FileType> for FsFileType {
248    fn from(ft: fs::FileType) -> Self {
249        if ft.is_dir() {
250            FsFileType::Directory
251        } else if ft.is_file() {
252            FsFileType::File
253        } else if ft.is_symlink() {
254            FsFileType::Symlink
255        } else {
256            FsFileType::Other
257        }
258    }
259}