1#![doc = include_str!("README.md")]
16
17use std::collections::HashMap;
18use std::fmt::Debug;
19use std::pin::Pin;
20use std::sync::{Arc, atomic::Ordering};
21use std::task::{Context, Poll};
22use std::time::{Duration, Instant};
23
24use anyhow::{Context as _, Result, anyhow, bail};
25use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch, builder::Int32Builder};
26use arrow_schema::{DataType, Field, FieldRef, Schema, SchemaRef};
27use futures_util::{FutureExt, Stream};
28use rquickjs::context::intrinsic::{All, Base};
29pub use rquickjs::runtime::MemoryUsage;
30use rquickjs::{
31 Array as JsArray, AsyncContext, AsyncRuntime, Ctx, FromJs, IteratorJs as _, Module, Object,
32 Persistent, Promise, Value, async_with, function::Args, module::Evaluated,
33};
34
35use crate::CallMode;
36use crate::into_field::IntoField;
37
38#[cfg(feature = "javascript-fetch")]
39mod fetch;
40mod jsarrow;
41
42pub struct Runtime {
67 functions: HashMap<String, Function>,
68 aggregates: HashMap<String, Aggregate>,
69 converter: jsarrow::Converter,
71 runtime: AsyncRuntime,
72 context: AsyncContext,
73 timeout: Option<Duration>,
75 deadline: Arc<atomic_time::AtomicOptionInstant>,
77}
78
79impl Debug for Runtime {
80 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81 f.debug_struct("Runtime")
82 .field("functions", &self.functions.keys())
83 .field("aggregates", &self.aggregates.keys())
84 .field("timeout", &self.timeout)
85 .finish()
86 }
87}
88
89struct Function {
91 function: JsFunction,
92 return_field: FieldRef,
93 options: FunctionOptions,
94}
95
96struct Aggregate {
98 state_field: FieldRef,
99 output_field: FieldRef,
100 create_state: JsFunction,
101 accumulate: JsFunction,
102 retract: Option<JsFunction>,
103 finish: Option<JsFunction>,
104 merge: Option<JsFunction>,
105 options: AggregateOptions,
106}
107
108unsafe impl Send for Function {}
113unsafe impl Sync for Function {}
114unsafe impl Send for Aggregate {}
115unsafe impl Sync for Aggregate {}
116
117type JsFunction = Persistent<rquickjs::Function<'static>>;
119
120unsafe impl Send for Runtime {}
122unsafe impl Sync for Runtime {}
123
124#[derive(Debug, Clone, Default)]
126pub struct FunctionOptions {
127 pub call_mode: CallMode,
129 pub is_async: bool,
131 pub is_batched: bool,
133 pub handler: Option<String>,
136}
137
138impl FunctionOptions {
139 pub fn return_null_on_null_input(mut self) -> Self {
142 self.call_mode = CallMode::ReturnNullOnNullInput;
143 self
144 }
145
146 pub fn async_mode(mut self) -> Self {
148 self.is_async = true;
149 self
150 }
151
152 pub fn batched(mut self) -> Self {
154 self.is_batched = true;
155 self
156 }
157
158 pub fn handler(mut self, handler: impl Into<String>) -> Self {
160 self.handler = Some(handler.into());
161 self
162 }
163}
164
165#[derive(Debug, Clone, Default)]
167pub struct AggregateOptions {
168 pub call_mode: CallMode,
170 pub is_async: bool,
172}
173
174impl AggregateOptions {
175 pub fn return_null_on_null_input(mut self) -> Self {
178 self.call_mode = CallMode::ReturnNullOnNullInput;
179 self
180 }
181
182 pub fn async_mode(mut self) -> Self {
184 self.is_async = true;
185 self
186 }
187}
188
189impl Runtime {
190 pub async fn new() -> Result<Self> {
202 let runtime = AsyncRuntime::new().context("failed to create quickjs runtime")?;
203 let context = AsyncContext::custom::<(Base, All)>(&runtime)
204 .await
205 .context("failed to create quickjs context")?;
206
207 Ok(Self {
208 functions: HashMap::new(),
209 aggregates: HashMap::new(),
210 runtime,
211 context,
212 timeout: None,
213 deadline: Default::default(),
214 converter: jsarrow::Converter::new(),
215 })
216 }
217
218 pub async fn set_memory_limit(&self, limit: Option<usize>) {
230 self.runtime.set_memory_limit(limit.unwrap_or(0)).await;
231 }
232
233 pub async fn set_timeout(&mut self, timeout: Option<Duration>) {
246 self.timeout = timeout;
247 if timeout.is_some() {
248 let deadline = self.deadline.clone();
249 self.runtime
250 .set_interrupt_handler(Some(Box::new(move || {
251 if let Some(deadline) = deadline.load(Ordering::Relaxed) {
252 return deadline <= Instant::now();
253 }
254 false
255 })))
256 .await;
257 } else {
258 self.runtime.set_interrupt_handler(None).await;
259 }
260 }
261
262 pub fn inner(&self) -> &AsyncRuntime {
264 &self.runtime
265 }
266
267 pub fn converter_mut(&mut self) -> &mut jsarrow::Converter {
269 &mut self.converter
270 }
271
272 pub async fn add_function(
329 &mut self,
330 name: &str,
331 return_type: impl IntoField + Send,
332 code: &str,
333 options: FunctionOptions,
334 ) -> Result<()> {
335 let function = async_with!(self.context => |ctx| {
336 let (module, _) = Module::declare(ctx.clone(), name, code)
337 .map_err(|e| check_exception(e, &ctx))
338 .context("failed to declare module")?
339 .eval()
340 .map_err(|e| check_exception(e, &ctx))
341 .context("failed to evaluate module")?;
342 let function = Self::get_function(&ctx, &module, options.handler.as_deref().unwrap_or(name))?;
343 Ok(Function {
344 function,
345 return_field: return_type.into_field(name).into(),
346 options,
347 }) as Result<Function>
348 })
349 .await?;
350 self.functions.insert(name.to_string(), function);
351 Ok(())
352 }
353
354 fn get_function<'a>(
356 ctx: &Ctx<'a>,
357 module: &Module<'a, Evaluated>,
358 name: &str,
359 ) -> Result<JsFunction> {
360 let function: rquickjs::Function = module.get(name).with_context(|| {
361 format!("function \"{name}\" not found. HINT: make sure the function is exported")
362 })?;
363 Ok(Persistent::save(ctx, function))
364 }
365
366 pub async fn add_aggregate(
424 &mut self,
425 name: &str,
426 state_type: impl IntoField + Send,
427 output_type: impl IntoField + Send,
428 code: &str,
429 options: AggregateOptions,
430 ) -> Result<()> {
431 let aggregate = async_with!(self.context => |ctx| {
432 let (module, _) = Module::declare(ctx.clone(), name, code)
433 .map_err(|e| check_exception(e, &ctx))
434 .context("failed to declare module")?
435 .eval()
436 .map_err(|e| check_exception(e, &ctx))
437 .context("failed to evaluate module")?;
438 Ok(Aggregate {
439 state_field: state_type.into_field(name).into(),
440 output_field: output_type.into_field(name).into(),
441 create_state: Self::get_function(&ctx, &module, "create_state")?,
442 accumulate: Self::get_function(&ctx, &module, "accumulate")?,
443 retract: Self::get_function(&ctx, &module, "retract").ok(),
444 finish: Self::get_function(&ctx, &module, "finish").ok(),
445 merge: Self::get_function(&ctx, &module, "merge").ok(),
446 options,
447 }) as Result<Aggregate>
448 })
449 .await?;
450
451 if aggregate.finish.is_none() && aggregate.state_field != aggregate.output_field {
452 bail!("`output_type` must be the same as `state_type` when `finish` is not defined");
453 }
454 self.aggregates.insert(name.to_string(), aggregate);
455 Ok(())
456 }
457
458 #[doc = include_str!("doc_create_function.txt")]
465 pub async fn call(&self, name: &str, input: &RecordBatch) -> Result<RecordBatch> {
481 let function = self.functions.get(name).context("function not found")?;
482
483 async_with!(self.context => |ctx| {
484 if function.options.is_batched {
485 self.call_batched_function(&ctx, function, input).await
486 } else {
487 self.call_non_batched_function(&ctx, function, input).await
488 }
489 })
490 .await
491 }
492
493 async fn call_non_batched_function(
494 &self,
495 ctx: &Ctx<'_>,
496 function: &Function,
497 input: &RecordBatch,
498 ) -> Result<RecordBatch> {
499 let js_function = function.function.clone().restore(ctx)?;
500
501 let mut results = Vec::with_capacity(input.num_rows());
502 let mut row = Vec::with_capacity(input.num_columns());
503 for i in 0..input.num_rows() {
504 row.clear();
505 for (column, field) in input.columns().iter().zip(input.schema().fields()) {
506 let val = self
507 .converter
508 .get_jsvalue(ctx, field, column, i)
509 .context("failed to get jsvalue from arrow array")?;
510
511 row.push(val);
512 }
513 if function.options.call_mode == CallMode::ReturnNullOnNullInput
514 && row.iter().any(|v| v.is_null())
515 {
516 results.push(Value::new_null(ctx.clone()));
517 continue;
518 }
519 let mut args = Args::new(ctx.clone(), row.len());
520 args.push_args(row.drain(..))?;
521 let result = self
522 .call_user_fn(ctx, &js_function, args, function.options.is_async)
523 .await
524 .context("failed to call function")?;
525 results.push(result);
526 }
527
528 let array = self
529 .converter
530 .build_array(&function.return_field, ctx, results)
531 .context("failed to build arrow array from return values")?;
532 let schema = Schema::new(vec![function.return_field.clone()]);
533 Ok(RecordBatch::try_new(Arc::new(schema), vec![array])?)
534 }
535
536 async fn call_batched_function(
537 &self,
538 ctx: &Ctx<'_>,
539 function: &Function,
540 input: &RecordBatch,
541 ) -> Result<RecordBatch> {
542 let js_function = function.function.clone().restore(ctx)?;
543
544 let mut js_columns = Vec::with_capacity(input.num_columns());
545 for (column, field) in input.columns().iter().zip(input.schema().fields()) {
546 let mut js_values = Vec::with_capacity(input.num_rows());
547 for i in 0..input.num_rows() {
548 let val = self
549 .converter
550 .get_jsvalue(ctx, field, column, i)
551 .context("failed to get jsvalue from arrow array")?;
552 js_values.push(val);
553 }
554 js_columns.push(js_values);
555 }
556
557 let result = match function.options.call_mode {
558 CallMode::CalledOnNullInput => {
559 let mut args = Args::new(ctx.clone(), input.num_columns());
560 for js_values in js_columns {
561 let js_array = js_values.into_iter().collect_js::<JsArray>(ctx)?;
562 args.push_arg(js_array)?;
563 }
564 self.call_user_fn(ctx, &js_function, args, function.options.is_async)
565 .await
566 .context("failed to call function")?
567 }
568 CallMode::ReturnNullOnNullInput => {
569 let n_cols = input.num_columns();
572 let n_rows = input.num_rows();
573
574 let bitmap: Vec<bool> = (0..n_rows)
576 .map(|row_idx| {
577 let has_null = (0..n_cols).any(|j| js_columns[j][row_idx].is_null());
578 !has_null
579 })
580 .collect();
581
582 let mut filtered_columns = Vec::with_capacity(n_cols);
584 for js_values in js_columns {
585 let filtered_js_values: Vec<_> = js_values
586 .into_iter()
587 .zip(bitmap.iter())
588 .filter(|(_, b)| **b)
589 .map(|(v, _)| v)
590 .collect();
591 filtered_columns.push(filtered_js_values);
592 }
593
594 let mut args = Args::new(ctx.clone(), filtered_columns.len());
596 for js_values in filtered_columns {
597 let js_array = js_values.into_iter().collect_js::<JsArray>(ctx)?;
598 args.push_arg(js_array)?;
599 }
600 let filtered_result: Vec<_> = self
601 .call_user_fn(ctx, &js_function, args, function.options.is_async)
602 .await
603 .context("failed to call function")?;
604 let mut iter = filtered_result.into_iter();
605
606 let mut result = Vec::with_capacity(n_rows);
608 for b in bitmap.iter() {
609 if *b {
610 let v = iter.next().expect("filtered result length mismatch");
611 result.push(v);
612 } else {
613 result.push(Value::new_null(ctx.clone()));
614 }
615 }
616 assert!(iter.next().is_none(), "filtered result length mismatch");
617 result
618 }
619 };
620 let array = self
621 .converter
622 .build_array(&function.return_field, ctx, result)?;
623 let schema = Schema::new(vec![function.return_field.clone()]);
624 Ok(RecordBatch::try_new(Arc::new(schema), vec![array])?)
625 }
626
627 #[doc = include_str!("doc_create_function.txt")]
634 pub fn call_table_function<'a>(
656 &'a self,
657 name: &'a str,
658 input: &'a RecordBatch,
659 chunk_size: usize,
660 ) -> Result<RecordBatchIter<'a>> {
661 assert!(chunk_size > 0);
662 let function = self.functions.get(name).context("function not found")?;
663 if function.options.is_batched {
664 bail!("table function does not support batched mode");
665 }
666
667 Ok(RecordBatchIter {
669 rt: self,
670 input,
671 function,
672 schema: Arc::new(Schema::new(vec![
673 Arc::new(Field::new("row", DataType::Int32, false)),
674 function.return_field.clone(),
675 ])),
676 chunk_size,
677 row: 0,
678 generator: None,
679 converter: &self.converter,
680 })
681 }
682
683 #[doc = include_str!("doc_create_aggregate.txt")]
689 pub async fn create_state(&self, name: &str) -> Result<ArrayRef> {
694 let aggregate = self.aggregates.get(name).context("function not found")?;
695 let state = async_with!(self.context => |ctx| {
696 let create_state = aggregate.create_state.clone().restore(&ctx)?;
697 let state = self
698 .call_user_fn(&ctx, &create_state, Args::new(ctx.clone(), 0), aggregate.options.is_async)
699 .await
700 .context("failed to call create_state")?;
701 let state = self
702 .converter
703 .build_array(&aggregate.state_field, &ctx, vec![state])?;
704 Ok(state) as Result<_>
705 })
706 .await?;
707 Ok(state)
708 }
709
710 #[doc = include_str!("doc_create_aggregate.txt")]
716 pub async fn accumulate(
727 &self,
728 name: &str,
729 state: &dyn Array,
730 input: &RecordBatch,
731 ) -> Result<ArrayRef> {
732 let aggregate = self.aggregates.get(name).context("function not found")?;
733 let new_state = async_with!(self.context => |ctx| {
735 let accumulate = aggregate.accumulate.clone().restore(&ctx)?;
736 let mut state = self
737 .converter
738 .get_jsvalue(&ctx, &aggregate.state_field, state, 0)?;
739
740 let mut row = Vec::with_capacity(1 + input.num_columns());
741 for i in 0..input.num_rows() {
742 if aggregate.options.call_mode == CallMode::ReturnNullOnNullInput
743 && input.columns().iter().any(|column| column.is_null(i))
744 {
745 continue;
746 }
747 row.clear();
748 row.push(state.clone());
749 for (column, field) in input.columns().iter().zip(input.schema().fields()) {
750 let pyobj = self.converter.get_jsvalue(&ctx, field, column, i)?;
751 row.push(pyobj);
752 }
753 let mut args = Args::new(ctx.clone(), row.len());
754 args.push_args(row.drain(..))?;
755 state = self
756 .call_user_fn(&ctx, &accumulate, args, aggregate.options.is_async)
757 .await
758 .context("failed to call accumulate")?;
759 }
760 let output = self
761 .converter
762 .build_array(&aggregate.state_field, &ctx, vec![state])?;
763 Ok(output) as Result<_>
764 })
765 .await?;
766 Ok(new_state)
767 }
768
769 #[doc = include_str!("doc_create_aggregate.txt")]
778 pub async fn accumulate_or_retract(
790 &self,
791 name: &str,
792 state: &dyn Array,
793 ops: &BooleanArray,
794 input: &RecordBatch,
795 ) -> Result<ArrayRef> {
796 let aggregate = self.aggregates.get(name).context("function not found")?;
797 let new_state = async_with!(self.context => |ctx| {
799 let accumulate = aggregate.accumulate.clone().restore(&ctx)?;
800 let retract = aggregate
801 .retract
802 .clone()
803 .context("function does not support retraction")?
804 .restore(&ctx)?;
805
806 let mut state = self
807 .converter
808 .get_jsvalue(&ctx, &aggregate.state_field, state, 0)?;
809
810 let mut row = Vec::with_capacity(1 + input.num_columns());
811 for i in 0..input.num_rows() {
812 if aggregate.options.call_mode == CallMode::ReturnNullOnNullInput
813 && input.columns().iter().any(|column| column.is_null(i))
814 {
815 continue;
816 }
817 row.clear();
818 row.push(state.clone());
819 for (column, field) in input.columns().iter().zip(input.schema().fields()) {
820 let pyobj = self.converter.get_jsvalue(&ctx, field, column, i)?;
821 row.push(pyobj);
822 }
823 let func = if ops.is_valid(i) && ops.value(i) {
824 &retract
825 } else {
826 &accumulate
827 };
828 let mut args = Args::new(ctx.clone(), row.len());
829 args.push_args(row.drain(..))?;
830 state = self
831 .call_user_fn(&ctx, func, args, aggregate.options.is_async)
832 .await
833 .context("failed to call accumulate or retract")?;
834 }
835 let output = self
836 .converter
837 .build_array(&aggregate.state_field, &ctx, vec![state])?;
838 Ok(output) as Result<_>
839 })
840 .await?;
841 Ok(new_state)
842 }
843
844 #[doc = include_str!("doc_create_aggregate.txt")]
850 pub async fn merge(&self, name: &str, states: &dyn Array) -> Result<ArrayRef> {
857 let aggregate = self.aggregates.get(name).context("function not found")?;
858 let output = async_with!(self.context => |ctx| {
859 let merge = aggregate
860 .merge
861 .clone()
862 .context("merge not found")?
863 .restore(&ctx)?;
864 let mut state = self
865 .converter
866 .get_jsvalue(&ctx, &aggregate.state_field, states, 0)?;
867 for i in 1..states.len() {
868 if aggregate.options.call_mode == CallMode::ReturnNullOnNullInput && states.is_null(i) {
869 continue;
870 }
871 let state2 = self
872 .converter
873 .get_jsvalue(&ctx, &aggregate.state_field, states, i)?;
874 let mut args = Args::new(ctx.clone(), 2);
875 args.push_args([state, state2])?;
876 state = self
877 .call_user_fn(&ctx, &merge, args, aggregate.options.is_async)
878 .await
879 .context("failed to call accumulate or retract")?;
880 }
881 let output = self
882 .converter
883 .build_array(&aggregate.state_field, &ctx, vec![state])?;
884 Ok(output) as Result<_>
885 })
886 .await?;
887 Ok(output)
888 }
889
890 #[doc = include_str!("doc_create_aggregate.txt")]
898 pub async fn finish(&self, name: &str, states: &ArrayRef) -> Result<ArrayRef> {
905 let aggregate = self.aggregates.get(name).context("function not found")?;
906 if aggregate.finish.is_none() {
907 return Ok(states.clone());
908 };
909 let output = async_with!(self.context => |ctx| {
910 let finish = aggregate.finish.clone().unwrap().restore(&ctx)?;
911 let mut results = Vec::with_capacity(states.len());
912 for i in 0..states.len() {
913 if aggregate.options.call_mode == CallMode::ReturnNullOnNullInput && states.is_null(i) {
914 results.push(Value::new_null(ctx.clone()));
915 continue;
916 }
917 let state =
918 self.converter
919 .get_jsvalue(&ctx, &aggregate.state_field, states, i)?;
920 let mut args = Args::new(ctx.clone(), 1);
921 args.push_args([state])?;
922 let result = self
923 .call_user_fn(&ctx, &finish, args, aggregate.options.is_async)
924 .await
925 .context("failed to call finish")?;
926 results.push(result);
927 }
928 let output = self
929 .converter
930 .build_array(&aggregate.output_field, &ctx, results)?;
931 Ok(output) as Result<_>
932 })
933 .await?;
934 Ok(output)
935 }
936
937 async fn call_user_fn<'js, T: FromJs<'js>>(
941 &self,
942 ctx: &Ctx<'js>,
943 f: &rquickjs::Function<'js>,
944 args: Args<'js>,
945 is_async: bool,
946 ) -> Result<T> {
947 if is_async {
948 Self::call_user_fn_async(self, ctx, f, args).await
949 } else {
950 Self::call_user_fn_sync(self, ctx, f, args)
951 }
952 }
953
954 async fn call_user_fn_async<'js, T: FromJs<'js>>(
955 &self,
956 ctx: &Ctx<'js>,
957 f: &rquickjs::Function<'js>,
958 args: Args<'js>,
959 ) -> Result<T> {
960 let call_result = if let Some(timeout) = self.timeout {
961 self.deadline
962 .store(Some(Instant::now() + timeout), Ordering::Relaxed);
963 let call_result = f.call_arg::<Promise>(args);
964 self.deadline.store(None, Ordering::Relaxed);
965 call_result
966 } else {
967 f.call_arg::<Promise>(args)
968 };
969 let promise = call_result.map_err(|e| check_exception(e, ctx))?;
970 promise
971 .into_future::<T>()
972 .await
973 .map_err(|e| check_exception(e, ctx))
974 }
975
976 fn call_user_fn_sync<'js, T: FromJs<'js>>(
977 &self,
978 ctx: &Ctx<'js>,
979 f: &rquickjs::Function<'js>,
980 args: Args<'js>,
981 ) -> Result<T> {
982 let result = if let Some(timeout) = self.timeout {
983 self.deadline
984 .store(Some(Instant::now() + timeout), Ordering::Relaxed);
985 let result = f.call_arg(args);
986 self.deadline.store(None, Ordering::Relaxed);
987 result
988 } else {
989 f.call_arg(args)
990 };
991 result.map_err(|e| check_exception(e, ctx))
992 }
993
994 pub fn context(&self) -> &AsyncContext {
995 &self.context
996 }
997
998 #[cfg(feature = "javascript-fetch")]
1002 pub async fn enable_fetch(&self) -> Result<()> {
1003 fetch::enable_fetch(&self.runtime, &self.context).await
1004 }
1005}
1006
1007pub struct RecordBatchIter<'a> {
1009 rt: &'a Runtime,
1010 input: &'a RecordBatch,
1011 function: &'a Function,
1013 schema: SchemaRef,
1014 chunk_size: usize,
1015 row: usize,
1018 generator: Option<Persistent<Object<'static>>>,
1020 converter: &'a jsarrow::Converter,
1021}
1022
1023unsafe impl Send for RecordBatchIter<'_> {}
1025
1026impl RecordBatchIter<'_> {
1027 pub fn schema(&self) -> &Schema {
1029 &self.schema
1030 }
1031
1032 pub async fn next(&mut self) -> Result<Option<RecordBatch>> {
1033 if self.row == self.input.num_rows() {
1034 return Ok(None);
1035 }
1036 async_with!(self.rt.context => |ctx| {
1037 let js_function = self.function.function.clone().restore(&ctx)?;
1038 let mut indexes = Int32Builder::with_capacity(self.chunk_size);
1039 let mut results = Vec::with_capacity(self.input.num_rows());
1040 let mut row = Vec::with_capacity(self.input.num_columns());
1041 let mut generator = match self.generator.take() {
1043 Some(generator) => {
1044 let generator_obj = generator.restore(&ctx)?;
1045 let next: rquickjs::Function =
1046 generator_obj.get("next").context("failed to get 'next' method")?;
1047 Some((generator_obj, next))
1048 }
1049 None => None,
1050 };
1051 while self.row < self.input.num_rows() && results.len() < self.chunk_size {
1052 let (generator_obj, next) = if let Some(g) = generator.as_ref() {
1053 g
1054 } else {
1055 row.clear();
1057 for (column, field) in
1058 (self.input.columns().iter()).zip(self.input.schema().fields())
1059 {
1060 let val = self
1061 .converter
1062 .get_jsvalue(&ctx, field, column, self.row)
1063 .context("failed to get jsvalue from arrow array")?;
1064 row.push(val);
1065 }
1066 if self.function.options.call_mode == CallMode::ReturnNullOnNullInput
1067 && row.iter().any(|v| v.is_null())
1068 {
1069 self.row += 1;
1070 continue;
1071 }
1072 let mut args = Args::new(ctx.clone(), row.len());
1073 args.push_args(row.drain(..))?;
1074 let generator_obj: Object = self
1078 .rt
1079 .call_user_fn(&ctx, &js_function, args, false).await
1080 .context("failed to call function")?;
1081 let next: rquickjs::Function =
1082 generator_obj.get("next").context("failed to get 'next' method")?;
1083 let mut args = Args::new(ctx.clone(), 0);
1084 args.this(generator_obj.clone())?;
1085 generator.insert((generator_obj, next))
1086 };
1087 let mut args = Args::new(ctx.clone(), 0);
1088 args.this(generator_obj.clone())?;
1089 let object: Object = self
1090 .rt
1091 .call_user_fn(&ctx, next, args, self.function.options.is_async).await
1092 .context("failed to call next")?;
1093 let value: Value = object.get("value")?;
1094 let done: bool = object.get("done")?;
1095 if done {
1096 self.row += 1;
1097 generator = None;
1098 continue;
1099 }
1100 indexes.append_value(self.row as i32);
1101 results.push(value);
1102 }
1103 self.generator = generator.map(|(generator_obj, _)| Persistent::save(&ctx, generator_obj));
1104
1105 if results.is_empty() {
1106 return Ok(None);
1107 }
1108 let indexes = Arc::new(indexes.finish());
1109 let array = self
1110 .converter
1111 .build_array(&self.function.return_field, &ctx, results)
1112 .context("failed to build arrow array from return values")?;
1113 Ok(Some(RecordBatch::try_new(
1114 self.schema.clone(),
1115 vec![indexes, array],
1116 )?))
1117 })
1118 .await
1119 }
1120}
1121
1122impl Stream for RecordBatchIter<'_> {
1123 type Item = Result<RecordBatch>;
1124 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1125 Box::pin(self.next().map(|v| v.transpose()))
1126 .as_mut()
1127 .poll_unpin(cx)
1128 }
1129}
1130
1131pub(crate) fn check_exception(err: rquickjs::Error, ctx: &Ctx) -> anyhow::Error {
1133 match err {
1134 rquickjs::Error::Exception => {
1135 anyhow!("exception generated by QuickJS: {:?}", ctx.catch())
1136 }
1137 e => e.into(),
1138 }
1139}