1#![doc = include_str!("../README.md")]
16
17use std::collections::HashMap;
18use std::fmt::Debug;
19use std::pin::Pin;
20use std::sync::{atomic::Ordering, Arc};
21use std::task::{Context, Poll};
22use std::time::{Duration, Instant};
23
24use anyhow::{anyhow, bail, Context as _, Result};
25use arrow_array::{builder::Int32Builder, Array, ArrayRef, BooleanArray, RecordBatch};
26use arrow_schema::{DataType, Field, FieldRef, Schema, SchemaRef};
27use futures_util::{FutureExt, Stream};
28use rquickjs::context::intrinsic::{All, Base};
29pub use rquickjs::runtime::MemoryUsage;
30use rquickjs::{
31 async_with, function::Args, module::Evaluated, Array as JsArray, AsyncContext, AsyncRuntime,
32 Ctx, FromJs, IteratorJs as _, Module, Object, Persistent, Promise, Value,
33};
34
35pub use self::into_field::IntoField;
36
37#[cfg(feature = "fetch")]
38mod fetch;
39mod into_field;
40mod jsarrow;
41
42pub struct Runtime {
67 functions: HashMap<String, Function>,
68 aggregates: HashMap<String, Aggregate>,
69 converter: jsarrow::Converter,
71 runtime: AsyncRuntime,
72 context: AsyncContext,
73 timeout: Option<Duration>,
75 deadline: Arc<atomic_time::AtomicOptionInstant>,
77}
78
79impl Debug for Runtime {
80 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81 f.debug_struct("Runtime")
82 .field("functions", &self.functions.keys())
83 .field("aggregates", &self.aggregates.keys())
84 .field("timeout", &self.timeout)
85 .finish()
86 }
87}
88
89struct Function {
91 function: JsFunction,
92 return_field: FieldRef,
93 options: FunctionOptions,
94}
95
96struct Aggregate {
98 state_field: FieldRef,
99 output_field: FieldRef,
100 create_state: JsFunction,
101 accumulate: JsFunction,
102 retract: Option<JsFunction>,
103 finish: Option<JsFunction>,
104 merge: Option<JsFunction>,
105 options: AggregateOptions,
106}
107
108unsafe impl Send for Function {}
113unsafe impl Sync for Function {}
114unsafe impl Send for Aggregate {}
115unsafe impl Sync for Aggregate {}
116
117type JsFunction = Persistent<rquickjs::Function<'static>>;
119
120unsafe impl Send for Runtime {}
122unsafe impl Sync for Runtime {}
123
124#[derive(Debug, Default, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
126pub enum CallMode {
127 #[default]
130 CalledOnNullInput,
131
132 ReturnNullOnNullInput,
136}
137
138#[derive(Debug, Clone, Default)]
140pub struct FunctionOptions {
141 pub call_mode: CallMode,
143 pub is_async: bool,
145 pub is_batched: bool,
147 pub handler: Option<String>,
150}
151
152impl FunctionOptions {
153 pub fn return_null_on_null_input(mut self) -> Self {
156 self.call_mode = CallMode::ReturnNullOnNullInput;
157 self
158 }
159
160 pub fn async_mode(mut self) -> Self {
162 self.is_async = true;
163 self
164 }
165
166 pub fn batched(mut self) -> Self {
168 self.is_batched = true;
169 self
170 }
171
172 pub fn handler(mut self, handler: impl Into<String>) -> Self {
174 self.handler = Some(handler.into());
175 self
176 }
177}
178
179#[derive(Debug, Clone, Default)]
181pub struct AggregateOptions {
182 pub call_mode: CallMode,
184 pub is_async: bool,
186}
187
188impl AggregateOptions {
189 pub fn return_null_on_null_input(mut self) -> Self {
192 self.call_mode = CallMode::ReturnNullOnNullInput;
193 self
194 }
195
196 pub fn async_mode(mut self) -> Self {
198 self.is_async = true;
199 self
200 }
201}
202
203impl Runtime {
204 pub async fn new() -> Result<Self> {
216 let runtime = AsyncRuntime::new().context("failed to create quickjs runtime")?;
217 let context = AsyncContext::custom::<(Base, All)>(&runtime)
218 .await
219 .context("failed to create quickjs context")?;
220
221 Ok(Self {
222 functions: HashMap::new(),
223 aggregates: HashMap::new(),
224 runtime,
225 context,
226 timeout: None,
227 deadline: Default::default(),
228 converter: jsarrow::Converter::new(),
229 })
230 }
231
232 pub async fn set_memory_limit(&self, limit: Option<usize>) {
244 self.runtime.set_memory_limit(limit.unwrap_or(0)).await;
245 }
246
247 pub async fn set_timeout(&mut self, timeout: Option<Duration>) {
260 self.timeout = timeout;
261 if timeout.is_some() {
262 let deadline = self.deadline.clone();
263 self.runtime
264 .set_interrupt_handler(Some(Box::new(move || {
265 if let Some(deadline) = deadline.load(Ordering::Relaxed) {
266 return deadline <= Instant::now();
267 }
268 false
269 })))
270 .await;
271 } else {
272 self.runtime.set_interrupt_handler(None).await;
273 }
274 }
275
276 pub fn inner(&self) -> &AsyncRuntime {
278 &self.runtime
279 }
280
281 pub fn converter_mut(&mut self) -> &mut jsarrow::Converter {
283 &mut self.converter
284 }
285
286 pub async fn add_function(
343 &mut self,
344 name: &str,
345 return_type: impl IntoField + Send,
346 code: &str,
347 options: FunctionOptions,
348 ) -> Result<()> {
349 let function = async_with!(self.context => |ctx| {
350 let (module, _) = Module::declare(ctx.clone(), name, code)
351 .map_err(|e| check_exception(e, &ctx))
352 .context("failed to declare module")?
353 .eval()
354 .map_err(|e| check_exception(e, &ctx))
355 .context("failed to evaluate module")?;
356 let function = Self::get_function(&ctx, &module, options.handler.as_deref().unwrap_or(name))?;
357 Ok(Function {
358 function,
359 return_field: return_type.into_field(name).into(),
360 options,
361 }) as Result<Function>
362 })
363 .await?;
364 self.functions.insert(name.to_string(), function);
365 Ok(())
366 }
367
368 fn get_function<'a>(
370 ctx: &Ctx<'a>,
371 module: &Module<'a, Evaluated>,
372 name: &str,
373 ) -> Result<JsFunction> {
374 let function: rquickjs::Function = module.get(name).with_context(|| {
375 format!("function \"{name}\" not found. HINT: make sure the function is exported")
376 })?;
377 Ok(Persistent::save(ctx, function))
378 }
379
380 pub async fn add_aggregate(
438 &mut self,
439 name: &str,
440 state_type: impl IntoField + Send,
441 output_type: impl IntoField + Send,
442 code: &str,
443 options: AggregateOptions,
444 ) -> Result<()> {
445 let aggregate = async_with!(self.context => |ctx| {
446 let (module, _) = Module::declare(ctx.clone(), name, code)
447 .map_err(|e| check_exception(e, &ctx))
448 .context("failed to declare module")?
449 .eval()
450 .map_err(|e| check_exception(e, &ctx))
451 .context("failed to evaluate module")?;
452 Ok(Aggregate {
453 state_field: state_type.into_field(name).into(),
454 output_field: output_type.into_field(name).into(),
455 create_state: Self::get_function(&ctx, &module, "create_state")?,
456 accumulate: Self::get_function(&ctx, &module, "accumulate")?,
457 retract: Self::get_function(&ctx, &module, "retract").ok(),
458 finish: Self::get_function(&ctx, &module, "finish").ok(),
459 merge: Self::get_function(&ctx, &module, "merge").ok(),
460 options,
461 }) as Result<Aggregate>
462 })
463 .await?;
464
465 if aggregate.finish.is_none() && aggregate.state_field != aggregate.output_field {
466 bail!("`output_type` must be the same as `state_type` when `finish` is not defined");
467 }
468 self.aggregates.insert(name.to_string(), aggregate);
469 Ok(())
470 }
471
472 #[doc = include_str!("doc_create_function.txt")]
479 pub async fn call(&self, name: &str, input: &RecordBatch) -> Result<RecordBatch> {
495 let function = self.functions.get(name).context("function not found")?;
496
497 async_with!(self.context => |ctx| {
498 if function.options.is_batched {
499 self.call_batched_function(&ctx, function, input).await
500 } else {
501 self.call_non_batched_function(&ctx, function, input).await
502 }
503 })
504 .await
505 }
506
507 async fn call_non_batched_function(
508 &self,
509 ctx: &Ctx<'_>,
510 function: &Function,
511 input: &RecordBatch,
512 ) -> Result<RecordBatch> {
513 let js_function = function.function.clone().restore(ctx)?;
514
515 let mut results = Vec::with_capacity(input.num_rows());
516 let mut row = Vec::with_capacity(input.num_columns());
517 for i in 0..input.num_rows() {
518 row.clear();
519 for (column, field) in input.columns().iter().zip(input.schema().fields()) {
520 let val = self
521 .converter
522 .get_jsvalue(ctx, field, column, i)
523 .context("failed to get jsvalue from arrow array")?;
524
525 row.push(val);
526 }
527 if function.options.call_mode == CallMode::ReturnNullOnNullInput
528 && row.iter().any(|v| v.is_null())
529 {
530 results.push(Value::new_null(ctx.clone()));
531 continue;
532 }
533 let mut args = Args::new(ctx.clone(), row.len());
534 args.push_args(row.drain(..))?;
535 let result = self
536 .call_user_fn(ctx, &js_function, args, function.options.is_async)
537 .await
538 .context("failed to call function")?;
539 results.push(result);
540 }
541
542 let array = self
543 .converter
544 .build_array(&function.return_field, ctx, results)
545 .context("failed to build arrow array from return values")?;
546 let schema = Schema::new(vec![function.return_field.clone()]);
547 Ok(RecordBatch::try_new(Arc::new(schema), vec![array])?)
548 }
549
550 async fn call_batched_function(
551 &self,
552 ctx: &Ctx<'_>,
553 function: &Function,
554 input: &RecordBatch,
555 ) -> Result<RecordBatch> {
556 let js_function = function.function.clone().restore(ctx)?;
557
558 let mut js_columns = Vec::with_capacity(input.num_columns());
559 for (column, field) in input.columns().iter().zip(input.schema().fields()) {
560 let mut js_values = Vec::with_capacity(input.num_rows());
561 for i in 0..input.num_rows() {
562 let val = self
563 .converter
564 .get_jsvalue(ctx, field, column, i)
565 .context("failed to get jsvalue from arrow array")?;
566 js_values.push(val);
567 }
568 js_columns.push(js_values);
569 }
570
571 let result = match function.options.call_mode {
572 CallMode::CalledOnNullInput => {
573 let mut args = Args::new(ctx.clone(), input.num_columns());
574 for js_values in js_columns {
575 let js_array = js_values.into_iter().collect_js::<JsArray>(ctx)?;
576 args.push_arg(js_array)?;
577 }
578 self.call_user_fn(ctx, &js_function, args, function.options.is_async)
579 .await
580 .context("failed to call function")?
581 }
582 CallMode::ReturnNullOnNullInput => {
583 let n_cols = input.num_columns();
586 let n_rows = input.num_rows();
587
588 let mut bitmap = Vec::with_capacity(n_rows);
590 for i in 0..n_rows {
591 let has_null = (0..n_cols).any(|j| js_columns[j][i].is_null());
592 bitmap.push(!has_null);
593 }
594
595 let mut filtered_columns = Vec::with_capacity(n_cols);
597 for js_values in js_columns {
598 let filtered_js_values: Vec<_> = js_values
599 .into_iter()
600 .zip(bitmap.iter())
601 .filter(|(_, b)| **b)
602 .map(|(v, _)| v)
603 .collect();
604 filtered_columns.push(filtered_js_values);
605 }
606
607 let mut args = Args::new(ctx.clone(), filtered_columns.len());
609 for js_values in filtered_columns {
610 let js_array = js_values.into_iter().collect_js::<JsArray>(ctx)?;
611 args.push_arg(js_array)?;
612 }
613 let filtered_result: Vec<_> = self
614 .call_user_fn(ctx, &js_function, args, function.options.is_async)
615 .await
616 .context("failed to call function")?;
617 let mut iter = filtered_result.into_iter();
618
619 let mut result = Vec::with_capacity(n_rows);
621 for b in bitmap.iter() {
622 if *b {
623 let v = iter.next().expect("filtered result length mismatch");
624 result.push(v);
625 } else {
626 result.push(Value::new_null(ctx.clone()));
627 }
628 }
629 assert!(iter.next().is_none(), "filtered result length mismatch");
630 result
631 }
632 };
633 let array = self
634 .converter
635 .build_array(&function.return_field, ctx, result)?;
636 let schema = Schema::new(vec![function.return_field.clone()]);
637 Ok(RecordBatch::try_new(Arc::new(schema), vec![array])?)
638 }
639
640 #[doc = include_str!("doc_create_function.txt")]
647 pub fn call_table_function<'a>(
669 &'a self,
670 name: &'a str,
671 input: &'a RecordBatch,
672 chunk_size: usize,
673 ) -> Result<RecordBatchIter<'a>> {
674 assert!(chunk_size > 0);
675 let function = self.functions.get(name).context("function not found")?;
676 if function.options.is_batched {
677 bail!("table function does not support batched mode");
678 }
679
680 Ok(RecordBatchIter {
682 rt: self,
683 input,
684 function,
685 schema: Arc::new(Schema::new(vec![
686 Arc::new(Field::new("row", DataType::Int32, false)),
687 function.return_field.clone(),
688 ])),
689 chunk_size,
690 row: 0,
691 generator: None,
692 converter: &self.converter,
693 })
694 }
695
696 #[doc = include_str!("doc_create_aggregate.txt")]
702 pub async fn create_state(&self, name: &str) -> Result<ArrayRef> {
707 let aggregate = self.aggregates.get(name).context("function not found")?;
708 let state = async_with!(self.context => |ctx| {
709 let create_state = aggregate.create_state.clone().restore(&ctx)?;
710 let state = self
711 .call_user_fn(&ctx, &create_state, Args::new(ctx.clone(), 0), aggregate.options.is_async)
712 .await
713 .context("failed to call create_state")?;
714 let state = self
715 .converter
716 .build_array(&aggregate.state_field, &ctx, vec![state])?;
717 Ok(state) as Result<_>
718 })
719 .await?;
720 Ok(state)
721 }
722
723 #[doc = include_str!("doc_create_aggregate.txt")]
729 pub async fn accumulate(
740 &self,
741 name: &str,
742 state: &dyn Array,
743 input: &RecordBatch,
744 ) -> Result<ArrayRef> {
745 let aggregate = self.aggregates.get(name).context("function not found")?;
746 let new_state = async_with!(self.context => |ctx| {
748 let accumulate = aggregate.accumulate.clone().restore(&ctx)?;
749 let mut state = self
750 .converter
751 .get_jsvalue(&ctx, &aggregate.state_field, state, 0)?;
752
753 let mut row = Vec::with_capacity(1 + input.num_columns());
754 for i in 0..input.num_rows() {
755 if aggregate.options.call_mode == CallMode::ReturnNullOnNullInput
756 && input.columns().iter().any(|column| column.is_null(i))
757 {
758 continue;
759 }
760 row.clear();
761 row.push(state.clone());
762 for (column, field) in input.columns().iter().zip(input.schema().fields()) {
763 let pyobj = self.converter.get_jsvalue(&ctx, field, column, i)?;
764 row.push(pyobj);
765 }
766 let mut args = Args::new(ctx.clone(), row.len());
767 args.push_args(row.drain(..))?;
768 state = self
769 .call_user_fn(&ctx, &accumulate, args, aggregate.options.is_async)
770 .await
771 .context("failed to call accumulate")?;
772 }
773 let output = self
774 .converter
775 .build_array(&aggregate.state_field, &ctx, vec![state])?;
776 Ok(output) as Result<_>
777 })
778 .await?;
779 Ok(new_state)
780 }
781
782 #[doc = include_str!("doc_create_aggregate.txt")]
791 pub async fn accumulate_or_retract(
803 &self,
804 name: &str,
805 state: &dyn Array,
806 ops: &BooleanArray,
807 input: &RecordBatch,
808 ) -> Result<ArrayRef> {
809 let aggregate = self.aggregates.get(name).context("function not found")?;
810 let new_state = async_with!(self.context => |ctx| {
812 let accumulate = aggregate.accumulate.clone().restore(&ctx)?;
813 let retract = aggregate
814 .retract
815 .clone()
816 .context("function does not support retraction")?
817 .restore(&ctx)?;
818
819 let mut state = self
820 .converter
821 .get_jsvalue(&ctx, &aggregate.state_field, state, 0)?;
822
823 let mut row = Vec::with_capacity(1 + input.num_columns());
824 for i in 0..input.num_rows() {
825 if aggregate.options.call_mode == CallMode::ReturnNullOnNullInput
826 && input.columns().iter().any(|column| column.is_null(i))
827 {
828 continue;
829 }
830 row.clear();
831 row.push(state.clone());
832 for (column, field) in input.columns().iter().zip(input.schema().fields()) {
833 let pyobj = self.converter.get_jsvalue(&ctx, field, column, i)?;
834 row.push(pyobj);
835 }
836 let func = if ops.is_valid(i) && ops.value(i) {
837 &retract
838 } else {
839 &accumulate
840 };
841 let mut args = Args::new(ctx.clone(), row.len());
842 args.push_args(row.drain(..))?;
843 state = self
844 .call_user_fn(&ctx, func, args, aggregate.options.is_async)
845 .await
846 .context("failed to call accumulate or retract")?;
847 }
848 let output = self
849 .converter
850 .build_array(&aggregate.state_field, &ctx, vec![state])?;
851 Ok(output) as Result<_>
852 })
853 .await?;
854 Ok(new_state)
855 }
856
857 #[doc = include_str!("doc_create_aggregate.txt")]
863 pub async fn merge(&self, name: &str, states: &dyn Array) -> Result<ArrayRef> {
870 let aggregate = self.aggregates.get(name).context("function not found")?;
871 let output = async_with!(self.context => |ctx| {
872 let merge = aggregate
873 .merge
874 .clone()
875 .context("merge not found")?
876 .restore(&ctx)?;
877 let mut state = self
878 .converter
879 .get_jsvalue(&ctx, &aggregate.state_field, states, 0)?;
880 for i in 1..states.len() {
881 if aggregate.options.call_mode == CallMode::ReturnNullOnNullInput && states.is_null(i) {
882 continue;
883 }
884 let state2 = self
885 .converter
886 .get_jsvalue(&ctx, &aggregate.state_field, states, i)?;
887 let mut args = Args::new(ctx.clone(), 2);
888 args.push_args([state, state2])?;
889 state = self
890 .call_user_fn(&ctx, &merge, args, aggregate.options.is_async)
891 .await
892 .context("failed to call accumulate or retract")?;
893 }
894 let output = self
895 .converter
896 .build_array(&aggregate.state_field, &ctx, vec![state])?;
897 Ok(output) as Result<_>
898 })
899 .await?;
900 Ok(output)
901 }
902
903 #[doc = include_str!("doc_create_aggregate.txt")]
911 pub async fn finish(&self, name: &str, states: &ArrayRef) -> Result<ArrayRef> {
918 let aggregate = self.aggregates.get(name).context("function not found")?;
919 if aggregate.finish.is_none() {
920 return Ok(states.clone());
921 };
922 let output = async_with!(self.context => |ctx| {
923 let finish = aggregate.finish.clone().unwrap().restore(&ctx)?;
924 let mut results = Vec::with_capacity(states.len());
925 for i in 0..states.len() {
926 if aggregate.options.call_mode == CallMode::ReturnNullOnNullInput && states.is_null(i) {
927 results.push(Value::new_null(ctx.clone()));
928 continue;
929 }
930 let state =
931 self.converter
932 .get_jsvalue(&ctx, &aggregate.state_field, states, i)?;
933 let mut args = Args::new(ctx.clone(), 1);
934 args.push_args([state])?;
935 let result = self
936 .call_user_fn(&ctx, &finish, args, aggregate.options.is_async)
937 .await
938 .context("failed to call finish")?;
939 results.push(result);
940 }
941 let output = self
942 .converter
943 .build_array(&aggregate.output_field, &ctx, results)?;
944 Ok(output) as Result<_>
945 })
946 .await?;
947 Ok(output)
948 }
949
950 async fn call_user_fn<'js, T: FromJs<'js>>(
954 &self,
955 ctx: &Ctx<'js>,
956 f: &rquickjs::Function<'js>,
957 args: Args<'js>,
958 is_async: bool,
959 ) -> Result<T> {
960 if is_async {
961 Self::call_user_fn_async(self, ctx, f, args).await
962 } else {
963 Self::call_user_fn_sync(self, ctx, f, args)
964 }
965 }
966
967 async fn call_user_fn_async<'js, T: FromJs<'js>>(
968 &self,
969 ctx: &Ctx<'js>,
970 f: &rquickjs::Function<'js>,
971 args: Args<'js>,
972 ) -> Result<T> {
973 let call_result = if let Some(timeout) = self.timeout {
974 self.deadline
975 .store(Some(Instant::now() + timeout), Ordering::Relaxed);
976 let call_result = f.call_arg::<Promise>(args);
977 self.deadline.store(None, Ordering::Relaxed);
978 call_result
979 } else {
980 f.call_arg::<Promise>(args)
981 };
982 let promise = call_result.map_err(|e| check_exception(e, ctx))?;
983 promise
984 .into_future::<T>()
985 .await
986 .map_err(|e| check_exception(e, ctx))
987 }
988
989 fn call_user_fn_sync<'js, T: FromJs<'js>>(
990 &self,
991 ctx: &Ctx<'js>,
992 f: &rquickjs::Function<'js>,
993 args: Args<'js>,
994 ) -> Result<T> {
995 let result = if let Some(timeout) = self.timeout {
996 self.deadline
997 .store(Some(Instant::now() + timeout), Ordering::Relaxed);
998 let result = f.call_arg(args);
999 self.deadline.store(None, Ordering::Relaxed);
1000 result
1001 } else {
1002 f.call_arg(args)
1003 };
1004 result.map_err(|e| check_exception(e, ctx))
1005 }
1006
1007 pub fn context(&self) -> &AsyncContext {
1008 &self.context
1009 }
1010
1011 #[cfg(feature = "fetch")]
1015 pub async fn enable_fetch(&self) -> Result<()> {
1016 fetch::enable_fetch(&self.runtime, &self.context).await
1017 }
1018}
1019
1020pub struct RecordBatchIter<'a> {
1022 rt: &'a Runtime,
1023 input: &'a RecordBatch,
1024 function: &'a Function,
1026 schema: SchemaRef,
1027 chunk_size: usize,
1028 row: usize,
1031 generator: Option<Persistent<Object<'static>>>,
1033 converter: &'a jsarrow::Converter,
1034}
1035
1036unsafe impl Send for RecordBatchIter<'_> {}
1038
1039impl RecordBatchIter<'_> {
1040 pub fn schema(&self) -> &Schema {
1042 &self.schema
1043 }
1044
1045 pub async fn next(&mut self) -> Result<Option<RecordBatch>> {
1046 if self.row == self.input.num_rows() {
1047 return Ok(None);
1048 }
1049 async_with!(self.rt.context => |ctx| {
1050 let js_function = self.function.function.clone().restore(&ctx)?;
1051 let mut indexes = Int32Builder::with_capacity(self.chunk_size);
1052 let mut results = Vec::with_capacity(self.input.num_rows());
1053 let mut row = Vec::with_capacity(self.input.num_columns());
1054 let mut generator = match self.generator.take() {
1056 Some(generator) => {
1057 let gen = generator.restore(&ctx)?;
1058 let next: rquickjs::Function =
1059 gen.get("next").context("failed to get 'next' method")?;
1060 Some((gen, next))
1061 }
1062 None => None,
1063 };
1064 while self.row < self.input.num_rows() && results.len() < self.chunk_size {
1065 let (gen, next) = if let Some(g) = generator.as_ref() {
1066 g
1067 } else {
1068 row.clear();
1070 for (column, field) in
1071 (self.input.columns().iter()).zip(self.input.schema().fields())
1072 {
1073 let val = self
1074 .converter
1075 .get_jsvalue(&ctx, field, column, self.row)
1076 .context("failed to get jsvalue from arrow array")?;
1077 row.push(val);
1078 }
1079 if self.function.options.call_mode == CallMode::ReturnNullOnNullInput
1080 && row.iter().any(|v| v.is_null())
1081 {
1082 self.row += 1;
1083 continue;
1084 }
1085 let mut args = Args::new(ctx.clone(), row.len());
1086 args.push_args(row.drain(..))?;
1087 let gen: Object = self
1091 .rt
1092 .call_user_fn(&ctx, &js_function, args, false).await
1093 .context("failed to call function")?;
1094 let next: rquickjs::Function =
1095 gen.get("next").context("failed to get 'next' method")?;
1096 let mut args = Args::new(ctx.clone(), 0);
1097 args.this(gen.clone())?;
1098 generator.insert((gen, next))
1099 };
1100 let mut args = Args::new(ctx.clone(), 0);
1101 args.this(gen.clone())?;
1102 let object: Object = self
1103 .rt
1104 .call_user_fn(&ctx, next, args, self.function.options.is_async).await
1105 .context("failed to call next")?;
1106 let value: Value = object.get("value")?;
1107 let done: bool = object.get("done")?;
1108 if done {
1109 self.row += 1;
1110 generator = None;
1111 continue;
1112 }
1113 indexes.append_value(self.row as i32);
1114 results.push(value);
1115 }
1116 self.generator = generator.map(|(gen, _)| Persistent::save(&ctx, gen));
1117
1118 if results.is_empty() {
1119 return Ok(None);
1120 }
1121 let indexes = Arc::new(indexes.finish());
1122 let array = self
1123 .converter
1124 .build_array(&self.function.return_field, &ctx, results)
1125 .context("failed to build arrow array from return values")?;
1126 Ok(Some(RecordBatch::try_new(
1127 self.schema.clone(),
1128 vec![indexes, array],
1129 )?))
1130 })
1131 .await
1132 }
1133}
1134
1135impl Stream for RecordBatchIter<'_> {
1136 type Item = Result<RecordBatch>;
1137 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1138 Box::pin(self.next().map(|v| v.transpose()))
1139 .as_mut()
1140 .poll_unpin(cx)
1141 }
1142}
1143
1144pub(crate) fn check_exception(err: rquickjs::Error, ctx: &Ctx) -> anyhow::Error {
1146 match err {
1147 rquickjs::Error::Exception => {
1148 anyhow!("exception generated by QuickJS: {:?}", ctx.catch())
1149 }
1150 e => e.into(),
1151 }
1152}