typeline_core/operators/
literal.rs

1use metamatch::metamatch;
2use num::{BigInt, BigRational};
3use std::{borrow::Cow, io::BufReader, sync::Arc};
4
5use crate::{
6    cli::call_expr::{Argument, CallExpr, ParsedArgValue, Span},
7    job::JobData,
8    options::{
9        chain_settings::{ChainSetting, SettingUseFloatingPointMath},
10        session_setup::SessionSetupData,
11    },
12    record_data::{
13        array::Array,
14        custom_data::CustomDataBox,
15        field_data::FieldValueRepr,
16        field_value::{FieldValue, FieldValueKind, Object},
17        iter_hall::FieldIterId,
18        push_interface::PushInterface,
19        stream_value::{StreamValue, StreamValueData},
20    },
21    typeline_error::TypelineError,
22    tyson::{parse_tyson, TysonParseError},
23    utils::{cow_to_small_str, cow_to_str},
24};
25
26use super::{
27    errors::{OperatorApplicationError, OperatorCreationError},
28    operator::{Operator, OperatorName, TransformInstatiation},
29    transform::{Transform, TransformId, TransformState},
30    utils::maintain_single_value::{maintain_single_value, ExplicitCount},
31};
32
33#[derive(Clone)]
34pub enum Literal {
35    Bytes(Vec<u8>),
36    StreamBytes(Arc<Vec<u8>>),
37    Text(String),
38    StreamString(Arc<String>),
39    Object(Object),
40    Array(Array),
41    Int(i64),
42    BigInt(BigInt),
43    Float(f64),
44    BigRational(BigRational),
45    Null,
46    Undefined,
47    Error(String),
48    StreamError(String),
49    Custom(CustomDataBox),
50    Argument(Argument),
51}
52
53#[derive(Clone)]
54pub struct OpLiteral {
55    pub data: Literal,
56    pub insert_count: Option<usize>,
57}
58
59pub struct TfLiteral<'a> {
60    data: &'a Literal,
61    explicit_count: Option<ExplicitCount>,
62    value_inserted: bool,
63    iter_id: FieldIterId,
64}
65
66impl Operator for OpLiteral {
67    fn default_name(&self) -> OperatorName {
68        match &self.data {
69            Literal::Null => "null",
70            Literal::Undefined => "undefined",
71            Literal::Text(_) => "str",
72            Literal::StreamString(_) => "~str",
73            Literal::Bytes(_) => "bytes",
74            Literal::StreamBytes(_) => "~bytes",
75            Literal::Error(_) => "error",
76            Literal::StreamError(_) => "~error",
77            Literal::Int(_) => "int",
78            Literal::BigInt(_) => "integer",
79            Literal::Float(_) => "float",
80            Literal::BigRational(_) => "rational",
81            Literal::Object(_) => "object",
82            Literal::Array(_) => "array",
83            Literal::Argument(_) => "argument",
84            Literal::Custom(v) => return cow_to_small_str(v.type_name()),
85        }
86        .into()
87    }
88
89    fn output_count(
90        &self,
91        _sess: &crate::context::SessionData,
92        _op_id: super::operator::OperatorId,
93    ) -> usize {
94        1
95    }
96
97    fn has_dynamic_outputs(
98        &self,
99        _sess: &crate::context::SessionData,
100        _op_id: super::operator::OperatorId,
101    ) -> bool {
102        false
103    }
104
105    fn build_transforms<'a>(
106        &'a self,
107        job: &mut crate::job::Job<'a>,
108        tf_state: &mut TransformState,
109        _op_id: super::operator::OperatorId,
110        _prebound_outputs: &super::operator::PreboundOutputsMap,
111    ) -> super::operator::TransformInstatiation<'a> {
112        let actor_id = job.job_data.add_actor_for_tf_state(tf_state);
113        let iter_id = job.job_data.claim_iter_for_tf_state(tf_state);
114        TransformInstatiation::Single(Box::new(TfLiteral {
115            data: &self.data,
116            explicit_count: self
117                .insert_count
118                .map(|count| ExplicitCount { count, actor_id }),
119            value_inserted: false,
120            iter_id,
121        }))
122    }
123
124    fn update_variable_liveness(
125        &self,
126        _sess: &crate::context::SessionData,
127        _ld: &mut crate::liveness_analysis::LivenessData,
128        _op_offset_after_last_write: super::operator::OffsetInChain,
129        _op_id: super::operator::OperatorId,
130        _bb_id: crate::liveness_analysis::BasicBlockId,
131        _input_field: crate::liveness_analysis::OpOutputIdx,
132        output: &mut crate::liveness_analysis::OperatorLivenessOutput,
133    ) {
134        output.flags.may_dup_or_drop = self.insert_count.is_some();
135        output.flags.input_accessed = false;
136        output.flags.non_stringified_input_access = false;
137    }
138}
139
140pub fn parse_op_literal_zst(
141    expr: &CallExpr,
142    literal: Literal,
143) -> Result<Box<dyn Operator>, TypelineError> {
144    let insert_count = parse_insert_count_reject_value(expr)?;
145    Ok(Box::new(OpLiteral {
146        data: literal,
147        insert_count,
148    }))
149}
150pub fn parse_op_str(
151    sess: &mut SessionSetupData,
152    expr: &CallExpr,
153    stream: bool,
154) -> Result<Box<dyn Operator>, TypelineError> {
155    let (insert_count, value, _value_span) =
156        parse_insert_count_and_value_args_str(sess, expr)?;
157    let value_owned = value.into_owned();
158    Ok(Box::new(OpLiteral {
159        data: if stream {
160            Literal::StreamString(Arc::new(value_owned))
161        } else {
162            Literal::Text(value_owned)
163        },
164        insert_count,
165    }))
166}
167
168pub fn parse_op_error(
169    sess: &mut SessionSetupData,
170    expr: &CallExpr,
171    stream: bool,
172) -> Result<Box<dyn Operator>, TypelineError> {
173    let (insert_count, value, _value_span) =
174        parse_insert_count_and_value_args_str(sess, expr)?;
175    let value_owned = value.into_owned();
176    Ok(Box::new(OpLiteral {
177        data: if stream {
178            Literal::StreamError(value_owned)
179        } else {
180            Literal::Error(value_owned)
181        },
182        insert_count,
183    }))
184}
185
186pub fn parse_insert_count_reject_value(
187    expr: &CallExpr,
188) -> Result<Option<usize>, TypelineError> {
189    let mut insert_count = None;
190    for arg in expr.parsed_args_iter() {
191        match arg.value {
192            ParsedArgValue::NamedArg { key, value } => {
193                if key == "i" || key == "insert_count" {
194                    insert_count =
195                        Some(value.try_cast_int(false).ok_or_else(|| {
196                            expr.error_arg_invalid_int(key, arg.span)
197                        })? as usize);
198                    continue;
199                }
200                return Err(expr
201                    .error_named_arg_unsupported(key, arg.span)
202                    .into());
203            }
204            ParsedArgValue::Flag(flag) => {
205                return Err(expr
206                    .error_flag_unsupported(flag, arg.span)
207                    .into());
208            }
209            ParsedArgValue::PositionalArg { .. } => {
210                return Err(expr
211                    .error_positional_args_unsupported(arg.span)
212                    .into())
213            }
214        }
215    }
216    Ok(insert_count)
217}
218
219pub fn parse_insert_count_and_value_args<'a>(
220    sess: &mut SessionSetupData,
221    expr: &'a CallExpr<'a>,
222) -> Result<(Option<usize>, Cow<'a, [u8]>, Span), TypelineError> {
223    let mut insert_count = None;
224    let mut value = None;
225    for arg in expr.parsed_args_iter_with_bounded_positionals(1, 1) {
226        let arg = arg?;
227        match arg.value {
228            ParsedArgValue::Flag(flag) => {
229                return Err(expr
230                    .error_flag_unsupported(flag, arg.span)
231                    .into());
232            }
233            ParsedArgValue::NamedArg { key, value } => {
234                if key == "i" || key == "insert_count" {
235                    // TODO: error on negative
236                    insert_count =
237                        Some(value.try_cast_int(false).ok_or_else(|| {
238                            expr.error_arg_invalid_int(key, arg.span)
239                        })? as usize);
240                    continue;
241                }
242                return Err(expr
243                    .error_named_arg_unsupported(key, arg.span)
244                    .into());
245            }
246            ParsedArgValue::PositionalArg { arg, .. } => {
247                // TODO: this is stupid
248                value = Some((arg.stringify(sess).into_bytes_cow(), arg.span));
249            }
250        }
251    }
252    let (value, value_span) = value.unwrap();
253    Ok((insert_count, value, value_span))
254}
255
256pub fn parse_insert_count_and_value_args_str<'a>(
257    sess: &mut SessionSetupData,
258    expr: &'a CallExpr<'a>,
259) -> Result<(Option<usize>, Cow<'a, str>, Span), TypelineError> {
260    let (insert_count, value, value_span) =
261        parse_insert_count_and_value_args(sess, expr)?;
262    let value_str = cow_to_str(value)
263        .map_err(|_| expr.error_positional_arg_invalid_utf8(value_span))?;
264
265    Ok((insert_count, value_str, value_span))
266}
267
268pub fn parse_op_int(
269    sess: &mut SessionSetupData,
270    expr: &CallExpr,
271) -> Result<Box<dyn Operator>, TypelineError> {
272    let (insert_count, value, value_span) =
273        parse_insert_count_and_value_args_str(sess, expr)?;
274
275    let data = if let Ok(i) = str::parse::<i64>(&value) {
276        Literal::Int(i)
277    } else {
278        let Ok(big_int) = str::parse::<BigInt>(&value) else {
279            return Err(expr
280                .error_positional_arg_invalid_int(value_span)
281                .into());
282        };
283        Literal::BigInt(big_int)
284    };
285    Ok(Box::new(OpLiteral { data, insert_count }))
286}
287pub fn parse_op_bytes(
288    sess: &mut SessionSetupData,
289    arg: &mut Argument,
290    stream: bool,
291) -> Result<Box<dyn Operator>, TypelineError> {
292    let call_expr = CallExpr::from_argument_mut(arg)?;
293    let (insert_count, value, _value_span) =
294        parse_insert_count_and_value_args(sess, &call_expr)?;
295    Ok(Box::new(OpLiteral {
296        data: if stream {
297            Literal::StreamBytes(Arc::new(value.into_owned()))
298        } else {
299            Literal::Bytes(value.into_owned())
300        },
301        insert_count,
302    }))
303}
304pub fn field_value_to_literal(v: FieldValue) -> Literal {
305    metamatch!(match v {
306        #[expand(REP in [Null, Undefined])]
307        FieldValue::REP => Literal::REP,
308
309        #[expand(REP in [Int, Float, Bytes, Text, Array,  Custom])]
310        FieldValue::REP(v) => Literal::REP(v),
311
312        #[expand(REP in [BigInt, BigRational, Argument, Object])]
313        FieldValue::REP(v) => Literal::REP(*v),
314
315        FieldValue::Error(v) => Literal::Error(v.message().to_owned()),
316
317        #[expand_pattern(REP in [OpDecl, StreamValueId, FieldReference, SlicedFieldReference])]
318        FieldValue::REP(_) => {
319            panic!("{} is not a valid literal", v.kind().to_str())
320        }
321    })
322}
323pub fn parse_op_tyson(
324    sess: &mut SessionSetupData,
325    expr: &CallExpr,
326    affinity: FieldValueKind,
327) -> Result<Box<dyn Operator>, TypelineError> {
328    let (insert_count, value, value_span) =
329        parse_insert_count_and_value_args(sess, expr)?;
330    let value = parse_tyson(
331        BufReader::new(&*value),
332        use_fpm(&mut Some(sess)),
333        Some(&sess.extensions),
334    )
335    .map_err(|e| {
336        OperatorCreationError::new_s(
337            format!(
338                "failed to parse value as {}: {}",
339                affinity.to_str(),
340                match e {
341                    TysonParseError::Io(e) => e.to_string(),
342                    TysonParseError::InvalidSyntax { kind, .. } =>
343                        kind.to_string(),
344                }
345            ),
346            value_span,
347        )
348    })?;
349    let lit = field_value_to_literal(value);
350    Ok(Box::new(OpLiteral {
351        data: lit,
352        insert_count,
353    }))
354}
355
356pub fn use_fpm(sess: &mut Option<&mut SessionSetupData>) -> bool {
357    sess.as_deref_mut()
358        .map(|sess| {
359            sess.get_chain_setting::<SettingUseFloatingPointMath>(
360                sess.curr_chain,
361            )
362        })
363        .unwrap_or(SettingUseFloatingPointMath::DEFAULT)
364}
365
366pub fn build_op_tyson_value(
367    mut sess: Option<&mut SessionSetupData>,
368    value: &[u8],
369    value_span: Span,
370    insert_count: Option<usize>,
371) -> Result<Box<dyn Operator>, TypelineError> {
372    let value = parse_tyson(
373        value,
374        use_fpm(&mut sess),
375        sess.as_ref().map(|sess| &*sess.extensions),
376    )
377    .map_err(|e| {
378        OperatorCreationError::new_s(format!("invalid tyson: {e}"), value_span)
379    })?;
380    let lit = field_value_to_literal(value);
381    Ok(Box::new(OpLiteral {
382        data: lit,
383        insert_count,
384    }))
385}
386
387pub fn parse_op_tyson_value(
388    sess: &mut SessionSetupData,
389    expr: &CallExpr,
390) -> Result<Box<dyn Operator>, TypelineError> {
391    let (insert_count, value, value_span) =
392        parse_insert_count_and_value_args(sess, expr)?;
393    build_op_tyson_value(Some(sess), &value, value_span, insert_count)
394}
395
396pub fn create_op_literal_with_insert_count(
397    data: Literal,
398    insert_count: Option<usize>,
399) -> Box<dyn Operator> {
400    Box::new(OpLiteral { data, insert_count })
401}
402
403pub fn create_op_literal(data: Literal) -> Box<dyn Operator> {
404    create_op_literal_with_insert_count(data, None)
405}
406pub fn create_op_literal_n(
407    data: Literal,
408    insert_count: usize,
409) -> Box<dyn Operator> {
410    create_op_literal_with_insert_count(data, Some(insert_count))
411}
412
413pub fn create_op_error(str: &str) -> Box<dyn Operator> {
414    create_op_literal(Literal::Error(str.to_owned()))
415}
416pub fn create_op_str(str: &str) -> Box<dyn Operator> {
417    create_op_literal(Literal::Text(str.to_owned()))
418}
419pub fn create_op_stream_bytes(v: &[u8]) -> Box<dyn Operator> {
420    create_op_literal(Literal::StreamBytes(Arc::new(v.to_owned())))
421}
422pub fn create_op_stream_str(v: &str) -> Box<dyn Operator> {
423    create_op_literal(Literal::StreamString(Arc::new(v.to_owned())))
424}
425pub fn create_op_bytes(v: &[u8]) -> Box<dyn Operator> {
426    create_op_literal(Literal::Bytes(v.to_owned()))
427}
428pub fn create_op_stream_error(str: &str) -> Box<dyn Operator> {
429    create_op_literal(Literal::StreamError(str.to_owned()))
430}
431pub fn create_op_int(v: i64) -> Box<dyn Operator> {
432    create_op_literal(Literal::Int(v))
433}
434pub fn create_op_int_big(v: BigInt) -> Box<dyn Operator> {
435    create_op_literal(Literal::BigInt(v))
436}
437pub fn create_op_null() -> Box<dyn Operator> {
438    create_op_literal(Literal::Null)
439}
440pub fn create_op_undefined() -> Box<dyn Operator> {
441    create_op_literal(Literal::Undefined)
442}
443pub fn create_op_v(str: &str) -> Result<Box<dyn Operator>, TypelineError> {
444    build_op_tyson_value(None, str.as_bytes(), Span::Generated, None)
445}
446
447pub fn create_op_error_n(str: &str, insert_count: usize) -> Box<dyn Operator> {
448    create_op_literal_n(Literal::Error(str.to_owned()), insert_count)
449}
450pub fn create_op_str_n(str: &str, insert_count: usize) -> Box<dyn Operator> {
451    create_op_literal_n(Literal::Text(str.to_owned()), insert_count)
452}
453pub fn create_op_stream_bytes_n(
454    v: &[u8],
455    insert_count: usize,
456) -> Box<dyn Operator> {
457    create_op_literal_n(
458        Literal::StreamBytes(Arc::new(v.to_owned())),
459        insert_count,
460    )
461}
462pub fn create_op_stream_str_n(
463    v: &str,
464    insert_count: usize,
465) -> Box<dyn Operator> {
466    create_op_literal_n(
467        Literal::StreamString(Arc::new(v.to_owned())),
468        insert_count,
469    )
470}
471pub fn create_op_bytes_n(v: &[u8], insert_count: usize) -> Box<dyn Operator> {
472    create_op_literal_n(Literal::Bytes(v.to_owned()), insert_count)
473}
474pub fn create_op_stream_error_n(
475    str: &str,
476    insert_count: usize,
477) -> Box<dyn Operator> {
478    create_op_literal_n(Literal::StreamError(str.to_owned()), insert_count)
479}
480pub fn create_op_int_n(v: i64, insert_count: usize) -> Box<dyn Operator> {
481    create_op_literal_n(Literal::Int(v), insert_count)
482}
483pub fn create_op_null_n(insert_count: usize) -> Box<dyn Operator> {
484    create_op_literal_n(Literal::Null, insert_count)
485}
486pub fn create_op_success_n(insert_count: usize) -> Box<dyn Operator> {
487    create_op_literal_n(Literal::Undefined, insert_count)
488}
489pub fn create_op_v_n(
490    str: &str,
491    insert_count: usize,
492) -> Result<Box<dyn Operator>, TypelineError> {
493    build_op_tyson_value(
494        None,
495        str.as_bytes(),
496        Span::Generated,
497        Some(insert_count),
498    )
499}
500
501pub fn insert_value(
502    jd: &mut JobData,
503    tf_id: TransformId,
504    lit: &mut TfLiteral,
505) {
506    let tf = &jd.tf_mgr.transforms[tf_id];
507    let op_id = tf.op_id.unwrap();
508    let of_id = jd.tf_mgr.prepare_output_field(
509        &mut jd.field_mgr,
510        &mut jd.match_set_mgr,
511        tf_id,
512    );
513    let mut output_field = jd.field_mgr.fields[of_id].borrow_mut();
514    metamatch!(match lit.data {
515        #[expand(REP in [Null, Undefined])]
516        Literal::REP => {
517            output_field
518                .iter_hall
519                .push_zst(FieldValueRepr::REP, 1, true)
520        }
521
522        #[expand((REP, PUSH_FN, VAL) in [
523            (Int, push_int, *v),
524            (Float, push_float, *v),
525            (Bytes, push_bytes, v),
526            (Text, push_str, v),
527            (Object, push_object, v.clone()),
528            (Array, push_array, v.clone()),
529            (BigInt, push_big_int, v.clone()),
530            (BigRational, push_big_rational, v.clone()),
531            (Custom, push_custom, v.clone()),
532            (Argument, push_fixed_size_type, v.clone()),
533        ])]
534        Literal::REP(v) => {
535            output_field.iter_hall.PUSH_FN(VAL, 1, true, true)
536        }
537
538        #[expand((LIT, DATA) in [(StreamString, Text), (StreamBytes, Bytes)])]
539        Literal::LIT(ss) => {
540            let sv_id = jd.sv_mgr.claim_stream_value(
541                StreamValue::from_data_done(StreamValueData::DATA {
542                    data: ss.clone(),
543                    range: 0..ss.len(),
544                }),
545            );
546            output_field
547                .iter_hall
548                .push_stream_value_id(sv_id, 1, true, false);
549        }
550
551        Literal::Error(e) => output_field.iter_hall.push_error(
552            OperatorApplicationError::new_s(e.clone(), op_id),
553            1,
554            true,
555            false,
556        ),
557
558        Literal::StreamError(ss) => {
559            let sv_id = jd.sv_mgr.claim_stream_value(StreamValue {
560                error: Some(Arc::new(OperatorApplicationError::new_s(
561                    ss.clone(),
562                    op_id,
563                ))),
564                done: true,
565                ..Default::default()
566            });
567            output_field
568                .iter_hall
569                .push_stream_value_id(sv_id, 1, true, false);
570        }
571    })
572}
573
574impl<'a> Transform<'a> for TfLiteral<'a> {
575    fn update(&mut self, jd: &mut JobData<'a>, tf_id: TransformId) {
576        if !self.value_inserted {
577            self.value_inserted = true;
578            insert_value(jd, tf_id, self);
579        }
580        let (batch_size, ps) = maintain_single_value(
581            jd,
582            tf_id,
583            self.explicit_count.as_ref(),
584            self.iter_id,
585        );
586        jd.tf_mgr.submit_batch_ready_for_more(tf_id, batch_size, ps);
587    }
588}