typeline_core/operators/
key.rs

1use crate::{
2    chain::ChainId,
3    cli::{
4        call_expr::{Argument, CallExpr, Span},
5        parse_operator_data,
6    },
7    liveness_analysis::OperatorCallEffect,
8    options::session_setup::SessionSetupData,
9    typeline_error::TypelineError,
10    utils::{indexing_type::IndexingType, string_store::StringStoreEntry},
11};
12
13use super::{
14    errors::OperatorCreationError,
15    nop::OpNop,
16    operator::{
17        OffsetInAggregation, Operator, OperatorDataId, OperatorId,
18        OperatorOffsetInChain, OutputFieldKind,
19    },
20    utils::nested_op::{setup_op_outputs_for_nested_op, NestedOp},
21};
22
23pub struct OpKey {
24    pub key: String,
25    pub key_interned: Option<StringStoreEntry>,
26    pub nested_op: Option<NestedOp>,
27}
28
29pub fn parse_op_key(
30    sess: &mut SessionSetupData,
31    mut arg: Argument,
32) -> Result<Box<dyn Operator>, TypelineError> {
33    let expr = CallExpr::from_argument_mut(&mut arg)?;
34    let op_name = expr.op_name;
35
36    if expr.args.len() < 2 {
37        return Err(OperatorCreationError::new(
38            "missing label argument for operator `key`",
39            expr.span,
40        )
41        .into());
42    }
43
44    let key_span = expr.args[0].span;
45
46    let key = std::mem::take(&mut expr.args[0].value)
47        .into_maybe_text()
48        .ok_or_else(|| expr.error_positional_arg_not_plaintext(key_span))?;
49
50    let key = key
51        .into_text()
52        .ok_or_else(|| expr.error_arg_invalid_utf8(op_name, key_span))?;
53
54    let mut nested_op = None;
55    if let Some(arg) = expr.args.get_mut(1) {
56        let span = arg.span;
57        let op = parse_operator_data(sess, std::mem::take(arg))?;
58        nested_op = Some(NestedOp::Operator(Box::new((op, span))));
59    }
60
61    if expr.args.len() > 2 {
62        return Err(OperatorCreationError::new(
63            "operator key only accepts two arguments`",
64            expr.args[2].span,
65        )
66        .into());
67    }
68
69    Ok(Box::new(OpKey {
70        key,
71        key_interned: None,
72        nested_op,
73    }))
74}
75
76pub fn create_op_key(key: String) -> Box<dyn Operator> {
77    Box::new(OpKey {
78        key,
79        key_interned: None,
80        nested_op: None,
81    })
82}
83
84pub fn create_op_key_with_op(
85    key: String,
86    op: Box<dyn Operator>,
87) -> Box<dyn Operator> {
88    Box::new(OpKey {
89        key,
90        key_interned: None,
91        nested_op: Some(NestedOp::Operator(Box::new((op, Span::Generated)))),
92    })
93}
94
95impl Operator for OpKey {
96    fn setup(
97        &mut self,
98        sess: &mut SessionSetupData,
99        op_data_id: OperatorDataId,
100        chain_id: ChainId,
101        offset_in_chain: OperatorOffsetInChain,
102        span: Span,
103    ) -> Result<OperatorId, TypelineError> {
104        self.key_interned = Some(sess.string_store.intern_cloned(&self.key));
105        let op_id = sess.add_op(op_data_id, chain_id, offset_in_chain, span);
106        let Some(nested_op) = &mut self.nested_op else {
107            return Ok(op_id);
108        };
109        let NestedOp::Operator(op_span) = nested_op else {
110            panic!("operator was already set up");
111        };
112        let (sub_op, span) = *std::mem::replace(
113            op_span,
114            Box::new((Box::new(OpNop::default()), Span::default())),
115        );
116        let sub_op_id = sess.setup_op_from_data(
117            sub_op,
118            sess.curr_chain,
119            OperatorOffsetInChain::AggregationMember(
120                op_id,
121                OffsetInAggregation::ZERO,
122            ),
123            span,
124        )?;
125        self.nested_op = Some(NestedOp::SetUp(sub_op_id));
126
127        Ok(op_id)
128    }
129    fn update_bb_for_op(
130        &self,
131        sess: &crate::context::SessionData,
132        ld: &mut crate::liveness_analysis::LivenessData,
133        _op_id: OperatorId,
134        op_n: super::operator::OffsetInChain,
135        cn: &crate::chain::Chain,
136        bb_id: crate::liveness_analysis::BasicBlockId,
137    ) -> bool {
138        let Some(nested_op) = &self.nested_op else {
139            return false;
140        };
141        let &NestedOp::SetUp(sub_op_id) = nested_op else {
142            unreachable!()
143        };
144        ld.update_bb_for_op(sess, sub_op_id, op_n, cn, bb_id)
145    }
146
147    fn default_name(&self) -> super::operator::OperatorName {
148        "key".into()
149    }
150
151    fn debug_op_name(&self) -> super::operator::OperatorName {
152        let Some(nested) = &self.nested_op else {
153            return self.default_name();
154        };
155        match nested {
156            NestedOp::Operator(nested_op) => {
157                format!(
158                    "[ key '{}' {} ]",
159                    self.key,
160                    nested_op.0.debug_op_name()
161                )
162            }
163            NestedOp::SetUp(op_id) => {
164                format!("[ key '{}' <op {op_id:02}> ]", self.key)
165            }
166        }
167        .into()
168    }
169
170    fn update_variable_liveness(
171        &self,
172        sess: &crate::context::SessionData,
173        ld: &mut crate::liveness_analysis::LivenessData,
174        op_offset_after_last_write: super::operator::OffsetInChain,
175        _op_id: OperatorId,
176        bb_id: crate::liveness_analysis::BasicBlockId,
177        input_field: crate::liveness_analysis::OpOutputIdx,
178        output: &mut crate::liveness_analysis::OperatorLivenessOutput,
179    ) {
180        if let Some(NestedOp::SetUp(nested_op_id)) = self.nested_op {
181            sess.operator_data[sess.op_data_id(nested_op_id)]
182                .update_variable_liveness(
183                    sess,
184                    ld,
185                    op_offset_after_last_write,
186                    nested_op_id,
187                    bb_id,
188                    input_field,
189                    output,
190                );
191        }
192
193        let var_id = ld.var_names[&self.key_interned.unwrap()];
194        ld.vars_to_op_outputs_map[var_id] = output.primary_output;
195        ld.op_outputs[output.primary_output]
196            .field_references
197            .push(input_field);
198        if let Some(prev_tgt) = ld.key_aliases_map.insert(var_id, input_field)
199        {
200            ld.apply_var_remapping(var_id, prev_tgt);
201        }
202        output.primary_output = input_field;
203        output.call_effect = OperatorCallEffect::NoCall;
204    }
205
206    fn output_field_kind(
207        &self,
208        sess: &crate::context::SessionData,
209        _op_id: OperatorId,
210    ) -> super::operator::OutputFieldKind {
211        let Some(nested) = &self.nested_op else {
212            return OutputFieldKind::SameAsInput;
213        };
214        let &NestedOp::SetUp(op_id) = nested else {
215            unreachable!()
216        };
217        sess.operator_data[sess.op_data_id(op_id)]
218            .output_field_kind(sess, op_id)
219    }
220
221    fn register_output_var_names(
222        &self,
223        ld: &mut crate::liveness_analysis::LivenessData,
224        sess: &crate::context::SessionData,
225        _op_id: OperatorId,
226    ) {
227        ld.add_var_name(self.key_interned.unwrap());
228        if let Some(NestedOp::SetUp(op_id)) = self.nested_op {
229            sess.operator_data[sess.op_data_id(op_id)]
230                .register_output_var_names(ld, sess, op_id);
231        }
232    }
233
234    fn output_count(
235        &self,
236        sess: &crate::context::SessionData,
237        _op_id: OperatorId,
238    ) -> usize {
239        let Some(nested) = &self.nested_op else {
240            return 0;
241        };
242        let &NestedOp::SetUp(op_id) = nested else {
243            unreachable!()
244        };
245        sess.operator_data[sess.op_data_id(op_id)].output_count(sess, op_id)
246    }
247
248    fn aggregation_member(
249        &self,
250        agg_offset: OffsetInAggregation,
251    ) -> Option<OperatorId> {
252        if agg_offset != OffsetInAggregation::ZERO {
253            return None;
254        }
255        if let Some(NestedOp::SetUp(op_id)) = self.nested_op {
256            return Some(op_id);
257        }
258        None
259    }
260
261    fn on_liveness_computed(
262        &mut self,
263        sess: &mut crate::context::SessionData,
264        ld: &crate::liveness_analysis::LivenessData,
265        _op_id: OperatorId,
266    ) {
267        if let Some(NestedOp::SetUp(op_id)) = self.nested_op {
268            let op_data_id = sess.op_data_id(op_id);
269            let mut op_data = std::mem::replace(
270                &mut sess.operator_data[op_data_id],
271                Box::new(OpNop::default()),
272            );
273            op_data.on_liveness_computed(sess, ld, op_id);
274            sess.operator_data[op_data_id] = op_data;
275        }
276    }
277
278    fn assign_op_outputs(
279        &mut self,
280        sess: &mut crate::context::SessionData,
281        ld: &mut crate::liveness_analysis::LivenessData,
282        op_id: OperatorId,
283        output_count: &mut crate::liveness_analysis::OpOutputIdx,
284    ) {
285        if let Some(nested_op) = &self.nested_op {
286            setup_op_outputs_for_nested_op(
287                nested_op,
288                sess,
289                ld,
290                op_id,
291                output_count,
292            );
293            return;
294        }
295        let op_base = &mut sess.operator_bases[op_id];
296        op_base.outputs_start = *output_count;
297        op_base.outputs_end = op_base.outputs_start;
298    }
299
300    fn has_dynamic_outputs(
301        &self,
302        sess: &crate::context::SessionData,
303        _op_id: OperatorId,
304    ) -> bool {
305        let Some(nested) = &self.nested_op else {
306            return false;
307        };
308        let &NestedOp::SetUp(op_id) = nested else {
309            unreachable!()
310        };
311        sess.operator_data[sess.op_data_id(op_id)]
312            .has_dynamic_outputs(sess, op_id)
313    }
314
315    fn build_transforms<'a>(
316        &'a self,
317        _job: &mut crate::job::Job<'a>,
318        _tf_state: &mut super::transform::TransformState,
319        _op_id: OperatorId,
320        _prebound_outputs: &super::operator::PreboundOutputsMap,
321    ) -> super::operator::TransformInstatiation<'a> {
322        unreachable!()
323    }
324
325    fn as_any(&self) -> Option<&dyn std::any::Any> {
326        Some(self)
327    }
328
329    fn as_any_mut(&mut self) -> Option<&mut dyn std::any::Any> {
330        Some(self)
331    }
332}