Skip to main content

binoc_sdk/
data_access.rs

1use std::path::{Path, PathBuf};
2use std::sync::atomic::{AtomicU32, Ordering};
3use std::sync::Mutex;
4
5use crate::types::{ArtifactDescriptor, ArtifactFormat, ArtifactSubject};
6use crate::{BinocError, BinocResult, DataAccess, ItemRef};
7
8/// In-process DataAccess backed by the local filesystem, temp directories,
9/// and a filesystem-backed artifact store under `data_root/.artifacts/`.
10///
11/// Construction modes:
12///
13/// - [`Self::new`] — unrestricted paths (tests, ad-hoc tooling).
14/// - [`Self::new_for_diff`] — paths must stay under the two snapshot trees or
15///   session workspace (used by the controller).
16/// - [`Self::for_plugin`] — shares the host's `data_root` for artifact access,
17///   plus a pre-allocated workspace for expansion (C ABI plugins).
18/// - [`Self::with_data_root`] — shares an existing `data_root` for artifact
19///   reads only (no expansion workspace).
20pub struct LocalDataAccess {
21    _session_dir: Option<tempfile::TempDir>,
22    data_root: PathBuf,
23    external_root: Option<PathBuf>,
24    workspace_counter: AtomicU32,
25    workspaces: Mutex<Vec<tempfile::TempDir>>,
26    provide_dir: Mutex<Option<tempfile::TempDir>>,
27    path_policy: PathPolicy,
28}
29
30enum PathPolicy {
31    Unrestricted,
32    Restricted {
33        snapshot_a: PathBuf,
34        snapshot_b: PathBuf,
35        extra_allowed: Mutex<Vec<PathBuf>>,
36    },
37}
38
39fn artifacts_dir(data_root: &Path) -> PathBuf {
40    data_root.join(".artifacts")
41}
42
43fn safe_name(s: &str) -> String {
44    s.bytes()
45        .map(|b| {
46            if b.is_ascii_alphanumeric() || b == b'-' || b == b'_' || b == b'.' {
47                (b as char).to_string()
48            } else {
49                format!("%{b:02x}")
50            }
51        })
52        .collect()
53}
54
55fn subject_dir_name(subject: ArtifactSubject) -> &'static str {
56    match subject {
57        ArtifactSubject::Left => "left",
58        ArtifactSubject::Right => "right",
59        ArtifactSubject::Pair => "pair",
60    }
61}
62
63/// True when `path` is `root` or a descendant (component-wise).
64fn path_is_within(path: &Path, root: &Path) -> bool {
65    path.starts_with(root)
66}
67
68fn item_ref_from_physical(physical: &Path, logical: &str) -> ItemRef {
69    ItemRef {
70        logical_path: logical.to_string(),
71        is_dir: physical.is_dir(),
72        content_hash: None,
73        media_type: None,
74        handle: physical.to_string_lossy().to_string(),
75    }
76}
77
78impl LocalDataAccess {
79    pub fn new() -> Self {
80        let session = tempfile::tempdir().expect("failed to create session temp dir");
81        let data_root = session.path().to_path_buf();
82        Self {
83            _session_dir: Some(session),
84            data_root,
85            external_root: None,
86            workspace_counter: AtomicU32::new(0),
87            workspaces: Mutex::new(Vec::new()),
88            provide_dir: Mutex::new(None),
89            path_policy: PathPolicy::Unrestricted,
90        }
91    }
92
93    /// Session-backed access with path confinement: filesystem reads and
94    /// `register_local` targets must lie under the snapshot roots, the session
95    /// `data_root`, or a workspace / provide directory created by this instance.
96    pub fn new_for_diff(snapshot_a: &Path, snapshot_b: &Path) -> BinocResult<Self> {
97        let session = tempfile::tempdir().map_err(BinocError::Io)?;
98        let data_root = session.path().to_path_buf();
99        let snap_a = std::fs::canonicalize(snapshot_a).map_err(BinocError::Io)?;
100        let snap_b = std::fs::canonicalize(snapshot_b).map_err(BinocError::Io)?;
101        let data_root_canon = std::fs::canonicalize(&data_root).map_err(BinocError::Io)?;
102        Ok(Self {
103            _session_dir: Some(session),
104            data_root,
105            external_root: None,
106            workspace_counter: AtomicU32::new(0),
107            workspaces: Mutex::new(Vec::new()),
108            provide_dir: Mutex::new(None),
109            path_policy: PathPolicy::Restricted {
110                snapshot_a: snap_a,
111                snapshot_b: snap_b,
112                extra_allowed: Mutex::new(vec![data_root_canon]),
113            },
114        })
115    }
116
117    /// Create a LocalDataAccess for a plugin running across the C ABI.
118    /// Shares the host's `data_root` for cache access and uses `workspace`
119    /// for expansion (provide, workspace calls).
120    pub fn for_plugin(data_root: PathBuf, workspace: PathBuf) -> Self {
121        Self {
122            _session_dir: None,
123            data_root,
124            external_root: Some(workspace),
125            workspace_counter: AtomicU32::new(0),
126            workspaces: Mutex::new(Vec::new()),
127            provide_dir: Mutex::new(None),
128            path_policy: PathPolicy::Unrestricted,
129        }
130    }
131
132    /// Create a LocalDataAccess that can only read from an existing data_root
133    /// cache. No workspace for expansion. Used during extract-only access.
134    pub fn with_data_root(data_root: PathBuf) -> Self {
135        Self {
136            _session_dir: None,
137            data_root,
138            external_root: None,
139            workspace_counter: AtomicU32::new(0),
140            workspaces: Mutex::new(Vec::new()),
141            provide_dir: Mutex::new(None),
142            path_policy: PathPolicy::Unrestricted,
143        }
144    }
145
146    fn record_allowed_if_restricted(&self, path: &Path) -> BinocResult<()> {
147        if let PathPolicy::Restricted { extra_allowed, .. } = &self.path_policy {
148            let c = std::fs::canonicalize(path).map_err(BinocError::Io)?;
149            extra_allowed.lock().unwrap().push(c);
150        }
151        Ok(())
152    }
153
154    fn enforce_path_policy_resolved(&self, resolved: &Path) -> BinocResult<()> {
155        match &self.path_policy {
156            PathPolicy::Unrestricted => Ok(()),
157            PathPolicy::Restricted {
158                snapshot_a,
159                snapshot_b,
160                extra_allowed,
161            } => {
162                if path_is_within(resolved, snapshot_a) || path_is_within(resolved, snapshot_b) {
163                    return Ok(());
164                }
165                let roots = extra_allowed.lock().unwrap();
166                for root in roots.iter() {
167                    if path_is_within(resolved, root) {
168                        return Ok(());
169                    }
170                }
171                Err(BinocError::PathPolicy(format!(
172                    "path must stay under snapshot directories or session workspace: {}",
173                    resolved.display()
174                )))
175            }
176        }
177    }
178
179    /// Enforce policy for a path that must already exist on disk (e.g. `register_local`).
180    fn enforce_path_policy(&self, physical: &Path) -> BinocResult<()> {
181        let resolved = std::fs::canonicalize(physical).map_err(BinocError::Io)?;
182        self.enforce_path_policy_resolved(&resolved)
183    }
184
185    /// Enforce policy before reading; allows missing leaf paths under an allowed directory.
186    fn enforce_policy_for_read_path(&self, path: &Path) -> BinocResult<()> {
187        match &self.path_policy {
188            PathPolicy::Unrestricted => Ok(()),
189            PathPolicy::Restricted { .. } => {
190                if let Ok(c) = std::fs::canonicalize(path) {
191                    return self.enforce_path_policy_resolved(&c);
192                }
193                let mut probe: Option<&Path> = Some(path);
194                while let Some(p) = probe {
195                    if p.as_os_str().is_empty() {
196                        break;
197                    }
198                    if p.exists() {
199                        let base = std::fs::canonicalize(p).map_err(BinocError::Io)?;
200                        self.enforce_path_policy_resolved(&base)?;
201                        return Ok(());
202                    }
203                    probe = p.parent();
204                }
205                Err(BinocError::PathPolicy(format!(
206                    "cannot resolve path under session: {}",
207                    path.display()
208                )))
209            }
210        }
211    }
212
213    fn ensure_provide_dir(&self) -> BinocResult<PathBuf> {
214        if let Some(root) = &self.external_root {
215            let d = root.join("_provide");
216            std::fs::create_dir_all(&d).map_err(BinocError::Io)?;
217            self.record_allowed_if_restricted(&d)?;
218            return Ok(d);
219        }
220        let mut guard = self.provide_dir.lock().unwrap();
221        if guard.is_none() {
222            let dir = tempfile::tempdir().map_err(BinocError::Io)?;
223            self.record_allowed_if_restricted(dir.path())?;
224            *guard = Some(dir);
225        }
226        Ok(guard.as_ref().unwrap().path().to_path_buf())
227    }
228}
229
230impl Default for LocalDataAccess {
231    fn default() -> Self {
232        Self::new()
233    }
234}
235
236impl DataAccess for LocalDataAccess {
237    fn read_bytes(&self, item: &ItemRef) -> BinocResult<Vec<u8>> {
238        let p = Path::new(&item.handle);
239        self.enforce_policy_for_read_path(p)?;
240        std::fs::read(p).map_err(BinocError::Io)
241    }
242
243    fn open_read(&self, item: &ItemRef) -> BinocResult<Box<dyn std::io::Read + Send>> {
244        let p = Path::new(&item.handle);
245        self.enforce_policy_for_read_path(p)?;
246        let file = std::fs::File::open(p).map_err(BinocError::Io)?;
247        Ok(Box::new(file))
248    }
249
250    fn local_path(&self, item: &ItemRef) -> BinocResult<PathBuf> {
251        let p = PathBuf::from(&item.handle);
252        self.enforce_policy_for_read_path(&p)?;
253        Ok(p)
254    }
255
256    fn provide(&self, logical_path: &str, content: &[u8]) -> BinocResult<ItemRef> {
257        let dir = self.ensure_provide_dir()?;
258        let safe_name = logical_path.replace(['/', '\\'], "_");
259        let file_path = dir.join(&safe_name);
260        std::fs::write(&file_path, content).map_err(BinocError::Io)?;
261        self.enforce_path_policy(&file_path)?;
262        Ok(item_ref_from_physical(&file_path, logical_path))
263    }
264
265    fn workspace(&self) -> BinocResult<PathBuf> {
266        if let Some(root) = &self.external_root {
267            let n = self.workspace_counter.fetch_add(1, Ordering::Relaxed);
268            let subdir = root.join(format!("ws-{n}"));
269            std::fs::create_dir_all(&subdir).map_err(BinocError::Io)?;
270            self.record_allowed_if_restricted(&subdir)?;
271            return Ok(subdir);
272        }
273        let dir = tempfile::tempdir().map_err(BinocError::Io)?;
274        let path = dir.path().to_path_buf();
275        self.record_allowed_if_restricted(&path)?;
276        self.workspaces.lock().unwrap().push(dir);
277        Ok(path)
278    }
279
280    fn register_local(&self, physical: &Path, logical: &str) -> BinocResult<ItemRef> {
281        self.enforce_path_policy(physical)?;
282        Ok(item_ref_from_physical(physical, logical))
283    }
284
285    fn publish_artifact(
286        &self,
287        format: &ArtifactFormat,
288        subject: ArtifactSubject,
289        producer: &str,
290        data: &[u8],
291    ) -> BinocResult<ArtifactDescriptor> {
292        let dir = artifacts_dir(&self.data_root)
293            .join(safe_name(&format.package))
294            .join(safe_name(&format.name))
295            .join(format!("v{}", format.version))
296            .join(subject_dir_name(subject));
297        std::fs::create_dir_all(&dir).map_err(BinocError::Io)?;
298        let handle = dir.join(safe_name(producer)).to_string_lossy().to_string();
299        std::fs::write(&handle, data).map_err(BinocError::Io)?;
300        Ok(ArtifactDescriptor {
301            format: format.clone(),
302            subject,
303            producer: producer.to_string(),
304            handle,
305        })
306    }
307
308    fn get_artifact(&self, descriptor: &ArtifactDescriptor) -> BinocResult<Option<Vec<u8>>> {
309        let path = PathBuf::from(&descriptor.handle);
310        self.enforce_policy_for_read_path(&path)?;
311        match std::fs::read(&path) {
312            Ok(data) => Ok(Some(data)),
313            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
314            Err(e) => Err(BinocError::Io(e)),
315        }
316    }
317
318    fn data_root(&self) -> BinocResult<PathBuf> {
319        Ok(self.data_root.clone())
320    }
321}
322
323#[cfg(test)]
324mod tests {
325    use super::*;
326
327    #[test]
328    fn publish_and_get_artifact_round_trip() {
329        let da = LocalDataAccess::new();
330        let fmt = ArtifactFormat::new("binoc", "tabular", 1);
331        let desc = da
332            .publish_artifact(&fmt, ArtifactSubject::Left, "binoc.csv", b"hello world")
333            .unwrap();
334        assert_eq!(desc.format, fmt);
335        assert_eq!(desc.subject, ArtifactSubject::Left);
336        assert_eq!(desc.producer, "binoc.csv");
337        let loaded = da.get_artifact(&desc).unwrap();
338        assert_eq!(loaded, Some(b"hello world".to_vec()));
339    }
340
341    #[test]
342    fn get_artifact_missing_returns_none() {
343        let da = LocalDataAccess::new();
344        let desc = ArtifactDescriptor {
345            format: ArtifactFormat::new("nonexistent", "thing", 1),
346            subject: ArtifactSubject::Pair,
347            producer: "test".into(),
348            handle: "/tmp/does-not-exist-binoc-test".into(),
349        };
350        assert_eq!(da.get_artifact(&desc).unwrap(), None);
351    }
352
353    #[test]
354    fn cross_instance_artifact_visibility() {
355        let da = LocalDataAccess::new();
356        let fmt = ArtifactFormat::new("binoc", "tabular", 1);
357        let desc = da
358            .publish_artifact(&fmt, ArtifactSubject::Right, "binoc.csv", b"shared-value")
359            .unwrap();
360        let data_root = da.data_root().unwrap();
361
362        let plugin_da = LocalDataAccess::with_data_root(data_root);
363        let loaded = plugin_da.get_artifact(&desc).unwrap();
364        assert_eq!(loaded, Some(b"shared-value".to_vec()));
365    }
366
367    #[test]
368    fn for_plugin_shares_artifacts() {
369        let da = LocalDataAccess::new();
370        let data_root = da.data_root().unwrap();
371        let ws = da.workspace().unwrap();
372
373        let plugin_da = LocalDataAccess::for_plugin(data_root, ws);
374        let fmt = ArtifactFormat::new("myplugin", "schema", 1);
375        let desc = plugin_da
376            .publish_artifact(&fmt, ArtifactSubject::Pair, "myplugin", b"plugin-data")
377            .unwrap();
378
379        let loaded = da.get_artifact(&desc).unwrap();
380        assert_eq!(loaded, Some(b"plugin-data".to_vec()));
381    }
382
383    #[test]
384    fn data_root_returns_valid_path() {
385        let da = LocalDataAccess::new();
386        let root = da.data_root().unwrap();
387        assert!(root.exists());
388    }
389
390    #[test]
391    fn restricted_rejects_register_outside_snapshots() {
392        let tmp_a = tempfile::tempdir().unwrap();
393        let tmp_b = tempfile::tempdir().unwrap();
394        let outside = tempfile::tempdir().unwrap();
395        std::fs::write(outside.path().join("x.txt"), b"x").unwrap();
396
397        let da = LocalDataAccess::new_for_diff(tmp_a.path(), tmp_b.path()).unwrap();
398        let p = outside.path().join("x.txt");
399        let err = da.register_local(&p, "x.txt").unwrap_err();
400        assert!(matches!(err, BinocError::PathPolicy(_)));
401    }
402
403    #[test]
404    fn restricted_allows_register_under_snapshot() {
405        let tmp_a = tempfile::tempdir().unwrap();
406        let tmp_b = tempfile::tempdir().unwrap();
407        let f = tmp_a.path().join("f.txt");
408        std::fs::write(&f, b"ok").unwrap();
409
410        let da = LocalDataAccess::new_for_diff(tmp_a.path(), tmp_b.path()).unwrap();
411        da.register_local(&f, "f.txt").unwrap();
412    }
413}