1use crate::error::{Error, Result};
38use crate::procedures::{ProcCursor, ProcRow, ProcedureRegistry};
39use crate::reader::GraphReader;
40use crate::value::Value;
41use meshdb_core::Property;
42use serde::Deserialize;
43use std::collections::HashMap;
44use std::path::PathBuf;
45
46#[derive(Debug, Clone, Default, Deserialize)]
53#[serde(default)]
54pub struct ImportConfig {
55 pub enabled: bool,
58 pub allow_file: bool,
61 pub allow_http: bool,
64 pub file_root: Option<PathBuf>,
71 #[serde(default)]
76 pub url_allowlist: Vec<String>,
77 pub allow_unrestricted: bool,
82}
83
84#[derive(Debug)]
89pub enum Source {
90 File(PathBuf),
91 Url(String),
92}
93
94pub fn resolve_source(cfg: &ImportConfig, input: &str) -> Result<Source> {
109 if !cfg.enabled {
110 return Err(Error::Procedure(
111 "apoc.load.* is disabled — set apoc.import.enabled = true in the server config".into(),
112 ));
113 }
114 let trimmed = input.trim();
115 if trimmed.is_empty() {
116 return Err(Error::Procedure(
117 "apoc.load.*: source path/URL must not be empty".into(),
118 ));
119 }
120
121 let (is_http, raw_path) = if let Some(rest) = trimmed.strip_prefix("http://") {
125 (true, Some(format!("http://{rest}")))
126 } else if let Some(rest) = trimmed.strip_prefix("https://") {
127 (true, Some(format!("https://{rest}")))
128 } else if let Some(rest) = trimmed.strip_prefix("file://") {
129 (false, Some(rest.to_string()))
130 } else {
131 (false, Some(trimmed.to_string()))
132 };
133
134 if cfg.allow_unrestricted {
135 return if is_http {
136 Ok(Source::Url(raw_path.unwrap()))
137 } else {
138 Ok(Source::File(PathBuf::from(raw_path.unwrap())))
139 };
140 }
141
142 if is_http {
143 if !cfg.allow_http {
144 return Err(Error::Procedure(
145 "apoc.load.*: HTTP access is disabled — set apoc.import.allow_http = true".into(),
146 ));
147 }
148 let url = raw_path.unwrap();
149 if !cfg.url_allowlist.is_empty()
150 && !cfg
151 .url_allowlist
152 .iter()
153 .any(|prefix| url.starts_with(prefix))
154 {
155 return Err(Error::Procedure(format!(
156 "apoc.load.*: URL '{url}' does not match any entry in apoc.import.url_allowlist"
157 )));
158 }
159 Ok(Source::Url(url))
160 } else {
161 if !cfg.allow_file {
162 return Err(Error::Procedure(
163 "apoc.load.*: file access is disabled — set apoc.import.allow_file = true".into(),
164 ));
165 }
166 let path = PathBuf::from(raw_path.unwrap());
167 if let Some(root) = &cfg.file_root {
168 let canonical_root = root
169 .canonicalize()
170 .map_err(|e| Error::Procedure(format!("apoc.load.*: file_root {root:?}: {e}")))?;
171 let target = if path.is_absolute() {
175 path.clone()
176 } else {
177 canonical_root.join(&path)
178 };
179 let canonical_target = target.canonicalize().map_err(|e| {
180 Error::Procedure(format!(
181 "apoc.load.*: failed to resolve path '{}': {e}",
182 path.display()
183 ))
184 })?;
185 if !canonical_target.starts_with(&canonical_root) {
186 return Err(Error::Procedure(format!(
187 "apoc.load.*: path '{}' is outside the configured import root '{}'",
188 canonical_target.display(),
189 canonical_root.display()
190 )));
191 }
192 Ok(Source::File(canonical_target))
193 } else {
194 Ok(Source::File(path))
195 }
196 }
197}
198
199#[cfg(feature = "apoc-export")]
211pub fn resolve_export_path(cfg: &ImportConfig, input: &str) -> Result<PathBuf> {
212 if !cfg.enabled {
213 return Err(Error::Procedure(
214 "apoc.export.* is disabled — set apoc.import.enabled = true in the server config"
215 .into(),
216 ));
217 }
218 let trimmed = input.trim();
219 if trimmed.is_empty() {
220 return Err(Error::Procedure(
221 "apoc.export.*: destination path must not be empty".into(),
222 ));
223 }
224 if trimmed.starts_with("http://") || trimmed.starts_with("https://") {
228 return Err(Error::Procedure(
229 "apoc.export.*: HTTP destinations are not supported — write to a local file".into(),
230 ));
231 }
232 let raw_path = if let Some(rest) = trimmed.strip_prefix("file://") {
233 rest.to_string()
234 } else {
235 trimmed.to_string()
236 };
237 if cfg.allow_unrestricted {
238 return Ok(PathBuf::from(raw_path));
239 }
240 if !cfg.allow_file {
241 return Err(Error::Procedure(
242 "apoc.export.*: file writes disabled — set apoc.import.allow_file = true".into(),
243 ));
244 }
245 let path = PathBuf::from(raw_path);
246 if let Some(root) = &cfg.file_root {
247 let canonical_root = root
248 .canonicalize()
249 .map_err(|e| Error::Procedure(format!("apoc.export.*: file_root {root:?}: {e}")))?;
250 let target = if path.is_absolute() {
254 path.clone()
255 } else {
256 canonical_root.join(&path)
257 };
258 let parent = target.parent().ok_or_else(|| {
259 Error::Procedure(format!(
260 "apoc.export.*: destination '{}' has no parent directory",
261 target.display()
262 ))
263 })?;
264 let file_name = target.file_name().ok_or_else(|| {
265 Error::Procedure(format!(
266 "apoc.export.*: destination '{}' has no file name component",
267 target.display()
268 ))
269 })?;
270 let canonical_parent = parent.canonicalize().map_err(|e| {
271 Error::Procedure(format!(
272 "apoc.export.*: parent directory '{}' does not exist: {e}",
273 parent.display()
274 ))
275 })?;
276 if !canonical_parent.starts_with(&canonical_root) {
277 return Err(Error::Procedure(format!(
278 "apoc.export.*: path '{}' is outside the configured import root '{}'",
279 canonical_parent.display(),
280 canonical_root.display()
281 )));
282 }
283 Ok(canonical_parent.join(file_name))
284 } else {
285 Ok(path)
286 }
287}
288
289fn fetch_bytes(source: &Source) -> Result<Vec<u8>> {
294 match source {
295 Source::File(path) => std::fs::read(path)
296 .map_err(|e| Error::Procedure(format!("apoc.load.*: cannot read file {path:?}: {e}"))),
297 Source::Url(url) => {
298 let resp = reqwest::blocking::get(url).map_err(|e| {
299 Error::Procedure(format!("apoc.load.*: HTTP request to {url} failed: {e}"))
300 })?;
301 let status = resp.status();
302 if !status.is_success() {
303 return Err(Error::Procedure(format!(
304 "apoc.load.*: HTTP {status} from {url}"
305 )));
306 }
307 resp.bytes()
308 .map(|b| b.to_vec())
309 .map_err(|e| Error::Procedure(format!("apoc.load.*: reading body from {url}: {e}")))
310 }
311 }
312}
313
314fn json_to_property(value: &serde_json::Value) -> Property {
319 match value {
320 serde_json::Value::Null => Property::Null,
321 serde_json::Value::Bool(b) => Property::Bool(*b),
322 serde_json::Value::Number(n) => {
323 if let Some(i) = n.as_i64() {
324 Property::Int64(i)
325 } else {
326 Property::Float64(n.as_f64().unwrap_or(f64::NAN))
327 }
328 }
329 serde_json::Value::String(s) => Property::String(s.clone()),
330 serde_json::Value::Array(items) => {
331 Property::List(items.iter().map(json_to_property).collect())
332 }
333 serde_json::Value::Object(entries) => Property::Map(
334 entries
335 .iter()
336 .map(|(k, v)| (k.clone(), json_to_property(v)))
337 .collect(),
338 ),
339 }
340}
341
342pub struct LoadJsonCursor {
350 config: ImportConfig,
351 input: String,
352 json_pointer: Option<String>,
353 items: Option<Vec<serde_json::Value>>,
357 idx: usize,
358}
359
360impl LoadJsonCursor {
361 pub fn new(config: ImportConfig, input: String, json_pointer: Option<String>) -> Self {
362 Self {
363 config,
364 input,
365 json_pointer,
366 items: None,
367 idx: 0,
368 }
369 }
370
371 fn ensure_loaded(&mut self) -> Result<()> {
375 if self.items.is_some() {
376 return Ok(());
377 }
378 let source = resolve_source(&self.config, &self.input)?;
379 let bytes = fetch_bytes(&source)?;
380 let root: serde_json::Value = serde_json::from_slice(&bytes)
381 .map_err(|e| Error::Procedure(format!("apoc.load.json: parse error: {e}")))?;
382 let descended = if let Some(ptr) = &self.json_pointer {
383 root.pointer(ptr).cloned().ok_or_else(|| {
384 Error::Procedure(format!(
385 "apoc.load.json: JSON pointer '{ptr}' did not resolve in the document"
386 ))
387 })?
388 } else {
389 root
390 };
391 let items = match descended {
395 serde_json::Value::Array(arr) => arr,
396 other => vec![other],
397 };
398 self.items = Some(items);
399 Ok(())
400 }
401}
402
403impl ProcCursor for LoadJsonCursor {
404 fn advance(&mut self, _reader: &dyn GraphReader) -> Result<Option<ProcRow>> {
405 self.ensure_loaded()?;
406 let items = self.items.as_ref().expect("ensure_loaded set self.items");
407 if self.idx >= items.len() {
408 return Ok(None);
409 }
410 let item = &items[self.idx];
411 self.idx += 1;
412 let mut row: ProcRow = HashMap::new();
413 row.insert("value".to_string(), Value::Property(json_to_property(item)));
414 Ok(Some(row))
415 }
416}
417
418#[derive(Debug, Clone)]
421struct LoadCsvConfig {
422 headers: bool,
427 delimiter: u8,
430}
431
432impl Default for LoadCsvConfig {
433 fn default() -> Self {
434 Self {
435 headers: true,
436 delimiter: b',',
437 }
438 }
439}
440
441pub struct LoadCsvCursor {
447 config: ImportConfig,
448 input: String,
449 csv_config: LoadCsvConfig,
450 state: Option<CsvState>,
453 line_no: i64,
454}
455
456struct CsvState {
459 reader: csv::Reader<Box<dyn std::io::Read>>,
460 headers: Vec<String>,
463}
464
465impl LoadCsvCursor {
466 pub fn new(config: ImportConfig, input: String, csv_config_map: Option<&Property>) -> Self {
467 let csv_config = match csv_config_map {
468 Some(Property::Map(entries)) => parse_csv_config(entries),
469 _ => LoadCsvConfig::default(),
470 };
471 Self {
472 config,
473 input,
474 csv_config,
475 state: None,
476 line_no: 0,
477 }
478 }
479
480 fn ensure_opened(&mut self) -> Result<()> {
481 if self.state.is_some() {
482 return Ok(());
483 }
484 let source = resolve_source(&self.config, &self.input)?;
485 let reader_box: Box<dyn std::io::Read> = match source {
493 Source::File(path) => {
494 let f = std::fs::File::open(&path).map_err(|e| {
495 Error::Procedure(format!("apoc.load.csv: cannot open file {path:?}: {e}"))
496 })?;
497 Box::new(f)
498 }
499 Source::Url(url) => {
500 let resp = reqwest::blocking::get(&url).map_err(|e| {
501 Error::Procedure(format!("apoc.load.csv: HTTP request to {url} failed: {e}"))
502 })?;
503 let status = resp.status();
504 if !status.is_success() {
505 return Err(Error::Procedure(format!(
506 "apoc.load.csv: HTTP {status} from {url}"
507 )));
508 }
509 let bytes = resp.bytes().map_err(|e| {
510 Error::Procedure(format!("apoc.load.csv: reading body from {url}: {e}"))
511 })?;
512 Box::new(std::io::Cursor::new(bytes.to_vec()))
513 }
514 };
515 let mut builder = csv::ReaderBuilder::new();
516 builder
517 .has_headers(self.csv_config.headers)
518 .delimiter(self.csv_config.delimiter);
519 let mut reader = builder.from_reader(reader_box);
520 let headers = if self.csv_config.headers {
521 reader
522 .headers()
523 .map_err(|e| Error::Procedure(format!("apoc.load.csv: reading headers: {e}")))?
524 .iter()
525 .map(|s| s.to_string())
526 .collect()
527 } else {
528 Vec::new()
529 };
530 self.state = Some(CsvState { reader, headers });
531 Ok(())
532 }
533}
534
535fn parse_csv_config(entries: &HashMap<String, Property>) -> LoadCsvConfig {
540 let mut cfg = LoadCsvConfig::default();
541 if let Some(Property::Bool(b)) = entries.get("header") {
542 cfg.headers = *b;
543 }
544 if let Some(Property::String(s)) = entries.get("sep") {
545 if let Some(first) = s.bytes().next() {
546 cfg.delimiter = first;
547 }
548 }
549 cfg
550}
551
552impl ProcCursor for LoadCsvCursor {
553 fn advance(&mut self, _reader: &dyn GraphReader) -> Result<Option<ProcRow>> {
554 self.ensure_opened()?;
555 let state = self.state.as_mut().expect("ensure_opened set state");
556 let mut record = csv::StringRecord::new();
557 let has_record = state
558 .reader
559 .read_record(&mut record)
560 .map_err(|e| Error::Procedure(format!("apoc.load.csv: reading record: {e}")))?;
561 if !has_record {
562 return Ok(None);
563 }
564 self.line_no += 1;
565 let list: Vec<Property> = record
566 .iter()
567 .map(|s| Property::String(s.to_string()))
568 .collect();
569 let map: HashMap<String, Property> = if state.headers.is_empty() {
570 HashMap::new()
571 } else {
572 state
573 .headers
574 .iter()
575 .zip(record.iter())
576 .map(|(h, v)| (h.clone(), Property::String(v.to_string())))
577 .collect()
578 };
579 let mut row: ProcRow = HashMap::new();
580 row.insert(
581 "lineNo".to_string(),
582 Value::Property(Property::Int64(self.line_no)),
583 );
584 row.insert("list".to_string(), Value::Property(Property::List(list)));
585 row.insert("map".to_string(), Value::Property(Property::Map(map)));
586 Ok(Some(row))
587 }
588}
589
590pub fn import_config_from_registry(registry: &ProcedureRegistry) -> ImportConfig {
595 registry.import_config().cloned().unwrap_or_default()
596}
597
598pub fn expect_source_arg(v: &Value, position: &str) -> Result<String> {
602 match v {
603 Value::Property(Property::String(s)) => Ok(s.clone()),
604 Value::Null | Value::Property(Property::Null) => Err(Error::Procedure(format!(
605 "apoc.load.*: {position} must be a string, got null"
606 ))),
607 other => Err(Error::Procedure(format!(
608 "apoc.load.*: {position} must be a string, got {other:?}"
609 ))),
610 }
611}
612
613pub fn expect_optional_string(v: &Value, position: &str) -> Result<Option<String>> {
616 match v {
617 Value::Property(Property::String(s)) if s.is_empty() => Ok(None),
618 Value::Property(Property::String(s)) => Ok(Some(s.clone())),
619 Value::Null | Value::Property(Property::Null) => Ok(None),
620 other => Err(Error::Procedure(format!(
621 "apoc.load.*: {position} must be a string or null, got {other:?}"
622 ))),
623 }
624}
625
626pub fn expect_optional_config_map(v: &Value) -> Result<Option<Property>> {
629 match v {
630 Value::Null | Value::Property(Property::Null) => Ok(None),
631 Value::Property(Property::Map(_)) => Ok(Some(match v {
632 Value::Property(p) => p.clone(),
633 _ => unreachable!(),
634 })),
635 Value::Map(entries) => {
636 let mut out: HashMap<String, Property> = HashMap::new();
640 for (k, v) in entries {
641 if let Value::Property(p) = v {
642 out.insert(k.clone(), p.clone());
643 }
644 }
645 Ok(Some(Property::Map(out)))
646 }
647 other => Err(Error::Procedure(format!(
648 "apoc.load.*: config argument must be a map or null, got {other:?}"
649 ))),
650 }
651}
652
653#[cfg(test)]
654mod tests {
655 use super::*;
656 use std::path::Path;
657
658 fn strict_disabled() -> ImportConfig {
659 ImportConfig::default()
660 }
661
662 fn allow_files_only(root: Option<&Path>) -> ImportConfig {
663 ImportConfig {
664 enabled: true,
665 allow_file: true,
666 allow_http: false,
667 file_root: root.map(PathBuf::from),
668 url_allowlist: Vec::new(),
669 allow_unrestricted: false,
670 }
671 }
672
673 #[test]
674 fn resolve_source_strict_disabled_refuses_everything() {
675 let cfg = strict_disabled();
676 let err = resolve_source(&cfg, "/tmp/whatever.json").unwrap_err();
677 assert!(err.to_string().contains("apoc.import.enabled"));
678 }
679
680 #[test]
681 fn resolve_source_http_refused_when_allow_http_false() {
682 let cfg = allow_files_only(None);
683 let err = resolve_source(&cfg, "https://example.com/data.json").unwrap_err();
684 assert!(err.to_string().contains("allow_http"));
685 }
686
687 #[test]
688 fn resolve_source_file_refused_when_allow_file_false() {
689 let cfg = ImportConfig {
690 enabled: true,
691 allow_file: false,
692 allow_http: true,
693 ..ImportConfig::default()
694 };
695 let err = resolve_source(&cfg, "/tmp/data.csv").unwrap_err();
696 assert!(err.to_string().contains("allow_file"));
697 }
698
699 #[test]
700 fn resolve_source_file_root_rejects_traversal_outside() {
701 let dir = tempfile::tempdir().unwrap();
702 let outside = tempfile::NamedTempFile::new().unwrap();
703 let cfg = allow_files_only(Some(dir.path()));
704 let err = resolve_source(&cfg, outside.path().to_str().unwrap()).unwrap_err();
705 assert!(err
706 .to_string()
707 .contains("outside the configured import root"));
708 }
709
710 #[test]
711 fn resolve_source_file_root_accepts_files_inside() {
712 let dir = tempfile::tempdir().unwrap();
713 let inside = dir.path().join("data.json");
714 std::fs::write(&inside, b"{}").unwrap();
715 let cfg = allow_files_only(Some(dir.path()));
716 let resolved = resolve_source(&cfg, inside.to_str().unwrap()).unwrap();
717 matches!(resolved, Source::File(_));
718 }
719
720 #[test]
721 fn resolve_source_url_allowlist_gates_prefix() {
722 let cfg = ImportConfig {
723 enabled: true,
724 allow_file: false,
725 allow_http: true,
726 url_allowlist: vec!["https://data.example.com/".into()],
727 ..ImportConfig::default()
728 };
729 let ok = resolve_source(&cfg, "https://data.example.com/foo.json").unwrap();
730 matches!(ok, Source::Url(_));
731 let err = resolve_source(&cfg, "https://other.example.com/foo.json").unwrap_err();
732 assert!(err.to_string().contains("url_allowlist"));
733 }
734
735 #[test]
736 fn resolve_source_unrestricted_bypasses_allowlists_but_not_enabled() {
737 let cfg = ImportConfig {
738 enabled: true,
739 allow_unrestricted: true,
740 ..ImportConfig::default()
741 };
742 assert!(resolve_source(&cfg, "/tmp/data.json").is_ok());
744 assert!(resolve_source(&cfg, "https://anything.example/").is_ok());
745 let disabled = ImportConfig {
747 enabled: false,
748 allow_unrestricted: true,
749 ..ImportConfig::default()
750 };
751 assert!(resolve_source(&disabled, "/tmp/data.json").is_err());
752 }
753
754 #[test]
755 fn json_number_conversion_prefers_int_when_possible() {
756 let int_val: serde_json::Value = serde_json::from_str("42").unwrap();
757 assert!(matches!(json_to_property(&int_val), Property::Int64(42)));
758 let float_val: serde_json::Value = serde_json::from_str("3.14").unwrap();
759 assert!(matches!(json_to_property(&float_val), Property::Float64(_)));
760 }
761
762 #[test]
763 fn json_nested_structure_rounds_through_property() {
764 let doc: serde_json::Value =
765 serde_json::from_str(r#"{"items": [1, 2, {"x": "y"}]}"#).unwrap();
766 let p = json_to_property(&doc);
767 if let Property::Map(m) = &p {
768 assert!(m.contains_key("items"));
769 if let Some(Property::List(items)) = m.get("items") {
770 assert_eq!(items.len(), 3);
771 } else {
772 panic!("expected list under items");
773 }
774 } else {
775 panic!("expected map, got {p:?}");
776 }
777 }
778}