1#![doc = include_str!("README.md")]
16
17use std::collections::HashMap;
18use std::fmt::Debug;
19use std::pin::Pin;
20use std::sync::{atomic::Ordering, Arc};
21use std::task::{Context, Poll};
22use std::time::{Duration, Instant};
23
24use anyhow::{anyhow, bail, Context as _, Result};
25use arrow_array::{builder::Int32Builder, Array, ArrayRef, BooleanArray, RecordBatch};
26use arrow_schema::{DataType, Field, FieldRef, Schema, SchemaRef};
27use futures_util::{FutureExt, Stream};
28use rquickjs::context::intrinsic::{All, Base};
29pub use rquickjs::runtime::MemoryUsage;
30use rquickjs::{
31 async_with, function::Args, module::Evaluated, Array as JsArray, AsyncContext, AsyncRuntime,
32 Ctx, FromJs, IteratorJs as _, Module, Object, Persistent, Promise, Value,
33};
34
35use crate::into_field::IntoField;
36use crate::CallMode;
37
38#[cfg(feature = "javascript-fetch")]
39mod fetch;
40mod jsarrow;
41
42pub struct Runtime {
67 functions: HashMap<String, Function>,
68 aggregates: HashMap<String, Aggregate>,
69 converter: jsarrow::Converter,
71 runtime: AsyncRuntime,
72 context: AsyncContext,
73 timeout: Option<Duration>,
75 deadline: Arc<atomic_time::AtomicOptionInstant>,
77}
78
79impl Debug for Runtime {
80 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81 f.debug_struct("Runtime")
82 .field("functions", &self.functions.keys())
83 .field("aggregates", &self.aggregates.keys())
84 .field("timeout", &self.timeout)
85 .finish()
86 }
87}
88
89struct Function {
91 function: JsFunction,
92 return_field: FieldRef,
93 options: FunctionOptions,
94}
95
96struct Aggregate {
98 state_field: FieldRef,
99 output_field: FieldRef,
100 create_state: JsFunction,
101 accumulate: JsFunction,
102 retract: Option<JsFunction>,
103 finish: Option<JsFunction>,
104 merge: Option<JsFunction>,
105 options: AggregateOptions,
106}
107
108unsafe impl Send for Function {}
113unsafe impl Sync for Function {}
114unsafe impl Send for Aggregate {}
115unsafe impl Sync for Aggregate {}
116
117type JsFunction = Persistent<rquickjs::Function<'static>>;
119
120unsafe impl Send for Runtime {}
122unsafe impl Sync for Runtime {}
123
124#[derive(Debug, Clone, Default)]
126pub struct FunctionOptions {
127 pub call_mode: CallMode,
129 pub is_async: bool,
131 pub is_batched: bool,
133 pub handler: Option<String>,
136}
137
138impl FunctionOptions {
139 pub fn return_null_on_null_input(mut self) -> Self {
142 self.call_mode = CallMode::ReturnNullOnNullInput;
143 self
144 }
145
146 pub fn async_mode(mut self) -> Self {
148 self.is_async = true;
149 self
150 }
151
152 pub fn batched(mut self) -> Self {
154 self.is_batched = true;
155 self
156 }
157
158 pub fn handler(mut self, handler: impl Into<String>) -> Self {
160 self.handler = Some(handler.into());
161 self
162 }
163}
164
165#[derive(Debug, Clone, Default)]
167pub struct AggregateOptions {
168 pub call_mode: CallMode,
170 pub is_async: bool,
172}
173
174impl AggregateOptions {
175 pub fn return_null_on_null_input(mut self) -> Self {
178 self.call_mode = CallMode::ReturnNullOnNullInput;
179 self
180 }
181
182 pub fn async_mode(mut self) -> Self {
184 self.is_async = true;
185 self
186 }
187}
188
189impl Runtime {
190 pub async fn new() -> Result<Self> {
202 let runtime = AsyncRuntime::new().context("failed to create quickjs runtime")?;
203 let context = AsyncContext::custom::<(Base, All)>(&runtime)
204 .await
205 .context("failed to create quickjs context")?;
206
207 Ok(Self {
208 functions: HashMap::new(),
209 aggregates: HashMap::new(),
210 runtime,
211 context,
212 timeout: None,
213 deadline: Default::default(),
214 converter: jsarrow::Converter::new(),
215 })
216 }
217
218 pub async fn set_memory_limit(&self, limit: Option<usize>) {
230 self.runtime.set_memory_limit(limit.unwrap_or(0)).await;
231 }
232
233 pub async fn set_timeout(&mut self, timeout: Option<Duration>) {
246 self.timeout = timeout;
247 if timeout.is_some() {
248 let deadline = self.deadline.clone();
249 self.runtime
250 .set_interrupt_handler(Some(Box::new(move || {
251 if let Some(deadline) = deadline.load(Ordering::Relaxed) {
252 return deadline <= Instant::now();
253 }
254 false
255 })))
256 .await;
257 } else {
258 self.runtime.set_interrupt_handler(None).await;
259 }
260 }
261
262 pub fn inner(&self) -> &AsyncRuntime {
264 &self.runtime
265 }
266
267 pub fn converter_mut(&mut self) -> &mut jsarrow::Converter {
269 &mut self.converter
270 }
271
272 pub async fn add_function(
329 &mut self,
330 name: &str,
331 return_type: impl IntoField + Send,
332 code: &str,
333 options: FunctionOptions,
334 ) -> Result<()> {
335 let function = async_with!(self.context => |ctx| {
336 let (module, _) = Module::declare(ctx.clone(), name, code)
337 .map_err(|e| check_exception(e, &ctx))
338 .context("failed to declare module")?
339 .eval()
340 .map_err(|e| check_exception(e, &ctx))
341 .context("failed to evaluate module")?;
342 let function = Self::get_function(&ctx, &module, options.handler.as_deref().unwrap_or(name))?;
343 Ok(Function {
344 function,
345 return_field: return_type.into_field(name).into(),
346 options,
347 }) as Result<Function>
348 })
349 .await?;
350 self.functions.insert(name.to_string(), function);
351 Ok(())
352 }
353
354 fn get_function<'a>(
356 ctx: &Ctx<'a>,
357 module: &Module<'a, Evaluated>,
358 name: &str,
359 ) -> Result<JsFunction> {
360 let function: rquickjs::Function = module.get(name).with_context(|| {
361 format!("function \"{name}\" not found. HINT: make sure the function is exported")
362 })?;
363 Ok(Persistent::save(ctx, function))
364 }
365
366 pub async fn add_aggregate(
424 &mut self,
425 name: &str,
426 state_type: impl IntoField + Send,
427 output_type: impl IntoField + Send,
428 code: &str,
429 options: AggregateOptions,
430 ) -> Result<()> {
431 let aggregate = async_with!(self.context => |ctx| {
432 let (module, _) = Module::declare(ctx.clone(), name, code)
433 .map_err(|e| check_exception(e, &ctx))
434 .context("failed to declare module")?
435 .eval()
436 .map_err(|e| check_exception(e, &ctx))
437 .context("failed to evaluate module")?;
438 Ok(Aggregate {
439 state_field: state_type.into_field(name).into(),
440 output_field: output_type.into_field(name).into(),
441 create_state: Self::get_function(&ctx, &module, "create_state")?,
442 accumulate: Self::get_function(&ctx, &module, "accumulate")?,
443 retract: Self::get_function(&ctx, &module, "retract").ok(),
444 finish: Self::get_function(&ctx, &module, "finish").ok(),
445 merge: Self::get_function(&ctx, &module, "merge").ok(),
446 options,
447 }) as Result<Aggregate>
448 })
449 .await?;
450
451 if aggregate.finish.is_none() && aggregate.state_field != aggregate.output_field {
452 bail!("`output_type` must be the same as `state_type` when `finish` is not defined");
453 }
454 self.aggregates.insert(name.to_string(), aggregate);
455 Ok(())
456 }
457
458 #[doc = include_str!("doc_create_function.txt")]
465 pub async fn call(&self, name: &str, input: &RecordBatch) -> Result<RecordBatch> {
481 let function = self.functions.get(name).context("function not found")?;
482
483 async_with!(self.context => |ctx| {
484 if function.options.is_batched {
485 self.call_batched_function(&ctx, function, input).await
486 } else {
487 self.call_non_batched_function(&ctx, function, input).await
488 }
489 })
490 .await
491 }
492
493 async fn call_non_batched_function(
494 &self,
495 ctx: &Ctx<'_>,
496 function: &Function,
497 input: &RecordBatch,
498 ) -> Result<RecordBatch> {
499 let js_function = function.function.clone().restore(ctx)?;
500
501 let mut results = Vec::with_capacity(input.num_rows());
502 let mut row = Vec::with_capacity(input.num_columns());
503 for i in 0..input.num_rows() {
504 row.clear();
505 for (column, field) in input.columns().iter().zip(input.schema().fields()) {
506 let val = self
507 .converter
508 .get_jsvalue(ctx, field, column, i)
509 .context("failed to get jsvalue from arrow array")?;
510
511 row.push(val);
512 }
513 if function.options.call_mode == CallMode::ReturnNullOnNullInput
514 && row.iter().any(|v| v.is_null())
515 {
516 results.push(Value::new_null(ctx.clone()));
517 continue;
518 }
519 let mut args = Args::new(ctx.clone(), row.len());
520 args.push_args(row.drain(..))?;
521 let result = self
522 .call_user_fn(ctx, &js_function, args, function.options.is_async)
523 .await
524 .context("failed to call function")?;
525 results.push(result);
526 }
527
528 let array = self
529 .converter
530 .build_array(&function.return_field, ctx, results)
531 .context("failed to build arrow array from return values")?;
532 let schema = Schema::new(vec![function.return_field.clone()]);
533 Ok(RecordBatch::try_new(Arc::new(schema), vec![array])?)
534 }
535
536 async fn call_batched_function(
537 &self,
538 ctx: &Ctx<'_>,
539 function: &Function,
540 input: &RecordBatch,
541 ) -> Result<RecordBatch> {
542 let js_function = function.function.clone().restore(ctx)?;
543
544 let mut js_columns = Vec::with_capacity(input.num_columns());
545 for (column, field) in input.columns().iter().zip(input.schema().fields()) {
546 let mut js_values = Vec::with_capacity(input.num_rows());
547 for i in 0..input.num_rows() {
548 let val = self
549 .converter
550 .get_jsvalue(ctx, field, column, i)
551 .context("failed to get jsvalue from arrow array")?;
552 js_values.push(val);
553 }
554 js_columns.push(js_values);
555 }
556
557 let result = match function.options.call_mode {
558 CallMode::CalledOnNullInput => {
559 let mut args = Args::new(ctx.clone(), input.num_columns());
560 for js_values in js_columns {
561 let js_array = js_values.into_iter().collect_js::<JsArray>(ctx)?;
562 args.push_arg(js_array)?;
563 }
564 self.call_user_fn(ctx, &js_function, args, function.options.is_async)
565 .await
566 .context("failed to call function")?
567 }
568 CallMode::ReturnNullOnNullInput => {
569 let n_cols = input.num_columns();
572 let n_rows = input.num_rows();
573
574 let mut bitmap = Vec::with_capacity(n_rows);
576 for i in 0..n_rows {
577 let has_null = (0..n_cols).any(|j| js_columns[j][i].is_null());
578 bitmap.push(!has_null);
579 }
580
581 let mut filtered_columns = Vec::with_capacity(n_cols);
583 for js_values in js_columns {
584 let filtered_js_values: Vec<_> = js_values
585 .into_iter()
586 .zip(bitmap.iter())
587 .filter(|(_, b)| **b)
588 .map(|(v, _)| v)
589 .collect();
590 filtered_columns.push(filtered_js_values);
591 }
592
593 let mut args = Args::new(ctx.clone(), filtered_columns.len());
595 for js_values in filtered_columns {
596 let js_array = js_values.into_iter().collect_js::<JsArray>(ctx)?;
597 args.push_arg(js_array)?;
598 }
599 let filtered_result: Vec<_> = self
600 .call_user_fn(ctx, &js_function, args, function.options.is_async)
601 .await
602 .context("failed to call function")?;
603 let mut iter = filtered_result.into_iter();
604
605 let mut result = Vec::with_capacity(n_rows);
607 for b in bitmap.iter() {
608 if *b {
609 let v = iter.next().expect("filtered result length mismatch");
610 result.push(v);
611 } else {
612 result.push(Value::new_null(ctx.clone()));
613 }
614 }
615 assert!(iter.next().is_none(), "filtered result length mismatch");
616 result
617 }
618 };
619 let array = self
620 .converter
621 .build_array(&function.return_field, ctx, result)?;
622 let schema = Schema::new(vec![function.return_field.clone()]);
623 Ok(RecordBatch::try_new(Arc::new(schema), vec![array])?)
624 }
625
626 #[doc = include_str!("doc_create_function.txt")]
633 pub fn call_table_function<'a>(
655 &'a self,
656 name: &'a str,
657 input: &'a RecordBatch,
658 chunk_size: usize,
659 ) -> Result<RecordBatchIter<'a>> {
660 assert!(chunk_size > 0);
661 let function = self.functions.get(name).context("function not found")?;
662 if function.options.is_batched {
663 bail!("table function does not support batched mode");
664 }
665
666 Ok(RecordBatchIter {
668 rt: self,
669 input,
670 function,
671 schema: Arc::new(Schema::new(vec![
672 Arc::new(Field::new("row", DataType::Int32, false)),
673 function.return_field.clone(),
674 ])),
675 chunk_size,
676 row: 0,
677 generator: None,
678 converter: &self.converter,
679 })
680 }
681
682 #[doc = include_str!("doc_create_aggregate.txt")]
688 pub async fn create_state(&self, name: &str) -> Result<ArrayRef> {
693 let aggregate = self.aggregates.get(name).context("function not found")?;
694 let state = async_with!(self.context => |ctx| {
695 let create_state = aggregate.create_state.clone().restore(&ctx)?;
696 let state = self
697 .call_user_fn(&ctx, &create_state, Args::new(ctx.clone(), 0), aggregate.options.is_async)
698 .await
699 .context("failed to call create_state")?;
700 let state = self
701 .converter
702 .build_array(&aggregate.state_field, &ctx, vec![state])?;
703 Ok(state) as Result<_>
704 })
705 .await?;
706 Ok(state)
707 }
708
709 #[doc = include_str!("doc_create_aggregate.txt")]
715 pub async fn accumulate(
726 &self,
727 name: &str,
728 state: &dyn Array,
729 input: &RecordBatch,
730 ) -> Result<ArrayRef> {
731 let aggregate = self.aggregates.get(name).context("function not found")?;
732 let new_state = async_with!(self.context => |ctx| {
734 let accumulate = aggregate.accumulate.clone().restore(&ctx)?;
735 let mut state = self
736 .converter
737 .get_jsvalue(&ctx, &aggregate.state_field, state, 0)?;
738
739 let mut row = Vec::with_capacity(1 + input.num_columns());
740 for i in 0..input.num_rows() {
741 if aggregate.options.call_mode == CallMode::ReturnNullOnNullInput
742 && input.columns().iter().any(|column| column.is_null(i))
743 {
744 continue;
745 }
746 row.clear();
747 row.push(state.clone());
748 for (column, field) in input.columns().iter().zip(input.schema().fields()) {
749 let pyobj = self.converter.get_jsvalue(&ctx, field, column, i)?;
750 row.push(pyobj);
751 }
752 let mut args = Args::new(ctx.clone(), row.len());
753 args.push_args(row.drain(..))?;
754 state = self
755 .call_user_fn(&ctx, &accumulate, args, aggregate.options.is_async)
756 .await
757 .context("failed to call accumulate")?;
758 }
759 let output = self
760 .converter
761 .build_array(&aggregate.state_field, &ctx, vec![state])?;
762 Ok(output) as Result<_>
763 })
764 .await?;
765 Ok(new_state)
766 }
767
768 #[doc = include_str!("doc_create_aggregate.txt")]
777 pub async fn accumulate_or_retract(
789 &self,
790 name: &str,
791 state: &dyn Array,
792 ops: &BooleanArray,
793 input: &RecordBatch,
794 ) -> Result<ArrayRef> {
795 let aggregate = self.aggregates.get(name).context("function not found")?;
796 let new_state = async_with!(self.context => |ctx| {
798 let accumulate = aggregate.accumulate.clone().restore(&ctx)?;
799 let retract = aggregate
800 .retract
801 .clone()
802 .context("function does not support retraction")?
803 .restore(&ctx)?;
804
805 let mut state = self
806 .converter
807 .get_jsvalue(&ctx, &aggregate.state_field, state, 0)?;
808
809 let mut row = Vec::with_capacity(1 + input.num_columns());
810 for i in 0..input.num_rows() {
811 if aggregate.options.call_mode == CallMode::ReturnNullOnNullInput
812 && input.columns().iter().any(|column| column.is_null(i))
813 {
814 continue;
815 }
816 row.clear();
817 row.push(state.clone());
818 for (column, field) in input.columns().iter().zip(input.schema().fields()) {
819 let pyobj = self.converter.get_jsvalue(&ctx, field, column, i)?;
820 row.push(pyobj);
821 }
822 let func = if ops.is_valid(i) && ops.value(i) {
823 &retract
824 } else {
825 &accumulate
826 };
827 let mut args = Args::new(ctx.clone(), row.len());
828 args.push_args(row.drain(..))?;
829 state = self
830 .call_user_fn(&ctx, func, args, aggregate.options.is_async)
831 .await
832 .context("failed to call accumulate or retract")?;
833 }
834 let output = self
835 .converter
836 .build_array(&aggregate.state_field, &ctx, vec![state])?;
837 Ok(output) as Result<_>
838 })
839 .await?;
840 Ok(new_state)
841 }
842
843 #[doc = include_str!("doc_create_aggregate.txt")]
849 pub async fn merge(&self, name: &str, states: &dyn Array) -> Result<ArrayRef> {
856 let aggregate = self.aggregates.get(name).context("function not found")?;
857 let output = async_with!(self.context => |ctx| {
858 let merge = aggregate
859 .merge
860 .clone()
861 .context("merge not found")?
862 .restore(&ctx)?;
863 let mut state = self
864 .converter
865 .get_jsvalue(&ctx, &aggregate.state_field, states, 0)?;
866 for i in 1..states.len() {
867 if aggregate.options.call_mode == CallMode::ReturnNullOnNullInput && states.is_null(i) {
868 continue;
869 }
870 let state2 = self
871 .converter
872 .get_jsvalue(&ctx, &aggregate.state_field, states, i)?;
873 let mut args = Args::new(ctx.clone(), 2);
874 args.push_args([state, state2])?;
875 state = self
876 .call_user_fn(&ctx, &merge, args, aggregate.options.is_async)
877 .await
878 .context("failed to call accumulate or retract")?;
879 }
880 let output = self
881 .converter
882 .build_array(&aggregate.state_field, &ctx, vec![state])?;
883 Ok(output) as Result<_>
884 })
885 .await?;
886 Ok(output)
887 }
888
889 #[doc = include_str!("doc_create_aggregate.txt")]
897 pub async fn finish(&self, name: &str, states: &ArrayRef) -> Result<ArrayRef> {
904 let aggregate = self.aggregates.get(name).context("function not found")?;
905 if aggregate.finish.is_none() {
906 return Ok(states.clone());
907 };
908 let output = async_with!(self.context => |ctx| {
909 let finish = aggregate.finish.clone().unwrap().restore(&ctx)?;
910 let mut results = Vec::with_capacity(states.len());
911 for i in 0..states.len() {
912 if aggregate.options.call_mode == CallMode::ReturnNullOnNullInput && states.is_null(i) {
913 results.push(Value::new_null(ctx.clone()));
914 continue;
915 }
916 let state =
917 self.converter
918 .get_jsvalue(&ctx, &aggregate.state_field, states, i)?;
919 let mut args = Args::new(ctx.clone(), 1);
920 args.push_args([state])?;
921 let result = self
922 .call_user_fn(&ctx, &finish, args, aggregate.options.is_async)
923 .await
924 .context("failed to call finish")?;
925 results.push(result);
926 }
927 let output = self
928 .converter
929 .build_array(&aggregate.output_field, &ctx, results)?;
930 Ok(output) as Result<_>
931 })
932 .await?;
933 Ok(output)
934 }
935
936 async fn call_user_fn<'js, T: FromJs<'js>>(
940 &self,
941 ctx: &Ctx<'js>,
942 f: &rquickjs::Function<'js>,
943 args: Args<'js>,
944 is_async: bool,
945 ) -> Result<T> {
946 if is_async {
947 Self::call_user_fn_async(self, ctx, f, args).await
948 } else {
949 Self::call_user_fn_sync(self, ctx, f, args)
950 }
951 }
952
953 async fn call_user_fn_async<'js, T: FromJs<'js>>(
954 &self,
955 ctx: &Ctx<'js>,
956 f: &rquickjs::Function<'js>,
957 args: Args<'js>,
958 ) -> Result<T> {
959 let call_result = if let Some(timeout) = self.timeout {
960 self.deadline
961 .store(Some(Instant::now() + timeout), Ordering::Relaxed);
962 let call_result = f.call_arg::<Promise>(args);
963 self.deadline.store(None, Ordering::Relaxed);
964 call_result
965 } else {
966 f.call_arg::<Promise>(args)
967 };
968 let promise = call_result.map_err(|e| check_exception(e, ctx))?;
969 promise
970 .into_future::<T>()
971 .await
972 .map_err(|e| check_exception(e, ctx))
973 }
974
975 fn call_user_fn_sync<'js, T: FromJs<'js>>(
976 &self,
977 ctx: &Ctx<'js>,
978 f: &rquickjs::Function<'js>,
979 args: Args<'js>,
980 ) -> Result<T> {
981 let result = if let Some(timeout) = self.timeout {
982 self.deadline
983 .store(Some(Instant::now() + timeout), Ordering::Relaxed);
984 let result = f.call_arg(args);
985 self.deadline.store(None, Ordering::Relaxed);
986 result
987 } else {
988 f.call_arg(args)
989 };
990 result.map_err(|e| check_exception(e, ctx))
991 }
992
993 pub fn context(&self) -> &AsyncContext {
994 &self.context
995 }
996
997 #[cfg(feature = "javascript-fetch")]
1001 pub async fn enable_fetch(&self) -> Result<()> {
1002 fetch::enable_fetch(&self.runtime, &self.context).await
1003 }
1004}
1005
1006pub struct RecordBatchIter<'a> {
1008 rt: &'a Runtime,
1009 input: &'a RecordBatch,
1010 function: &'a Function,
1012 schema: SchemaRef,
1013 chunk_size: usize,
1014 row: usize,
1017 generator: Option<Persistent<Object<'static>>>,
1019 converter: &'a jsarrow::Converter,
1020}
1021
1022unsafe impl Send for RecordBatchIter<'_> {}
1024
1025impl RecordBatchIter<'_> {
1026 pub fn schema(&self) -> &Schema {
1028 &self.schema
1029 }
1030
1031 pub async fn next(&mut self) -> Result<Option<RecordBatch>> {
1032 if self.row == self.input.num_rows() {
1033 return Ok(None);
1034 }
1035 async_with!(self.rt.context => |ctx| {
1036 let js_function = self.function.function.clone().restore(&ctx)?;
1037 let mut indexes = Int32Builder::with_capacity(self.chunk_size);
1038 let mut results = Vec::with_capacity(self.input.num_rows());
1039 let mut row = Vec::with_capacity(self.input.num_columns());
1040 let mut generator = match self.generator.take() {
1042 Some(generator) => {
1043 let gen = generator.restore(&ctx)?;
1044 let next: rquickjs::Function =
1045 gen.get("next").context("failed to get 'next' method")?;
1046 Some((gen, next))
1047 }
1048 None => None,
1049 };
1050 while self.row < self.input.num_rows() && results.len() < self.chunk_size {
1051 let (gen, next) = if let Some(g) = generator.as_ref() {
1052 g
1053 } else {
1054 row.clear();
1056 for (column, field) in
1057 (self.input.columns().iter()).zip(self.input.schema().fields())
1058 {
1059 let val = self
1060 .converter
1061 .get_jsvalue(&ctx, field, column, self.row)
1062 .context("failed to get jsvalue from arrow array")?;
1063 row.push(val);
1064 }
1065 if self.function.options.call_mode == CallMode::ReturnNullOnNullInput
1066 && row.iter().any(|v| v.is_null())
1067 {
1068 self.row += 1;
1069 continue;
1070 }
1071 let mut args = Args::new(ctx.clone(), row.len());
1072 args.push_args(row.drain(..))?;
1073 let gen: Object = self
1077 .rt
1078 .call_user_fn(&ctx, &js_function, args, false).await
1079 .context("failed to call function")?;
1080 let next: rquickjs::Function =
1081 gen.get("next").context("failed to get 'next' method")?;
1082 let mut args = Args::new(ctx.clone(), 0);
1083 args.this(gen.clone())?;
1084 generator.insert((gen, next))
1085 };
1086 let mut args = Args::new(ctx.clone(), 0);
1087 args.this(gen.clone())?;
1088 let object: Object = self
1089 .rt
1090 .call_user_fn(&ctx, next, args, self.function.options.is_async).await
1091 .context("failed to call next")?;
1092 let value: Value = object.get("value")?;
1093 let done: bool = object.get("done")?;
1094 if done {
1095 self.row += 1;
1096 generator = None;
1097 continue;
1098 }
1099 indexes.append_value(self.row as i32);
1100 results.push(value);
1101 }
1102 self.generator = generator.map(|(gen, _)| Persistent::save(&ctx, gen));
1103
1104 if results.is_empty() {
1105 return Ok(None);
1106 }
1107 let indexes = Arc::new(indexes.finish());
1108 let array = self
1109 .converter
1110 .build_array(&self.function.return_field, &ctx, results)
1111 .context("failed to build arrow array from return values")?;
1112 Ok(Some(RecordBatch::try_new(
1113 self.schema.clone(),
1114 vec![indexes, array],
1115 )?))
1116 })
1117 .await
1118 }
1119}
1120
1121impl Stream for RecordBatchIter<'_> {
1122 type Item = Result<RecordBatch>;
1123 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1124 Box::pin(self.next().map(|v| v.transpose()))
1125 .as_mut()
1126 .poll_unpin(cx)
1127 }
1128}
1129
1130pub(crate) fn check_exception(err: rquickjs::Error, ctx: &Ctx) -> anyhow::Error {
1132 match err {
1133 rquickjs::Error::Exception => {
1134 anyhow!("exception generated by QuickJS: {:?}", ctx.catch())
1135 }
1136 e => e.into(),
1137 }
1138}