typeline_core/operators/
success_updater.rs1#[derive(Clone, Default)]
2pub struct OpSuccessUpdator {}
3pub struct TfSuccessUpdator {
4 iter_id: FieldIterId,
5 success: bool,
6}
7
8use crate::{
9 cli::call_expr::CallExpr,
10 job::JobData,
11 record_data::{
12 field_value_ref::FieldValueSlice, iter::field_iterator::FieldIterOpts,
13 iter_hall::FieldIterId,
14 },
15};
16
17use super::{
18 errors::OperatorCreationError,
19 operator::{Operator, OperatorId, OutputFieldKind, TransformInstatiation},
20 transform::{Transform, TransformId, TransformState},
21};
22
23impl Operator for OpSuccessUpdator {
24 fn default_name(&self) -> super::operator::OperatorName {
25 "success_updator".into()
26 }
27
28 fn output_field_kind(
29 &self,
30 _sess: &crate::context::SessionData,
31 _op_id: OperatorId,
32 ) -> OutputFieldKind {
33 OutputFieldKind::SameAsInput
34 }
35
36 fn output_count(
37 &self,
38 _sess: &crate::context::SessionData,
39 _op_id: OperatorId,
40 ) -> usize {
41 0
42 }
43
44 fn has_dynamic_outputs(
45 &self,
46 _sess: &crate::context::SessionData,
47 _op_id: OperatorId,
48 ) -> bool {
49 false
50 }
51
52 fn update_variable_liveness(
53 &self,
54 _sess: &crate::context::SessionData,
55 _ld: &mut crate::liveness_analysis::LivenessData,
56 _op_offset_after_last_write: super::operator::OffsetInChain,
57 _op_id: OperatorId,
58 _bb_id: crate::liveness_analysis::BasicBlockId,
59 _input_field: crate::liveness_analysis::OpOutputIdx,
60 output: &mut crate::liveness_analysis::OperatorLivenessOutput,
61 ) {
62 output.flags.may_dup_or_drop = false;
63 output.flags.non_stringified_input_access = false;
64 }
65
66 fn build_transforms<'a>(
67 &'a self,
68 job: &mut crate::job::Job<'a>,
69 tf_state: &mut TransformState,
70 _op_id: OperatorId,
71 _prebound_outputs: &super::operator::PreboundOutputsMap,
72 ) -> super::operator::TransformInstatiation<'a> {
73 let su = TfSuccessUpdator {
74 iter_id: job.job_data.claim_iter_for_tf_state(tf_state),
75 success: true,
76 };
77 TransformInstatiation::Single(Box::new(su))
78 }
79}
80
81impl Transform<'_> for TfSuccessUpdator {
82 fn update(&mut self, jd: &mut JobData<'_>, tf_id: TransformId) {
83 let (batch_size, ps) = jd.tf_mgr.claim_all(tf_id);
84 let tf = &mut jd.tf_mgr.transforms[tf_id];
85
86 let field = jd
87 .field_mgr
88 .get_cow_field_ref(&jd.match_set_mgr, tf.input_field);
89 let mut iter = jd.field_mgr.get_auto_deref_iter(
90 tf.input_field,
91 &field,
92 self.iter_id,
93 );
94
95 let mut rem = batch_size;
97 while let Some(range) = iter.typed_range_fwd(
98 &jd.match_set_mgr,
99 rem,
100 FieldIterOpts::default(),
101 ) {
102 if matches!(range.base.data, FieldValueSlice::Error(_)) {
103 self.success = false;
104 }
105 rem -= range.base.field_count;
106 }
107
108 jd.tf_mgr.submit_batch(
109 tf_id,
110 batch_size,
111 ps.group_to_truncate,
112 ps.input_done,
113 );
114 if ps.input_done && !self.success {
115 jd.session_data.set_success(false)
116 }
117 }
118}
119
120pub fn parse_op_success_updator(
121 arg: &CallExpr,
122) -> Result<Box<dyn Operator>, OperatorCreationError> {
123 arg.reject_args()?;
124 Ok(create_op_success_updator())
125}
126pub fn create_op_success_updator() -> Box<dyn Operator> {
127 Box::new(OpSuccessUpdator::default())
128}