1pub(crate) mod builtins;
57pub(crate) mod compile;
58pub(crate) mod data;
59pub(crate) mod exec;
60pub mod io;
61pub(crate) mod ir;
62pub(crate) mod parse;
63pub(crate) mod plan;
64pub(crate) mod util;
65pub(crate) mod vm;
66
67#[cfg(test)]
68mod tests;
69
70use data::value::Val;
71use serde_json::Value;
72use std::cell::{OnceCell, RefCell};
73use std::collections::HashMap;
74use std::sync::Arc;
75use std::sync::Mutex;
76
77pub use data::context::EvalError;
78#[cfg(test)]
79use parse::parser::ParseError;
80use vm::VM;
81
82#[cfg(feature = "fuzz_internal")]
86pub mod __fuzz_internal {
87 pub use crate::parse::parser::{parse, ParseError};
88 pub use crate::plan::physical::plan_query;
89}
90
91#[cfg(test)]
92#[derive(Debug)]
93pub(crate) enum Error {
94 Parse(ParseError),
95 Eval(EvalError),
96}
97
98#[cfg(test)]
99impl std::fmt::Display for Error {
100 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101 match self {
102 Error::Parse(e) => write!(f, "{}", e),
103 Error::Eval(e) => write!(f, "{}", e),
104 }
105 }
106}
107#[cfg(test)]
108impl std::error::Error for Error {
109 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
110 match self {
111 Error::Parse(e) => Some(e),
112 Error::Eval(_) => None,
113 }
114 }
115}
116
117#[cfg(test)]
118impl From<ParseError> for Error {
119 fn from(e: ParseError) -> Self {
120 Error::Parse(e)
121 }
122}
123#[cfg(test)]
124impl From<EvalError> for Error {
125 fn from(e: EvalError) -> Self {
126 Error::Eval(e)
127 }
128}
129
130pub struct Jetro {
135 document: Value,
138 root_val: OnceCell<Val>,
140 raw_bytes: Option<Arc<[u8]>>,
142
143 #[cfg(feature = "simd-json")]
145 tape: OnceCell<std::result::Result<Arc<crate::data::tape::TapeData>, String>>,
146 #[cfg(not(feature = "simd-json"))]
148 #[allow(dead_code)]
149 tape: OnceCell<()>,
150
151 structural_index:
153 OnceCell<std::result::Result<Arc<jetro_experimental::StructuralIndex>, String>>,
154
155 pub(crate) objvec_cache:
158 std::sync::Mutex<std::collections::HashMap<usize, Arc<crate::data::value::ObjVecData>>>,
159
160 vm: RefCell<VM>,
162}
163
164pub struct JetroEngine {
169 plan_cache: Mutex<HashMap<String, ir::physical::QueryPlan>>,
171 plan_cache_limit: usize,
173 vm: Mutex<VM>,
175 keys: Arc<crate::data::intern::KeyCache>,
181}
182
183#[derive(Debug)]
186pub enum JetroEngineError {
187 Json(serde_json::Error),
189 Io(std::io::Error),
191 Ndjson(io::RowError),
193 Eval(EvalError),
195}
196
197impl std::fmt::Display for JetroEngineError {
198 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
199 match self {
200 Self::Json(err) => write!(f, "{}", err),
201 Self::Io(err) => write!(f, "{}", err),
202 Self::Ndjson(err) => write!(f, "{}", err),
203 Self::Eval(err) => write!(f, "{}", err),
204 }
205 }
206}
207
208impl std::error::Error for JetroEngineError {
209 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
210 match self {
211 Self::Json(err) => Some(err),
212 Self::Io(err) => Some(err),
213 Self::Ndjson(err) => Some(err),
214 Self::Eval(_) => None,
215 }
216 }
217}
218
219impl From<serde_json::Error> for JetroEngineError {
220 fn from(err: serde_json::Error) -> Self {
221 Self::Json(err)
222 }
223}
224
225impl From<std::io::Error> for JetroEngineError {
226 fn from(err: std::io::Error) -> Self {
227 Self::Io(err)
228 }
229}
230
231impl From<io::RowError> for JetroEngineError {
232 fn from(err: io::RowError) -> Self {
233 Self::Ndjson(err)
234 }
235}
236
237impl From<EvalError> for JetroEngineError {
238 fn from(err: EvalError) -> Self {
239 Self::Eval(err)
240 }
241}
242
243impl Default for JetroEngine {
244 fn default() -> Self {
245 Self::new()
246 }
247}
248
249impl JetroEngine {
250 const DEFAULT_PLAN_CACHE_LIMIT: usize = 256;
252
253 pub fn new() -> Self {
255 Self::with_plan_cache_limit(Self::DEFAULT_PLAN_CACHE_LIMIT)
256 }
257
258 pub fn with_plan_cache_limit(plan_cache_limit: usize) -> Self {
261 Self {
262 plan_cache: Mutex::new(HashMap::new()),
263 plan_cache_limit,
264 vm: Mutex::new(VM::new()),
265 keys: crate::data::intern::KeyCache::new(),
266 }
267 }
268
269 pub fn keys(&self) -> &Arc<crate::data::intern::KeyCache> {
271 &self.keys
272 }
273
274 pub fn clear_cache(&self) {
277 self.plan_cache.lock().expect("plan cache poisoned").clear();
278 self.keys.clear();
279 }
280
281 pub fn parse_value(&self, document: Value) -> Jetro {
286 let root = Val::from_value_with(&self.keys, &document);
287 Jetro::from_val_and_value(root, document)
288 }
289
290 pub fn parse_bytes(&self, bytes: Vec<u8>) -> std::result::Result<Jetro, JetroEngineError> {
295 let document = Jetro::from_bytes(bytes)?;
296 let _ = document.root_val_with(&self.keys)?;
300 Ok(document)
301 }
302
303 pub(crate) fn parse_bytes_lazy(
307 &self,
308 bytes: Vec<u8>,
309 ) -> std::result::Result<Jetro, JetroEngineError> {
310 Ok(Jetro::from_bytes(bytes)?)
311 }
312
313 pub fn collect<S: AsRef<str>>(
316 &self,
317 document: &Jetro,
318 expr: S,
319 ) -> std::result::Result<Value, EvalError> {
320 let expr = expr.as_ref();
321 if let Some(rows) = io::collect_document_rows(self, document, expr)? {
322 return Ok(Value::from(rows));
323 }
324 let plan = self.cached_plan(expr, exec::router::planning_context(document));
325 self.collect_prepared(document, &plan)
326 }
327
328 pub(crate) fn collect_prepared(
329 &self,
330 document: &Jetro,
331 plan: &ir::physical::QueryPlan,
332 ) -> std::result::Result<Value, EvalError> {
333 self.collect_prepared_val(document, plan).map(Value::from)
334 }
335
336 pub(crate) fn collect_prepared_val(
337 &self,
338 document: &Jetro,
339 plan: &ir::physical::QueryPlan,
340 ) -> std::result::Result<Val, EvalError> {
341 let mut vm = self.vm.lock().expect("vm cache poisoned");
342 exec::router::collect_plan_val_with_vm(document, plan, &mut vm)
343 }
344
345 pub(crate) fn lock_vm(&self) -> std::sync::MutexGuard<'_, VM> {
346 self.vm.lock().expect("vm cache poisoned")
347 }
348
349 pub fn collect_value<S: AsRef<str>>(
353 &self,
354 document: Value,
355 expr: S,
356 ) -> std::result::Result<Value, EvalError> {
357 let document = self.parse_value(document);
358 self.collect(&document, expr)
359 }
360
361 pub fn collect_bytes<S: AsRef<str>>(
366 &self,
367 bytes: Vec<u8>,
368 expr: S,
369 ) -> std::result::Result<Value, JetroEngineError> {
370 let document = self.parse_bytes(bytes)?;
371 Ok(self.collect(&document, expr)?)
372 }
373
374 pub fn run_ndjson<R, W>(
377 &self,
378 reader: R,
379 query: &str,
380 writer: W,
381 ) -> std::result::Result<usize, JetroEngineError>
382 where
383 R: std::io::BufRead,
384 W: std::io::Write,
385 {
386 io::run_ndjson(self, reader, query, writer)
387 }
388
389 pub fn run_ndjson_file<P, W>(
392 &self,
393 path: P,
394 query: &str,
395 writer: W,
396 ) -> std::result::Result<usize, JetroEngineError>
397 where
398 P: AsRef<std::path::Path>,
399 W: std::io::Write,
400 {
401 io::run_ndjson_file(self, path, query, writer)
402 }
403
404 pub fn run_ndjson_file_with_options<P, W>(
406 &self,
407 path: P,
408 query: &str,
409 writer: W,
410 options: io::NdjsonOptions,
411 ) -> std::result::Result<usize, JetroEngineError>
412 where
413 P: AsRef<std::path::Path>,
414 W: std::io::Write,
415 {
416 io::run_ndjson_file_with_options(self, path, query, writer, options)
417 }
418
419 pub fn run_ndjson_file_limit<P, W>(
421 &self,
422 path: P,
423 query: &str,
424 limit: usize,
425 writer: W,
426 ) -> std::result::Result<usize, JetroEngineError>
427 where
428 P: AsRef<std::path::Path>,
429 W: std::io::Write,
430 {
431 io::run_ndjson_file_limit(self, path, query, limit, writer)
432 }
433
434 pub fn run_ndjson_file_limit_with_options<P, W>(
436 &self,
437 path: P,
438 query: &str,
439 limit: usize,
440 writer: W,
441 options: io::NdjsonOptions,
442 ) -> std::result::Result<usize, JetroEngineError>
443 where
444 P: AsRef<std::path::Path>,
445 W: std::io::Write,
446 {
447 io::run_ndjson_file_limit_with_options(self, path, query, limit, writer, options)
448 }
449
450 pub fn run_ndjson_source<W>(
452 &self,
453 source: io::NdjsonSource,
454 query: &str,
455 writer: W,
456 ) -> std::result::Result<usize, JetroEngineError>
457 where
458 W: std::io::Write,
459 {
460 io::run_ndjson_source(self, source, query, writer)
461 }
462
463 pub fn run_ndjson_source_with_options<W>(
465 &self,
466 source: io::NdjsonSource,
467 query: &str,
468 writer: W,
469 options: io::NdjsonOptions,
470 ) -> std::result::Result<usize, JetroEngineError>
471 where
472 W: std::io::Write,
473 {
474 io::run_ndjson_source_with_options(self, source, query, writer, options)
475 }
476
477 pub fn run_ndjson_source_limit<W>(
480 &self,
481 source: io::NdjsonSource,
482 query: &str,
483 limit: usize,
484 writer: W,
485 ) -> std::result::Result<usize, JetroEngineError>
486 where
487 W: std::io::Write,
488 {
489 io::run_ndjson_source_limit(self, source, query, limit, writer)
490 }
491
492 pub fn run_ndjson_source_limit_with_options<W>(
494 &self,
495 source: io::NdjsonSource,
496 query: &str,
497 limit: usize,
498 writer: W,
499 options: io::NdjsonOptions,
500 ) -> std::result::Result<usize, JetroEngineError>
501 where
502 W: std::io::Write,
503 {
504 io::run_ndjson_source_limit_with_options(self, source, query, limit, writer, options)
505 }
506
507 pub fn run_ndjson_rev<P, W>(
509 &self,
510 path: P,
511 query: &str,
512 writer: W,
513 ) -> std::result::Result<usize, JetroEngineError>
514 where
515 P: AsRef<std::path::Path>,
516 W: std::io::Write,
517 {
518 io::run_ndjson_rev(self, path, query, writer)
519 }
520
521 pub fn run_ndjson_rev_with_options<P, W>(
523 &self,
524 path: P,
525 query: &str,
526 writer: W,
527 options: io::NdjsonOptions,
528 ) -> std::result::Result<usize, JetroEngineError>
529 where
530 P: AsRef<std::path::Path>,
531 W: std::io::Write,
532 {
533 io::run_ndjson_rev_with_options(self, path, query, writer, options)
534 }
535
536 pub fn run_ndjson_rev_limit<P, W>(
539 &self,
540 path: P,
541 query: &str,
542 limit: usize,
543 writer: W,
544 ) -> std::result::Result<usize, JetroEngineError>
545 where
546 P: AsRef<std::path::Path>,
547 W: std::io::Write,
548 {
549 io::run_ndjson_rev_limit(self, path, query, limit, writer)
550 }
551
552 pub fn run_ndjson_rev_limit_with_options<P, W>(
554 &self,
555 path: P,
556 query: &str,
557 limit: usize,
558 writer: W,
559 options: io::NdjsonOptions,
560 ) -> std::result::Result<usize, JetroEngineError>
561 where
562 P: AsRef<std::path::Path>,
563 W: std::io::Write,
564 {
565 io::run_ndjson_rev_limit_with_options(self, path, query, limit, writer, options)
566 }
567
568 pub fn run_ndjson_rev_distinct_by<P, W>(
572 &self,
573 path: P,
574 key_query: &str,
575 query: &str,
576 limit: usize,
577 writer: W,
578 ) -> std::result::Result<usize, JetroEngineError>
579 where
580 P: AsRef<std::path::Path>,
581 W: std::io::Write,
582 {
583 io::run_ndjson_rev_distinct_by(self, path, key_query, query, limit, writer)
584 }
585
586 pub fn run_ndjson_rev_distinct_by_with_options<P, W>(
589 &self,
590 path: P,
591 key_query: &str,
592 query: &str,
593 limit: usize,
594 writer: W,
595 options: io::NdjsonOptions,
596 ) -> std::result::Result<usize, JetroEngineError>
597 where
598 P: AsRef<std::path::Path>,
599 W: std::io::Write,
600 {
601 io::run_ndjson_rev_distinct_by_with_options(
602 self, path, key_query, query, limit, writer, options,
603 )
604 }
605
606 pub fn run_ndjson_rev_distinct_by_with_stats<P, W>(
609 &self,
610 path: P,
611 key_query: &str,
612 query: &str,
613 limit: usize,
614 writer: W,
615 ) -> std::result::Result<io::NdjsonRevDistinctStats, JetroEngineError>
616 where
617 P: AsRef<std::path::Path>,
618 W: std::io::Write,
619 {
620 io::run_ndjson_rev_distinct_by_with_stats(self, path, key_query, query, limit, writer)
621 }
622
623 pub fn run_ndjson_rev_distinct_by_with_stats_and_options<P, W>(
626 &self,
627 path: P,
628 key_query: &str,
629 query: &str,
630 limit: usize,
631 writer: W,
632 options: io::NdjsonOptions,
633 ) -> std::result::Result<io::NdjsonRevDistinctStats, JetroEngineError>
634 where
635 P: AsRef<std::path::Path>,
636 W: std::io::Write,
637 {
638 io::run_ndjson_rev_distinct_by_with_stats_and_options(
639 self, path, key_query, query, limit, writer, options,
640 )
641 }
642
643 pub fn run_ndjson_with_options<R, W>(
645 &self,
646 reader: R,
647 query: &str,
648 writer: W,
649 options: io::NdjsonOptions,
650 ) -> std::result::Result<usize, JetroEngineError>
651 where
652 R: std::io::BufRead,
653 W: std::io::Write,
654 {
655 io::run_ndjson_with_options(self, reader, query, writer, options)
656 }
657
658 pub fn run_ndjson_limit<R, W>(
660 &self,
661 reader: R,
662 query: &str,
663 limit: usize,
664 writer: W,
665 ) -> std::result::Result<usize, JetroEngineError>
666 where
667 R: std::io::BufRead,
668 W: std::io::Write,
669 {
670 io::run_ndjson_limit(self, reader, query, limit, writer)
671 }
672
673 pub fn run_ndjson_limit_with_options<R, W>(
675 &self,
676 reader: R,
677 query: &str,
678 limit: usize,
679 writer: W,
680 options: io::NdjsonOptions,
681 ) -> std::result::Result<usize, JetroEngineError>
682 where
683 R: std::io::BufRead,
684 W: std::io::Write,
685 {
686 io::run_ndjson_limit_with_options(self, reader, query, limit, writer, options)
687 }
688
689 pub fn run_ndjson_matches<R, W>(
692 &self,
693 reader: R,
694 predicate: &str,
695 limit: usize,
696 writer: W,
697 ) -> std::result::Result<usize, JetroEngineError>
698 where
699 R: std::io::BufRead,
700 W: std::io::Write,
701 {
702 io::run_ndjson_matches(self, reader, predicate, limit, writer)
703 }
704
705 pub fn run_ndjson_matches_with_options<R, W>(
707 &self,
708 reader: R,
709 predicate: &str,
710 limit: usize,
711 writer: W,
712 options: io::NdjsonOptions,
713 ) -> std::result::Result<usize, JetroEngineError>
714 where
715 R: std::io::BufRead,
716 W: std::io::Write,
717 {
718 io::run_ndjson_matches_with_options(self, reader, predicate, limit, writer, options)
719 }
720
721 pub fn run_ndjson_matches_file<P, W>(
723 &self,
724 path: P,
725 predicate: &str,
726 limit: usize,
727 writer: W,
728 ) -> std::result::Result<usize, JetroEngineError>
729 where
730 P: AsRef<std::path::Path>,
731 W: std::io::Write,
732 {
733 io::run_ndjson_matches_file(self, path, predicate, limit, writer)
734 }
735
736 pub fn run_ndjson_matches_file_with_options<P, W>(
738 &self,
739 path: P,
740 predicate: &str,
741 limit: usize,
742 writer: W,
743 options: io::NdjsonOptions,
744 ) -> std::result::Result<usize, JetroEngineError>
745 where
746 P: AsRef<std::path::Path>,
747 W: std::io::Write,
748 {
749 io::run_ndjson_matches_file_with_options(self, path, predicate, limit, writer, options)
750 }
751
752 pub fn run_ndjson_matches_source<W>(
755 &self,
756 source: io::NdjsonSource,
757 predicate: &str,
758 limit: usize,
759 writer: W,
760 ) -> std::result::Result<usize, JetroEngineError>
761 where
762 W: std::io::Write,
763 {
764 io::run_ndjson_matches_source(self, source, predicate, limit, writer)
765 }
766
767 pub fn run_ndjson_matches_source_with_options<W>(
769 &self,
770 source: io::NdjsonSource,
771 predicate: &str,
772 limit: usize,
773 writer: W,
774 options: io::NdjsonOptions,
775 ) -> std::result::Result<usize, JetroEngineError>
776 where
777 W: std::io::Write,
778 {
779 io::run_ndjson_matches_source_with_options(self, source, predicate, limit, writer, options)
780 }
781
782 pub fn run_ndjson_rev_matches<P, W>(
785 &self,
786 path: P,
787 predicate: &str,
788 limit: usize,
789 writer: W,
790 ) -> std::result::Result<usize, JetroEngineError>
791 where
792 P: AsRef<std::path::Path>,
793 W: std::io::Write,
794 {
795 io::run_ndjson_rev_matches(self, path, predicate, limit, writer)
796 }
797
798 pub fn run_ndjson_rev_matches_with_options<P, W>(
800 &self,
801 path: P,
802 predicate: &str,
803 limit: usize,
804 writer: W,
805 options: io::NdjsonOptions,
806 ) -> std::result::Result<usize, JetroEngineError>
807 where
808 P: AsRef<std::path::Path>,
809 W: std::io::Write,
810 {
811 io::run_ndjson_rev_matches_with_options(self, path, predicate, limit, writer, options)
812 }
813
814 pub fn collect_ndjson<R>(
817 &self,
818 reader: R,
819 query: &str,
820 ) -> std::result::Result<Vec<Value>, JetroEngineError>
821 where
822 R: std::io::BufRead,
823 {
824 io::collect_ndjson(self, reader, query)
825 }
826
827 pub fn collect_ndjson_file<P>(
829 &self,
830 path: P,
831 query: &str,
832 ) -> std::result::Result<Vec<Value>, JetroEngineError>
833 where
834 P: AsRef<std::path::Path>,
835 {
836 io::collect_ndjson_file(self, path, query)
837 }
838
839 pub fn collect_ndjson_file_with_options<P>(
841 &self,
842 path: P,
843 query: &str,
844 options: io::NdjsonOptions,
845 ) -> std::result::Result<Vec<Value>, JetroEngineError>
846 where
847 P: AsRef<std::path::Path>,
848 {
849 io::collect_ndjson_file_with_options(self, path, query, options)
850 }
851
852 pub fn collect_ndjson_source(
854 &self,
855 source: io::NdjsonSource,
856 query: &str,
857 ) -> std::result::Result<Vec<Value>, JetroEngineError> {
858 io::collect_ndjson_source(self, source, query)
859 }
860
861 pub fn collect_ndjson_source_with_options(
863 &self,
864 source: io::NdjsonSource,
865 query: &str,
866 options: io::NdjsonOptions,
867 ) -> std::result::Result<Vec<Value>, JetroEngineError> {
868 io::collect_ndjson_source_with_options(self, source, query, options)
869 }
870
871 pub fn collect_ndjson_rev<P>(
873 &self,
874 path: P,
875 query: &str,
876 ) -> std::result::Result<Vec<Value>, JetroEngineError>
877 where
878 P: AsRef<std::path::Path>,
879 {
880 io::collect_ndjson_rev(self, path, query)
881 }
882
883 pub fn collect_ndjson_rev_with_options<P>(
885 &self,
886 path: P,
887 query: &str,
888 options: io::NdjsonOptions,
889 ) -> std::result::Result<Vec<Value>, JetroEngineError>
890 where
891 P: AsRef<std::path::Path>,
892 {
893 io::collect_ndjson_rev_with_options(self, path, query, options)
894 }
895
896 pub fn for_each_ndjson_rev<P, F>(
899 &self,
900 path: P,
901 query: &str,
902 f: F,
903 ) -> std::result::Result<usize, JetroEngineError>
904 where
905 P: AsRef<std::path::Path>,
906 F: FnMut(Value),
907 {
908 io::for_each_ndjson_rev(self, path, query, f)
909 }
910
911 pub fn for_each_ndjson_rev_until<P, F>(
914 &self,
915 path: P,
916 query: &str,
917 f: F,
918 ) -> std::result::Result<usize, JetroEngineError>
919 where
920 P: AsRef<std::path::Path>,
921 F: FnMut(Value) -> std::result::Result<io::NdjsonControl, JetroEngineError>,
922 {
923 io::for_each_ndjson_rev_with_options(self, path, query, io::NdjsonOptions::default(), f)
924 }
925
926 pub fn for_each_ndjson_rev_until_with_options<P, F>(
928 &self,
929 path: P,
930 query: &str,
931 options: io::NdjsonOptions,
932 f: F,
933 ) -> std::result::Result<usize, JetroEngineError>
934 where
935 P: AsRef<std::path::Path>,
936 F: FnMut(Value) -> std::result::Result<io::NdjsonControl, JetroEngineError>,
937 {
938 io::for_each_ndjson_rev_with_options(self, path, query, options, f)
939 }
940
941 pub fn for_each_ndjson_rev_with_options<P, F>(
943 &self,
944 path: P,
945 query: &str,
946 options: io::NdjsonOptions,
947 mut f: F,
948 ) -> std::result::Result<usize, JetroEngineError>
949 where
950 P: AsRef<std::path::Path>,
951 F: FnMut(Value),
952 {
953 io::for_each_ndjson_rev_with_options(self, path, query, options, |value| {
954 f(value);
955 Ok(io::NdjsonControl::Continue)
956 })
957 }
958
959 pub fn collect_ndjson_with_options<R>(
961 &self,
962 reader: R,
963 query: &str,
964 options: io::NdjsonOptions,
965 ) -> std::result::Result<Vec<Value>, JetroEngineError>
966 where
967 R: std::io::BufRead,
968 {
969 io::collect_ndjson_with_options(self, reader, query, options)
970 }
971
972 pub fn collect_ndjson_matches<R>(
975 &self,
976 reader: R,
977 predicate: &str,
978 limit: usize,
979 ) -> std::result::Result<Vec<Value>, JetroEngineError>
980 where
981 R: std::io::BufRead,
982 {
983 io::collect_ndjson_matches(self, reader, predicate, limit)
984 }
985
986 pub fn collect_ndjson_matches_with_options<R>(
988 &self,
989 reader: R,
990 predicate: &str,
991 limit: usize,
992 options: io::NdjsonOptions,
993 ) -> std::result::Result<Vec<Value>, JetroEngineError>
994 where
995 R: std::io::BufRead,
996 {
997 io::collect_ndjson_matches_with_options(self, reader, predicate, limit, options)
998 }
999
1000 pub fn collect_ndjson_matches_file<P>(
1002 &self,
1003 path: P,
1004 predicate: &str,
1005 limit: usize,
1006 ) -> std::result::Result<Vec<Value>, JetroEngineError>
1007 where
1008 P: AsRef<std::path::Path>,
1009 {
1010 io::collect_ndjson_matches_file(self, path, predicate, limit)
1011 }
1012
1013 pub fn collect_ndjson_matches_file_with_options<P>(
1015 &self,
1016 path: P,
1017 predicate: &str,
1018 limit: usize,
1019 options: io::NdjsonOptions,
1020 ) -> std::result::Result<Vec<Value>, JetroEngineError>
1021 where
1022 P: AsRef<std::path::Path>,
1023 {
1024 io::collect_ndjson_matches_file_with_options(self, path, predicate, limit, options)
1025 }
1026
1027 pub fn collect_ndjson_matches_source(
1030 &self,
1031 source: io::NdjsonSource,
1032 predicate: &str,
1033 limit: usize,
1034 ) -> std::result::Result<Vec<Value>, JetroEngineError> {
1035 io::collect_ndjson_matches_source(self, source, predicate, limit)
1036 }
1037
1038 pub fn collect_ndjson_matches_source_with_options(
1040 &self,
1041 source: io::NdjsonSource,
1042 predicate: &str,
1043 limit: usize,
1044 options: io::NdjsonOptions,
1045 ) -> std::result::Result<Vec<Value>, JetroEngineError> {
1046 io::collect_ndjson_matches_source_with_options(self, source, predicate, limit, options)
1047 }
1048
1049 pub fn collect_ndjson_rev_matches<P>(
1052 &self,
1053 path: P,
1054 predicate: &str,
1055 limit: usize,
1056 ) -> std::result::Result<Vec<Value>, JetroEngineError>
1057 where
1058 P: AsRef<std::path::Path>,
1059 {
1060 io::collect_ndjson_rev_matches(self, path, predicate, limit)
1061 }
1062
1063 pub fn collect_ndjson_rev_matches_with_options<P>(
1065 &self,
1066 path: P,
1067 predicate: &str,
1068 limit: usize,
1069 options: io::NdjsonOptions,
1070 ) -> std::result::Result<Vec<Value>, JetroEngineError>
1071 where
1072 P: AsRef<std::path::Path>,
1073 {
1074 io::collect_ndjson_rev_matches_with_options(self, path, predicate, limit, options)
1075 }
1076
1077 pub fn for_each_ndjson<R, F>(
1080 &self,
1081 reader: R,
1082 query: &str,
1083 f: F,
1084 ) -> std::result::Result<usize, JetroEngineError>
1085 where
1086 R: std::io::BufRead,
1087 F: FnMut(Value),
1088 {
1089 io::for_each_ndjson(self, reader, query, f)
1090 }
1091
1092 pub fn for_each_ndjson_until<R, F>(
1095 &self,
1096 reader: R,
1097 query: &str,
1098 f: F,
1099 ) -> std::result::Result<usize, JetroEngineError>
1100 where
1101 R: std::io::BufRead,
1102 F: FnMut(Value) -> std::result::Result<io::NdjsonControl, JetroEngineError>,
1103 {
1104 io::for_each_ndjson_until(self, reader, query, f)
1105 }
1106
1107 pub fn for_each_ndjson_source<F>(
1110 &self,
1111 source: io::NdjsonSource,
1112 query: &str,
1113 f: F,
1114 ) -> std::result::Result<usize, JetroEngineError>
1115 where
1116 F: FnMut(Value),
1117 {
1118 io::for_each_ndjson_source(self, source, query, f)
1119 }
1120
1121 pub fn for_each_ndjson_source_until<F>(
1124 &self,
1125 source: io::NdjsonSource,
1126 query: &str,
1127 f: F,
1128 ) -> std::result::Result<usize, JetroEngineError>
1129 where
1130 F: FnMut(Value) -> std::result::Result<io::NdjsonControl, JetroEngineError>,
1131 {
1132 io::for_each_ndjson_source_until(self, source, query, f)
1133 }
1134
1135 pub fn for_each_ndjson_source_until_with_options<F>(
1137 &self,
1138 source: io::NdjsonSource,
1139 query: &str,
1140 options: io::NdjsonOptions,
1141 f: F,
1142 ) -> std::result::Result<usize, JetroEngineError>
1143 where
1144 F: FnMut(Value) -> std::result::Result<io::NdjsonControl, JetroEngineError>,
1145 {
1146 io::for_each_ndjson_source_until_with_options(self, source, query, options, f)
1147 }
1148
1149 pub fn for_each_ndjson_source_with_options<F>(
1151 &self,
1152 source: io::NdjsonSource,
1153 query: &str,
1154 options: io::NdjsonOptions,
1155 f: F,
1156 ) -> std::result::Result<usize, JetroEngineError>
1157 where
1158 F: FnMut(Value),
1159 {
1160 io::for_each_ndjson_source_with_options(self, source, query, options, f)
1161 }
1162
1163 pub fn for_each_ndjson_with_options<R, F>(
1165 &self,
1166 reader: R,
1167 query: &str,
1168 options: io::NdjsonOptions,
1169 f: F,
1170 ) -> std::result::Result<usize, JetroEngineError>
1171 where
1172 R: std::io::BufRead,
1173 F: FnMut(Value),
1174 {
1175 io::for_each_ndjson_with_options(self, reader, query, options, f)
1176 }
1177
1178 pub fn for_each_ndjson_until_with_options<R, F>(
1180 &self,
1181 reader: R,
1182 query: &str,
1183 options: io::NdjsonOptions,
1184 f: F,
1185 ) -> std::result::Result<usize, JetroEngineError>
1186 where
1187 R: std::io::BufRead,
1188 F: FnMut(Value) -> std::result::Result<io::NdjsonControl, JetroEngineError>,
1189 {
1190 io::for_each_ndjson_until_with_options(self, reader, query, options, f)
1191 }
1192
1193 pub(crate) fn cached_plan(
1196 &self,
1197 expr: &str,
1198 context: plan::physical::PlanningContext,
1199 ) -> ir::physical::QueryPlan {
1200 let mut cache = self.plan_cache.lock().expect("plan cache poisoned");
1201 let cache_key = format!("{}\0{}", context.cache_key(), expr);
1202 if let Some(plan) = cache.get(&cache_key) {
1203 return plan.clone();
1204 }
1205
1206 let plan = plan::physical::plan_query_with_context(expr, context);
1207 if self.plan_cache_limit > 0 {
1208 if cache.len() >= self.plan_cache_limit {
1209 cache.clear();
1210 }
1211 cache.insert(cache_key, plan.clone());
1212 }
1213 plan
1214 }
1215}
1216
1217impl exec::pipeline::PipelineData for Jetro {
1218 fn promote_objvec(&self, arr: &Arc<Vec<Val>>) -> Option<Arc<crate::data::value::ObjVecData>> {
1219 self.get_or_promote_objvec(arr)
1220 }
1221}
1222
1223impl Jetro {
1224 #[cfg(feature = "simd-json")]
1227 pub(crate) fn lazy_tape(
1228 &self,
1229 ) -> std::result::Result<Option<&Arc<crate::data::tape::TapeData>>, EvalError> {
1230 if let Some(result) = self.tape.get() {
1231 return result
1232 .as_ref()
1233 .map(Some)
1234 .map_err(|err| EvalError(format!("Invalid JSON: {err}")));
1235 }
1236 let Some(raw) = self.raw_bytes.as_ref() else {
1237 return Ok(None);
1238 };
1239 let bytes: Vec<u8> = (**raw).to_vec();
1240 let parsed = crate::data::tape::TapeData::parse(bytes).map_err(|err| err.to_string());
1241 let _ = self.tape.set(parsed);
1242 self.tape
1243 .get()
1244 .expect("tape cache initialized")
1245 .as_ref()
1246 .map(Some)
1247 .map_err(|err| EvalError(format!("Invalid JSON: {err}")))
1248 }
1249
1250 pub(crate) fn get_or_promote_objvec(
1253 &self,
1254 arr: &Arc<Vec<Val>>,
1255 ) -> Option<Arc<crate::data::value::ObjVecData>> {
1256 let key = Arc::as_ptr(arr) as usize;
1257 if let Ok(cache) = self.objvec_cache.lock() {
1258 if let Some(d) = cache.get(&key) {
1259 return Some(Arc::clone(d));
1260 }
1261 }
1262 let promoted = exec::pipeline::Pipeline::try_promote_objvec_arr(arr)?;
1263 if let Ok(mut cache) = self.objvec_cache.lock() {
1264 cache.entry(key).or_insert_with(|| Arc::clone(&promoted));
1265 }
1266 Some(promoted)
1267 }
1268
1269 pub(crate) fn new(document: Value) -> Self {
1271 Self {
1272 document,
1273 root_val: OnceCell::new(),
1274 objvec_cache: Default::default(),
1275 raw_bytes: None,
1276 tape: OnceCell::new(),
1277 structural_index: OnceCell::new(),
1278 vm: RefCell::new(VM::new()),
1279 }
1280 }
1281
1282 pub(crate) fn from_val_and_value(root: Val, document: Value) -> Self {
1287 let root_val = OnceCell::new();
1288 let _ = root_val.set(root);
1289 Self {
1290 document,
1291 root_val,
1292 objvec_cache: Default::default(),
1293 raw_bytes: None,
1294 tape: OnceCell::new(),
1295 structural_index: OnceCell::new(),
1296 vm: RefCell::new(VM::new()),
1297 }
1298 }
1299
1300 pub(crate) fn root_val_with(
1305 &self,
1306 keys: &crate::data::intern::KeyCache,
1307 ) -> std::result::Result<Val, EvalError> {
1308 if let Some(root) = self.root_val.get() {
1309 return Ok(root.clone());
1310 }
1311 let root = {
1312 #[cfg(feature = "simd-json")]
1313 {
1314 if let Some(tape) = self.lazy_tape()? {
1315 Val::from_tape_data_with(keys, tape)
1316 } else {
1317 Val::from_value_with(keys, &self.document)
1318 }
1319 }
1320 #[cfg(not(feature = "simd-json"))]
1321 {
1322 Val::from_value_with(keys, &self.document)
1323 }
1324 };
1325 let _ = self.root_val.set(root);
1326 Ok(self.root_val.get().expect("root val initialized").clone())
1327 }
1328
1329 pub fn from_bytes(bytes: Vec<u8>) -> std::result::Result<Self, serde_json::Error> {
1333 #[cfg(feature = "simd-json")]
1334 {
1335 return Ok(Self {
1336 document: Value::Null,
1337 root_val: OnceCell::new(),
1338 objvec_cache: Default::default(),
1339 raw_bytes: Some(Arc::from(bytes.into_boxed_slice())),
1340 tape: OnceCell::new(),
1341 structural_index: OnceCell::new(),
1342 vm: RefCell::new(VM::new()),
1343 });
1344 }
1345 #[allow(unreachable_code)]
1346 {
1347 let document: Value = serde_json::from_slice(&bytes)?;
1348 Ok(Self {
1349 document,
1350 root_val: OnceCell::new(),
1351 objvec_cache: Default::default(),
1352 raw_bytes: Some(Arc::from(bytes.into_boxed_slice())),
1353 tape: OnceCell::new(),
1354 structural_index: OnceCell::new(),
1355 vm: RefCell::new(VM::new()),
1356 })
1357 }
1358 }
1359
1360 pub(crate) fn with_vm<F, R>(&self, f: F) -> R
1362 where
1363 F: FnOnce(&mut VM) -> R,
1364 {
1365 match self.vm.try_borrow_mut() {
1366 Ok(mut vm) => f(&mut vm),
1367 Err(_) => {
1368 let mut vm = VM::new();
1369 f(&mut vm)
1370 }
1371 }
1372 }
1373
1374 pub(crate) fn raw_bytes(&self) -> Option<&[u8]> {
1377 self.raw_bytes.as_deref()
1378 }
1379
1380 pub(crate) fn lazy_structural_index(
1383 &self,
1384 ) -> std::result::Result<Option<&Arc<jetro_experimental::StructuralIndex>>, EvalError> {
1385 if let Some(result) = self.structural_index.get() {
1386 return result
1387 .as_ref()
1388 .map(Some)
1389 .map_err(|err| EvalError(format!("Invalid JSON: {err}")));
1390 }
1391 let Some(raw) = self.raw_bytes.as_ref() else {
1392 return Ok(None);
1393 };
1394 let built = jetro_experimental::from_bytes_with(
1395 raw.as_ref(),
1396 jetro_experimental::BuildOptions::keys_only(),
1397 )
1398 .map(Arc::new)
1399 .map_err(|err| err.to_string());
1400 let _ = self.structural_index.set(built);
1401 self.structural_index
1402 .get()
1403 .expect("structural index cache initialized")
1404 .as_ref()
1405 .map(Some)
1406 .map_err(|err| EvalError(format!("Invalid JSON: {err}")))
1407 }
1408
1409 pub(crate) fn root_val(&self) -> std::result::Result<Val, EvalError> {
1412 if let Some(root) = self.root_val.get() {
1413 return Ok(root.clone());
1414 }
1415 let root = {
1416 #[cfg(feature = "simd-json")]
1417 {
1418 if let Some(tape) = self.lazy_tape()? {
1419 Val::from_tape_data(tape)
1420 } else {
1421 Val::from(&self.document)
1422 }
1423 }
1424 #[cfg(not(feature = "simd-json"))]
1425 {
1426 Val::from(&self.document)
1427 }
1428 };
1429 let _ = self.root_val.set(root);
1430 Ok(self.root_val.get().expect("root val initialized").clone())
1431 }
1432
1433 #[cfg(test)]
1436 pub(crate) fn root_val_is_materialized(&self) -> bool {
1437 self.root_val.get().is_some()
1438 }
1439
1440 #[cfg(test)]
1441 pub(crate) fn structural_index_is_built(&self) -> bool {
1442 self.structural_index.get().is_some()
1443 }
1444
1445 #[cfg(all(test, feature = "simd-json"))]
1446 pub(crate) fn tape_is_built(&self) -> bool {
1447 self.tape.get().is_some()
1448 }
1449
1450 #[cfg(all(test, feature = "simd-json"))]
1451 pub(crate) fn reset_tape_materialized_subtrees(&self) {
1452 if let Ok(Some(tape)) = self.lazy_tape() {
1453 tape.reset_materialized_subtrees();
1454 }
1455 }
1456
1457 #[cfg(all(test, feature = "simd-json"))]
1458 pub(crate) fn tape_materialized_subtrees(&self) -> usize {
1459 self.lazy_tape()
1460 .ok()
1461 .flatten()
1462 .map(|tape| tape.materialized_subtrees())
1463 .unwrap_or(0)
1464 }
1465
1466 pub fn collect<S: AsRef<str>>(&self, expr: S) -> std::result::Result<Value, EvalError> {
1470 exec::router::collect_json(self, expr.as_ref())
1471 }
1472}
1473
1474impl From<Value> for Jetro {
1478 fn from(v: Value) -> Self {
1480 Self::new(v)
1481 }
1482}