1pub(crate) mod builtins;
57pub(crate) mod compile;
58pub(crate) mod data;
59pub(crate) mod exec;
60pub mod io;
61pub(crate) mod ir;
62pub(crate) mod parse;
63pub(crate) mod plan;
64pub(crate) mod util;
65pub(crate) mod vm;
66
67#[cfg(test)]
68mod tests;
69
70use data::value::Val;
71use serde_json::Value;
72use std::cell::{OnceCell, RefCell};
73use std::collections::HashMap;
74use std::sync::Arc;
75use std::sync::Mutex;
76
77pub use data::context::EvalError;
78#[cfg(test)]
79use parse::parser::ParseError;
80use vm::VM;
81
82#[cfg(feature = "fuzz_internal")]
86pub mod __fuzz_internal {
87 pub use crate::parse::parser::{parse, ParseError};
88 pub use crate::plan::physical::plan_query;
89}
90
91#[cfg(test)]
92#[derive(Debug)]
93pub(crate) enum Error {
94 Parse(ParseError),
95 Eval(EvalError),
96}
97
98#[cfg(test)]
99impl std::fmt::Display for Error {
100 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101 match self {
102 Error::Parse(e) => write!(f, "{}", e),
103 Error::Eval(e) => write!(f, "{}", e),
104 }
105 }
106}
107#[cfg(test)]
108impl std::error::Error for Error {
109 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
110 match self {
111 Error::Parse(e) => Some(e),
112 Error::Eval(_) => None,
113 }
114 }
115}
116
117#[cfg(test)]
118impl From<ParseError> for Error {
119 fn from(e: ParseError) -> Self {
120 Error::Parse(e)
121 }
122}
123#[cfg(test)]
124impl From<EvalError> for Error {
125 fn from(e: EvalError) -> Self {
126 Error::Eval(e)
127 }
128}
129
130pub struct Jetro {
135 document: Value,
138 root_val: OnceCell<Val>,
140 raw_bytes: Option<Arc<[u8]>>,
142
143 tape: OnceCell<std::result::Result<Arc<crate::data::tape::TapeData>, String>>,
145
146 structural_index:
148 OnceCell<std::result::Result<Arc<jetro_experimental::StructuralIndex>, String>>,
149
150 pub(crate) objvec_cache:
153 std::sync::Mutex<std::collections::HashMap<usize, Arc<crate::data::value::ObjVecData>>>,
154
155 vm: RefCell<VM>,
157}
158
159pub struct JetroEngine {
164 plan_cache: Mutex<HashMap<String, ir::physical::QueryPlan>>,
166 plan_cache_limit: usize,
168 vm: Mutex<VM>,
170 keys: Arc<crate::data::intern::KeyCache>,
176}
177
178#[derive(Debug)]
181pub enum JetroEngineError {
182 Json(serde_json::Error),
184 Io(std::io::Error),
186 Ndjson(io::RowError),
188 Eval(EvalError),
190}
191
192impl std::fmt::Display for JetroEngineError {
193 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194 match self {
195 Self::Json(err) => write!(f, "{}", err),
196 Self::Io(err) => write!(f, "{}", err),
197 Self::Ndjson(err) => write!(f, "{}", err),
198 Self::Eval(err) => write!(f, "{}", err),
199 }
200 }
201}
202
203impl std::error::Error for JetroEngineError {
204 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
205 match self {
206 Self::Json(err) => Some(err),
207 Self::Io(err) => Some(err),
208 Self::Ndjson(err) => Some(err),
209 Self::Eval(_) => None,
210 }
211 }
212}
213
214impl From<serde_json::Error> for JetroEngineError {
215 fn from(err: serde_json::Error) -> Self {
216 Self::Json(err)
217 }
218}
219
220impl From<std::io::Error> for JetroEngineError {
221 fn from(err: std::io::Error) -> Self {
222 Self::Io(err)
223 }
224}
225
226impl From<io::RowError> for JetroEngineError {
227 fn from(err: io::RowError) -> Self {
228 Self::Ndjson(err)
229 }
230}
231
232impl From<EvalError> for JetroEngineError {
233 fn from(err: EvalError) -> Self {
234 Self::Eval(err)
235 }
236}
237
238impl Default for JetroEngine {
239 fn default() -> Self {
240 Self::new()
241 }
242}
243
244impl JetroEngine {
245 const DEFAULT_PLAN_CACHE_LIMIT: usize = 256;
247
248 pub fn new() -> Self {
250 Self::with_plan_cache_limit(Self::DEFAULT_PLAN_CACHE_LIMIT)
251 }
252
253 pub fn with_plan_cache_limit(plan_cache_limit: usize) -> Self {
256 Self {
257 plan_cache: Mutex::new(HashMap::new()),
258 plan_cache_limit,
259 vm: Mutex::new(VM::new()),
260 keys: crate::data::intern::KeyCache::new(),
261 }
262 }
263
264 pub fn keys(&self) -> &Arc<crate::data::intern::KeyCache> {
266 &self.keys
267 }
268
269 pub fn clear_cache(&self) {
272 self.plan_cache.lock().expect("plan cache poisoned").clear();
273 self.keys.clear();
274 }
275
276 pub fn parse_value(&self, document: Value) -> Jetro {
281 let root = Val::from_value_with(&self.keys, &document);
282 Jetro::from_val_and_value(root, document)
283 }
284
285 pub fn parse_bytes(&self, bytes: Vec<u8>) -> std::result::Result<Jetro, JetroEngineError> {
290 let document = Jetro::from_bytes(bytes)?;
291 let _ = document.root_val_with(&self.keys)?;
295 Ok(document)
296 }
297
298 pub(crate) fn parse_bytes_lazy(
302 &self,
303 bytes: Vec<u8>,
304 ) -> std::result::Result<Jetro, JetroEngineError> {
305 Ok(Jetro::from_bytes(bytes)?)
306 }
307
308 pub fn collect<S: AsRef<str>>(
311 &self,
312 document: &Jetro,
313 expr: S,
314 ) -> std::result::Result<Value, EvalError> {
315 let expr = expr.as_ref();
316 if let Some(rows) = io::collect_document_rows(self, document, expr)? {
317 return Ok(Value::from(rows));
318 }
319 let plan = self.cached_plan(expr, exec::router::planning_context(document));
320 self.collect_prepared(document, &plan)
321 }
322
323 pub(crate) fn collect_prepared(
324 &self,
325 document: &Jetro,
326 plan: &ir::physical::QueryPlan,
327 ) -> std::result::Result<Value, EvalError> {
328 self.collect_prepared_val(document, plan).map(Value::from)
329 }
330
331 pub(crate) fn collect_prepared_val(
332 &self,
333 document: &Jetro,
334 plan: &ir::physical::QueryPlan,
335 ) -> std::result::Result<Val, EvalError> {
336 let mut vm = self.vm.lock().expect("vm cache poisoned");
337 exec::router::collect_plan_val_with_vm(document, plan, &mut vm)
338 }
339
340 pub(crate) fn lock_vm(&self) -> std::sync::MutexGuard<'_, VM> {
341 self.vm.lock().expect("vm cache poisoned")
342 }
343
344 pub fn collect_value<S: AsRef<str>>(
348 &self,
349 document: Value,
350 expr: S,
351 ) -> std::result::Result<Value, EvalError> {
352 let document = self.parse_value(document);
353 self.collect(&document, expr)
354 }
355
356 pub fn collect_bytes<S: AsRef<str>>(
361 &self,
362 bytes: Vec<u8>,
363 expr: S,
364 ) -> std::result::Result<Value, JetroEngineError> {
365 let document = self.parse_bytes(bytes)?;
366 Ok(self.collect(&document, expr)?)
367 }
368
369 pub fn run_ndjson<R, W>(
372 &self,
373 reader: R,
374 query: &str,
375 writer: W,
376 ) -> std::result::Result<usize, JetroEngineError>
377 where
378 R: std::io::BufRead,
379 W: std::io::Write,
380 {
381 io::run_ndjson(self, reader, query, writer)
382 }
383
384 pub fn run_ndjson_file<P, W>(
387 &self,
388 path: P,
389 query: &str,
390 writer: W,
391 ) -> std::result::Result<usize, JetroEngineError>
392 where
393 P: AsRef<std::path::Path>,
394 W: std::io::Write,
395 {
396 io::run_ndjson_file(self, path, query, writer)
397 }
398
399 pub fn run_ndjson_file_with_options<P, W>(
401 &self,
402 path: P,
403 query: &str,
404 writer: W,
405 options: io::NdjsonOptions,
406 ) -> std::result::Result<usize, JetroEngineError>
407 where
408 P: AsRef<std::path::Path>,
409 W: std::io::Write,
410 {
411 io::run_ndjson_file_with_options(self, path, query, writer, options)
412 }
413
414 pub fn run_ndjson_file_with_report<P, W>(
416 &self,
417 path: P,
418 query: &str,
419 writer: W,
420 ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
421 where
422 P: AsRef<std::path::Path>,
423 W: std::io::Write,
424 {
425 io::run_ndjson_file_with_report(self, path, query, writer)
426 }
427
428 pub fn run_ndjson_file_with_report_and_options<P, W>(
430 &self,
431 path: P,
432 query: &str,
433 writer: W,
434 options: io::NdjsonOptions,
435 ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
436 where
437 P: AsRef<std::path::Path>,
438 W: std::io::Write,
439 {
440 io::run_ndjson_file_with_report_and_options(self, path, query, writer, options)
441 }
442
443 pub fn run_ndjson_file_limit<P, W>(
445 &self,
446 path: P,
447 query: &str,
448 limit: usize,
449 writer: W,
450 ) -> std::result::Result<usize, JetroEngineError>
451 where
452 P: AsRef<std::path::Path>,
453 W: std::io::Write,
454 {
455 io::run_ndjson_file_limit(self, path, query, limit, writer)
456 }
457
458 pub fn run_ndjson_file_limit_with_options<P, W>(
460 &self,
461 path: P,
462 query: &str,
463 limit: usize,
464 writer: W,
465 options: io::NdjsonOptions,
466 ) -> std::result::Result<usize, JetroEngineError>
467 where
468 P: AsRef<std::path::Path>,
469 W: std::io::Write,
470 {
471 io::run_ndjson_file_limit_with_options(self, path, query, limit, writer, options)
472 }
473
474 pub fn run_ndjson_file_limit_with_report<P, W>(
476 &self,
477 path: P,
478 query: &str,
479 limit: usize,
480 writer: W,
481 ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
482 where
483 P: AsRef<std::path::Path>,
484 W: std::io::Write,
485 {
486 io::run_ndjson_file_limit_with_report(self, path, query, limit, writer)
487 }
488
489 pub fn run_ndjson_file_limit_with_report_and_options<P, W>(
491 &self,
492 path: P,
493 query: &str,
494 limit: usize,
495 writer: W,
496 options: io::NdjsonOptions,
497 ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
498 where
499 P: AsRef<std::path::Path>,
500 W: std::io::Write,
501 {
502 io::run_ndjson_file_limit_with_report_and_options(self, path, query, limit, writer, options)
503 }
504
505 pub fn run_ndjson_source<W>(
507 &self,
508 source: io::NdjsonSource,
509 query: &str,
510 writer: W,
511 ) -> std::result::Result<usize, JetroEngineError>
512 where
513 W: std::io::Write,
514 {
515 io::run_ndjson_source(self, source, query, writer)
516 }
517
518 pub fn run_ndjson_source_with_options<W>(
520 &self,
521 source: io::NdjsonSource,
522 query: &str,
523 writer: W,
524 options: io::NdjsonOptions,
525 ) -> std::result::Result<usize, JetroEngineError>
526 where
527 W: std::io::Write,
528 {
529 io::run_ndjson_source_with_options(self, source, query, writer, options)
530 }
531
532 pub fn run_ndjson_source_with_report<W>(
534 &self,
535 source: io::NdjsonSource,
536 query: &str,
537 writer: W,
538 ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
539 where
540 W: std::io::Write,
541 {
542 io::run_ndjson_source_with_report(self, source, query, writer)
543 }
544
545 pub fn run_ndjson_source_with_report_and_options<W>(
547 &self,
548 source: io::NdjsonSource,
549 query: &str,
550 writer: W,
551 options: io::NdjsonOptions,
552 ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
553 where
554 W: std::io::Write,
555 {
556 io::run_ndjson_source_with_report_and_options(self, source, query, writer, options)
557 }
558
559 pub fn run_ndjson_source_limit<W>(
562 &self,
563 source: io::NdjsonSource,
564 query: &str,
565 limit: usize,
566 writer: W,
567 ) -> std::result::Result<usize, JetroEngineError>
568 where
569 W: std::io::Write,
570 {
571 io::run_ndjson_source_limit(self, source, query, limit, writer)
572 }
573
574 pub fn run_ndjson_source_limit_with_options<W>(
576 &self,
577 source: io::NdjsonSource,
578 query: &str,
579 limit: usize,
580 writer: W,
581 options: io::NdjsonOptions,
582 ) -> std::result::Result<usize, JetroEngineError>
583 where
584 W: std::io::Write,
585 {
586 io::run_ndjson_source_limit_with_options(self, source, query, limit, writer, options)
587 }
588
589 pub fn run_ndjson_source_limit_with_report<W>(
591 &self,
592 source: io::NdjsonSource,
593 query: &str,
594 limit: usize,
595 writer: W,
596 ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
597 where
598 W: std::io::Write,
599 {
600 io::run_ndjson_source_limit_with_report(self, source, query, limit, writer)
601 }
602
603 pub fn run_ndjson_source_limit_with_report_and_options<W>(
605 &self,
606 source: io::NdjsonSource,
607 query: &str,
608 limit: usize,
609 writer: W,
610 options: io::NdjsonOptions,
611 ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
612 where
613 W: std::io::Write,
614 {
615 io::run_ndjson_source_limit_with_report_and_options(
616 self, source, query, limit, writer, options,
617 )
618 }
619
620 pub fn run_ndjson_rev<P, W>(
622 &self,
623 path: P,
624 query: &str,
625 writer: W,
626 ) -> std::result::Result<usize, JetroEngineError>
627 where
628 P: AsRef<std::path::Path>,
629 W: std::io::Write,
630 {
631 io::run_ndjson_rev(self, path, query, writer)
632 }
633
634 pub fn run_ndjson_rev_with_options<P, W>(
636 &self,
637 path: P,
638 query: &str,
639 writer: W,
640 options: io::NdjsonOptions,
641 ) -> std::result::Result<usize, JetroEngineError>
642 where
643 P: AsRef<std::path::Path>,
644 W: std::io::Write,
645 {
646 io::run_ndjson_rev_with_options(self, path, query, writer, options)
647 }
648
649 pub fn run_ndjson_rev_limit<P, W>(
652 &self,
653 path: P,
654 query: &str,
655 limit: usize,
656 writer: W,
657 ) -> std::result::Result<usize, JetroEngineError>
658 where
659 P: AsRef<std::path::Path>,
660 W: std::io::Write,
661 {
662 io::run_ndjson_rev_limit(self, path, query, limit, writer)
663 }
664
665 pub fn run_ndjson_rev_limit_with_options<P, W>(
667 &self,
668 path: P,
669 query: &str,
670 limit: usize,
671 writer: W,
672 options: io::NdjsonOptions,
673 ) -> std::result::Result<usize, JetroEngineError>
674 where
675 P: AsRef<std::path::Path>,
676 W: std::io::Write,
677 {
678 io::run_ndjson_rev_limit_with_options(self, path, query, limit, writer, options)
679 }
680
681 pub fn run_ndjson_rev_distinct_by<P, W>(
685 &self,
686 path: P,
687 key_query: &str,
688 query: &str,
689 limit: usize,
690 writer: W,
691 ) -> std::result::Result<usize, JetroEngineError>
692 where
693 P: AsRef<std::path::Path>,
694 W: std::io::Write,
695 {
696 io::run_ndjson_rev_distinct_by(self, path, key_query, query, limit, writer)
697 }
698
699 pub fn run_ndjson_rev_distinct_by_with_options<P, W>(
702 &self,
703 path: P,
704 key_query: &str,
705 query: &str,
706 limit: usize,
707 writer: W,
708 options: io::NdjsonOptions,
709 ) -> std::result::Result<usize, JetroEngineError>
710 where
711 P: AsRef<std::path::Path>,
712 W: std::io::Write,
713 {
714 io::run_ndjson_rev_distinct_by_with_options(
715 self, path, key_query, query, limit, writer, options,
716 )
717 }
718
719 pub fn run_ndjson_rev_distinct_by_with_stats<P, W>(
722 &self,
723 path: P,
724 key_query: &str,
725 query: &str,
726 limit: usize,
727 writer: W,
728 ) -> std::result::Result<io::NdjsonRevDistinctStats, JetroEngineError>
729 where
730 P: AsRef<std::path::Path>,
731 W: std::io::Write,
732 {
733 io::run_ndjson_rev_distinct_by_with_stats(self, path, key_query, query, limit, writer)
734 }
735
736 pub fn run_ndjson_rev_distinct_by_with_stats_and_options<P, W>(
739 &self,
740 path: P,
741 key_query: &str,
742 query: &str,
743 limit: usize,
744 writer: W,
745 options: io::NdjsonOptions,
746 ) -> std::result::Result<io::NdjsonRevDistinctStats, JetroEngineError>
747 where
748 P: AsRef<std::path::Path>,
749 W: std::io::Write,
750 {
751 io::run_ndjson_rev_distinct_by_with_stats_and_options(
752 self, path, key_query, query, limit, writer, options,
753 )
754 }
755
756 pub fn run_ndjson_rev_distinct_by_with_report<P, W>(
759 &self,
760 path: P,
761 key_query: &str,
762 query: &str,
763 limit: usize,
764 writer: W,
765 ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
766 where
767 P: AsRef<std::path::Path>,
768 W: std::io::Write,
769 {
770 io::run_ndjson_rev_distinct_by_with_report(
771 self, path, key_query, query, limit, writer,
772 )
773 }
774
775 pub fn run_ndjson_rev_distinct_by_with_report_and_options<P, W>(
777 &self,
778 path: P,
779 key_query: &str,
780 query: &str,
781 limit: usize,
782 writer: W,
783 options: io::NdjsonOptions,
784 ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
785 where
786 P: AsRef<std::path::Path>,
787 W: std::io::Write,
788 {
789 io::run_ndjson_rev_distinct_by_with_report_and_options(
790 self, path, key_query, query, limit, writer, options,
791 )
792 }
793
794 pub fn run_ndjson_with_options<R, W>(
796 &self,
797 reader: R,
798 query: &str,
799 writer: W,
800 options: io::NdjsonOptions,
801 ) -> std::result::Result<usize, JetroEngineError>
802 where
803 R: std::io::BufRead,
804 W: std::io::Write,
805 {
806 io::run_ndjson_with_options(self, reader, query, writer, options)
807 }
808
809 pub fn run_ndjson_with_report<R, W>(
811 &self,
812 reader: R,
813 query: &str,
814 writer: W,
815 ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
816 where
817 R: std::io::BufRead,
818 W: std::io::Write,
819 {
820 io::run_ndjson_with_report(self, reader, query, writer)
821 }
822
823 pub fn run_ndjson_with_report_and_options<R, W>(
825 &self,
826 reader: R,
827 query: &str,
828 writer: W,
829 options: io::NdjsonOptions,
830 ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
831 where
832 R: std::io::BufRead,
833 W: std::io::Write,
834 {
835 io::run_ndjson_with_report_and_options(self, reader, query, writer, options)
836 }
837
838 pub fn run_ndjson_limit<R, W>(
840 &self,
841 reader: R,
842 query: &str,
843 limit: usize,
844 writer: W,
845 ) -> std::result::Result<usize, JetroEngineError>
846 where
847 R: std::io::BufRead,
848 W: std::io::Write,
849 {
850 io::run_ndjson_limit(self, reader, query, limit, writer)
851 }
852
853 pub fn run_ndjson_limit_with_options<R, W>(
855 &self,
856 reader: R,
857 query: &str,
858 limit: usize,
859 writer: W,
860 options: io::NdjsonOptions,
861 ) -> std::result::Result<usize, JetroEngineError>
862 where
863 R: std::io::BufRead,
864 W: std::io::Write,
865 {
866 io::run_ndjson_limit_with_options(self, reader, query, limit, writer, options)
867 }
868
869 pub fn run_ndjson_limit_with_report<R, W>(
871 &self,
872 reader: R,
873 query: &str,
874 limit: usize,
875 writer: W,
876 ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
877 where
878 R: std::io::BufRead,
879 W: std::io::Write,
880 {
881 io::run_ndjson_limit_with_report(self, reader, query, limit, writer)
882 }
883
884 pub fn run_ndjson_limit_with_report_and_options<R, W>(
886 &self,
887 reader: R,
888 query: &str,
889 limit: usize,
890 writer: W,
891 options: io::NdjsonOptions,
892 ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
893 where
894 R: std::io::BufRead,
895 W: std::io::Write,
896 {
897 io::run_ndjson_limit_with_report_and_options(self, reader, query, limit, writer, options)
898 }
899
900 pub fn run_ndjson_matches<R, W>(
903 &self,
904 reader: R,
905 predicate: &str,
906 limit: usize,
907 writer: W,
908 ) -> std::result::Result<usize, JetroEngineError>
909 where
910 R: std::io::BufRead,
911 W: std::io::Write,
912 {
913 io::run_ndjson_matches(self, reader, predicate, limit, writer)
914 }
915
916 pub fn run_ndjson_matches_with_options<R, W>(
918 &self,
919 reader: R,
920 predicate: &str,
921 limit: usize,
922 writer: W,
923 options: io::NdjsonOptions,
924 ) -> std::result::Result<usize, JetroEngineError>
925 where
926 R: std::io::BufRead,
927 W: std::io::Write,
928 {
929 io::run_ndjson_matches_with_options(self, reader, predicate, limit, writer, options)
930 }
931
932 pub fn run_ndjson_matches_with_report<R, W>(
934 &self,
935 reader: R,
936 predicate: &str,
937 limit: usize,
938 writer: W,
939 ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
940 where
941 R: std::io::BufRead,
942 W: std::io::Write,
943 {
944 io::run_ndjson_matches_with_report(self, reader, predicate, limit, writer)
945 }
946
947 pub fn run_ndjson_matches_with_report_and_options<R, W>(
949 &self,
950 reader: R,
951 predicate: &str,
952 limit: usize,
953 writer: W,
954 options: io::NdjsonOptions,
955 ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
956 where
957 R: std::io::BufRead,
958 W: std::io::Write,
959 {
960 io::run_ndjson_matches_with_report_and_options(
961 self, reader, predicate, limit, writer, options,
962 )
963 }
964
965 pub fn run_ndjson_matches_file<P, W>(
967 &self,
968 path: P,
969 predicate: &str,
970 limit: usize,
971 writer: W,
972 ) -> std::result::Result<usize, JetroEngineError>
973 where
974 P: AsRef<std::path::Path>,
975 W: std::io::Write,
976 {
977 io::run_ndjson_matches_file(self, path, predicate, limit, writer)
978 }
979
980 pub fn run_ndjson_matches_file_with_options<P, W>(
982 &self,
983 path: P,
984 predicate: &str,
985 limit: usize,
986 writer: W,
987 options: io::NdjsonOptions,
988 ) -> std::result::Result<usize, JetroEngineError>
989 where
990 P: AsRef<std::path::Path>,
991 W: std::io::Write,
992 {
993 io::run_ndjson_matches_file_with_options(self, path, predicate, limit, writer, options)
994 }
995
996 pub fn run_ndjson_matches_file_with_report<P, W>(
998 &self,
999 path: P,
1000 predicate: &str,
1001 limit: usize,
1002 writer: W,
1003 ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
1004 where
1005 P: AsRef<std::path::Path>,
1006 W: std::io::Write,
1007 {
1008 io::run_ndjson_matches_file_with_report(self, path, predicate, limit, writer)
1009 }
1010
1011 pub fn run_ndjson_matches_file_with_report_and_options<P, W>(
1013 &self,
1014 path: P,
1015 predicate: &str,
1016 limit: usize,
1017 writer: W,
1018 options: io::NdjsonOptions,
1019 ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
1020 where
1021 P: AsRef<std::path::Path>,
1022 W: std::io::Write,
1023 {
1024 io::run_ndjson_matches_file_with_report_and_options(
1025 self, path, predicate, limit, writer, options,
1026 )
1027 }
1028
1029 pub fn run_ndjson_matches_source<W>(
1032 &self,
1033 source: io::NdjsonSource,
1034 predicate: &str,
1035 limit: usize,
1036 writer: W,
1037 ) -> std::result::Result<usize, JetroEngineError>
1038 where
1039 W: std::io::Write,
1040 {
1041 io::run_ndjson_matches_source(self, source, predicate, limit, writer)
1042 }
1043
1044 pub fn run_ndjson_matches_source_with_options<W>(
1046 &self,
1047 source: io::NdjsonSource,
1048 predicate: &str,
1049 limit: usize,
1050 writer: W,
1051 options: io::NdjsonOptions,
1052 ) -> std::result::Result<usize, JetroEngineError>
1053 where
1054 W: std::io::Write,
1055 {
1056 io::run_ndjson_matches_source_with_options(self, source, predicate, limit, writer, options)
1057 }
1058
1059 pub fn run_ndjson_matches_source_with_report<W>(
1061 &self,
1062 source: io::NdjsonSource,
1063 predicate: &str,
1064 limit: usize,
1065 writer: W,
1066 ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
1067 where
1068 W: std::io::Write,
1069 {
1070 io::run_ndjson_matches_source_with_report(self, source, predicate, limit, writer)
1071 }
1072
1073 pub fn run_ndjson_matches_source_with_report_and_options<W>(
1075 &self,
1076 source: io::NdjsonSource,
1077 predicate: &str,
1078 limit: usize,
1079 writer: W,
1080 options: io::NdjsonOptions,
1081 ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
1082 where
1083 W: std::io::Write,
1084 {
1085 io::run_ndjson_matches_source_with_report_and_options(
1086 self, source, predicate, limit, writer, options,
1087 )
1088 }
1089
1090 pub fn run_ndjson_rev_matches<P, W>(
1093 &self,
1094 path: P,
1095 predicate: &str,
1096 limit: usize,
1097 writer: W,
1098 ) -> std::result::Result<usize, JetroEngineError>
1099 where
1100 P: AsRef<std::path::Path>,
1101 W: std::io::Write,
1102 {
1103 io::run_ndjson_rev_matches(self, path, predicate, limit, writer)
1104 }
1105
1106 pub fn run_ndjson_rev_matches_with_options<P, W>(
1108 &self,
1109 path: P,
1110 predicate: &str,
1111 limit: usize,
1112 writer: W,
1113 options: io::NdjsonOptions,
1114 ) -> std::result::Result<usize, JetroEngineError>
1115 where
1116 P: AsRef<std::path::Path>,
1117 W: std::io::Write,
1118 {
1119 io::run_ndjson_rev_matches_with_options(self, path, predicate, limit, writer, options)
1120 }
1121
1122 pub fn run_ndjson_rev_matches_with_report<P, W>(
1124 &self,
1125 path: P,
1126 predicate: &str,
1127 limit: usize,
1128 writer: W,
1129 ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
1130 where
1131 P: AsRef<std::path::Path>,
1132 W: std::io::Write,
1133 {
1134 io::run_ndjson_rev_matches_with_report(self, path, predicate, limit, writer)
1135 }
1136
1137 pub fn run_ndjson_rev_matches_with_report_and_options<P, W>(
1139 &self,
1140 path: P,
1141 predicate: &str,
1142 limit: usize,
1143 writer: W,
1144 options: io::NdjsonOptions,
1145 ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
1146 where
1147 P: AsRef<std::path::Path>,
1148 W: std::io::Write,
1149 {
1150 io::run_ndjson_rev_matches_with_report_and_options(
1151 self, path, predicate, limit, writer, options,
1152 )
1153 }
1154
1155 pub fn collect_ndjson<R>(
1158 &self,
1159 reader: R,
1160 query: &str,
1161 ) -> std::result::Result<Vec<Value>, JetroEngineError>
1162 where
1163 R: std::io::BufRead,
1164 {
1165 io::collect_ndjson(self, reader, query)
1166 }
1167
1168 pub fn collect_ndjson_file<P>(
1170 &self,
1171 path: P,
1172 query: &str,
1173 ) -> std::result::Result<Vec<Value>, JetroEngineError>
1174 where
1175 P: AsRef<std::path::Path>,
1176 {
1177 io::collect_ndjson_file(self, path, query)
1178 }
1179
1180 pub fn collect_ndjson_file_with_options<P>(
1182 &self,
1183 path: P,
1184 query: &str,
1185 options: io::NdjsonOptions,
1186 ) -> std::result::Result<Vec<Value>, JetroEngineError>
1187 where
1188 P: AsRef<std::path::Path>,
1189 {
1190 io::collect_ndjson_file_with_options(self, path, query, options)
1191 }
1192
1193 pub fn collect_ndjson_source(
1195 &self,
1196 source: io::NdjsonSource,
1197 query: &str,
1198 ) -> std::result::Result<Vec<Value>, JetroEngineError> {
1199 io::collect_ndjson_source(self, source, query)
1200 }
1201
1202 pub fn collect_ndjson_source_with_options(
1204 &self,
1205 source: io::NdjsonSource,
1206 query: &str,
1207 options: io::NdjsonOptions,
1208 ) -> std::result::Result<Vec<Value>, JetroEngineError> {
1209 io::collect_ndjson_source_with_options(self, source, query, options)
1210 }
1211
1212 pub fn collect_ndjson_rev<P>(
1214 &self,
1215 path: P,
1216 query: &str,
1217 ) -> std::result::Result<Vec<Value>, JetroEngineError>
1218 where
1219 P: AsRef<std::path::Path>,
1220 {
1221 io::collect_ndjson_rev(self, path, query)
1222 }
1223
1224 pub fn collect_ndjson_rev_with_options<P>(
1226 &self,
1227 path: P,
1228 query: &str,
1229 options: io::NdjsonOptions,
1230 ) -> std::result::Result<Vec<Value>, JetroEngineError>
1231 where
1232 P: AsRef<std::path::Path>,
1233 {
1234 io::collect_ndjson_rev_with_options(self, path, query, options)
1235 }
1236
1237 pub fn for_each_ndjson_rev<P, F>(
1240 &self,
1241 path: P,
1242 query: &str,
1243 f: F,
1244 ) -> std::result::Result<usize, JetroEngineError>
1245 where
1246 P: AsRef<std::path::Path>,
1247 F: FnMut(Value),
1248 {
1249 io::for_each_ndjson_rev(self, path, query, f)
1250 }
1251
1252 pub fn for_each_ndjson_rev_until<P, F>(
1255 &self,
1256 path: P,
1257 query: &str,
1258 f: F,
1259 ) -> std::result::Result<usize, JetroEngineError>
1260 where
1261 P: AsRef<std::path::Path>,
1262 F: FnMut(Value) -> std::result::Result<io::NdjsonControl, JetroEngineError>,
1263 {
1264 io::for_each_ndjson_rev_with_options(self, path, query, io::NdjsonOptions::default(), f)
1265 }
1266
1267 pub fn for_each_ndjson_rev_until_with_options<P, F>(
1269 &self,
1270 path: P,
1271 query: &str,
1272 options: io::NdjsonOptions,
1273 f: F,
1274 ) -> std::result::Result<usize, JetroEngineError>
1275 where
1276 P: AsRef<std::path::Path>,
1277 F: FnMut(Value) -> std::result::Result<io::NdjsonControl, JetroEngineError>,
1278 {
1279 io::for_each_ndjson_rev_with_options(self, path, query, options, f)
1280 }
1281
1282 pub fn for_each_ndjson_rev_with_options<P, F>(
1284 &self,
1285 path: P,
1286 query: &str,
1287 options: io::NdjsonOptions,
1288 mut f: F,
1289 ) -> std::result::Result<usize, JetroEngineError>
1290 where
1291 P: AsRef<std::path::Path>,
1292 F: FnMut(Value),
1293 {
1294 io::for_each_ndjson_rev_with_options(self, path, query, options, |value| {
1295 f(value);
1296 Ok(io::NdjsonControl::Continue)
1297 })
1298 }
1299
1300 pub fn collect_ndjson_with_options<R>(
1302 &self,
1303 reader: R,
1304 query: &str,
1305 options: io::NdjsonOptions,
1306 ) -> std::result::Result<Vec<Value>, JetroEngineError>
1307 where
1308 R: std::io::BufRead,
1309 {
1310 io::collect_ndjson_with_options(self, reader, query, options)
1311 }
1312
1313 pub fn collect_ndjson_matches<R>(
1316 &self,
1317 reader: R,
1318 predicate: &str,
1319 limit: usize,
1320 ) -> std::result::Result<Vec<Value>, JetroEngineError>
1321 where
1322 R: std::io::BufRead,
1323 {
1324 io::collect_ndjson_matches(self, reader, predicate, limit)
1325 }
1326
1327 pub fn collect_ndjson_matches_with_options<R>(
1329 &self,
1330 reader: R,
1331 predicate: &str,
1332 limit: usize,
1333 options: io::NdjsonOptions,
1334 ) -> std::result::Result<Vec<Value>, JetroEngineError>
1335 where
1336 R: std::io::BufRead,
1337 {
1338 io::collect_ndjson_matches_with_options(self, reader, predicate, limit, options)
1339 }
1340
1341 pub fn collect_ndjson_matches_file<P>(
1343 &self,
1344 path: P,
1345 predicate: &str,
1346 limit: usize,
1347 ) -> std::result::Result<Vec<Value>, JetroEngineError>
1348 where
1349 P: AsRef<std::path::Path>,
1350 {
1351 io::collect_ndjson_matches_file(self, path, predicate, limit)
1352 }
1353
1354 pub fn collect_ndjson_matches_file_with_options<P>(
1356 &self,
1357 path: P,
1358 predicate: &str,
1359 limit: usize,
1360 options: io::NdjsonOptions,
1361 ) -> std::result::Result<Vec<Value>, JetroEngineError>
1362 where
1363 P: AsRef<std::path::Path>,
1364 {
1365 io::collect_ndjson_matches_file_with_options(self, path, predicate, limit, options)
1366 }
1367
1368 pub fn collect_ndjson_matches_source(
1371 &self,
1372 source: io::NdjsonSource,
1373 predicate: &str,
1374 limit: usize,
1375 ) -> std::result::Result<Vec<Value>, JetroEngineError> {
1376 io::collect_ndjson_matches_source(self, source, predicate, limit)
1377 }
1378
1379 pub fn collect_ndjson_matches_source_with_options(
1381 &self,
1382 source: io::NdjsonSource,
1383 predicate: &str,
1384 limit: usize,
1385 options: io::NdjsonOptions,
1386 ) -> std::result::Result<Vec<Value>, JetroEngineError> {
1387 io::collect_ndjson_matches_source_with_options(self, source, predicate, limit, options)
1388 }
1389
1390 pub fn collect_ndjson_rev_matches<P>(
1393 &self,
1394 path: P,
1395 predicate: &str,
1396 limit: usize,
1397 ) -> std::result::Result<Vec<Value>, JetroEngineError>
1398 where
1399 P: AsRef<std::path::Path>,
1400 {
1401 io::collect_ndjson_rev_matches(self, path, predicate, limit)
1402 }
1403
1404 pub fn collect_ndjson_rev_matches_with_options<P>(
1406 &self,
1407 path: P,
1408 predicate: &str,
1409 limit: usize,
1410 options: io::NdjsonOptions,
1411 ) -> std::result::Result<Vec<Value>, JetroEngineError>
1412 where
1413 P: AsRef<std::path::Path>,
1414 {
1415 io::collect_ndjson_rev_matches_with_options(self, path, predicate, limit, options)
1416 }
1417
1418 pub fn for_each_ndjson<R, F>(
1421 &self,
1422 reader: R,
1423 query: &str,
1424 f: F,
1425 ) -> std::result::Result<usize, JetroEngineError>
1426 where
1427 R: std::io::BufRead,
1428 F: FnMut(Value),
1429 {
1430 io::for_each_ndjson(self, reader, query, f)
1431 }
1432
1433 pub fn for_each_ndjson_until<R, F>(
1436 &self,
1437 reader: R,
1438 query: &str,
1439 f: F,
1440 ) -> std::result::Result<usize, JetroEngineError>
1441 where
1442 R: std::io::BufRead,
1443 F: FnMut(Value) -> std::result::Result<io::NdjsonControl, JetroEngineError>,
1444 {
1445 io::for_each_ndjson_until(self, reader, query, f)
1446 }
1447
1448 pub fn for_each_ndjson_source<F>(
1451 &self,
1452 source: io::NdjsonSource,
1453 query: &str,
1454 f: F,
1455 ) -> std::result::Result<usize, JetroEngineError>
1456 where
1457 F: FnMut(Value),
1458 {
1459 io::for_each_ndjson_source(self, source, query, f)
1460 }
1461
1462 pub fn for_each_ndjson_source_until<F>(
1465 &self,
1466 source: io::NdjsonSource,
1467 query: &str,
1468 f: F,
1469 ) -> std::result::Result<usize, JetroEngineError>
1470 where
1471 F: FnMut(Value) -> std::result::Result<io::NdjsonControl, JetroEngineError>,
1472 {
1473 io::for_each_ndjson_source_until(self, source, query, f)
1474 }
1475
1476 pub fn for_each_ndjson_source_until_with_options<F>(
1478 &self,
1479 source: io::NdjsonSource,
1480 query: &str,
1481 options: io::NdjsonOptions,
1482 f: F,
1483 ) -> std::result::Result<usize, JetroEngineError>
1484 where
1485 F: FnMut(Value) -> std::result::Result<io::NdjsonControl, JetroEngineError>,
1486 {
1487 io::for_each_ndjson_source_until_with_options(self, source, query, options, f)
1488 }
1489
1490 pub fn for_each_ndjson_source_with_options<F>(
1492 &self,
1493 source: io::NdjsonSource,
1494 query: &str,
1495 options: io::NdjsonOptions,
1496 f: F,
1497 ) -> std::result::Result<usize, JetroEngineError>
1498 where
1499 F: FnMut(Value),
1500 {
1501 io::for_each_ndjson_source_with_options(self, source, query, options, f)
1502 }
1503
1504 pub fn for_each_ndjson_with_options<R, F>(
1506 &self,
1507 reader: R,
1508 query: &str,
1509 options: io::NdjsonOptions,
1510 f: F,
1511 ) -> std::result::Result<usize, JetroEngineError>
1512 where
1513 R: std::io::BufRead,
1514 F: FnMut(Value),
1515 {
1516 io::for_each_ndjson_with_options(self, reader, query, options, f)
1517 }
1518
1519 pub fn for_each_ndjson_until_with_options<R, F>(
1521 &self,
1522 reader: R,
1523 query: &str,
1524 options: io::NdjsonOptions,
1525 f: F,
1526 ) -> std::result::Result<usize, JetroEngineError>
1527 where
1528 R: std::io::BufRead,
1529 F: FnMut(Value) -> std::result::Result<io::NdjsonControl, JetroEngineError>,
1530 {
1531 io::for_each_ndjson_until_with_options(self, reader, query, options, f)
1532 }
1533
1534 pub(crate) fn cached_plan(
1537 &self,
1538 expr: &str,
1539 context: plan::physical::PlanningContext,
1540 ) -> ir::physical::QueryPlan {
1541 let mut cache = self.plan_cache.lock().expect("plan cache poisoned");
1542 let cache_key = format!("{}\0{}", context.cache_key(), expr);
1543 if let Some(plan) = cache.get(&cache_key) {
1544 return plan.clone();
1545 }
1546
1547 let plan = plan::physical::plan_query_with_context(expr, context);
1548 if self.plan_cache_limit > 0 {
1549 if cache.len() >= self.plan_cache_limit {
1550 cache.clear();
1551 }
1552 cache.insert(cache_key, plan.clone());
1553 }
1554 plan
1555 }
1556}
1557
1558impl exec::pipeline::PipelineData for Jetro {
1559 fn promote_objvec(&self, arr: &Arc<Vec<Val>>) -> Option<Arc<crate::data::value::ObjVecData>> {
1560 self.get_or_promote_objvec(arr)
1561 }
1562}
1563
1564impl Jetro {
1565 pub(crate) fn lazy_tape(
1568 &self,
1569 ) -> std::result::Result<Option<&Arc<crate::data::tape::TapeData>>, EvalError> {
1570 if let Some(result) = self.tape.get() {
1571 return result
1572 .as_ref()
1573 .map(Some)
1574 .map_err(|err| EvalError(format!("Invalid JSON: {err}")));
1575 }
1576 let Some(raw) = self.raw_bytes.as_ref() else {
1577 return Ok(None);
1578 };
1579 let bytes: Vec<u8> = (**raw).to_vec();
1580 let parsed = crate::data::tape::TapeData::parse(bytes).map_err(|err| err.to_string());
1581 let _ = self.tape.set(parsed);
1582 self.tape
1583 .get()
1584 .expect("tape cache initialized")
1585 .as_ref()
1586 .map(Some)
1587 .map_err(|err| EvalError(format!("Invalid JSON: {err}")))
1588 }
1589
1590 pub(crate) fn get_or_promote_objvec(
1593 &self,
1594 arr: &Arc<Vec<Val>>,
1595 ) -> Option<Arc<crate::data::value::ObjVecData>> {
1596 let key = Arc::as_ptr(arr) as usize;
1597 if let Ok(cache) = self.objvec_cache.lock() {
1598 if let Some(d) = cache.get(&key) {
1599 return Some(Arc::clone(d));
1600 }
1601 }
1602 let promoted = exec::pipeline::Pipeline::try_promote_objvec_arr(arr)?;
1603 if let Ok(mut cache) = self.objvec_cache.lock() {
1604 cache.entry(key).or_insert_with(|| Arc::clone(&promoted));
1605 }
1606 Some(promoted)
1607 }
1608
1609 pub(crate) fn new(document: Value) -> Self {
1611 Self {
1612 document,
1613 root_val: OnceCell::new(),
1614 objvec_cache: Default::default(),
1615 raw_bytes: None,
1616 tape: OnceCell::new(),
1617 structural_index: OnceCell::new(),
1618 vm: RefCell::new(VM::new()),
1619 }
1620 }
1621
1622 pub(crate) fn from_val_and_value(root: Val, document: Value) -> Self {
1627 let root_val = OnceCell::new();
1628 let _ = root_val.set(root);
1629 Self {
1630 document,
1631 root_val,
1632 objvec_cache: Default::default(),
1633 raw_bytes: None,
1634 tape: OnceCell::new(),
1635 structural_index: OnceCell::new(),
1636 vm: RefCell::new(VM::new()),
1637 }
1638 }
1639
1640 pub(crate) fn root_val_with(
1645 &self,
1646 keys: &crate::data::intern::KeyCache,
1647 ) -> std::result::Result<Val, EvalError> {
1648 if let Some(root) = self.root_val.get() {
1649 return Ok(root.clone());
1650 }
1651 let root = {
1652 if let Some(tape) = self.lazy_tape()? {
1653 Val::from_tape_data_with(keys, tape)
1654 } else {
1655 Val::from_value_with(keys, &self.document)
1656 }
1657 };
1658 let _ = self.root_val.set(root);
1659 Ok(self.root_val.get().expect("root val initialized").clone())
1660 }
1661
1662 pub fn from_bytes(bytes: Vec<u8>) -> std::result::Result<Self, serde_json::Error> {
1666 Ok(Self {
1667 document: Value::Null,
1668 root_val: OnceCell::new(),
1669 objvec_cache: Default::default(),
1670 raw_bytes: Some(Arc::from(bytes.into_boxed_slice())),
1671 tape: OnceCell::new(),
1672 structural_index: OnceCell::new(),
1673 vm: RefCell::new(VM::new()),
1674 })
1675 }
1676
1677 pub(crate) fn with_vm<F, R>(&self, f: F) -> R
1679 where
1680 F: FnOnce(&mut VM) -> R,
1681 {
1682 match self.vm.try_borrow_mut() {
1683 Ok(mut vm) => f(&mut vm),
1684 Err(_) => {
1685 let mut vm = VM::new();
1686 f(&mut vm)
1687 }
1688 }
1689 }
1690
1691 pub(crate) fn raw_bytes(&self) -> Option<&[u8]> {
1694 self.raw_bytes.as_deref()
1695 }
1696
1697 pub(crate) fn lazy_structural_index(
1700 &self,
1701 ) -> std::result::Result<Option<&Arc<jetro_experimental::StructuralIndex>>, EvalError> {
1702 if let Some(result) = self.structural_index.get() {
1703 return result
1704 .as_ref()
1705 .map(Some)
1706 .map_err(|err| EvalError(format!("Invalid JSON: {err}")));
1707 }
1708 let Some(raw) = self.raw_bytes.as_ref() else {
1709 return Ok(None);
1710 };
1711 let built = jetro_experimental::from_bytes_with(
1712 raw.as_ref(),
1713 jetro_experimental::BuildOptions::keys_only(),
1714 )
1715 .map(Arc::new)
1716 .map_err(|err| err.to_string());
1717 let _ = self.structural_index.set(built);
1718 self.structural_index
1719 .get()
1720 .expect("structural index cache initialized")
1721 .as_ref()
1722 .map(Some)
1723 .map_err(|err| EvalError(format!("Invalid JSON: {err}")))
1724 }
1725
1726 pub(crate) fn root_val(&self) -> std::result::Result<Val, EvalError> {
1729 if let Some(root) = self.root_val.get() {
1730 return Ok(root.clone());
1731 }
1732 let root = {
1733 if let Some(tape) = self.lazy_tape()? {
1734 Val::from_tape_data(tape)
1735 } else {
1736 Val::from(&self.document)
1737 }
1738 };
1739 let _ = self.root_val.set(root);
1740 Ok(self.root_val.get().expect("root val initialized").clone())
1741 }
1742
1743 #[cfg(test)]
1746 pub(crate) fn root_val_is_materialized(&self) -> bool {
1747 self.root_val.get().is_some()
1748 }
1749
1750 #[cfg(test)]
1751 pub(crate) fn structural_index_is_built(&self) -> bool {
1752 self.structural_index.get().is_some()
1753 }
1754
1755 #[cfg(test)]
1756 pub(crate) fn tape_is_built(&self) -> bool {
1757 self.tape.get().is_some()
1758 }
1759
1760 #[cfg(test)]
1761 pub(crate) fn reset_tape_materialized_subtrees(&self) {
1762 if let Ok(Some(tape)) = self.lazy_tape() {
1763 tape.reset_materialized_subtrees();
1764 }
1765 }
1766
1767 #[cfg(test)]
1768 pub(crate) fn tape_materialized_subtrees(&self) -> usize {
1769 self.lazy_tape()
1770 .ok()
1771 .flatten()
1772 .map(|tape| tape.materialized_subtrees())
1773 .unwrap_or(0)
1774 }
1775
1776 pub fn collect<S: AsRef<str>>(&self, expr: S) -> std::result::Result<Value, EvalError> {
1780 exec::router::collect_json(self, expr.as_ref())
1781 }
1782}
1783
1784impl From<Value> for Jetro {
1788 fn from(v: Value) -> Self {
1790 Self::new(v)
1791 }
1792}