1use std::path::{Path, PathBuf};
2use std::sync::atomic::{AtomicU32, Ordering};
3use std::sync::Mutex;
4
5use crate::types::{ArtifactDescriptor, ArtifactFormat, ArtifactSubject};
6use crate::{BinocError, BinocResult, DataAccess, ItemRef};
7
8pub struct LocalDataAccess {
21 _session_dir: Option<tempfile::TempDir>,
22 data_root: PathBuf,
23 external_root: Option<PathBuf>,
24 workspace_counter: AtomicU32,
25 workspaces: Mutex<Vec<tempfile::TempDir>>,
26 provide_dir: Mutex<Option<tempfile::TempDir>>,
27 path_policy: PathPolicy,
28}
29
30enum PathPolicy {
31 Unrestricted,
32 Restricted {
33 snapshot_a: PathBuf,
34 snapshot_b: PathBuf,
35 extra_allowed: Mutex<Vec<PathBuf>>,
36 },
37}
38
39fn artifacts_dir(data_root: &Path) -> PathBuf {
40 data_root.join(".artifacts")
41}
42
43fn safe_name(s: &str) -> String {
44 s.bytes()
45 .map(|b| {
46 if b.is_ascii_alphanumeric() || b == b'-' || b == b'_' || b == b'.' {
47 (b as char).to_string()
48 } else {
49 format!("%{b:02x}")
50 }
51 })
52 .collect()
53}
54
55fn subject_dir_name(subject: ArtifactSubject) -> &'static str {
56 match subject {
57 ArtifactSubject::Left => "left",
58 ArtifactSubject::Right => "right",
59 ArtifactSubject::Pair => "pair",
60 }
61}
62
63fn path_is_within(path: &Path, root: &Path) -> bool {
65 path.starts_with(root)
66}
67
68fn item_ref_from_physical(physical: &Path, logical: &str) -> ItemRef {
69 ItemRef {
70 logical_path: logical.to_string(),
71 is_dir: physical.is_dir(),
72 content_hash: None,
73 media_type: None,
74 handle: physical.to_string_lossy().to_string(),
75 }
76}
77
78impl LocalDataAccess {
79 pub fn new() -> Self {
80 let session = tempfile::tempdir().expect("failed to create session temp dir");
81 let data_root = session.path().to_path_buf();
82 Self {
83 _session_dir: Some(session),
84 data_root,
85 external_root: None,
86 workspace_counter: AtomicU32::new(0),
87 workspaces: Mutex::new(Vec::new()),
88 provide_dir: Mutex::new(None),
89 path_policy: PathPolicy::Unrestricted,
90 }
91 }
92
93 pub fn new_for_diff(snapshot_a: &Path, snapshot_b: &Path) -> BinocResult<Self> {
97 let session = tempfile::tempdir().map_err(BinocError::Io)?;
98 let data_root = session.path().to_path_buf();
99 let snap_a = std::fs::canonicalize(snapshot_a).map_err(BinocError::Io)?;
100 let snap_b = std::fs::canonicalize(snapshot_b).map_err(BinocError::Io)?;
101 let data_root_canon = std::fs::canonicalize(&data_root).map_err(BinocError::Io)?;
102 Ok(Self {
103 _session_dir: Some(session),
104 data_root,
105 external_root: None,
106 workspace_counter: AtomicU32::new(0),
107 workspaces: Mutex::new(Vec::new()),
108 provide_dir: Mutex::new(None),
109 path_policy: PathPolicy::Restricted {
110 snapshot_a: snap_a,
111 snapshot_b: snap_b,
112 extra_allowed: Mutex::new(vec![data_root_canon]),
113 },
114 })
115 }
116
117 pub fn for_plugin(data_root: PathBuf, workspace: PathBuf) -> Self {
121 Self {
122 _session_dir: None,
123 data_root,
124 external_root: Some(workspace),
125 workspace_counter: AtomicU32::new(0),
126 workspaces: Mutex::new(Vec::new()),
127 provide_dir: Mutex::new(None),
128 path_policy: PathPolicy::Unrestricted,
129 }
130 }
131
132 pub fn with_data_root(data_root: PathBuf) -> Self {
135 Self {
136 _session_dir: None,
137 data_root,
138 external_root: None,
139 workspace_counter: AtomicU32::new(0),
140 workspaces: Mutex::new(Vec::new()),
141 provide_dir: Mutex::new(None),
142 path_policy: PathPolicy::Unrestricted,
143 }
144 }
145
146 fn record_allowed_if_restricted(&self, path: &Path) -> BinocResult<()> {
147 if let PathPolicy::Restricted { extra_allowed, .. } = &self.path_policy {
148 let c = std::fs::canonicalize(path).map_err(BinocError::Io)?;
149 extra_allowed.lock().unwrap().push(c);
150 }
151 Ok(())
152 }
153
154 fn enforce_path_policy_resolved(&self, resolved: &Path) -> BinocResult<()> {
155 match &self.path_policy {
156 PathPolicy::Unrestricted => Ok(()),
157 PathPolicy::Restricted {
158 snapshot_a,
159 snapshot_b,
160 extra_allowed,
161 } => {
162 if path_is_within(resolved, snapshot_a) || path_is_within(resolved, snapshot_b) {
163 return Ok(());
164 }
165 let roots = extra_allowed.lock().unwrap();
166 for root in roots.iter() {
167 if path_is_within(resolved, root) {
168 return Ok(());
169 }
170 }
171 Err(BinocError::PathPolicy(format!(
172 "path must stay under snapshot directories or session workspace: {}",
173 resolved.display()
174 )))
175 }
176 }
177 }
178
179 fn enforce_path_policy(&self, physical: &Path) -> BinocResult<()> {
181 let resolved = std::fs::canonicalize(physical).map_err(BinocError::Io)?;
182 self.enforce_path_policy_resolved(&resolved)
183 }
184
185 fn enforce_policy_for_read_path(&self, path: &Path) -> BinocResult<()> {
187 match &self.path_policy {
188 PathPolicy::Unrestricted => Ok(()),
189 PathPolicy::Restricted { .. } => {
190 if let Ok(c) = std::fs::canonicalize(path) {
191 return self.enforce_path_policy_resolved(&c);
192 }
193 let mut probe: Option<&Path> = Some(path);
194 while let Some(p) = probe {
195 if p.as_os_str().is_empty() {
196 break;
197 }
198 if p.exists() {
199 let base = std::fs::canonicalize(p).map_err(BinocError::Io)?;
200 self.enforce_path_policy_resolved(&base)?;
201 return Ok(());
202 }
203 probe = p.parent();
204 }
205 Err(BinocError::PathPolicy(format!(
206 "cannot resolve path under session: {}",
207 path.display()
208 )))
209 }
210 }
211 }
212
213 fn ensure_provide_dir(&self) -> BinocResult<PathBuf> {
214 if let Some(root) = &self.external_root {
215 let d = root.join("_provide");
216 std::fs::create_dir_all(&d).map_err(BinocError::Io)?;
217 self.record_allowed_if_restricted(&d)?;
218 return Ok(d);
219 }
220 let mut guard = self.provide_dir.lock().unwrap();
221 if guard.is_none() {
222 let dir = tempfile::tempdir().map_err(BinocError::Io)?;
223 self.record_allowed_if_restricted(dir.path())?;
224 *guard = Some(dir);
225 }
226 Ok(guard.as_ref().unwrap().path().to_path_buf())
227 }
228}
229
230impl Default for LocalDataAccess {
231 fn default() -> Self {
232 Self::new()
233 }
234}
235
236impl DataAccess for LocalDataAccess {
237 fn read_bytes(&self, item: &ItemRef) -> BinocResult<Vec<u8>> {
238 let p = Path::new(&item.handle);
239 self.enforce_policy_for_read_path(p)?;
240 std::fs::read(p).map_err(BinocError::Io)
241 }
242
243 fn open_read(&self, item: &ItemRef) -> BinocResult<Box<dyn std::io::Read + Send>> {
244 let p = Path::new(&item.handle);
245 self.enforce_policy_for_read_path(p)?;
246 let file = std::fs::File::open(p).map_err(BinocError::Io)?;
247 Ok(Box::new(file))
248 }
249
250 fn local_path(&self, item: &ItemRef) -> BinocResult<PathBuf> {
251 let p = PathBuf::from(&item.handle);
252 self.enforce_policy_for_read_path(&p)?;
253 Ok(p)
254 }
255
256 fn provide(&self, logical_path: &str, content: &[u8]) -> BinocResult<ItemRef> {
257 let dir = self.ensure_provide_dir()?;
258 let safe_name = logical_path.replace(['/', '\\'], "_");
259 let file_path = dir.join(&safe_name);
260 std::fs::write(&file_path, content).map_err(BinocError::Io)?;
261 self.enforce_path_policy(&file_path)?;
262 Ok(item_ref_from_physical(&file_path, logical_path))
263 }
264
265 fn workspace(&self) -> BinocResult<PathBuf> {
266 if let Some(root) = &self.external_root {
267 let n = self.workspace_counter.fetch_add(1, Ordering::Relaxed);
268 let subdir = root.join(format!("ws-{n}"));
269 std::fs::create_dir_all(&subdir).map_err(BinocError::Io)?;
270 self.record_allowed_if_restricted(&subdir)?;
271 return Ok(subdir);
272 }
273 let dir = tempfile::tempdir().map_err(BinocError::Io)?;
274 let path = dir.path().to_path_buf();
275 self.record_allowed_if_restricted(&path)?;
276 self.workspaces.lock().unwrap().push(dir);
277 Ok(path)
278 }
279
280 fn register_local(&self, physical: &Path, logical: &str) -> BinocResult<ItemRef> {
281 self.enforce_path_policy(physical)?;
282 Ok(item_ref_from_physical(physical, logical))
283 }
284
285 fn publish_artifact(
286 &self,
287 format: &ArtifactFormat,
288 subject: ArtifactSubject,
289 producer: &str,
290 data: &[u8],
291 ) -> BinocResult<ArtifactDescriptor> {
292 let dir = artifacts_dir(&self.data_root)
293 .join(safe_name(&format.package))
294 .join(safe_name(&format.name))
295 .join(format!("v{}", format.version))
296 .join(subject_dir_name(subject));
297 std::fs::create_dir_all(&dir).map_err(BinocError::Io)?;
298 let handle = dir.join(safe_name(producer)).to_string_lossy().to_string();
299 std::fs::write(&handle, data).map_err(BinocError::Io)?;
300 Ok(ArtifactDescriptor {
301 format: format.clone(),
302 subject,
303 producer: producer.to_string(),
304 handle,
305 })
306 }
307
308 fn get_artifact(&self, descriptor: &ArtifactDescriptor) -> BinocResult<Option<Vec<u8>>> {
309 let path = PathBuf::from(&descriptor.handle);
310 self.enforce_policy_for_read_path(&path)?;
311 match std::fs::read(&path) {
312 Ok(data) => Ok(Some(data)),
313 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
314 Err(e) => Err(BinocError::Io(e)),
315 }
316 }
317
318 fn data_root(&self) -> BinocResult<PathBuf> {
319 Ok(self.data_root.clone())
320 }
321}
322
323#[cfg(test)]
324mod tests {
325 use super::*;
326
327 #[test]
328 fn publish_and_get_artifact_round_trip() {
329 let da = LocalDataAccess::new();
330 let fmt = ArtifactFormat::new("binoc", "tabular", 1);
331 let desc = da
332 .publish_artifact(&fmt, ArtifactSubject::Left, "binoc.csv", b"hello world")
333 .unwrap();
334 assert_eq!(desc.format, fmt);
335 assert_eq!(desc.subject, ArtifactSubject::Left);
336 assert_eq!(desc.producer, "binoc.csv");
337 let loaded = da.get_artifact(&desc).unwrap();
338 assert_eq!(loaded, Some(b"hello world".to_vec()));
339 }
340
341 #[test]
342 fn get_artifact_missing_returns_none() {
343 let da = LocalDataAccess::new();
344 let desc = ArtifactDescriptor {
345 format: ArtifactFormat::new("nonexistent", "thing", 1),
346 subject: ArtifactSubject::Pair,
347 producer: "test".into(),
348 handle: "/tmp/does-not-exist-binoc-test".into(),
349 };
350 assert_eq!(da.get_artifact(&desc).unwrap(), None);
351 }
352
353 #[test]
354 fn cross_instance_artifact_visibility() {
355 let da = LocalDataAccess::new();
356 let fmt = ArtifactFormat::new("binoc", "tabular", 1);
357 let desc = da
358 .publish_artifact(&fmt, ArtifactSubject::Right, "binoc.csv", b"shared-value")
359 .unwrap();
360 let data_root = da.data_root().unwrap();
361
362 let plugin_da = LocalDataAccess::with_data_root(data_root);
363 let loaded = plugin_da.get_artifact(&desc).unwrap();
364 assert_eq!(loaded, Some(b"shared-value".to_vec()));
365 }
366
367 #[test]
368 fn for_plugin_shares_artifacts() {
369 let da = LocalDataAccess::new();
370 let data_root = da.data_root().unwrap();
371 let ws = da.workspace().unwrap();
372
373 let plugin_da = LocalDataAccess::for_plugin(data_root, ws);
374 let fmt = ArtifactFormat::new("myplugin", "schema", 1);
375 let desc = plugin_da
376 .publish_artifact(&fmt, ArtifactSubject::Pair, "myplugin", b"plugin-data")
377 .unwrap();
378
379 let loaded = da.get_artifact(&desc).unwrap();
380 assert_eq!(loaded, Some(b"plugin-data".to_vec()));
381 }
382
383 #[test]
384 fn data_root_returns_valid_path() {
385 let da = LocalDataAccess::new();
386 let root = da.data_root().unwrap();
387 assert!(root.exists());
388 }
389
390 #[test]
391 fn restricted_rejects_register_outside_snapshots() {
392 let tmp_a = tempfile::tempdir().unwrap();
393 let tmp_b = tempfile::tempdir().unwrap();
394 let outside = tempfile::tempdir().unwrap();
395 std::fs::write(outside.path().join("x.txt"), b"x").unwrap();
396
397 let da = LocalDataAccess::new_for_diff(tmp_a.path(), tmp_b.path()).unwrap();
398 let p = outside.path().join("x.txt");
399 let err = da.register_local(&p, "x.txt").unwrap_err();
400 assert!(matches!(err, BinocError::PathPolicy(_)));
401 }
402
403 #[test]
404 fn restricted_allows_register_under_snapshot() {
405 let tmp_a = tempfile::tempdir().unwrap();
406 let tmp_b = tempfile::tempdir().unwrap();
407 let f = tmp_a.path().join("f.txt");
408 std::fs::write(&f, b"ok").unwrap();
409
410 let da = LocalDataAccess::new_for_diff(tmp_a.path(), tmp_b.path()).unwrap();
411 da.register_local(&f, "f.txt").unwrap();
412 }
413}