Skip to main content

meshdb_executor/
apoc_load.rs

1//! Implementation of the `apoc.load.*` procedure family.
2//!
3//! The shipped surface is `apoc.load.json` and `apoc.load.csv`
4//! — enough to cover the common "pull data into the graph
5//! from somewhere" pattern without adding the XML parser. Both
6//! stream: CSV streams row-by-row natively via the `csv`
7//! crate; JSON fetches the full body first (unavoidable
8//! without a streaming JSON parser — Neo4j APOC does the same)
9//! and then iterates over the top-level array.
10//!
11//! # Security model
12//!
13//! `apoc.load.*` opens a door — once enabled, a Cypher query
14//! can read any file the server process can access and make
15//! arbitrary HTTP requests. The default [`ImportConfig`] is
16//! strict-disabled; operators opt in explicitly via the
17//! `[apoc.import]` section of the server config. Gates:
18//!
19//! * **Master switch** (`enabled`): off by default. When off,
20//!   every `apoc.load.*` call fails with a message explaining
21//!   which config key to flip.
22//! * **File vs HTTP**: `allow_file` / `allow_http` are
23//!   independent — an operator who wants read-only-from-disk
24//!   sets `allow_file` but leaves `allow_http` off, closing
25//!   the SSRF vector.
26//! * **Scoping**: `file_root` restricts file reads to a
27//!   directory tree (no traversal past symlinks). `url_allowlist`
28//!   restricts HTTP fetches to prefix-matched URLs.
29//! * **Dev escape hatch**: `allow_unrestricted` disables all
30//!   checks, for local dev only. Never flip on a prod server.
31//!
32//! [`resolve_source`] is the single choke point — every cursor
33//! calls it before opening any I/O handle. Adding a new load
34//! surface means consuming the resolved [`Source`], never
35//! re-parsing the input string.
36
37use crate::error::{Error, Result};
38use crate::procedures::{ProcCursor, ProcRow, ProcedureRegistry};
39use crate::reader::GraphReader;
40use crate::value::Value;
41use meshdb_core::Property;
42use serde::Deserialize;
43use std::collections::HashMap;
44use std::path::PathBuf;
45
46/// Runtime security configuration for `apoc.load.*` (and,
47/// ultimately, `apoc.export.*`). The top-level `enabled` flag
48/// is the master switch; a still-enabled config with both
49/// `allow_file` and `allow_http` off is legal but makes every
50/// call fail — useful for wiring the feature in on a staging
51/// host while the allowlists are still being decided.
52#[derive(Debug, Clone, Default, Deserialize)]
53#[serde(default)]
54pub struct ImportConfig {
55    /// Master gate. When `false`, every `apoc.load.*` call
56    /// refuses with a message naming the config key.
57    pub enabled: bool,
58    /// When `true`, `file://` and plain-path inputs are
59    /// accepted. Subject to [`Self::file_root`].
60    pub allow_file: bool,
61    /// When `true`, `http://` and `https://` URLs are accepted.
62    /// Subject to [`Self::url_allowlist`].
63    pub allow_http: bool,
64    /// When set, file reads are pinned to this directory tree.
65    /// An input path must be inside it after canonicalisation;
66    /// symlinks pointing outside are rejected. `None` means no
67    /// restriction — `allow_file = true` with no root is
68    /// equivalent to "any file the process can read", so prefer
69    /// setting it even in trusted environments.
70    pub file_root: Option<PathBuf>,
71    /// When non-empty, an HTTP URL must start with one of these
72    /// prefixes. Simple prefix match — no regex — to keep the
73    /// audit story straightforward. An empty list plus
74    /// `allow_http = true` permits any URL.
75    #[serde(default)]
76    pub url_allowlist: Vec<String>,
77    /// Dev-only escape hatch. When `true`, all other checks are
78    /// bypassed — useful for local testing but absolutely never
79    /// for production. Keeps the separate gates visible in
80    /// config review.
81    pub allow_unrestricted: bool,
82}
83
84/// A resolved, vetted import source. Constructed only by
85/// [`resolve_source`], which enforces every ImportConfig gate
86/// before handing out a variant. Downstream cursors consume
87/// this — they never see the raw user string.
88#[derive(Debug)]
89pub enum Source {
90    File(PathBuf),
91    Url(String),
92}
93
94/// Apply every security gate and resolve `input` to a
95/// [`Source`] or a clear error naming which gate refused.
96///
97/// Rejection precedence (highest first):
98///
99/// 1. `enabled` off → refuse everything.
100/// 2. Scheme detection (`http://` / `https://` / `file://` /
101///    bare path).
102/// 3. `allow_http` / `allow_file` off for the detected scheme.
103/// 4. `file_root` canonicalisation mismatch / `url_allowlist`
104///    prefix miss.
105///
106/// `allow_unrestricted` short-circuits 2–4 but never 1 —
107/// disabling the feature always wins.
108pub fn resolve_source(cfg: &ImportConfig, input: &str) -> Result<Source> {
109    if !cfg.enabled {
110        return Err(Error::Procedure(
111            "apoc.load.* is disabled — set apoc.import.enabled = true in the server config".into(),
112        ));
113    }
114    let trimmed = input.trim();
115    if trimmed.is_empty() {
116        return Err(Error::Procedure(
117            "apoc.load.*: source path/URL must not be empty".into(),
118        ));
119    }
120
121    // Scheme detection. `http://` / `https://` → URL; `file://`
122    // strips the scheme and falls through to the path branch;
123    // anything else is treated as a local path.
124    let (is_http, raw_path) = if let Some(rest) = trimmed.strip_prefix("http://") {
125        (true, Some(format!("http://{rest}")))
126    } else if let Some(rest) = trimmed.strip_prefix("https://") {
127        (true, Some(format!("https://{rest}")))
128    } else if let Some(rest) = trimmed.strip_prefix("file://") {
129        (false, Some(rest.to_string()))
130    } else {
131        (false, Some(trimmed.to_string()))
132    };
133
134    if cfg.allow_unrestricted {
135        return if is_http {
136            Ok(Source::Url(raw_path.unwrap()))
137        } else {
138            Ok(Source::File(PathBuf::from(raw_path.unwrap())))
139        };
140    }
141
142    if is_http {
143        if !cfg.allow_http {
144            return Err(Error::Procedure(
145                "apoc.load.*: HTTP access is disabled — set apoc.import.allow_http = true".into(),
146            ));
147        }
148        let url = raw_path.unwrap();
149        if !cfg.url_allowlist.is_empty()
150            && !cfg
151                .url_allowlist
152                .iter()
153                .any(|prefix| url.starts_with(prefix))
154        {
155            return Err(Error::Procedure(format!(
156                "apoc.load.*: URL '{url}' does not match any entry in apoc.import.url_allowlist"
157            )));
158        }
159        Ok(Source::Url(url))
160    } else {
161        if !cfg.allow_file {
162            return Err(Error::Procedure(
163                "apoc.load.*: file access is disabled — set apoc.import.allow_file = true".into(),
164            ));
165        }
166        let path = PathBuf::from(raw_path.unwrap());
167        if let Some(root) = &cfg.file_root {
168            let canonical_root = root
169                .canonicalize()
170                .map_err(|e| Error::Procedure(format!("apoc.load.*: file_root {root:?}: {e}")))?;
171            // Resolve relative paths against the root, absolute
172            // paths stay absolute; either way we canonicalise
173            // and confirm the result is inside the root.
174            let target = if path.is_absolute() {
175                path.clone()
176            } else {
177                canonical_root.join(&path)
178            };
179            let canonical_target = target.canonicalize().map_err(|e| {
180                Error::Procedure(format!(
181                    "apoc.load.*: failed to resolve path '{}': {e}",
182                    path.display()
183                ))
184            })?;
185            if !canonical_target.starts_with(&canonical_root) {
186                return Err(Error::Procedure(format!(
187                    "apoc.load.*: path '{}' is outside the configured import root '{}'",
188                    canonical_target.display(),
189                    canonical_root.display()
190                )));
191            }
192            Ok(Source::File(canonical_target))
193        } else {
194            Ok(Source::File(path))
195        }
196    }
197}
198
199/// Resolve a file-destination string for `apoc.export.*` against
200/// the same ImportConfig gates `resolve_source` uses. Differences:
201///
202/// * HTTP URLs are rejected — exports are local-only in this
203///   release.
204/// * The target file may not yet exist; we canonicalise the
205///   parent directory and join the filename onto it, confirming
206///   the parent sits inside `file_root` when configured.
207/// * Existing files are silently truncated on write (the
208///   individual export functions are responsible for opening
209///   with `File::create` — this helper only vets the path).
210#[cfg(feature = "apoc-export")]
211pub fn resolve_export_path(cfg: &ImportConfig, input: &str) -> Result<PathBuf> {
212    if !cfg.enabled {
213        return Err(Error::Procedure(
214            "apoc.export.* is disabled — set apoc.import.enabled = true in the server config"
215                .into(),
216        ));
217    }
218    let trimmed = input.trim();
219    if trimmed.is_empty() {
220        return Err(Error::Procedure(
221            "apoc.export.*: destination path must not be empty".into(),
222        ));
223    }
224    // Reject HTTP explicitly — the user is probably expecting
225    // something we don't support, so a clear error beats
226    // accepting the path as a filename.
227    if trimmed.starts_with("http://") || trimmed.starts_with("https://") {
228        return Err(Error::Procedure(
229            "apoc.export.*: HTTP destinations are not supported — write to a local file".into(),
230        ));
231    }
232    let raw_path = if let Some(rest) = trimmed.strip_prefix("file://") {
233        rest.to_string()
234    } else {
235        trimmed.to_string()
236    };
237    if cfg.allow_unrestricted {
238        return Ok(PathBuf::from(raw_path));
239    }
240    if !cfg.allow_file {
241        return Err(Error::Procedure(
242            "apoc.export.*: file writes disabled — set apoc.import.allow_file = true".into(),
243        ));
244    }
245    let path = PathBuf::from(raw_path);
246    if let Some(root) = &cfg.file_root {
247        let canonical_root = root
248            .canonicalize()
249            .map_err(|e| Error::Procedure(format!("apoc.export.*: file_root {root:?}: {e}")))?;
250        // Split into parent + file_name so we can canonicalise
251        // just the parent (which must exist) while leaving the
252        // target filename unresolved.
253        let target = if path.is_absolute() {
254            path.clone()
255        } else {
256            canonical_root.join(&path)
257        };
258        let parent = target.parent().ok_or_else(|| {
259            Error::Procedure(format!(
260                "apoc.export.*: destination '{}' has no parent directory",
261                target.display()
262            ))
263        })?;
264        let file_name = target.file_name().ok_or_else(|| {
265            Error::Procedure(format!(
266                "apoc.export.*: destination '{}' has no file name component",
267                target.display()
268            ))
269        })?;
270        let canonical_parent = parent.canonicalize().map_err(|e| {
271            Error::Procedure(format!(
272                "apoc.export.*: parent directory '{}' does not exist: {e}",
273                parent.display()
274            ))
275        })?;
276        if !canonical_parent.starts_with(&canonical_root) {
277            return Err(Error::Procedure(format!(
278                "apoc.export.*: path '{}' is outside the configured import root '{}'",
279                canonical_parent.display(),
280                canonical_root.display()
281            )));
282        }
283        Ok(canonical_parent.join(file_name))
284    } else {
285        Ok(path)
286    }
287}
288
289/// Fetch a resolved [`Source`] as a byte buffer. Files go
290/// through `std::fs`; URLs go through `reqwest::blocking`.
291/// Both errors surface with a clear prefix so the call site
292/// doesn't need to re-decorate them.
293fn fetch_bytes(source: &Source) -> Result<Vec<u8>> {
294    match source {
295        Source::File(path) => std::fs::read(path)
296            .map_err(|e| Error::Procedure(format!("apoc.load.*: cannot read file {path:?}: {e}"))),
297        Source::Url(url) => {
298            let resp = reqwest::blocking::get(url).map_err(|e| {
299                Error::Procedure(format!("apoc.load.*: HTTP request to {url} failed: {e}"))
300            })?;
301            let status = resp.status();
302            if !status.is_success() {
303                return Err(Error::Procedure(format!(
304                    "apoc.load.*: HTTP {status} from {url}"
305                )));
306            }
307            resp.bytes()
308                .map(|b| b.to_vec())
309                .map_err(|e| Error::Procedure(format!("apoc.load.*: reading body from {url}: {e}")))
310        }
311    }
312}
313
314/// Convert a `serde_json::Value` to a Mesh `Property`. Numbers
315/// that fit in `i64` stay integer; bigger / fractional numbers
316/// become `Float64`. JSON null maps to `Property::Null`.
317/// Containers recurse.
318fn json_to_property(value: &serde_json::Value) -> Property {
319    match value {
320        serde_json::Value::Null => Property::Null,
321        serde_json::Value::Bool(b) => Property::Bool(*b),
322        serde_json::Value::Number(n) => {
323            if let Some(i) = n.as_i64() {
324                Property::Int64(i)
325            } else {
326                Property::Float64(n.as_f64().unwrap_or(f64::NAN))
327            }
328        }
329        serde_json::Value::String(s) => Property::String(s.clone()),
330        serde_json::Value::Array(items) => {
331            Property::List(items.iter().map(json_to_property).collect())
332        }
333        serde_json::Value::Object(entries) => Property::Map(
334            entries
335                .iter()
336                .map(|(k, v)| (k.clone(), json_to_property(v)))
337                .collect(),
338        ),
339    }
340}
341
342/// Cursor for `apoc.load.json(urlOrPath [, path])`. Fetches
343/// the full body on first advance (there's no streaming JSON
344/// parser in the workspace and Neo4j APOC uses the same
345/// strategy), then walks the top-level container one element
346/// at a time. `path` (a JSON Pointer like `/items`) descends
347/// into the document before iteration starts; a missing
348/// pointer target errors loudly.
349pub struct LoadJsonCursor {
350    config: ImportConfig,
351    input: String,
352    json_pointer: Option<String>,
353    /// Parsed body after the first advance. `None` until
354    /// iteration starts, then `Some` with the array / single-
355    /// element vector.
356    items: Option<Vec<serde_json::Value>>,
357    idx: usize,
358}
359
360impl LoadJsonCursor {
361    pub fn new(config: ImportConfig, input: String, json_pointer: Option<String>) -> Self {
362        Self {
363            config,
364            input,
365            json_pointer,
366            items: None,
367            idx: 0,
368        }
369    }
370
371    /// Lazily fetch + parse the source on first advance. Caches
372    /// the resulting Vec in `self.items` so subsequent calls
373    /// just walk the index.
374    fn ensure_loaded(&mut self) -> Result<()> {
375        if self.items.is_some() {
376            return Ok(());
377        }
378        let source = resolve_source(&self.config, &self.input)?;
379        let bytes = fetch_bytes(&source)?;
380        let root: serde_json::Value = serde_json::from_slice(&bytes)
381            .map_err(|e| Error::Procedure(format!("apoc.load.json: parse error: {e}")))?;
382        let descended = if let Some(ptr) = &self.json_pointer {
383            root.pointer(ptr).cloned().ok_or_else(|| {
384                Error::Procedure(format!(
385                    "apoc.load.json: JSON pointer '{ptr}' did not resolve in the document"
386                ))
387            })?
388        } else {
389            root
390        };
391        // Arrays iterate; anything else yields a single row.
392        // An empty object still yields one row ({}) per APOC
393        // convention; an empty array yields zero rows.
394        let items = match descended {
395            serde_json::Value::Array(arr) => arr,
396            other => vec![other],
397        };
398        self.items = Some(items);
399        Ok(())
400    }
401}
402
403impl ProcCursor for LoadJsonCursor {
404    fn advance(&mut self, _reader: &dyn GraphReader) -> Result<Option<ProcRow>> {
405        self.ensure_loaded()?;
406        let items = self.items.as_ref().expect("ensure_loaded set self.items");
407        if self.idx >= items.len() {
408            return Ok(None);
409        }
410        let item = &items[self.idx];
411        self.idx += 1;
412        let mut row: ProcRow = HashMap::new();
413        row.insert("value".to_string(), Value::Property(json_to_property(item)));
414        Ok(Some(row))
415    }
416}
417
418/// Per-call configuration for `apoc.load.csv`. Parsed from the
419/// procedure's third argument (a map) at call time.
420#[derive(Debug, Clone)]
421struct LoadCsvConfig {
422    /// When `true` (default), the first row is treated as the
423    /// header — the `map` output column is keyed by these
424    /// names. When `false`, only `lineNo` and `list` are
425    /// populated; `map` is empty.
426    headers: bool,
427    /// Column separator. Defaults to `,`. Single-byte only —
428    /// multi-byte separators aren't in Neo4j APOC either.
429    delimiter: u8,
430}
431
432impl Default for LoadCsvConfig {
433    fn default() -> Self {
434        Self {
435            headers: true,
436            delimiter: b',',
437        }
438    }
439}
440
441/// Cursor for `apoc.load.csv(urlOrPath [, config])`. Uses
442/// `csv::Reader` so records stream rather than materialising
443/// the full file. The header row (when enabled) is read
444/// eagerly on first advance so every subsequent row can zip
445/// against it.
446pub struct LoadCsvCursor {
447    config: ImportConfig,
448    input: String,
449    csv_config: LoadCsvConfig,
450    /// Lazy reader — initialised on first advance since
451    /// construction is fallible (network / file open).
452    state: Option<CsvState>,
453    line_no: i64,
454}
455
456/// Initialised CSV reader state. Separated out so the `Option`
457/// wrapper on the cursor is clean.
458struct CsvState {
459    reader: csv::Reader<Box<dyn std::io::Read>>,
460    /// Header names captured off the first record; empty when
461    /// `headers = false`.
462    headers: Vec<String>,
463}
464
465impl LoadCsvCursor {
466    pub fn new(config: ImportConfig, input: String, csv_config_map: Option<&Property>) -> Self {
467        let csv_config = match csv_config_map {
468            Some(Property::Map(entries)) => parse_csv_config(entries),
469            _ => LoadCsvConfig::default(),
470        };
471        Self {
472            config,
473            input,
474            csv_config,
475            state: None,
476            line_no: 0,
477        }
478    }
479
480    fn ensure_opened(&mut self) -> Result<()> {
481        if self.state.is_some() {
482            return Ok(());
483        }
484        let source = resolve_source(&self.config, &self.input)?;
485        // For file sources stream from disk; for URLs we have
486        // to fetch the full body first (reqwest::blocking hands
487        // back `bytes()` not a `Read`), then wrap in a Cursor.
488        // True HTTP streaming would want `reqwest::blocking::Response::copy_to`
489        // but the csv::Reader API takes owned Read, so we'd
490        // need an adapter — skip for now; large CSVs from HTTP
491        // pay the buffer cost.
492        let reader_box: Box<dyn std::io::Read> = match source {
493            Source::File(path) => {
494                let f = std::fs::File::open(&path).map_err(|e| {
495                    Error::Procedure(format!("apoc.load.csv: cannot open file {path:?}: {e}"))
496                })?;
497                Box::new(f)
498            }
499            Source::Url(url) => {
500                let resp = reqwest::blocking::get(&url).map_err(|e| {
501                    Error::Procedure(format!("apoc.load.csv: HTTP request to {url} failed: {e}"))
502                })?;
503                let status = resp.status();
504                if !status.is_success() {
505                    return Err(Error::Procedure(format!(
506                        "apoc.load.csv: HTTP {status} from {url}"
507                    )));
508                }
509                let bytes = resp.bytes().map_err(|e| {
510                    Error::Procedure(format!("apoc.load.csv: reading body from {url}: {e}"))
511                })?;
512                Box::new(std::io::Cursor::new(bytes.to_vec()))
513            }
514        };
515        let mut builder = csv::ReaderBuilder::new();
516        builder
517            .has_headers(self.csv_config.headers)
518            .delimiter(self.csv_config.delimiter);
519        let mut reader = builder.from_reader(reader_box);
520        let headers = if self.csv_config.headers {
521            reader
522                .headers()
523                .map_err(|e| Error::Procedure(format!("apoc.load.csv: reading headers: {e}")))?
524                .iter()
525                .map(|s| s.to_string())
526                .collect()
527        } else {
528            Vec::new()
529        };
530        self.state = Some(CsvState { reader, headers });
531        Ok(())
532    }
533}
534
535/// Parse the per-call config map passed as the third arg. Only
536/// `header` (singular, per Neo4j APOC) and `sep` (alias for
537/// delimiter) are honoured today; unknown keys are ignored so
538/// future additions don't break existing queries.
539fn parse_csv_config(entries: &HashMap<String, Property>) -> LoadCsvConfig {
540    let mut cfg = LoadCsvConfig::default();
541    if let Some(Property::Bool(b)) = entries.get("header") {
542        cfg.headers = *b;
543    }
544    if let Some(Property::String(s)) = entries.get("sep") {
545        if let Some(first) = s.bytes().next() {
546            cfg.delimiter = first;
547        }
548    }
549    cfg
550}
551
552impl ProcCursor for LoadCsvCursor {
553    fn advance(&mut self, _reader: &dyn GraphReader) -> Result<Option<ProcRow>> {
554        self.ensure_opened()?;
555        let state = self.state.as_mut().expect("ensure_opened set state");
556        let mut record = csv::StringRecord::new();
557        let has_record = state
558            .reader
559            .read_record(&mut record)
560            .map_err(|e| Error::Procedure(format!("apoc.load.csv: reading record: {e}")))?;
561        if !has_record {
562            return Ok(None);
563        }
564        self.line_no += 1;
565        let list: Vec<Property> = record
566            .iter()
567            .map(|s| Property::String(s.to_string()))
568            .collect();
569        let map: HashMap<String, Property> = if state.headers.is_empty() {
570            HashMap::new()
571        } else {
572            state
573                .headers
574                .iter()
575                .zip(record.iter())
576                .map(|(h, v)| (h.clone(), Property::String(v.to_string())))
577                .collect()
578        };
579        let mut row: ProcRow = HashMap::new();
580        row.insert(
581            "lineNo".to_string(),
582            Value::Property(Property::Int64(self.line_no)),
583        );
584        row.insert("list".to_string(), Value::Property(Property::List(list)));
585        row.insert("map".to_string(), Value::Property(Property::Map(map)));
586        Ok(Some(row))
587    }
588}
589
590/// Extract the [`ImportConfig`] from the [`ProcedureRegistry`].
591/// A missing config is treated as "all-disabled" so
592/// mis-configured servers fail closed rather than surprising
593/// the operator.
594pub fn import_config_from_registry(registry: &ProcedureRegistry) -> ImportConfig {
595    registry.import_config().cloned().unwrap_or_default()
596}
597
598/// Helper for constructing cursors: extract a String arg with
599/// null-propagation semantics replaced by an explicit error
600/// (null-as-source doesn't make sense for load procedures).
601pub fn expect_source_arg(v: &Value, position: &str) -> Result<String> {
602    match v {
603        Value::Property(Property::String(s)) => Ok(s.clone()),
604        Value::Null | Value::Property(Property::Null) => Err(Error::Procedure(format!(
605            "apoc.load.*: {position} must be a string, got null"
606        ))),
607        other => Err(Error::Procedure(format!(
608            "apoc.load.*: {position} must be a string, got {other:?}"
609        ))),
610    }
611}
612
613/// Helper for extracting an optional string arg (used by
614/// `apoc.load.json`'s second `path` / JSON Pointer argument).
615pub fn expect_optional_string(v: &Value, position: &str) -> Result<Option<String>> {
616    match v {
617        Value::Property(Property::String(s)) if s.is_empty() => Ok(None),
618        Value::Property(Property::String(s)) => Ok(Some(s.clone())),
619        Value::Null | Value::Property(Property::Null) => Ok(None),
620        other => Err(Error::Procedure(format!(
621            "apoc.load.*: {position} must be a string or null, got {other:?}"
622        ))),
623    }
624}
625
626/// Hint that the argument is also allowed to be a `Property::Map`,
627/// which we pass straight through to [`LoadCsvCursor::new`].
628pub fn expect_optional_config_map(v: &Value) -> Result<Option<Property>> {
629    match v {
630        Value::Null | Value::Property(Property::Null) => Ok(None),
631        Value::Property(Property::Map(_)) => Ok(Some(match v {
632            Value::Property(p) => p.clone(),
633            _ => unreachable!(),
634        })),
635        Value::Map(entries) => {
636            // Lower Value::Map to Property::Map, dropping any
637            // graph-element entries (a config map has no reason
638            // to carry them).
639            let mut out: HashMap<String, Property> = HashMap::new();
640            for (k, v) in entries {
641                if let Value::Property(p) = v {
642                    out.insert(k.clone(), p.clone());
643                }
644            }
645            Ok(Some(Property::Map(out)))
646        }
647        other => Err(Error::Procedure(format!(
648            "apoc.load.*: config argument must be a map or null, got {other:?}"
649        ))),
650    }
651}
652
653#[cfg(test)]
654mod tests {
655    use super::*;
656    use std::path::Path;
657
658    fn strict_disabled() -> ImportConfig {
659        ImportConfig::default()
660    }
661
662    fn allow_files_only(root: Option<&Path>) -> ImportConfig {
663        ImportConfig {
664            enabled: true,
665            allow_file: true,
666            allow_http: false,
667            file_root: root.map(PathBuf::from),
668            url_allowlist: Vec::new(),
669            allow_unrestricted: false,
670        }
671    }
672
673    #[test]
674    fn resolve_source_strict_disabled_refuses_everything() {
675        let cfg = strict_disabled();
676        let err = resolve_source(&cfg, "/tmp/whatever.json").unwrap_err();
677        assert!(err.to_string().contains("apoc.import.enabled"));
678    }
679
680    #[test]
681    fn resolve_source_http_refused_when_allow_http_false() {
682        let cfg = allow_files_only(None);
683        let err = resolve_source(&cfg, "https://example.com/data.json").unwrap_err();
684        assert!(err.to_string().contains("allow_http"));
685    }
686
687    #[test]
688    fn resolve_source_file_refused_when_allow_file_false() {
689        let cfg = ImportConfig {
690            enabled: true,
691            allow_file: false,
692            allow_http: true,
693            ..ImportConfig::default()
694        };
695        let err = resolve_source(&cfg, "/tmp/data.csv").unwrap_err();
696        assert!(err.to_string().contains("allow_file"));
697    }
698
699    #[test]
700    fn resolve_source_file_root_rejects_traversal_outside() {
701        let dir = tempfile::tempdir().unwrap();
702        let outside = tempfile::NamedTempFile::new().unwrap();
703        let cfg = allow_files_only(Some(dir.path()));
704        let err = resolve_source(&cfg, outside.path().to_str().unwrap()).unwrap_err();
705        assert!(err
706            .to_string()
707            .contains("outside the configured import root"));
708    }
709
710    #[test]
711    fn resolve_source_file_root_accepts_files_inside() {
712        let dir = tempfile::tempdir().unwrap();
713        let inside = dir.path().join("data.json");
714        std::fs::write(&inside, b"{}").unwrap();
715        let cfg = allow_files_only(Some(dir.path()));
716        let resolved = resolve_source(&cfg, inside.to_str().unwrap()).unwrap();
717        matches!(resolved, Source::File(_));
718    }
719
720    #[test]
721    fn resolve_source_url_allowlist_gates_prefix() {
722        let cfg = ImportConfig {
723            enabled: true,
724            allow_file: false,
725            allow_http: true,
726            url_allowlist: vec!["https://data.example.com/".into()],
727            ..ImportConfig::default()
728        };
729        let ok = resolve_source(&cfg, "https://data.example.com/foo.json").unwrap();
730        matches!(ok, Source::Url(_));
731        let err = resolve_source(&cfg, "https://other.example.com/foo.json").unwrap_err();
732        assert!(err.to_string().contains("url_allowlist"));
733    }
734
735    #[test]
736    fn resolve_source_unrestricted_bypasses_allowlists_but_not_enabled() {
737        let cfg = ImportConfig {
738            enabled: true,
739            allow_unrestricted: true,
740            ..ImportConfig::default()
741        };
742        // File and URL both work.
743        assert!(resolve_source(&cfg, "/tmp/data.json").is_ok());
744        assert!(resolve_source(&cfg, "https://anything.example/").is_ok());
745        // Master switch still wins.
746        let disabled = ImportConfig {
747            enabled: false,
748            allow_unrestricted: true,
749            ..ImportConfig::default()
750        };
751        assert!(resolve_source(&disabled, "/tmp/data.json").is_err());
752    }
753
754    #[test]
755    fn json_number_conversion_prefers_int_when_possible() {
756        let int_val: serde_json::Value = serde_json::from_str("42").unwrap();
757        assert!(matches!(json_to_property(&int_val), Property::Int64(42)));
758        let float_val: serde_json::Value = serde_json::from_str("3.14").unwrap();
759        assert!(matches!(json_to_property(&float_val), Property::Float64(_)));
760    }
761
762    #[test]
763    fn json_nested_structure_rounds_through_property() {
764        let doc: serde_json::Value =
765            serde_json::from_str(r#"{"items": [1, 2, {"x": "y"}]}"#).unwrap();
766        let p = json_to_property(&doc);
767        if let Property::Map(m) = &p {
768            assert!(m.contains_key("items"));
769            if let Some(Property::List(items)) = m.get("items") {
770                assert_eq!(items.len(), 3);
771            } else {
772                panic!("expected list under items");
773            }
774        } else {
775            panic!("expected map, got {p:?}");
776        }
777    }
778}