typeline_ext_utils/
exec.rs

1use std::{
2    ffi::{OsStr, OsString},
3    io::{ErrorKind, Read},
4    os::unix::process::ExitStatusExt,
5    process::{Child, Command, ExitStatus, Stdio},
6    sync::Arc,
7};
8
9use bstr::ByteSlice;
10use metamatch::metamatch;
11use mio::{
12    unix::pipe::{Receiver, Sender},
13    Events, Poll,
14};
15use typeline_core::{
16    chain::ChainId,
17    cli::{
18        call_expr::{CallExpr, ParsedArgValue, Span},
19        CliArgumentError,
20    },
21    context::SessionData,
22    index_newtype,
23    job::{Job, JobData},
24    liveness_analysis::{
25        BasicBlockId, LivenessData, OpOutputIdx, OperatorLivenessOutput,
26    },
27    operators::{
28        errors::{OperatorApplicationError, OperatorCreationError},
29        format::{
30            access_format_key_refs, parse_format_string, FormatKey,
31            FormatKeyRefData, FormatKeyRefId, FormatPart, FormatPartIndex,
32        },
33        operator::{
34            OffsetInChain, Operator, OperatorDataId, OperatorId,
35            OperatorOffsetInChain, PreboundOutputsMap, TransformInstatiation,
36        },
37        transform::{Transform, TransformId, TransformState},
38    },
39    options::{
40        chain_settings::{
41            SettingStreamBufferSize, SettingStreamSizeThreshold,
42        },
43        session_setup::SessionSetupData,
44    },
45    record_data::{
46        action_buffer::ActorRef,
47        bytes_insertion_stream::BytesInsertionStream,
48        field::{FieldId, FieldIterRef},
49        field_data::{FieldData, FieldValueRepr},
50        field_data_ref::FieldDataRef,
51        field_value::FieldValueKind,
52        field_value_ref::FieldValueSlice,
53        formattable::RealizedFormatKey,
54        iter::{
55            field_iter::FieldIter,
56            field_iterator::FieldIterOpts,
57            field_value_slice_iter::FieldValueRangeIter,
58            iter_adapters::UnfoldIterRunLength,
59            ref_iter::{
60                AutoDerefIter, RefAwareBytesBufferIter,
61                RefAwareFieldValueRangeIter, RefAwareInlineBytesIter,
62                RefAwareInlineTextIter, RefAwareTextBufferIter,
63                RefAwareUnfoldIterRunLength,
64            },
65        },
66        iter_hall::IterKind,
67        match_set::MatchSetManager,
68        push_interface::PushInterface,
69        stream_value::{
70            StreamValue, StreamValueBufferMode, StreamValueData,
71            StreamValueDataOffset, StreamValueDataType, StreamValueId,
72            StreamValueManager, StreamValueUpdate,
73        },
74        varying_type_inserter::VaryingTypeInserter,
75    },
76    typeline_error::TypelineError,
77    utils::{
78        index_vec::IndexVec,
79        indexing_type::IndexingType,
80        int_string_conversions::{f64_to_str, i64_to_str},
81        maybe_text::MaybeText,
82        string_store::{StringStoreEntry, INVALID_STRING_STORE_ENTRY},
83        universe::{CountedUniverse, Universe},
84    },
85};
86
87index_newtype! {
88    pub struct ExecArgIdx(u32);
89}
90
91#[derive(Default, Clone)]
92pub struct OpExecOpts {
93    propagate_input: bool,
94    // TODO: more stuff
95}
96
97pub struct OpExec {
98    fmt_parts: IndexVec<FormatPartIndex, FormatPart>,
99    refs: IndexVec<FormatKeyRefId, FormatKeyRefData>,
100    fmt_arg_part_ends: IndexVec<ExecArgIdx, FormatPartIndex>,
101    stderr_field_name: StringStoreEntry,
102    exit_code_field_name: StringStoreEntry,
103    opts: OpExecOpts,
104}
105
106struct CommandArgs {
107    stdin_sv: Option<StreamValueId>,
108    fake_stdin: bool,
109    args: Vec<OsString>,
110    error: Option<OperatorApplicationError>,
111}
112
113struct InStream {
114    sender: Option<Sender>,
115    token: Option<CommandOutputTokenId>,
116    sv_id: Option<StreamValueId>,
117    stdin_offset: StreamValueDataOffset,
118}
119
120struct OutStream {
121    receiver: Option<Receiver>,
122    token: Option<CommandOutputTokenId>,
123    sv_id: Option<StreamValueId>,
124    sv_appended: bool,
125}
126
127const STDOUT_IDX: usize = 0;
128const STDERR_IDX: usize = 1;
129
130struct RunningCommand {
131    proc: Child,
132    in_stream: InStream,
133    out_streams: [OutStream; 2],
134    exit_code_sv_id: StreamValueId,
135    poll_requested: bool,
136}
137index_newtype! {
138    struct RunningCommandIdx(u32);
139}
140type CommandOutputTokenId = usize;
141
142#[derive(Clone, Copy)]
143enum CommandStreamIdx {
144    Stdin(RunningCommandIdx),
145    Stdout(RunningCommandIdx),
146    Stderr(RunningCommandIdx),
147}
148
149pub struct TfExec<'a> {
150    op: &'a OpExec,
151    token_universe: Universe<usize, CommandStreamIdx>,
152    iters: IndexVec<FormatKeyRefId, FieldIterRef>,
153    command_args: Vec<CommandArgs>,
154    running_commands: CountedUniverse<RunningCommandIdx, RunningCommand>,
155    stderr_field: FieldId,
156    exit_code_field: FieldId,
157    input_iter: Option<FieldIterRef>,
158    stream_buffer_size: usize,
159    stream_buffer_threshold: usize,
160    poll: Poll,
161    events: Option<Events>,
162    commands_to_poll: Vec<RunningCommandIdx>,
163}
164
165impl CommandStreamIdx {
166    fn id(&self) -> RunningCommandIdx {
167        match *self {
168            CommandStreamIdx::Stdin(id) => id,
169            CommandStreamIdx::Stdout(id) => id,
170            CommandStreamIdx::Stderr(id) => id,
171        }
172    }
173}
174
175impl Operator for OpExec {
176    fn default_name(
177        &self,
178    ) -> typeline_core::operators::operator::OperatorName {
179        "exec".into()
180    }
181
182    fn output_count(&self, _sess: &SessionData, _op_id: OperatorId) -> usize {
183        2
184    }
185
186    fn setup(
187        &mut self,
188        sess: &mut SessionSetupData,
189        op_data_id: OperatorDataId,
190        chain_id: ChainId,
191        offset_in_chain: OperatorOffsetInChain,
192        span: Span,
193    ) -> Result<OperatorId, TypelineError> {
194        for r in &mut self.refs {
195            r.name_interned =
196                r.name.as_ref().map(|n| sess.string_store.intern_cloned(n));
197        }
198        self.stderr_field_name = sess.string_store.intern_static("stderr");
199        self.exit_code_field_name =
200            sess.string_store.intern_cloned("exit_code");
201        Ok(sess.add_op(op_data_id, chain_id, offset_in_chain, span))
202    }
203
204    fn has_dynamic_outputs(
205        &self,
206        _sess: &SessionData,
207        _op_id: OperatorId,
208    ) -> bool {
209        false
210    }
211
212    fn update_variable_liveness(
213        &self,
214        sess: &SessionData,
215        ld: &mut LivenessData,
216        op_offset_after_last_write: OffsetInChain,
217        op_id: OperatorId,
218        _bb_id: BasicBlockId,
219        _input_field: OpOutputIdx,
220        output: &mut OperatorLivenessOutput,
221    ) {
222        output.flags.may_dup_or_drop = false;
223        // might be set to true again in the loop below
224        output.flags.non_stringified_input_access = false;
225        output.flags.input_accessed = false;
226        for p in &self.fmt_parts {
227            match p {
228                FormatPart::ByteLiteral(_) | FormatPart::TextLiteral(_) => (),
229                FormatPart::Key(fk) => {
230                    access_format_key_refs(
231                        fk,
232                        sess,
233                        ld,
234                        &self.refs,
235                        op_id,
236                        op_offset_after_last_write,
237                        &mut output.flags,
238                    );
239                }
240            }
241        }
242    }
243
244    fn build_transforms<'a>(
245        &'a self,
246        job: &mut Job,
247        tf_state: &mut TransformState,
248        _op_id: OperatorId,
249        _prebound_outputs: &PreboundOutputsMap,
250    ) -> TransformInstatiation<'a> {
251        let jd = &mut job.job_data;
252        let ms = &jd.match_set_mgr.match_sets[tf_state.match_set_id];
253        let scope_id = ms.active_scope;
254        let next_actor = ms.action_buffer.borrow().peek_next_actor_id();
255        let mut iters = IndexVec::new();
256
257        let iter_kind =
258            IterKind::Transform(jd.tf_mgr.transforms.peek_claim_id());
259        for r in &self.refs {
260            let field_id = if let Some(name) = r.name_interned {
261                jd.scope_mgr
262                    .lookup_field(ms.active_scope, name)
263                    .unwrap_or(ms.dummy_field)
264            } else {
265                tf_state.input_field
266            };
267            iters.push(FieldIterRef {
268                field_id,
269                iter_id: jd
270                    .field_mgr
271                    .claim_iter(field_id, next_actor, iter_kind),
272            });
273        }
274
275        let stderr_field = jd.field_mgr.add_field(
276            &jd.match_set_mgr,
277            tf_state.match_set_id,
278            ActorRef::Unconfirmed(next_actor),
279        );
280        jd.scope_mgr.insert_field_name(
281            scope_id,
282            self.stderr_field_name,
283            stderr_field,
284        );
285        let exit_code_field = jd.field_mgr.add_field(
286            &jd.match_set_mgr,
287            tf_state.match_set_id,
288            ActorRef::Unconfirmed(next_actor),
289        );
290        jd.scope_mgr.insert_field_name(
291            scope_id,
292            self.exit_code_field_name,
293            exit_code_field,
294        );
295
296        let stream_buffer_size = jd
297            .get_scope_setting_or_default::<SettingStreamBufferSize>(scope_id);
298
299        let stream_buffer_threshold = jd
300            .get_scope_setting_or_default::<SettingStreamSizeThreshold>(
301                scope_id,
302            );
303
304        let input_iter = if self.opts.propagate_input {
305            Some(FieldIterRef {
306                iter_id: jd.field_mgr.claim_iter(
307                    tf_state.input_field,
308                    next_actor,
309                    iter_kind,
310                ),
311                field_id: tf_state.input_field,
312            })
313        } else {
314            None
315        };
316
317        TransformInstatiation::Single(Box::new(TfExec {
318            op: self,
319            iters,
320            token_universe: Universe::default(),
321            stderr_field,
322            command_args: Vec::new(),
323            running_commands: CountedUniverse::default(),
324            input_iter,
325            exit_code_field,
326            stream_buffer_size,
327            stream_buffer_threshold,
328            poll: Poll::new().unwrap(),
329            events: Some(Events::with_capacity(64)),
330            commands_to_poll: Vec::new(),
331        }))
332    }
333}
334
335enum ProcessStart {
336    Running {
337        stdin: InStream,
338        stdout: OutStream,
339        stderr: OutStream,
340        exit_stream: StreamValueId,
341    },
342    Done(ExitStatus),
343}
344
345impl<'a> TfExec<'a> {
346    fn push_text(
347        &mut self,
348        sv_mgr: &mut StreamValueManager,
349        cmd_idx: usize,
350        arg_idx: usize,
351        text: &str,
352    ) {
353        if arg_idx == 0 {
354            // TODO: maybe consider converting the encoding on non unix?
355            let sv_id = &mut self.command_args[cmd_idx].stdin_sv;
356            if let Some(stdin_sv) = *sv_id {
357                sv_mgr.stream_values[stdin_sv]
358                    .data_inserter(stdin_sv, self.stream_buffer_size, false)
359                    .append_text_copy(text);
360            } else {
361                *sv_id = Some(sv_mgr.stream_values.claim_with_value(
362                    StreamValue::from_data(
363                        Some(StreamValueDataType::MaybeText),
364                        StreamValueData::from_string(text.to_owned()),
365                        StreamValueBufferMode::Stream,
366                        true,
367                    ),
368                ));
369            }
370            return;
371        }
372        self.command_args[cmd_idx].args[arg_idx - 1].push(text);
373    }
374    #[cfg_attr(target_family = "unix", allow(unused))]
375    fn push_bytes(
376        &mut self,
377        sv_mgr: &mut StreamValueManager,
378        op_id: OperatorId,
379        cmd_idx: usize,
380        mut arg_idx: usize,
381        data: &[u8],
382    ) {
383        if arg_idx == 0 {
384            let stdin_sv =
385                if let Some(stdin_sv) = self.command_args[cmd_idx].stdin_sv {
386                    stdin_sv
387                } else {
388                    sv_mgr.stream_values.claim_with_value(
389                        StreamValue::new_empty(
390                            Some(StreamValueDataType::Bytes),
391                            StreamValueBufferMode::Stream,
392                        ),
393                    )
394                };
395            sv_mgr.stream_values[stdin_sv]
396                .data_inserter(stdin_sv, self.stream_buffer_size, false)
397                .append_bytes_copy(data);
398            return;
399        }
400        arg_idx -= 1;
401        #[cfg(target_family = "unix")]
402        {
403            let os_str =
404                <OsStr as std::os::unix::ffi::OsStrExt>::from_bytes(data);
405            self.command_args[cmd_idx].args[arg_idx].push(os_str);
406            return;
407        }
408        #[allow(unreachable_code)]
409        {
410            let cmd_args = &mut self.command_args[cmd_idx];
411            if cmd_args.error.is_some() {
412                return;
413            }
414            match <[u8] as bstr::ByteSlice>::to_os_str(data) {
415                Ok(data) => {
416                    cmd_args.args[arg_idx].push(data);
417                }
418                Err(e) => {
419                    let err = if arg_idx == 0 {
420                        OperatorApplicationError::new(
421                            "invalid utf-8 in program name",
422                            op_id,
423                        )
424                    } else {
425                        OperatorApplicationError::new_s(
426                            format!("invalid utf-8 in cli argument {arg_idx}"),
427                            op_id,
428                        )
429                    };
430                    cmd_args.error = Some(err);
431                }
432            }
433        }
434    }
435    fn push_error(&mut self, cmd_idx: usize, err: OperatorApplicationError) {
436        self.command_args[cmd_idx].error = Some(err)
437    }
438    fn add_iter_to_command_arg<R: FieldDataRef>(
439        &mut self,
440        sv_mgr: &mut StreamValueManager,
441        msm: &MatchSetManager,
442        op_id: OperatorId,
443        cmd_offset: usize,
444        arg_idx: usize,
445        _fmt_key: &FormatKey,
446        iter: &mut AutoDerefIter<FieldIter<R>>,
447    ) {
448        let mut cmd_idx = cmd_offset;
449        while let Some(range) =
450            iter.typed_range_fwd(msm, usize::MAX, FieldIterOpts::default())
451        {
452            metamatch!(match range.base.data {
453                #[expand((REP, ITER) in [
454                    (TextInline, RefAwareInlineTextIter),
455                    (TextBuffer, RefAwareTextBufferIter),
456                ])]
457                FieldValueSlice::REP(text) => {
458                    for v in ITER::from_range(&range, text).unfold_rl() {
459                        self.push_text(sv_mgr, cmd_idx, arg_idx, v);
460                        cmd_idx += 1;
461                    }
462                }
463                #[expand((REP, ITER) in [
464                    (BytesInline, RefAwareInlineBytesIter),
465                    (BytesBuffer, RefAwareBytesBufferIter),
466                ])]
467                FieldValueSlice::REP(bytes) => {
468                    for v in ITER::from_range(&range, bytes).unfold_rl() {
469                        self.push_bytes(sv_mgr, op_id, cmd_idx, arg_idx, v);
470                        cmd_idx += 1;
471                    }
472                }
473
474                #[expand((REP, CONV_FN) in [
475                    (Int, i64_to_str(false, *v)),
476                    (Float, f64_to_str(*v)),
477                ])]
478                FieldValueSlice::REP(ints) => {
479                    for (v, rl) in
480                        FieldValueRangeIter::from_range(&range, ints)
481                    {
482                        let v = CONV_FN;
483                        for _ in 0..rl {
484                            self.push_bytes(
485                                sv_mgr,
486                                op_id,
487                                cmd_idx,
488                                arg_idx,
489                                v.as_bytes(),
490                            );
491                            cmd_idx += 1;
492                        }
493                    }
494                }
495                FieldValueSlice::BigInt(_)
496                | FieldValueSlice::BigRational(_) => {
497                    todo!();
498                }
499
500                FieldValueSlice::Custom(custom_types) => {
501                    for (v, rl) in RefAwareFieldValueRangeIter::from_range(
502                        &range,
503                        custom_types,
504                    ) {
505                        // TODO //PERF
506                        let mut buf = MaybeText::default();
507
508                        match v.format_raw(
509                            &mut buf,
510                            &RealizedFormatKey::default(),
511                        ) {
512                            Err(e) => {
513                                let op =  OperatorApplicationError::new_s(
514                                        format!(
515                                            "failed to stringify custom type '{}': {e}",
516                                            v.type_name()
517                                        ),
518                                        op_id,
519                                    );
520                                for _ in 0..rl {
521                                    self.push_error(cmd_idx, op.clone());
522                                    cmd_idx += 1;
523                                }
524                            }
525
526                            Ok(()) => {
527                                for _ in 0..rl {
528                                    if let Some(text) = buf.as_str() {
529                                        self.push_text(
530                                            sv_mgr, cmd_idx, arg_idx, text,
531                                        );
532                                    } else {
533                                        self.push_bytes(
534                                            sv_mgr,
535                                            op_id,
536                                            cmd_idx,
537                                            arg_idx,
538                                            buf.as_bytes(),
539                                        );
540                                    }
541                                    cmd_idx += 1;
542                                }
543                            }
544                        }
545                    }
546                }
547                FieldValueSlice::Error(errs) => {
548                    for e in
549                        RefAwareFieldValueRangeIter::from_range(&range, errs)
550                            .unfold_rl()
551                    {
552                        self.push_error(cmd_idx, e.clone());
553                        cmd_idx += 1;
554                    }
555                }
556                FieldValueSlice::StreamValueId(svs) => {
557                    if arg_idx == 0 {
558                        for &sv in RefAwareFieldValueRangeIter::from_range(
559                            &range, svs,
560                        )
561                        .unfold_rl()
562                        {
563                            let ca = &mut self.command_args[cmd_idx];
564                            ca.stdin_sv = Some(sv);
565                            ca.fake_stdin = false;
566                            cmd_idx += 1;
567                        }
568                    } else {
569                        todo!()
570                    }
571                }
572
573                #[expand(REP in [Null, Undefined, Array, Object, Argument, OpDecl])]
574                FieldValueSlice::REP(_) => {
575                    let e = OperatorApplicationError::new_s(
576                        format!(
577                            "unsupported input type {}",
578                            FieldValueRepr::REP.to_str()
579                        ),
580                        op_id,
581                    );
582                    for _ in 0..range.base.field_count {
583                        self.push_error(cmd_idx, e.clone());
584                        cmd_idx += 1;
585                    }
586                }
587                FieldValueSlice::FieldReference(_)
588                | FieldValueSlice::SlicedFieldReference(_) => unreachable!(),
589            })
590        }
591    }
592
593    fn push_args_from_field(
594        &mut self,
595        jd: &mut JobData,
596        iter_ref: FieldIterRef,
597        op_id: OperatorId,
598        arg_idx: usize,
599        fmt_key: &FormatKey,
600    ) {
601        let field = jd
602            .field_mgr
603            .get_cow_field_ref(&jd.match_set_mgr, iter_ref.field_id);
604        let iter = jd.field_mgr.lookup_iter(
605            iter_ref.field_id,
606            &field,
607            iter_ref.iter_id,
608        );
609        let mut iter =
610            AutoDerefIter::new(&jd.field_mgr, iter_ref.field_id, iter);
611        self.add_iter_to_command_arg(
612            &mut jd.sv_mgr,
613            &jd.match_set_mgr,
614            op_id,
615            0,
616            arg_idx,
617            fmt_key,
618            &mut iter,
619        );
620        jd.field_mgr.store_iter(
621            iter_ref.field_id,
622            iter_ref.iter_id,
623            iter.into_base_iter(),
624        );
625    }
626
627    fn insert_out_stream(
628        &mut self,
629        sv_mgr: &mut StreamValueManager,
630        stdout_bis: BytesInsertionStream,
631    ) -> StreamValueId {
632        let stream = sv_mgr.claim_stream_value(StreamValue::new_empty(
633            Some(StreamValueDataType::Bytes),
634            StreamValueBufferMode::Stream,
635        ));
636        sv_mgr.stream_values[stream]
637            .data_inserter(stream, self.stream_buffer_size, false)
638            .append_bytes_copy(stdout_bis.get_inserted_data());
639
640        stdout_bis.abort();
641
642        stream
643    }
644
645    #[cfg(target_family = "unix")]
646    fn setup_proc_streams(
647        &mut self,
648        tf_id: TransformId,
649        sv_mgr: &mut StreamValueManager,
650        proc: &mut Child,
651        stdin_sv: Option<StreamValueId>,
652        fake_stdin_sv: bool,
653        stdout_inserter: &mut VaryingTypeInserter<&mut FieldData>,
654        stderr_inserter: &mut VaryingTypeInserter<&mut FieldData>,
655        exit_code_inserter: &mut VaryingTypeInserter<&mut FieldData>,
656        command_idx: RunningCommandIdx,
657    ) -> Result<ProcessStart, std::io::Error> {
658        use std::io::ErrorKind;
659
660        use mio::{Interest, Token};
661        use typeline_core::record_data::stream_value::StreamValueDataOffset;
662
663        let sbt = self.stream_buffer_threshold as u64;
664
665        let mut stdin = proc.stdin.take().map(mio::unix::pipe::Sender::from);
666
667        let mut token_stdin = None;
668        let mut stdin_offset = StreamValueDataOffset::default();
669
670        if let Some(input_sv) = stdin_sv {
671            if !fake_stdin_sv {
672                sv_mgr.subscribe_to_stream_value(
673                    input_sv,
674                    tf_id,
675                    command_idx.into_usize(),
676                    false,
677                    true,
678                )
679            }
680
681            let sv = &mut sv_mgr.stream_values[input_sv];
682            let mut iter = sv.data_iter(StreamValueDataOffset {
683                values_consumed: 0,
684                current_value_offset: 0,
685            });
686
687            match std::io::copy(&mut iter, stdin.as_mut().unwrap()) {
688                Ok(_) => {
689                    if iter.is_end() && sv.done {
690                        stdin.take();
691                    }
692                }
693                Err(e) if e.kind() == ErrorKind::WouldBlock => (),
694                Err(e) => return Err(e),
695            };
696
697            stdin_offset = iter.get_offset_at_end();
698        }
699
700        if let Some(stdin) = stdin.as_mut() {
701            let stdin_tok = self
702                .token_universe
703                .claim_with_value(CommandStreamIdx::Stdin(command_idx));
704            token_stdin = Some(stdin_tok);
705            self.poll.registry().register(
706                stdin,
707                Token(stdin_tok),
708                Interest::WRITABLE,
709            )?;
710        }
711
712        let mut stdout =
713            mio::unix::pipe::Receiver::from(proc.stdout.take().unwrap());
714        stdout.set_nonblocking(true)?;
715
716        let mut stderr =
717            mio::unix::pipe::Receiver::from(proc.stderr.take().unwrap());
718        stderr.set_nonblocking(true)?;
719
720        let mut stdout_bis = stdout_inserter.bytes_insertion_stream(1);
721        let stdout_bytes =
722            match std::io::copy(&mut (&mut stdout).take(sbt), &mut stdout_bis)
723            {
724                Ok(n) => n,
725                Err(e) if e.kind() == ErrorKind::WouldBlock => 0,
726                Err(e) => return Err(e),
727            };
728
729        let mut stderr_bis = stderr_inserter.bytes_insertion_stream(1);
730
731        let stderr_bytes =
732            match std::io::copy(&mut (&mut stderr).take(sbt), &mut stderr_bis)
733            {
734                Ok(n) => n,
735                Err(e) if e.kind() == ErrorKind::WouldBlock => 0,
736                Err(e) => return Err(e),
737            };
738
739        if stdout_bytes != sbt || stderr_bytes != sbt {
740            if let Some(exit_code) = proc.try_wait()? {
741                stdout_bis.commit();
742                stderr_bis.commit();
743                exit_code_inserter.push_int(
744                    exit_code.into_raw() as i64,
745                    1,
746                    true,
747                    false,
748                );
749                return Ok(ProcessStart::Done(exit_code));
750            }
751        }
752
753        let stdout_token = self
754            .token_universe
755            .claim_with_value(CommandStreamIdx::Stdout(command_idx));
756
757        self.poll.registry().register(
758            &mut stdout,
759            Token(stdout_token),
760            Interest::READABLE,
761        )?;
762
763        let stderr_token = self
764            .token_universe
765            .claim_with_value(CommandStreamIdx::Stderr(command_idx));
766        self.poll.registry().register(
767            &mut stderr,
768            Token(stderr_token),
769            Interest::READABLE,
770        )?;
771
772        let stdout_stream = self.insert_out_stream(sv_mgr, stdout_bis);
773        let stderr_stream = self.insert_out_stream(sv_mgr, stderr_bis);
774        let exit_stream = sv_mgr.claim_stream_value(StreamValue::new_empty(
775            Some(StreamValueDataType::SingleValue(FieldValueKind::Int)),
776            StreamValueBufferMode::Stream,
777        ));
778
779        Ok(ProcessStart::Running {
780            stdin: InStream {
781                sender: stdin,
782                token: token_stdin,
783                sv_id: stdin_sv,
784                stdin_offset,
785            },
786            stdout: OutStream {
787                receiver: Some(stdout),
788                token: Some(stdout_token),
789                sv_id: Some(stdout_stream),
790                sv_appended: false,
791            },
792            stderr: OutStream {
793                receiver: Some(stderr),
794                token: Some(stderr_token),
795                sv_id: Some(stderr_stream),
796                sv_appended: false,
797            },
798            exit_stream,
799        })
800    }
801    #[cfg(not(target_family = "unix"))]
802    fn setup_out_streams(
803        &mut self,
804        sv_mgr: &mut StreamValueManager,
805        _proc: &mut Child,
806        stdout_inserter: &mut VaryingTypeInserter<&mut FieldData>,
807        stderr_inserter: &mut VaryingTypeInserter<&mut FieldData>,
808        _exit_code_inserter: &mut VaryingTypeInserter<&mut FieldData>,
809        _command_idx: RunningCommandIdx,
810    ) -> Result<ProcessStart, std::io::Error> {
811        let stdout_stream = sv_mgr.claim_stream_value(StreamValue::new_empty(
812            Some(StreamValueDataType::Bytes),
813            StreamValueBufferMode::Stream,
814        ));
815        stdout_inserter.push_stream_value_id(stdout_stream, 1, true, false);
816
817        let stderr_stream = sv_mgr.claim_stream_value(StreamValue::new_empty(
818            Some(StreamValueDataType::Bytes),
819            StreamValueBufferMode::Stream,
820        ));
821        stderr_inserter.push_stream_value_id(stderr_stream, 1, true, false);
822
823        let exit_stream = sv_mgr.claim_stream_value(StreamValue::new_empty(
824            Some(StreamValueDataType::SingleValue(FieldValueKind::Int)),
825            StreamValueBufferMode::Stream,
826        ));
827        stderr_inserter.push_stream_value_id(stderr_stream, 1, true, false);
828
829        Ok(ProcessStart::Running {
830            stdout: stdout_stream,
831            stderr: stderr_stream,
832            exit_stream,
833        })
834    }
835
836    fn setup_proc(
837        &mut self,
838        tf_id: TransformId,
839        sv_mgr: &mut StreamValueManager,
840        cmd_idx: usize,
841        stderr_inserter: &mut VaryingTypeInserter<&mut FieldData>,
842        stdout_inserter: &mut VaryingTypeInserter<&mut FieldData>,
843        exit_code_inserter: &mut VaryingTypeInserter<&mut FieldData>,
844    ) -> Result<(), std::io::Error> {
845        let (mut proc, in_sv, fake_in_sv) = {
846            let ca = &mut self.command_args[cmd_idx];
847            let proc = Command::new(&ca.args[0])
848                .args(&ca.args[1..])
849                .stdin(if self.op.opts.propagate_input {
850                    Stdio::piped()
851                } else {
852                    Stdio::null()
853                })
854                .stdout(Stdio::piped())
855                .stderr(Stdio::piped())
856                .spawn()?;
857            (proc, ca.stdin_sv, ca.fake_stdin)
858        };
859
860        let res = self.setup_proc_streams(
861            tf_id,
862            sv_mgr,
863            &mut proc,
864            in_sv,
865            fake_in_sv,
866            stdout_inserter,
867            stderr_inserter,
868            exit_code_inserter,
869            self.running_commands.peek_claim_id(),
870        );
871
872        match res {
873            Err(e) => {
874                // last resort to not leave behind a mess.
875                // should only happen in case of a very unfortunately timed
876                // interrupt or something
877                let _ = proc.kill();
878                return Err(e);
879            }
880            Ok(ProcessStart::Running {
881                stdin,
882                stdout,
883                stderr,
884                exit_stream,
885            }) => {
886                stdout_inserter.push_stream_value_id(
887                    stdout.sv_id.unwrap(),
888                    1,
889                    true,
890                    false,
891                );
892                stderr_inserter.push_stream_value_id(
893                    stderr.sv_id.unwrap(),
894                    1,
895                    true,
896                    false,
897                );
898                exit_code_inserter.push_stream_value_id(
899                    exit_stream,
900                    1,
901                    true,
902                    false,
903                );
904
905                let cmd_idx =
906                    self.running_commands.claim_with_value(RunningCommand {
907                        proc,
908                        in_stream: stdin,
909                        out_streams: [stdout, stderr],
910                        exit_code_sv_id: exit_stream,
911                        poll_requested: true,
912                    });
913                // initial poll in case this command completed
914                // before we managed to register the streams
915                self.commands_to_poll.push(cmd_idx);
916            }
917            Ok(ProcessStart::Done(_status)) => (),
918        }
919        Ok(())
920    }
921
922    fn read_out_stream(
923        &mut self,
924        sv_mgr: &mut StreamValueManager,
925        cmd_id: RunningCommandIdx,
926        out_stream_idx: usize,
927        op_id: OperatorId,
928        read_to_end: bool,
929    ) {
930        let cmd = &mut self.running_commands[cmd_id];
931        let out_stream = &mut cmd.out_streams[out_stream_idx];
932        let Some(sv_id) = out_stream.sv_id else {
933            return;
934        };
935        let mut sv = sv_mgr.stream_values[sv_id].data_inserter(
936            sv_id,
937            self.stream_buffer_size,
938            !out_stream.sv_appended,
939        );
940        out_stream.sv_appended = true;
941        let receiver = out_stream.receiver.as_mut().unwrap();
942        let res = sv.with_bytes_buffer(|buf| {
943            if read_to_end {
944                receiver.read_to_end(buf)
945            } else {
946                receiver
947                    .take(self.stream_buffer_size as u64)
948                    .read_to_end(buf)
949            }
950        });
951
952        if let Err(e) = &res {
953            if e.kind() != ErrorKind::WouldBlock {
954                let _ = self.poll.registry().deregister(receiver);
955                self.token_universe
956                    .release(out_stream.token.take().unwrap());
957                out_stream.receiver.take();
958
959                let e = Arc::new(OperatorApplicationError::new_s(
960                    format!(
961                        "{} communication failed with I/O Error: {e}",
962                        match out_stream_idx {
963                            STDOUT_IDX => "stderr",
964                            STDERR_IDX => "stdout",
965                            _ => unreachable!(),
966                        }
967                    ),
968                    op_id,
969                ));
970                sv.set_error(e);
971                drop(sv);
972                sv_mgr.inform_stream_value_subscribers(sv_id);
973                sv_mgr.drop_field_value_subscription(sv_id, None);
974                out_stream.sv_id = None;
975            }
976        }
977    }
978
979    fn handle_input_stream(
980        &mut self,
981        tf_id: TransformId,
982        sv_mgr: &mut StreamValueManager<'a>,
983        cmd_id: RunningCommandIdx,
984    ) -> Result<(), std::io::Error> {
985        let stream = &mut self.running_commands[cmd_id].in_stream;
986        let mut iter = sv_mgr.stream_values[stream.sv_id.unwrap()]
987            .data_iter(stream.stdin_offset);
988        match std::io::copy(&mut iter, stream.sender.as_mut().unwrap()) {
989            Ok(_) => (),
990            Err(e) if e.kind() == ErrorKind::WouldBlock => (),
991            Err(e) => {
992                stream.sender.take();
993                return Err(e);
994            }
995        }
996        stream.stdin_offset = iter.get_offset_at_end();
997        if iter.is_end() {
998            let _ = self
999                .poll
1000                .registry()
1001                .deregister(&mut stream.sender.take().unwrap());
1002            self.token_universe.release(stream.token.take().unwrap());
1003            sv_mgr.drop_field_value_subscription(
1004                stream.sv_id.take().unwrap(),
1005                Some(tf_id),
1006            )
1007        } else {
1008            sv_mgr.stream_values[stream.sv_id.unwrap()]
1009                .set_subscriber_data_offset(tf_id, stream.stdin_offset);
1010        }
1011        Ok(())
1012    }
1013
1014    fn propagate_process_failure(
1015        &mut self,
1016        jd: &mut JobData<'a>,
1017        cmd_id: RunningCommandIdx,
1018        op_id: OperatorId,
1019        e: std::io::Error,
1020    ) {
1021        let cmd = &mut self.running_commands[cmd_id];
1022        let e = Arc::new(OperatorApplicationError::new_s(
1023            format!("program communication failed with I/O Error: {e}"),
1024            op_id,
1025        ));
1026        if let Some(stdin_sv_id) = cmd.in_stream.sv_id {
1027            jd.sv_mgr.stream_values[stdin_sv_id].set_error(e.clone());
1028            jd.sv_mgr.drop_field_value_subscription(stdin_sv_id, None);
1029            if let Some(mut sender) = cmd.in_stream.sender.take() {
1030                let _ = self.poll.registry().deregister(&mut sender);
1031                self.token_universe.release(cmd.in_stream.token.unwrap());
1032            }
1033        }
1034        for out_stream_idx in 0..cmd.out_streams.len() {
1035            let out_stream = &mut cmd.out_streams[out_stream_idx];
1036            if let Some(sv_id) = out_stream.sv_id {
1037                jd.sv_mgr.stream_values[sv_id].set_error(e.clone());
1038                jd.sv_mgr.inform_stream_value_subscribers(sv_id);
1039                jd.sv_mgr.drop_field_value_subscription(sv_id, None);
1040            }
1041            if let Some(mut rec) = out_stream.receiver.take() {
1042                let _ = self.poll.registry().deregister(&mut rec);
1043                self.token_universe
1044                    .release(out_stream.token.take().unwrap());
1045            }
1046        }
1047        jd.sv_mgr.stream_values[cmd.exit_code_sv_id].set_error(e);
1048    }
1049}
1050
1051impl<'a> Transform<'a> for TfExec<'a> {
1052    fn update(
1053        &mut self,
1054        jd: &mut JobData,
1055        tf_id: typeline_core::operators::transform::TransformId,
1056    ) {
1057        let (batch_size, ps) = jd.tf_mgr.claim_batch(tf_id);
1058        let tf = &jd.tf_mgr.transforms[tf_id];
1059        let output_field_id = tf.output_field;
1060        let op_id = tf.op_id.unwrap();
1061
1062        for _ in 0..batch_size {
1063            let command_args = CommandArgs {
1064                stdin_sv: None,
1065                fake_stdin: true,
1066                args: vec![OsString::new(); self.op.fmt_arg_part_ends.len()],
1067                error: None,
1068            };
1069            self.command_args.push(command_args);
1070        }
1071
1072        if let Some(iter) = self.input_iter {
1073            self.push_args_from_field(
1074                jd,
1075                iter,
1076                op_id,
1077                0,
1078                &FormatKey::default(),
1079            );
1080        }
1081
1082        let mut part_idx = FormatPartIndex::zero();
1083
1084        for (arg_idx, &part_end) in
1085            self.op.fmt_arg_part_ends.iter().enumerate()
1086        {
1087            for fmt_part in &self.op.fmt_parts[part_idx..part_end] {
1088                match fmt_part {
1089                    FormatPart::ByteLiteral(bytes) => {
1090                        for i in 0..batch_size {
1091                            self.push_bytes(
1092                                &mut jd.sv_mgr,
1093                                op_id,
1094                                i,
1095                                arg_idx + 1,
1096                                bytes,
1097                            );
1098                        }
1099                    }
1100                    FormatPart::TextLiteral(text) => {
1101                        for i in 0..batch_size {
1102                            self.push_text(
1103                                &mut jd.sv_mgr,
1104                                i,
1105                                arg_idx + 1,
1106                                text,
1107                            );
1108                        }
1109                    }
1110                    FormatPart::Key(fmt_key) => {
1111                        let iter_ref = self.iters[fmt_key.ref_idx];
1112                        self.push_args_from_field(
1113                            jd,
1114                            iter_ref,
1115                            op_id,
1116                            arg_idx + 1,
1117                            fmt_key,
1118                        );
1119                    }
1120                }
1121            }
1122            part_idx = part_end;
1123        }
1124
1125        let mut stdout_field =
1126            jd.field_mgr.fields[output_field_id].borrow_mut();
1127        let mut stdout_inserter =
1128            stdout_field.iter_hall.varying_type_inserter();
1129
1130        let mut stderr_field =
1131            jd.field_mgr.fields[self.stderr_field].borrow_mut();
1132        let mut stderr_inserter =
1133            stderr_field.iter_hall.varying_type_inserter();
1134
1135        let mut exit_code_field =
1136            jd.field_mgr.fields[self.exit_code_field].borrow_mut();
1137        let mut exit_code_inserter =
1138            exit_code_field.iter_hall.varying_type_inserter();
1139
1140        for cmd_idx in 0..batch_size {
1141            if let Some(e) = self.command_args[cmd_idx].error.take() {
1142                stderr_inserter.push_error(e.clone(), 1, true, false);
1143                stdout_inserter.push_error(e.clone(), 1, true, false);
1144                exit_code_inserter.push_error(e, 1, true, false);
1145                continue;
1146            }
1147            match self.setup_proc(
1148                tf_id,
1149                &mut jd.sv_mgr,
1150                cmd_idx,
1151                &mut stderr_inserter,
1152                &mut stdout_inserter,
1153                &mut exit_code_inserter,
1154            ) {
1155                Ok(()) => (),
1156                Err(e) => {
1157                    let e = OperatorApplicationError::new_s(
1158                        format!(
1159                            "failed to execute `{}`: {}",
1160                            self.command_args[cmd_idx].args[0]
1161                                .to_string_lossy(),
1162                            e,
1163                        ),
1164                        op_id,
1165                    );
1166                    stderr_inserter.push_error(e.clone(), 1, true, false);
1167                    stdout_inserter.push_error(e.clone(), 1, true, false);
1168                    exit_code_inserter.push_error(e, 1, true, false);
1169                }
1170            }
1171        }
1172
1173        if !self.running_commands.is_empty() {
1174            jd.tf_mgr.make_stream_producer(tf_id);
1175        }
1176
1177        jd.tf_mgr.submit_batch_ready_for_more(tf_id, batch_size, ps);
1178
1179        self.command_args.clear();
1180    }
1181
1182    #[cfg(target_family = "unix")]
1183    fn stream_producer_update(
1184        &mut self,
1185        jd: &mut JobData<'a>,
1186        tf_id: TransformId,
1187    ) {
1188        use std::time::Duration;
1189
1190        use typeline_core::record_data::{
1191            field_value::FieldValue, stream_value::StreamValueData,
1192        };
1193
1194        let op_id = jd.tf_mgr.transforms[tf_id].op_id.unwrap();
1195
1196        let mut events = self.events.take().unwrap();
1197        self.poll
1198            .poll(&mut events, Some(Duration::from_millis(1)))
1199            .expect("poll error");
1200
1201        for e in &events {
1202            let token = e.token();
1203            let cmd_output = self.token_universe[token.0];
1204            let cmd_id = cmd_output.id();
1205            let cmd = &mut self.running_commands[cmd_id];
1206            if !cmd.poll_requested {
1207                self.commands_to_poll.push(cmd_id);
1208                cmd.poll_requested = true;
1209            }
1210            let out_stream_idx = match cmd_output {
1211                CommandStreamIdx::Stdout(_) => STDOUT_IDX,
1212                CommandStreamIdx::Stderr(_) => STDERR_IDX,
1213                CommandStreamIdx::Stdin(_) => {
1214                    if let Err(e) =
1215                        self.handle_input_stream(tf_id, &mut jd.sv_mgr, cmd_id)
1216                    {
1217                        self.propagate_process_failure(jd, cmd_id, op_id, e);
1218                    }
1219                    continue;
1220                }
1221            };
1222            cmd.out_streams[out_stream_idx].sv_appended = true;
1223            self.read_out_stream(
1224                &mut jd.sv_mgr,
1225                cmd_id,
1226                out_stream_idx,
1227                op_id,
1228                false,
1229            );
1230        }
1231        events.clear();
1232        self.events = Some(events);
1233
1234        // HACK: this works around the fact that sometimes
1235        // we don't get any epoll results despite the process
1236        // ending
1237        // TODO: figure out a better way to do this
1238        for (idx, cmd) in self.running_commands.iter_enumerated_mut() {
1239            if !cmd.poll_requested {
1240                cmd.poll_requested = true;
1241                self.commands_to_poll.push(idx);
1242            }
1243        }
1244
1245        while let Some(cmd_id) = self.commands_to_poll.pop() {
1246            let cmd = &mut self.running_commands[cmd_id];
1247            cmd.poll_requested = false;
1248            let mut done = false;
1249            match cmd.proc.try_wait() {
1250                Ok(None) => {}
1251                Ok(Some(status)) => {
1252                    jd.sv_mgr.stream_values[cmd.exit_code_sv_id]
1253                        .data_inserter(
1254                            cmd.exit_code_sv_id,
1255                            self.stream_buffer_size,
1256                            true,
1257                        )
1258                        .append(StreamValueData::Single(FieldValue::Int(
1259                            status.into_raw() as i64,
1260                        )));
1261
1262                    for out_stream_idx in 0..cmd.out_streams.len() {
1263                        self.read_out_stream(
1264                            &mut jd.sv_mgr,
1265                            cmd_id,
1266                            out_stream_idx,
1267                            op_id,
1268                            true,
1269                        );
1270                    }
1271                    done = true;
1272                }
1273                Err(e) => {
1274                    self.propagate_process_failure(jd, cmd_id, op_id, e);
1275                    done = true;
1276                }
1277            }
1278            let cmd = &mut self.running_commands[cmd_id];
1279            for out_stream_idx in 0..cmd.out_streams.len() {
1280                let os = &mut cmd.out_streams[out_stream_idx];
1281                let appended = os.sv_appended;
1282                os.sv_appended = false;
1283                if let Some(sv_id) = os.sv_id {
1284                    if done {
1285                        jd.sv_mgr.stream_values[sv_id].mark_done();
1286                    }
1287                    if done || appended {
1288                        jd.sv_mgr.inform_stream_value_subscribers(sv_id);
1289                    }
1290                    if done {
1291                        jd.sv_mgr.drop_field_value_subscription(sv_id, None);
1292                        if let Some(token) = os.token {
1293                            self.token_universe.release(token);
1294                        }
1295                        if let Some(mut stream) = os.receiver.take() {
1296                            let _ =
1297                                self.poll.registry().deregister(&mut stream);
1298                        }
1299                    }
1300                }
1301            }
1302            if !done {
1303                continue;
1304            }
1305            if let Some(token) = cmd.in_stream.token {
1306                self.token_universe.release(token);
1307            }
1308            if let Some(mut stream) = cmd.in_stream.sender.take() {
1309                let _ = self.poll.registry().deregister(&mut stream);
1310            }
1311
1312            let cmd = &mut self.running_commands[cmd_id];
1313            jd.sv_mgr
1314                .inform_stream_value_subscribers(cmd.exit_code_sv_id);
1315            jd.sv_mgr
1316                .drop_field_value_subscription(cmd.exit_code_sv_id, None);
1317
1318            self.running_commands.release(cmd_id);
1319        }
1320
1321        if !self.running_commands.is_empty() {
1322            jd.tf_mgr.make_stream_producer(tf_id);
1323        }
1324    }
1325
1326    #[cfg(not(target_family = "unix"))]
1327    fn stream_producer_update(
1328        &mut self,
1329        jd: &mut JobData<'a>,
1330        tf_id: TransformId,
1331    ) {
1332        let op_id = jd.tf_mgr.transforms[tf_id].op_id.unwrap();
1333        for cmd_id in self.running_commands.next_index_phys().range_from_zero()
1334        {
1335            let Some(cmd) = &mut self.running_commands.get_mut(cmd_id) else {
1336                continue;
1337            };
1338            let (stdout_sv, stderr_sv, exit_code_sv) =
1339                jd.sv_mgr.stream_values.three_distinct_mut(
1340                    cmd.stdout_sv_id,
1341                    cmd.stderr_sv_id,
1342                    cmd.exit_code_sv_id,
1343                );
1344
1345            let mut stdout_inserter = stdout_sv.data_inserter(
1346                cmd.stdout_sv_id,
1347                self.stream_buffer_size,
1348                true,
1349            );
1350            let mut stderr_inserter = stderr_sv.data_inserter(
1351                cmd.stderr_sv_id,
1352                self.stream_buffer_size,
1353                true,
1354            );
1355            let mut exit_code_inserter = exit_code_sv.data_inserter(
1356                cmd.exit_code_sv_id,
1357                self.stream_buffer_size,
1358                true,
1359            );
1360
1361            let mut res = Ok(None);
1362
1363            let mut any_stream_running = false;
1364
1365            if let Some(stdin) = &mut cmd.proc.stdin {
1366                if cmd.stdin_data.len() > cmd.stdin_offset {
1367                    match stdin.write(&cmd.stdin_data[cmd.stdin_offset..]) {
1368                        Ok(0) => {
1369                            cmd.proc.stdin.take();
1370                        }
1371                        Ok(n) => {
1372                            any_stream_running = true;
1373                            cmd.stdin_offset += n;
1374                        }
1375                        Err(e) => {
1376                            res = Err(e);
1377                        }
1378                    }
1379                }
1380            }
1381            if res.is_ok() {
1382                if let Some(stdout) = &mut cmd.proc.stdout {
1383                    res = match stdout_inserter.with_bytes_buffer(|buf| {
1384                        stdout
1385                            .take(self.stream_buffer_size as u64)
1386                            .read_to_end(buf)
1387                    }) {
1388                        Ok(0) => {
1389                            cmd.proc.stdout.take();
1390                            Ok(None)
1391                        }
1392                        Ok(_n) => {
1393                            any_stream_running = true;
1394                            Ok(None)
1395                        }
1396                        Err(e) => Err(e),
1397                    }
1398                }
1399            }
1400            if res.is_ok() {
1401                if let Some(stderr) = &mut cmd.proc.stderr {
1402                    res = match stderr_inserter.with_bytes_buffer(|buf| {
1403                        stderr
1404                            .take(self.stream_buffer_size as u64)
1405                            .read_to_end(buf)
1406                    }) {
1407                        Ok(0) => {
1408                            cmd.proc.stderr.take();
1409                            Ok(None)
1410                        }
1411                        Ok(_n) => {
1412                            any_stream_running = true;
1413                            Ok(None)
1414                        }
1415                        Err(e) => Err(e),
1416                    }
1417                }
1418            }
1419            if !any_stream_running && res.is_ok() {
1420                res = cmd.proc.try_wait();
1421            }
1422            let mut res = res.map_err(|e| {
1423                OperatorApplicationError::new_s(
1424                    format!(
1425                        "program communication failed with I/O Error: {e}"
1426                    ),
1427                    op_id,
1428                )
1429            });
1430            let mut done = false;
1431            if let Ok(Some(code)) = res {
1432                if !code.success() {
1433                    res = Err(OperatorApplicationError::new_s(
1434                        format!("program exited with code {code}"),
1435                        op_id,
1436                    ));
1437                }
1438                done = true;
1439            }
1440            if let Err(e) = res {
1441                let e = Some(Arc::new(e));
1442                stdout_inserter.propagate_error(&e);
1443                stderr_inserter.propagate_error(&e);
1444                exit_code_inserter.propagate_error(&e);
1445                done = true;
1446            }
1447            if done {
1448                for ins in [
1449                    &mut stdout_inserter,
1450                    &mut stderr_inserter,
1451                    &mut exit_code_inserter,
1452                ] {
1453                    ins.stream_value().mark_done();
1454                }
1455            }
1456            drop(stdout_inserter);
1457            drop(stderr_inserter);
1458            drop(exit_code_inserter);
1459            for sv_id in
1460                [cmd.stdout_sv_id, cmd.stderr_sv_id, cmd.exit_code_sv_id]
1461            {
1462                jd.sv_mgr.inform_stream_value_subscribers(sv_id);
1463                if done {
1464                    jd.sv_mgr.drop_field_value_subscription(sv_id, None);
1465                }
1466            }
1467            if done {
1468                self.running_commands.release(cmd_id);
1469            }
1470        }
1471        if !self.running_commands.is_empty() {
1472            jd.tf_mgr.make_stream_producer(tf_id);
1473        }
1474    }
1475
1476    fn handle_stream_value_update(
1477        &mut self,
1478        jd: &mut JobData<'a>,
1479        svu: StreamValueUpdate,
1480    ) {
1481        let cmd_id = RunningCommandIdx::from_usize(svu.custom);
1482        if let Err(e) =
1483            self.handle_input_stream(svu.tf_id, &mut jd.sv_mgr, cmd_id)
1484        {
1485            let op_id = jd.tf_mgr.transforms[svu.tf_id].op_id.unwrap();
1486            self.propagate_process_failure(jd, cmd_id, op_id, e);
1487        }
1488    }
1489}
1490
1491fn append_exec_arg(
1492    arg_idx: usize,
1493    value: &[u8],
1494    span: Span,
1495    refs: &mut IndexVec<FormatKeyRefId, FormatKeyRefData>,
1496    parts: &mut IndexVec<FormatPartIndex, FormatPart>,
1497    fmt_arg_part_ends: &mut IndexVec<ExecArgIdx, FormatPartIndex>,
1498) -> Result<(), CliArgumentError> {
1499    parse_format_string(value.as_bytes(), refs, parts).map_err(
1500        |(i, msg)| {
1501            CliArgumentError::new_s(
1502                format!("exec format string arg {arg_idx} offset {i}: {msg}",),
1503                span,
1504            )
1505        },
1506    )?;
1507    fmt_arg_part_ends.push(parts.next_idx());
1508    Ok(())
1509}
1510
1511pub fn parse_op_exec(
1512    expr: &CallExpr,
1513) -> Result<Box<dyn Operator>, TypelineError> {
1514    let mut parts = IndexVec::new();
1515    let mut refs = IndexVec::new();
1516    let mut fmt_arg_part_ends = IndexVec::new();
1517    let mut opts = OpExecOpts::default();
1518    for arg in expr.parsed_args_iter() {
1519        match arg.value {
1520            ParsedArgValue::Flag(flag) => {
1521                if flag == "-i" {
1522                    opts.propagate_input = true;
1523                    continue;
1524                }
1525                return Err(expr
1526                    .error_flag_unsupported(flag, arg.span)
1527                    .into());
1528            }
1529            ParsedArgValue::NamedArg { key, .. } => {
1530                return Err(expr
1531                    .error_named_args_unsupported(key, arg.span)
1532                    .into());
1533            }
1534            ParsedArgValue::PositionalArg { idx, value, .. } => {
1535                let Some(value) = value.text_or_bytes() else {
1536                    return Err(expr
1537                        .error_non_primitive_arg_unsupported(arg.span)
1538                        .into());
1539                };
1540                append_exec_arg(
1541                    idx,
1542                    value,
1543                    arg.span,
1544                    &mut refs,
1545                    &mut parts,
1546                    &mut fmt_arg_part_ends,
1547                )?;
1548            }
1549        }
1550    }
1551
1552    Ok(Box::new(OpExec {
1553        fmt_parts: parts,
1554        refs,
1555        fmt_arg_part_ends,
1556        opts,
1557        stderr_field_name: INVALID_STRING_STORE_ENTRY,
1558        exit_code_field_name: INVALID_STRING_STORE_ENTRY,
1559    }))
1560}
1561
1562pub fn create_op_exec_from_strings<'a>(
1563    args: impl IntoIterator<Item = impl Into<&'a str>>,
1564) -> Result<Box<dyn Operator>, OperatorCreationError> {
1565    create_op_exec_with_opts_from_strings(OpExecOpts::default(), args)
1566}
1567
1568pub fn create_op_exec_with_opts_from_strings<'a>(
1569    opts: OpExecOpts,
1570    args: impl IntoIterator<Item = impl Into<&'a str>>,
1571) -> Result<Box<dyn Operator>, OperatorCreationError> {
1572    let mut parts = IndexVec::new();
1573    let mut refs = IndexVec::new();
1574    let mut fmt_arg_part_ends = IndexVec::new();
1575    for (idx, value) in args.into_iter().enumerate() {
1576        append_exec_arg(
1577            idx,
1578            value.into().as_bytes(),
1579            Span::Generated,
1580            &mut refs,
1581            &mut parts,
1582            &mut fmt_arg_part_ends,
1583        )?;
1584    }
1585
1586    Ok(Box::new(OpExec {
1587        fmt_parts: parts,
1588        refs,
1589        fmt_arg_part_ends,
1590        opts,
1591        stderr_field_name: INVALID_STRING_STORE_ENTRY,
1592        exit_code_field_name: INVALID_STRING_STORE_ENTRY,
1593    }))
1594}