1pub(crate) mod builtins;
57pub(crate) mod compile;
58pub(crate) mod data;
59pub(crate) mod exec;
60pub mod io;
61pub(crate) mod ir;
62pub(crate) mod parse;
63pub(crate) mod plan;
64pub(crate) mod util;
65pub(crate) mod vm;
66
67#[cfg(test)]
68mod tests;
69
70use data::value::Val;
71use serde_json::Value;
72use std::cell::{OnceCell, RefCell};
73use std::collections::HashMap;
74use std::sync::Arc;
75use std::sync::Mutex;
76
77pub use data::context::EvalError;
78#[cfg(test)]
79use parse::parser::ParseError;
80use vm::VM;
81
82#[cfg(feature = "fuzz_internal")]
86pub mod __fuzz_internal {
87 pub use crate::parse::parser::{parse, ParseError};
88 pub use crate::plan::physical::plan_query;
89}
90
91#[cfg(test)]
92#[derive(Debug)]
93pub(crate) enum Error {
94 Parse(ParseError),
95 Eval(EvalError),
96}
97
98#[cfg(test)]
99impl std::fmt::Display for Error {
100 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101 match self {
102 Error::Parse(e) => write!(f, "{}", e),
103 Error::Eval(e) => write!(f, "{}", e),
104 }
105 }
106}
107#[cfg(test)]
108impl std::error::Error for Error {
109 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
110 match self {
111 Error::Parse(e) => Some(e),
112 Error::Eval(_) => None,
113 }
114 }
115}
116
117#[cfg(test)]
118impl From<ParseError> for Error {
119 fn from(e: ParseError) -> Self {
120 Error::Parse(e)
121 }
122}
123#[cfg(test)]
124impl From<EvalError> for Error {
125 fn from(e: EvalError) -> Self {
126 Error::Eval(e)
127 }
128}
129
130pub struct Jetro {
135 document: Value,
138 root_val: OnceCell<Val>,
140 raw_bytes: Option<Arc<[u8]>>,
142
143 #[cfg(feature = "simd-json")]
145 tape: OnceCell<std::result::Result<Arc<crate::data::tape::TapeData>, String>>,
146 #[cfg(not(feature = "simd-json"))]
148 #[allow(dead_code)]
149 tape: OnceCell<()>,
150
151 structural_index:
153 OnceCell<std::result::Result<Arc<jetro_experimental::StructuralIndex>, String>>,
154
155 pub(crate) objvec_cache:
158 std::sync::Mutex<std::collections::HashMap<usize, Arc<crate::data::value::ObjVecData>>>,
159
160 vm: RefCell<VM>,
162}
163
164pub struct JetroEngine {
169 plan_cache: Mutex<HashMap<String, ir::physical::QueryPlan>>,
171 plan_cache_limit: usize,
173 vm: Mutex<VM>,
175 keys: Arc<crate::data::intern::KeyCache>,
181}
182
183#[derive(Debug)]
186pub enum JetroEngineError {
187 Json(serde_json::Error),
189 Io(std::io::Error),
191 Ndjson(io::RowError),
193 Eval(EvalError),
195}
196
197impl std::fmt::Display for JetroEngineError {
198 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
199 match self {
200 Self::Json(err) => write!(f, "{}", err),
201 Self::Io(err) => write!(f, "{}", err),
202 Self::Ndjson(err) => write!(f, "{}", err),
203 Self::Eval(err) => write!(f, "{}", err),
204 }
205 }
206}
207
208impl std::error::Error for JetroEngineError {
209 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
210 match self {
211 Self::Json(err) => Some(err),
212 Self::Io(err) => Some(err),
213 Self::Ndjson(err) => Some(err),
214 Self::Eval(_) => None,
215 }
216 }
217}
218
219impl From<serde_json::Error> for JetroEngineError {
220 fn from(err: serde_json::Error) -> Self {
221 Self::Json(err)
222 }
223}
224
225impl From<std::io::Error> for JetroEngineError {
226 fn from(err: std::io::Error) -> Self {
227 Self::Io(err)
228 }
229}
230
231impl From<io::RowError> for JetroEngineError {
232 fn from(err: io::RowError) -> Self {
233 Self::Ndjson(err)
234 }
235}
236
237impl From<EvalError> for JetroEngineError {
238 fn from(err: EvalError) -> Self {
239 Self::Eval(err)
240 }
241}
242
243impl Default for JetroEngine {
244 fn default() -> Self {
245 Self::new()
246 }
247}
248
249impl JetroEngine {
250 const DEFAULT_PLAN_CACHE_LIMIT: usize = 256;
252
253 pub fn new() -> Self {
255 Self::with_plan_cache_limit(Self::DEFAULT_PLAN_CACHE_LIMIT)
256 }
257
258 pub fn with_plan_cache_limit(plan_cache_limit: usize) -> Self {
261 Self {
262 plan_cache: Mutex::new(HashMap::new()),
263 plan_cache_limit,
264 vm: Mutex::new(VM::new()),
265 keys: crate::data::intern::KeyCache::new(),
266 }
267 }
268
269 pub fn keys(&self) -> &Arc<crate::data::intern::KeyCache> {
271 &self.keys
272 }
273
274 pub fn clear_cache(&self) {
277 self.plan_cache.lock().expect("plan cache poisoned").clear();
278 self.keys.clear();
279 }
280
281 pub fn parse_value(&self, document: Value) -> Jetro {
286 let root = Val::from_value_with(&self.keys, &document);
287 Jetro::from_val_and_value(root, document)
288 }
289
290 pub fn parse_bytes(&self, bytes: Vec<u8>) -> std::result::Result<Jetro, JetroEngineError> {
295 let document = Jetro::from_bytes(bytes)?;
296 let _ = document.root_val_with(&self.keys)?;
300 Ok(document)
301 }
302
303 pub(crate) fn parse_bytes_lazy(
307 &self,
308 bytes: Vec<u8>,
309 ) -> std::result::Result<Jetro, JetroEngineError> {
310 Ok(Jetro::from_bytes(bytes)?)
311 }
312
313 pub fn collect<S: AsRef<str>>(
316 &self,
317 document: &Jetro,
318 expr: S,
319 ) -> std::result::Result<Value, EvalError> {
320 let plan = self.cached_plan(expr.as_ref(), exec::router::planning_context(document));
321 self.collect_prepared(document, &plan)
322 }
323
324 pub(crate) fn collect_prepared(
325 &self,
326 document: &Jetro,
327 plan: &ir::physical::QueryPlan,
328 ) -> std::result::Result<Value, EvalError> {
329 self.collect_prepared_val(document, plan).map(Value::from)
330 }
331
332 pub(crate) fn collect_prepared_val(
333 &self,
334 document: &Jetro,
335 plan: &ir::physical::QueryPlan,
336 ) -> std::result::Result<Val, EvalError> {
337 let mut vm = self.vm.lock().expect("vm cache poisoned");
338 exec::router::collect_plan_val_with_vm(document, plan, &mut vm)
339 }
340
341 pub(crate) fn lock_vm(&self) -> std::sync::MutexGuard<'_, VM> {
342 self.vm.lock().expect("vm cache poisoned")
343 }
344
345 pub fn collect_value<S: AsRef<str>>(
349 &self,
350 document: Value,
351 expr: S,
352 ) -> std::result::Result<Value, EvalError> {
353 let document = self.parse_value(document);
354 self.collect(&document, expr)
355 }
356
357 pub fn collect_bytes<S: AsRef<str>>(
362 &self,
363 bytes: Vec<u8>,
364 expr: S,
365 ) -> std::result::Result<Value, JetroEngineError> {
366 let document = self.parse_bytes(bytes)?;
367 Ok(self.collect(&document, expr)?)
368 }
369
370 pub fn run_ndjson<R, W>(
373 &self,
374 reader: R,
375 query: &str,
376 writer: W,
377 ) -> std::result::Result<usize, JetroEngineError>
378 where
379 R: std::io::BufRead,
380 W: std::io::Write,
381 {
382 io::run_ndjson(self, reader, query, writer)
383 }
384
385 pub fn run_ndjson_file<P, W>(
388 &self,
389 path: P,
390 query: &str,
391 writer: W,
392 ) -> std::result::Result<usize, JetroEngineError>
393 where
394 P: AsRef<std::path::Path>,
395 W: std::io::Write,
396 {
397 io::run_ndjson_file(self, path, query, writer)
398 }
399
400 pub fn run_ndjson_file_with_options<P, W>(
402 &self,
403 path: P,
404 query: &str,
405 writer: W,
406 options: io::NdjsonOptions,
407 ) -> std::result::Result<usize, JetroEngineError>
408 where
409 P: AsRef<std::path::Path>,
410 W: std::io::Write,
411 {
412 io::run_ndjson_file_with_options(self, path, query, writer, options)
413 }
414
415 pub fn run_ndjson_file_limit<P, W>(
417 &self,
418 path: P,
419 query: &str,
420 limit: usize,
421 writer: W,
422 ) -> std::result::Result<usize, JetroEngineError>
423 where
424 P: AsRef<std::path::Path>,
425 W: std::io::Write,
426 {
427 io::run_ndjson_file_limit(self, path, query, limit, writer)
428 }
429
430 pub fn run_ndjson_file_limit_with_options<P, W>(
432 &self,
433 path: P,
434 query: &str,
435 limit: usize,
436 writer: W,
437 options: io::NdjsonOptions,
438 ) -> std::result::Result<usize, JetroEngineError>
439 where
440 P: AsRef<std::path::Path>,
441 W: std::io::Write,
442 {
443 io::run_ndjson_file_limit_with_options(self, path, query, limit, writer, options)
444 }
445
446 pub fn run_ndjson_source<W>(
448 &self,
449 source: io::NdjsonSource,
450 query: &str,
451 writer: W,
452 ) -> std::result::Result<usize, JetroEngineError>
453 where
454 W: std::io::Write,
455 {
456 io::run_ndjson_source(self, source, query, writer)
457 }
458
459 pub fn run_ndjson_source_with_options<W>(
461 &self,
462 source: io::NdjsonSource,
463 query: &str,
464 writer: W,
465 options: io::NdjsonOptions,
466 ) -> std::result::Result<usize, JetroEngineError>
467 where
468 W: std::io::Write,
469 {
470 io::run_ndjson_source_with_options(self, source, query, writer, options)
471 }
472
473 pub fn run_ndjson_source_limit<W>(
476 &self,
477 source: io::NdjsonSource,
478 query: &str,
479 limit: usize,
480 writer: W,
481 ) -> std::result::Result<usize, JetroEngineError>
482 where
483 W: std::io::Write,
484 {
485 io::run_ndjson_source_limit(self, source, query, limit, writer)
486 }
487
488 pub fn run_ndjson_source_limit_with_options<W>(
490 &self,
491 source: io::NdjsonSource,
492 query: &str,
493 limit: usize,
494 writer: W,
495 options: io::NdjsonOptions,
496 ) -> std::result::Result<usize, JetroEngineError>
497 where
498 W: std::io::Write,
499 {
500 io::run_ndjson_source_limit_with_options(self, source, query, limit, writer, options)
501 }
502
503 pub fn run_ndjson_rev<P, W>(
505 &self,
506 path: P,
507 query: &str,
508 writer: W,
509 ) -> std::result::Result<usize, JetroEngineError>
510 where
511 P: AsRef<std::path::Path>,
512 W: std::io::Write,
513 {
514 io::run_ndjson_rev(self, path, query, writer)
515 }
516
517 pub fn run_ndjson_rev_with_options<P, W>(
519 &self,
520 path: P,
521 query: &str,
522 writer: W,
523 options: io::NdjsonOptions,
524 ) -> std::result::Result<usize, JetroEngineError>
525 where
526 P: AsRef<std::path::Path>,
527 W: std::io::Write,
528 {
529 io::run_ndjson_rev_with_options(self, path, query, writer, options)
530 }
531
532 pub fn run_ndjson_rev_limit<P, W>(
535 &self,
536 path: P,
537 query: &str,
538 limit: usize,
539 writer: W,
540 ) -> std::result::Result<usize, JetroEngineError>
541 where
542 P: AsRef<std::path::Path>,
543 W: std::io::Write,
544 {
545 io::run_ndjson_rev_limit(self, path, query, limit, writer)
546 }
547
548 pub fn run_ndjson_rev_limit_with_options<P, W>(
550 &self,
551 path: P,
552 query: &str,
553 limit: usize,
554 writer: W,
555 options: io::NdjsonOptions,
556 ) -> std::result::Result<usize, JetroEngineError>
557 where
558 P: AsRef<std::path::Path>,
559 W: std::io::Write,
560 {
561 io::run_ndjson_rev_limit_with_options(self, path, query, limit, writer, options)
562 }
563
564 pub fn run_ndjson_with_options<R, W>(
566 &self,
567 reader: R,
568 query: &str,
569 writer: W,
570 options: io::NdjsonOptions,
571 ) -> std::result::Result<usize, JetroEngineError>
572 where
573 R: std::io::BufRead,
574 W: std::io::Write,
575 {
576 io::run_ndjson_with_options(self, reader, query, writer, options)
577 }
578
579 pub fn run_ndjson_limit<R, W>(
581 &self,
582 reader: R,
583 query: &str,
584 limit: usize,
585 writer: W,
586 ) -> std::result::Result<usize, JetroEngineError>
587 where
588 R: std::io::BufRead,
589 W: std::io::Write,
590 {
591 io::run_ndjson_limit(self, reader, query, limit, writer)
592 }
593
594 pub fn run_ndjson_limit_with_options<R, W>(
596 &self,
597 reader: R,
598 query: &str,
599 limit: usize,
600 writer: W,
601 options: io::NdjsonOptions,
602 ) -> std::result::Result<usize, JetroEngineError>
603 where
604 R: std::io::BufRead,
605 W: std::io::Write,
606 {
607 io::run_ndjson_limit_with_options(self, reader, query, limit, writer, options)
608 }
609
610 pub fn run_ndjson_matches<R, W>(
613 &self,
614 reader: R,
615 predicate: &str,
616 limit: usize,
617 writer: W,
618 ) -> std::result::Result<usize, JetroEngineError>
619 where
620 R: std::io::BufRead,
621 W: std::io::Write,
622 {
623 io::run_ndjson_matches(self, reader, predicate, limit, writer)
624 }
625
626 pub fn run_ndjson_matches_with_options<R, W>(
628 &self,
629 reader: R,
630 predicate: &str,
631 limit: usize,
632 writer: W,
633 options: io::NdjsonOptions,
634 ) -> std::result::Result<usize, JetroEngineError>
635 where
636 R: std::io::BufRead,
637 W: std::io::Write,
638 {
639 io::run_ndjson_matches_with_options(self, reader, predicate, limit, writer, options)
640 }
641
642 pub fn run_ndjson_matches_file<P, W>(
644 &self,
645 path: P,
646 predicate: &str,
647 limit: usize,
648 writer: W,
649 ) -> std::result::Result<usize, JetroEngineError>
650 where
651 P: AsRef<std::path::Path>,
652 W: std::io::Write,
653 {
654 io::run_ndjson_matches_file(self, path, predicate, limit, writer)
655 }
656
657 pub fn run_ndjson_matches_file_with_options<P, W>(
659 &self,
660 path: P,
661 predicate: &str,
662 limit: usize,
663 writer: W,
664 options: io::NdjsonOptions,
665 ) -> std::result::Result<usize, JetroEngineError>
666 where
667 P: AsRef<std::path::Path>,
668 W: std::io::Write,
669 {
670 io::run_ndjson_matches_file_with_options(self, path, predicate, limit, writer, options)
671 }
672
673 pub fn run_ndjson_matches_source<W>(
676 &self,
677 source: io::NdjsonSource,
678 predicate: &str,
679 limit: usize,
680 writer: W,
681 ) -> std::result::Result<usize, JetroEngineError>
682 where
683 W: std::io::Write,
684 {
685 io::run_ndjson_matches_source(self, source, predicate, limit, writer)
686 }
687
688 pub fn run_ndjson_matches_source_with_options<W>(
690 &self,
691 source: io::NdjsonSource,
692 predicate: &str,
693 limit: usize,
694 writer: W,
695 options: io::NdjsonOptions,
696 ) -> std::result::Result<usize, JetroEngineError>
697 where
698 W: std::io::Write,
699 {
700 io::run_ndjson_matches_source_with_options(self, source, predicate, limit, writer, options)
701 }
702
703 pub fn run_ndjson_rev_matches<P, W>(
706 &self,
707 path: P,
708 predicate: &str,
709 limit: usize,
710 writer: W,
711 ) -> std::result::Result<usize, JetroEngineError>
712 where
713 P: AsRef<std::path::Path>,
714 W: std::io::Write,
715 {
716 io::run_ndjson_rev_matches(self, path, predicate, limit, writer)
717 }
718
719 pub fn run_ndjson_rev_matches_with_options<P, W>(
721 &self,
722 path: P,
723 predicate: &str,
724 limit: usize,
725 writer: W,
726 options: io::NdjsonOptions,
727 ) -> std::result::Result<usize, JetroEngineError>
728 where
729 P: AsRef<std::path::Path>,
730 W: std::io::Write,
731 {
732 io::run_ndjson_rev_matches_with_options(self, path, predicate, limit, writer, options)
733 }
734
735 pub fn collect_ndjson<R>(
738 &self,
739 reader: R,
740 query: &str,
741 ) -> std::result::Result<Vec<Value>, JetroEngineError>
742 where
743 R: std::io::BufRead,
744 {
745 io::collect_ndjson(self, reader, query)
746 }
747
748 pub fn collect_ndjson_file<P>(
750 &self,
751 path: P,
752 query: &str,
753 ) -> std::result::Result<Vec<Value>, JetroEngineError>
754 where
755 P: AsRef<std::path::Path>,
756 {
757 io::collect_ndjson_file(self, path, query)
758 }
759
760 pub fn collect_ndjson_file_with_options<P>(
762 &self,
763 path: P,
764 query: &str,
765 options: io::NdjsonOptions,
766 ) -> std::result::Result<Vec<Value>, JetroEngineError>
767 where
768 P: AsRef<std::path::Path>,
769 {
770 io::collect_ndjson_file_with_options(self, path, query, options)
771 }
772
773 pub fn collect_ndjson_source(
775 &self,
776 source: io::NdjsonSource,
777 query: &str,
778 ) -> std::result::Result<Vec<Value>, JetroEngineError> {
779 io::collect_ndjson_source(self, source, query)
780 }
781
782 pub fn collect_ndjson_source_with_options(
784 &self,
785 source: io::NdjsonSource,
786 query: &str,
787 options: io::NdjsonOptions,
788 ) -> std::result::Result<Vec<Value>, JetroEngineError> {
789 io::collect_ndjson_source_with_options(self, source, query, options)
790 }
791
792 pub fn collect_ndjson_rev<P>(
794 &self,
795 path: P,
796 query: &str,
797 ) -> std::result::Result<Vec<Value>, JetroEngineError>
798 where
799 P: AsRef<std::path::Path>,
800 {
801 io::collect_ndjson_rev(self, path, query)
802 }
803
804 pub fn collect_ndjson_rev_with_options<P>(
806 &self,
807 path: P,
808 query: &str,
809 options: io::NdjsonOptions,
810 ) -> std::result::Result<Vec<Value>, JetroEngineError>
811 where
812 P: AsRef<std::path::Path>,
813 {
814 io::collect_ndjson_rev_with_options(self, path, query, options)
815 }
816
817 pub fn for_each_ndjson_rev<P, F>(
820 &self,
821 path: P,
822 query: &str,
823 f: F,
824 ) -> std::result::Result<usize, JetroEngineError>
825 where
826 P: AsRef<std::path::Path>,
827 F: FnMut(Value),
828 {
829 io::for_each_ndjson_rev(self, path, query, f)
830 }
831
832 pub fn for_each_ndjson_rev_until<P, F>(
835 &self,
836 path: P,
837 query: &str,
838 f: F,
839 ) -> std::result::Result<usize, JetroEngineError>
840 where
841 P: AsRef<std::path::Path>,
842 F: FnMut(Value) -> std::result::Result<io::NdjsonControl, JetroEngineError>,
843 {
844 io::for_each_ndjson_rev_with_options(self, path, query, io::NdjsonOptions::default(), f)
845 }
846
847 pub fn for_each_ndjson_rev_until_with_options<P, F>(
849 &self,
850 path: P,
851 query: &str,
852 options: io::NdjsonOptions,
853 f: F,
854 ) -> std::result::Result<usize, JetroEngineError>
855 where
856 P: AsRef<std::path::Path>,
857 F: FnMut(Value) -> std::result::Result<io::NdjsonControl, JetroEngineError>,
858 {
859 io::for_each_ndjson_rev_with_options(self, path, query, options, f)
860 }
861
862 pub fn for_each_ndjson_rev_with_options<P, F>(
864 &self,
865 path: P,
866 query: &str,
867 options: io::NdjsonOptions,
868 mut f: F,
869 ) -> std::result::Result<usize, JetroEngineError>
870 where
871 P: AsRef<std::path::Path>,
872 F: FnMut(Value),
873 {
874 io::for_each_ndjson_rev_with_options(self, path, query, options, |value| {
875 f(value);
876 Ok(io::NdjsonControl::Continue)
877 })
878 }
879
880 pub fn collect_ndjson_with_options<R>(
882 &self,
883 reader: R,
884 query: &str,
885 options: io::NdjsonOptions,
886 ) -> std::result::Result<Vec<Value>, JetroEngineError>
887 where
888 R: std::io::BufRead,
889 {
890 io::collect_ndjson_with_options(self, reader, query, options)
891 }
892
893 pub fn collect_ndjson_matches<R>(
896 &self,
897 reader: R,
898 predicate: &str,
899 limit: usize,
900 ) -> std::result::Result<Vec<Value>, JetroEngineError>
901 where
902 R: std::io::BufRead,
903 {
904 io::collect_ndjson_matches(self, reader, predicate, limit)
905 }
906
907 pub fn collect_ndjson_matches_with_options<R>(
909 &self,
910 reader: R,
911 predicate: &str,
912 limit: usize,
913 options: io::NdjsonOptions,
914 ) -> std::result::Result<Vec<Value>, JetroEngineError>
915 where
916 R: std::io::BufRead,
917 {
918 io::collect_ndjson_matches_with_options(self, reader, predicate, limit, options)
919 }
920
921 pub fn collect_ndjson_matches_file<P>(
923 &self,
924 path: P,
925 predicate: &str,
926 limit: usize,
927 ) -> std::result::Result<Vec<Value>, JetroEngineError>
928 where
929 P: AsRef<std::path::Path>,
930 {
931 io::collect_ndjson_matches_file(self, path, predicate, limit)
932 }
933
934 pub fn collect_ndjson_matches_file_with_options<P>(
936 &self,
937 path: P,
938 predicate: &str,
939 limit: usize,
940 options: io::NdjsonOptions,
941 ) -> std::result::Result<Vec<Value>, JetroEngineError>
942 where
943 P: AsRef<std::path::Path>,
944 {
945 io::collect_ndjson_matches_file_with_options(self, path, predicate, limit, options)
946 }
947
948 pub fn collect_ndjson_matches_source(
951 &self,
952 source: io::NdjsonSource,
953 predicate: &str,
954 limit: usize,
955 ) -> std::result::Result<Vec<Value>, JetroEngineError> {
956 io::collect_ndjson_matches_source(self, source, predicate, limit)
957 }
958
959 pub fn collect_ndjson_matches_source_with_options(
961 &self,
962 source: io::NdjsonSource,
963 predicate: &str,
964 limit: usize,
965 options: io::NdjsonOptions,
966 ) -> std::result::Result<Vec<Value>, JetroEngineError> {
967 io::collect_ndjson_matches_source_with_options(self, source, predicate, limit, options)
968 }
969
970 pub fn collect_ndjson_rev_matches<P>(
973 &self,
974 path: P,
975 predicate: &str,
976 limit: usize,
977 ) -> std::result::Result<Vec<Value>, JetroEngineError>
978 where
979 P: AsRef<std::path::Path>,
980 {
981 io::collect_ndjson_rev_matches(self, path, predicate, limit)
982 }
983
984 pub fn collect_ndjson_rev_matches_with_options<P>(
986 &self,
987 path: P,
988 predicate: &str,
989 limit: usize,
990 options: io::NdjsonOptions,
991 ) -> std::result::Result<Vec<Value>, JetroEngineError>
992 where
993 P: AsRef<std::path::Path>,
994 {
995 io::collect_ndjson_rev_matches_with_options(self, path, predicate, limit, options)
996 }
997
998 pub fn for_each_ndjson<R, F>(
1001 &self,
1002 reader: R,
1003 query: &str,
1004 f: F,
1005 ) -> std::result::Result<usize, JetroEngineError>
1006 where
1007 R: std::io::BufRead,
1008 F: FnMut(Value),
1009 {
1010 io::for_each_ndjson(self, reader, query, f)
1011 }
1012
1013 pub fn for_each_ndjson_until<R, F>(
1016 &self,
1017 reader: R,
1018 query: &str,
1019 f: F,
1020 ) -> std::result::Result<usize, JetroEngineError>
1021 where
1022 R: std::io::BufRead,
1023 F: FnMut(Value) -> std::result::Result<io::NdjsonControl, JetroEngineError>,
1024 {
1025 io::for_each_ndjson_until(self, reader, query, f)
1026 }
1027
1028 pub fn for_each_ndjson_source<F>(
1031 &self,
1032 source: io::NdjsonSource,
1033 query: &str,
1034 f: F,
1035 ) -> std::result::Result<usize, JetroEngineError>
1036 where
1037 F: FnMut(Value),
1038 {
1039 io::for_each_ndjson_source(self, source, query, f)
1040 }
1041
1042 pub fn for_each_ndjson_source_until<F>(
1045 &self,
1046 source: io::NdjsonSource,
1047 query: &str,
1048 f: F,
1049 ) -> std::result::Result<usize, JetroEngineError>
1050 where
1051 F: FnMut(Value) -> std::result::Result<io::NdjsonControl, JetroEngineError>,
1052 {
1053 io::for_each_ndjson_source_until(self, source, query, f)
1054 }
1055
1056 pub fn for_each_ndjson_source_until_with_options<F>(
1058 &self,
1059 source: io::NdjsonSource,
1060 query: &str,
1061 options: io::NdjsonOptions,
1062 f: F,
1063 ) -> std::result::Result<usize, JetroEngineError>
1064 where
1065 F: FnMut(Value) -> std::result::Result<io::NdjsonControl, JetroEngineError>,
1066 {
1067 io::for_each_ndjson_source_until_with_options(self, source, query, options, f)
1068 }
1069
1070 pub fn for_each_ndjson_source_with_options<F>(
1072 &self,
1073 source: io::NdjsonSource,
1074 query: &str,
1075 options: io::NdjsonOptions,
1076 f: F,
1077 ) -> std::result::Result<usize, JetroEngineError>
1078 where
1079 F: FnMut(Value),
1080 {
1081 io::for_each_ndjson_source_with_options(self, source, query, options, f)
1082 }
1083
1084 pub fn for_each_ndjson_with_options<R, F>(
1086 &self,
1087 reader: R,
1088 query: &str,
1089 options: io::NdjsonOptions,
1090 f: F,
1091 ) -> std::result::Result<usize, JetroEngineError>
1092 where
1093 R: std::io::BufRead,
1094 F: FnMut(Value),
1095 {
1096 io::for_each_ndjson_with_options(self, reader, query, options, f)
1097 }
1098
1099 pub fn for_each_ndjson_until_with_options<R, F>(
1101 &self,
1102 reader: R,
1103 query: &str,
1104 options: io::NdjsonOptions,
1105 f: F,
1106 ) -> std::result::Result<usize, JetroEngineError>
1107 where
1108 R: std::io::BufRead,
1109 F: FnMut(Value) -> std::result::Result<io::NdjsonControl, JetroEngineError>,
1110 {
1111 io::for_each_ndjson_until_with_options(self, reader, query, options, f)
1112 }
1113
1114 pub(crate) fn cached_plan(
1117 &self,
1118 expr: &str,
1119 context: plan::physical::PlanningContext,
1120 ) -> ir::physical::QueryPlan {
1121 let mut cache = self.plan_cache.lock().expect("plan cache poisoned");
1122 let cache_key = format!("{}\0{}", context.cache_key(), expr);
1123 if let Some(plan) = cache.get(&cache_key) {
1124 return plan.clone();
1125 }
1126
1127 let plan = plan::physical::plan_query_with_context(expr, context);
1128 if self.plan_cache_limit > 0 {
1129 if cache.len() >= self.plan_cache_limit {
1130 cache.clear();
1131 }
1132 cache.insert(cache_key, plan.clone());
1133 }
1134 plan
1135 }
1136}
1137
1138impl exec::pipeline::PipelineData for Jetro {
1139 fn promote_objvec(&self, arr: &Arc<Vec<Val>>) -> Option<Arc<crate::data::value::ObjVecData>> {
1140 self.get_or_promote_objvec(arr)
1141 }
1142}
1143
1144impl Jetro {
1145 #[cfg(feature = "simd-json")]
1148 pub(crate) fn lazy_tape(
1149 &self,
1150 ) -> std::result::Result<Option<&Arc<crate::data::tape::TapeData>>, EvalError> {
1151 if let Some(result) = self.tape.get() {
1152 return result
1153 .as_ref()
1154 .map(Some)
1155 .map_err(|err| EvalError(format!("Invalid JSON: {err}")));
1156 }
1157 let Some(raw) = self.raw_bytes.as_ref() else {
1158 return Ok(None);
1159 };
1160 let bytes: Vec<u8> = (**raw).to_vec();
1161 let parsed = crate::data::tape::TapeData::parse(bytes).map_err(|err| err.to_string());
1162 let _ = self.tape.set(parsed);
1163 self.tape
1164 .get()
1165 .expect("tape cache initialized")
1166 .as_ref()
1167 .map(Some)
1168 .map_err(|err| EvalError(format!("Invalid JSON: {err}")))
1169 }
1170
1171 pub(crate) fn get_or_promote_objvec(
1174 &self,
1175 arr: &Arc<Vec<Val>>,
1176 ) -> Option<Arc<crate::data::value::ObjVecData>> {
1177 let key = Arc::as_ptr(arr) as usize;
1178 if let Ok(cache) = self.objvec_cache.lock() {
1179 if let Some(d) = cache.get(&key) {
1180 return Some(Arc::clone(d));
1181 }
1182 }
1183 let promoted = exec::pipeline::Pipeline::try_promote_objvec_arr(arr)?;
1184 if let Ok(mut cache) = self.objvec_cache.lock() {
1185 cache.entry(key).or_insert_with(|| Arc::clone(&promoted));
1186 }
1187 Some(promoted)
1188 }
1189
1190 pub(crate) fn new(document: Value) -> Self {
1192 Self {
1193 document,
1194 root_val: OnceCell::new(),
1195 objvec_cache: Default::default(),
1196 raw_bytes: None,
1197 tape: OnceCell::new(),
1198 structural_index: OnceCell::new(),
1199 vm: RefCell::new(VM::new()),
1200 }
1201 }
1202
1203 pub(crate) fn from_val_and_value(root: Val, document: Value) -> Self {
1208 let root_val = OnceCell::new();
1209 let _ = root_val.set(root);
1210 Self {
1211 document,
1212 root_val,
1213 objvec_cache: Default::default(),
1214 raw_bytes: None,
1215 tape: OnceCell::new(),
1216 structural_index: OnceCell::new(),
1217 vm: RefCell::new(VM::new()),
1218 }
1219 }
1220
1221 pub(crate) fn root_val_with(
1226 &self,
1227 keys: &crate::data::intern::KeyCache,
1228 ) -> std::result::Result<Val, EvalError> {
1229 if let Some(root) = self.root_val.get() {
1230 return Ok(root.clone());
1231 }
1232 let root = {
1233 #[cfg(feature = "simd-json")]
1234 {
1235 if let Some(tape) = self.lazy_tape()? {
1236 Val::from_tape_data_with(keys, tape)
1237 } else {
1238 Val::from_value_with(keys, &self.document)
1239 }
1240 }
1241 #[cfg(not(feature = "simd-json"))]
1242 {
1243 Val::from_value_with(keys, &self.document)
1244 }
1245 };
1246 let _ = self.root_val.set(root);
1247 Ok(self.root_val.get().expect("root val initialized").clone())
1248 }
1249
1250 pub fn from_bytes(bytes: Vec<u8>) -> std::result::Result<Self, serde_json::Error> {
1254 #[cfg(feature = "simd-json")]
1255 {
1256 return Ok(Self {
1257 document: Value::Null,
1258 root_val: OnceCell::new(),
1259 objvec_cache: Default::default(),
1260 raw_bytes: Some(Arc::from(bytes.into_boxed_slice())),
1261 tape: OnceCell::new(),
1262 structural_index: OnceCell::new(),
1263 vm: RefCell::new(VM::new()),
1264 });
1265 }
1266 #[allow(unreachable_code)]
1267 {
1268 let document: Value = serde_json::from_slice(&bytes)?;
1269 Ok(Self {
1270 document,
1271 root_val: OnceCell::new(),
1272 objvec_cache: Default::default(),
1273 raw_bytes: Some(Arc::from(bytes.into_boxed_slice())),
1274 tape: OnceCell::new(),
1275 structural_index: OnceCell::new(),
1276 vm: RefCell::new(VM::new()),
1277 })
1278 }
1279 }
1280
1281 pub(crate) fn with_vm<F, R>(&self, f: F) -> R
1283 where
1284 F: FnOnce(&mut VM) -> R,
1285 {
1286 match self.vm.try_borrow_mut() {
1287 Ok(mut vm) => f(&mut vm),
1288 Err(_) => {
1289 let mut vm = VM::new();
1290 f(&mut vm)
1291 }
1292 }
1293 }
1294
1295 pub(crate) fn raw_bytes(&self) -> Option<&[u8]> {
1298 self.raw_bytes.as_deref()
1299 }
1300
1301 pub(crate) fn lazy_structural_index(
1304 &self,
1305 ) -> std::result::Result<Option<&Arc<jetro_experimental::StructuralIndex>>, EvalError> {
1306 if let Some(result) = self.structural_index.get() {
1307 return result
1308 .as_ref()
1309 .map(Some)
1310 .map_err(|err| EvalError(format!("Invalid JSON: {err}")));
1311 }
1312 let Some(raw) = self.raw_bytes.as_ref() else {
1313 return Ok(None);
1314 };
1315 let built = jetro_experimental::from_bytes_with(
1316 raw.as_ref(),
1317 jetro_experimental::BuildOptions::keys_only(),
1318 )
1319 .map(Arc::new)
1320 .map_err(|err| err.to_string());
1321 let _ = self.structural_index.set(built);
1322 self.structural_index
1323 .get()
1324 .expect("structural index cache initialized")
1325 .as_ref()
1326 .map(Some)
1327 .map_err(|err| EvalError(format!("Invalid JSON: {err}")))
1328 }
1329
1330 pub(crate) fn root_val(&self) -> std::result::Result<Val, EvalError> {
1333 if let Some(root) = self.root_val.get() {
1334 return Ok(root.clone());
1335 }
1336 let root = {
1337 #[cfg(feature = "simd-json")]
1338 {
1339 if let Some(tape) = self.lazy_tape()? {
1340 Val::from_tape_data(tape)
1341 } else {
1342 Val::from(&self.document)
1343 }
1344 }
1345 #[cfg(not(feature = "simd-json"))]
1346 {
1347 Val::from(&self.document)
1348 }
1349 };
1350 let _ = self.root_val.set(root);
1351 Ok(self.root_val.get().expect("root val initialized").clone())
1352 }
1353
1354 #[cfg(test)]
1357 pub(crate) fn root_val_is_materialized(&self) -> bool {
1358 self.root_val.get().is_some()
1359 }
1360
1361 #[cfg(test)]
1362 pub(crate) fn structural_index_is_built(&self) -> bool {
1363 self.structural_index.get().is_some()
1364 }
1365
1366 #[cfg(all(test, feature = "simd-json"))]
1367 pub(crate) fn tape_is_built(&self) -> bool {
1368 self.tape.get().is_some()
1369 }
1370
1371 #[cfg(all(test, feature = "simd-json"))]
1372 pub(crate) fn reset_tape_materialized_subtrees(&self) {
1373 if let Ok(Some(tape)) = self.lazy_tape() {
1374 tape.reset_materialized_subtrees();
1375 }
1376 }
1377
1378 #[cfg(all(test, feature = "simd-json"))]
1379 pub(crate) fn tape_materialized_subtrees(&self) -> usize {
1380 self.lazy_tape()
1381 .ok()
1382 .flatten()
1383 .map(|tape| tape.materialized_subtrees())
1384 .unwrap_or(0)
1385 }
1386
1387 pub fn collect<S: AsRef<str>>(&self, expr: S) -> std::result::Result<Value, EvalError> {
1391 exec::router::collect_json(self, expr.as_ref())
1392 }
1393}
1394
1395impl From<Value> for Jetro {
1399 fn from(v: Value) -> Self {
1401 Self::new(v)
1402 }
1403}