runmat_filesystem/
native.rs1use crate::data_contract::{
2 DataChunkUploadRequest, DataChunkUploadTarget, DataManifestDescriptor, DataManifestRequest,
3};
4use crate::{DirEntry, FileHandle, FsFileType, FsMetadata, FsProvider, OpenFlags};
5use async_trait::async_trait;
6use chrono::Utc;
7use serde_json::Value as JsonValue;
8use std::fs;
9use std::io;
10use std::path::{Path, PathBuf};
11use url::Url;
12
13#[derive(Default)]
14pub struct NativeFsProvider;
15
16#[async_trait(?Send)]
17impl FsProvider for NativeFsProvider {
18 fn open(&self, path: &Path, flags: &OpenFlags) -> io::Result<Box<dyn FileHandle>> {
19 let mut opts = fs::OpenOptions::new();
20 opts.read(flags.read);
21 opts.write(flags.write);
22 opts.append(flags.append);
23 opts.truncate(flags.truncate);
24 opts.create(flags.create);
25 opts.create_new(flags.create_new);
26 let file = opts.open(path)?;
27 Ok(Box::new(file))
28 }
29
30 async fn read(&self, path: &Path) -> io::Result<Vec<u8>> {
31 fs::read(path)
32 }
33
34 async fn write(&self, path: &Path, data: &[u8]) -> io::Result<()> {
35 fs::write(path, data)
36 }
37
38 async fn remove_file(&self, path: &Path) -> io::Result<()> {
39 fs::remove_file(path)
40 }
41
42 async fn metadata(&self, path: &Path) -> io::Result<FsMetadata> {
43 fs::metadata(path).map(FsMetadata::from)
44 }
45
46 async fn symlink_metadata(&self, path: &Path) -> io::Result<FsMetadata> {
47 fs::symlink_metadata(path).map(FsMetadata::from)
48 }
49
50 async fn read_dir(&self, path: &Path) -> io::Result<Vec<DirEntry>> {
51 let entries = fs::read_dir(path)?;
52 let mut out = Vec::new();
53 for entry in entries {
54 let entry = entry?;
55 let file_type = entry
56 .file_type()
57 .ok()
58 .map(FsFileType::from)
59 .unwrap_or(FsFileType::Unknown);
60 out.push(DirEntry {
61 path: entry.path(),
62 file_name: entry.file_name(),
63 file_type,
64 });
65 }
66 Ok(out)
67 }
68
69 async fn canonicalize(&self, path: &Path) -> io::Result<std::path::PathBuf> {
70 fs::canonicalize(path)
71 }
72
73 async fn create_dir(&self, path: &Path) -> io::Result<()> {
74 fs::create_dir(path)
75 }
76
77 async fn create_dir_all(&self, path: &Path) -> io::Result<()> {
78 fs::create_dir_all(path)
79 }
80
81 async fn remove_dir(&self, path: &Path) -> io::Result<()> {
82 fs::remove_dir(path)
83 }
84
85 async fn remove_dir_all(&self, path: &Path) -> io::Result<()> {
86 fs::remove_dir_all(path)
87 }
88
89 async fn rename(&self, from: &Path, to: &Path) -> io::Result<()> {
90 fs::rename(from, to)
91 }
92
93 async fn set_readonly(&self, path: &Path, readonly: bool) -> io::Result<()> {
94 let mut perms = fs::metadata(path)?.permissions();
95 perms.set_readonly(readonly);
96 fs::set_permissions(path, perms)
97 }
98
99 async fn data_manifest_descriptor(
100 &self,
101 request: &DataManifestRequest,
102 ) -> io::Result<DataManifestDescriptor> {
103 let path = dataset_manifest_path(&request.path);
104 let bytes = fs::read(&path)?;
105 let json: JsonValue = serde_json::from_slice(&bytes)
106 .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?;
107 Ok(DataManifestDescriptor {
108 schema_version: json
109 .get("schema_version")
110 .or_else(|| json.get("schemaVersion"))
111 .and_then(|v| v.as_u64())
112 .unwrap_or(1) as u32,
113 format: json
114 .get("format")
115 .and_then(|v| v.as_str())
116 .unwrap_or("runmat-data")
117 .to_string(),
118 dataset_id: json
119 .get("dataset_id")
120 .or_else(|| json.get("datasetId"))
121 .and_then(|v| v.as_str())
122 .unwrap_or_default()
123 .to_string(),
124 updated_at: json
125 .get("updated_at")
126 .or_else(|| json.get("updatedAt"))
127 .and_then(|v| v.as_str())
128 .map(ToString::to_string)
129 .unwrap_or_else(|| Utc::now().to_rfc3339()),
130 txn_sequence: json
131 .get("txn_sequence")
132 .or_else(|| json.get("txnSequence"))
133 .and_then(|v| v.as_u64())
134 .unwrap_or(0),
135 })
136 }
137
138 async fn data_chunk_upload_targets(
139 &self,
140 request: &DataChunkUploadRequest,
141 ) -> io::Result<Vec<DataChunkUploadTarget>> {
142 let root = dataset_chunk_root(&request.dataset_path, &request.array);
143 fs::create_dir_all(&root)?;
144 request
145 .chunks
146 .iter()
147 .map(|chunk| {
148 let path = root.join(format!("{}.bin", sanitize_segment(&chunk.object_id)));
149 let canonical = path_to_file_url(&path)?;
150 Ok(DataChunkUploadTarget {
151 key: chunk.key.clone(),
152 method: "PUT".to_string(),
153 upload_url: canonical,
154 headers: std::collections::HashMap::new(),
155 })
156 })
157 .collect()
158 }
159
160 async fn data_upload_chunk(
161 &self,
162 target: &DataChunkUploadTarget,
163 data: &[u8],
164 ) -> io::Result<()> {
165 if !target.method.eq_ignore_ascii_case("PUT") {
166 return Err(io::Error::new(
167 io::ErrorKind::InvalidInput,
168 format!("unsupported upload method '{}'", target.method),
169 ));
170 }
171 let path = file_url_to_path(&target.upload_url)?;
172 if let Some(parent) = path.parent() {
173 fs::create_dir_all(parent)?;
174 }
175 fs::write(path, data)
176 }
177}
178
179fn dataset_manifest_path(path: &str) -> PathBuf {
180 if path.ends_with(".json") {
181 return PathBuf::from(path);
182 }
183 PathBuf::from(path).join("manifest.json")
184}
185
186fn dataset_chunk_root(dataset_path: &str, array: &str) -> PathBuf {
187 PathBuf::from(dataset_path)
188 .join("arrays")
189 .join(sanitize_segment(array))
190 .join("chunks")
191}
192
193fn sanitize_segment(input: &str) -> String {
194 input
195 .chars()
196 .map(|ch| {
197 if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' || ch == '.' {
198 ch
199 } else {
200 '_'
201 }
202 })
203 .collect()
204}
205
206fn path_to_file_url(path: &Path) -> io::Result<String> {
207 let abs = if path.is_absolute() {
208 path.to_path_buf()
209 } else {
210 std::env::current_dir()?.join(path)
211 };
212 Url::from_file_path(abs)
213 .map(|url| url.to_string())
214 .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid local path"))
215}
216
217fn file_url_to_path(upload_url: &str) -> io::Result<PathBuf> {
218 let url = Url::parse(upload_url)
219 .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err.to_string()))?;
220 if url.scheme() != "file" {
221 return Err(io::Error::new(
222 io::ErrorKind::InvalidInput,
223 format!("unsupported upload url scheme '{}'", url.scheme()),
224 ));
225 }
226 url.to_file_path().map_err(|_| {
227 io::Error::new(
228 io::ErrorKind::InvalidInput,
229 "failed to decode local file upload url",
230 )
231 })
232}
233
234impl From<fs::Metadata> for FsMetadata {
235 fn from(meta: fs::Metadata) -> Self {
236 let file_type = FsFileType::from(meta.file_type());
237 FsMetadata {
238 file_type,
239 len: meta.len(),
240 modified: meta.modified().ok(),
241 readonly: meta.permissions().readonly(),
242 hash: None,
243 }
244 }
245}
246
247impl From<fs::FileType> for FsFileType {
248 fn from(ft: fs::FileType) -> Self {
249 if ft.is_dir() {
250 FsFileType::Directory
251 } else if ft.is_file() {
252 FsFileType::File
253 } else if ft.is_symlink() {
254 FsFileType::Symlink
255 } else {
256 FsFileType::Other
257 }
258 }
259}