typeline_core/operators/
success_updater.rs

1#[derive(Clone, Default)]
2pub struct OpSuccessUpdator {}
3pub struct TfSuccessUpdator {
4    iter_id: FieldIterId,
5    success: bool,
6}
7
8use crate::{
9    cli::call_expr::CallExpr,
10    job::JobData,
11    record_data::{
12        field_value_ref::FieldValueSlice, iter::field_iterator::FieldIterOpts,
13        iter_hall::FieldIterId,
14    },
15};
16
17use super::{
18    errors::OperatorCreationError,
19    operator::{Operator, OperatorId, OutputFieldKind, TransformInstatiation},
20    transform::{Transform, TransformId, TransformState},
21};
22
23impl Operator for OpSuccessUpdator {
24    fn default_name(&self) -> super::operator::OperatorName {
25        "success_updator".into()
26    }
27
28    fn output_field_kind(
29        &self,
30        _sess: &crate::context::SessionData,
31        _op_id: OperatorId,
32    ) -> OutputFieldKind {
33        OutputFieldKind::SameAsInput
34    }
35
36    fn output_count(
37        &self,
38        _sess: &crate::context::SessionData,
39        _op_id: OperatorId,
40    ) -> usize {
41        0
42    }
43
44    fn has_dynamic_outputs(
45        &self,
46        _sess: &crate::context::SessionData,
47        _op_id: OperatorId,
48    ) -> bool {
49        false
50    }
51
52    fn update_variable_liveness(
53        &self,
54        _sess: &crate::context::SessionData,
55        _ld: &mut crate::liveness_analysis::LivenessData,
56        _op_offset_after_last_write: super::operator::OffsetInChain,
57        _op_id: OperatorId,
58        _bb_id: crate::liveness_analysis::BasicBlockId,
59        _input_field: crate::liveness_analysis::OpOutputIdx,
60        output: &mut crate::liveness_analysis::OperatorLivenessOutput,
61    ) {
62        output.flags.may_dup_or_drop = false;
63        output.flags.non_stringified_input_access = false;
64    }
65
66    fn build_transforms<'a>(
67        &'a self,
68        job: &mut crate::job::Job<'a>,
69        tf_state: &mut TransformState,
70        _op_id: OperatorId,
71        _prebound_outputs: &super::operator::PreboundOutputsMap,
72    ) -> super::operator::TransformInstatiation<'a> {
73        let su = TfSuccessUpdator {
74            iter_id: job.job_data.claim_iter_for_tf_state(tf_state),
75            success: true,
76        };
77        TransformInstatiation::Single(Box::new(su))
78    }
79}
80
81impl Transform<'_> for TfSuccessUpdator {
82    fn update(&mut self, jd: &mut JobData<'_>, tf_id: TransformId) {
83        let (batch_size, ps) = jd.tf_mgr.claim_all(tf_id);
84        let tf = &mut jd.tf_mgr.transforms[tf_id];
85
86        let field = jd
87            .field_mgr
88            .get_cow_field_ref(&jd.match_set_mgr, tf.input_field);
89        let mut iter = jd.field_mgr.get_auto_deref_iter(
90            tf.input_field,
91            &field,
92            self.iter_id,
93        );
94
95        // PERF: we could optimize this
96        let mut rem = batch_size;
97        while let Some(range) = iter.typed_range_fwd(
98            &jd.match_set_mgr,
99            rem,
100            FieldIterOpts::default(),
101        ) {
102            if matches!(range.base.data, FieldValueSlice::Error(_)) {
103                self.success = false;
104            }
105            rem -= range.base.field_count;
106        }
107
108        jd.tf_mgr.submit_batch(
109            tf_id,
110            batch_size,
111            ps.group_to_truncate,
112            ps.input_done,
113        );
114        if ps.input_done && !self.success {
115            jd.session_data.set_success(false)
116        }
117    }
118}
119
120pub fn parse_op_success_updator(
121    arg: &CallExpr,
122) -> Result<Box<dyn Operator>, OperatorCreationError> {
123    arg.reject_args()?;
124    Ok(create_op_success_updator())
125}
126pub fn create_op_success_updator() -> Box<dyn Operator> {
127    Box::new(OpSuccessUpdator::default())
128}