typeline_core/operators/utils/
basic_generator.rs

1use crate::{
2    job::JobData,
3    liveness_analysis::OperatorLivenessOutput,
4    operators::{
5        operator::{Operator, OperatorName, TransformInstatiation},
6        transform::{Transform, TransformId},
7    },
8    record_data::{
9        action_buffer::ActorId, group_track::GroupTrackIterRef,
10        iter_hall::FieldIterId,
11    },
12};
13
14use super::generator_transform_update::{
15    handle_generator_transform_update, GeneratorMode, GeneratorSequence,
16};
17
18pub trait BasicGenerator: Send + Sync {
19    type Gen: GeneratorSequence + Send;
20    fn default_name(&self) -> OperatorName;
21    fn debug_op_name(&self) -> crate::operators::operator::OperatorName {
22        self.default_name()
23    }
24    fn generator_mode(&self) -> GeneratorMode;
25    fn create_generator(&self) -> Self::Gen;
26    fn update_variable_liveness(
27        &self,
28        _sess: &crate::context::SessionData,
29        _ld: &mut crate::liveness_analysis::LivenessData,
30        _op_offset_after_last_write: crate::operators::operator::OffsetInChain,
31        _op_id: crate::operators::operator::OperatorId,
32        _bb_id: crate::liveness_analysis::BasicBlockId,
33        _input_field: crate::liveness_analysis::OpOutputIdx,
34        output: &mut OperatorLivenessOutput,
35    ) {
36        output.flags.non_stringified_input_access = false;
37        output.flags.may_dup_or_drop = true;
38    }
39
40    fn on_liveness_computed(
41        &mut self,
42        _sess: &mut crate::context::SessionData,
43        _ld: &crate::liveness_analysis::LivenessData,
44        _op_id: crate::operators::operator::OperatorId,
45    ) {
46    }
47}
48
49pub struct BasicGeneratorWrapper<T> {
50    base: T,
51}
52
53pub struct BasicGeneratorTransform<'a, Op: BasicGenerator> {
54    op: &'a BasicGeneratorWrapper<Op>,
55    input_iter: FieldIterId,
56    group_track_iter: GroupTrackIterRef,
57    actor_id: ActorId,
58    generator: Op::Gen,
59}
60
61impl<'a, Op: BasicGenerator> Transform<'a>
62    for BasicGeneratorTransform<'a, Op>
63{
64    fn update(&mut self, jd: &mut JobData<'a>, tf_id: TransformId) {
65        handle_generator_transform_update(
66            jd,
67            tf_id,
68            self.input_iter,
69            self.actor_id,
70            self.group_track_iter,
71            &mut self.generator,
72            self.op.base.generator_mode(),
73        )
74    }
75}
76
77impl<T: BasicGenerator> Operator for BasicGeneratorWrapper<T> {
78    fn default_name(&self) -> OperatorName {
79        BasicGenerator::default_name(&self.base)
80    }
81
82    fn debug_op_name(&self) -> crate::operators::operator::OperatorName {
83        BasicGenerator::default_name(&self.base)
84    }
85
86    fn output_count(
87        &self,
88        _sess: &crate::context::SessionData,
89        _op_id: crate::operators::operator::OperatorId,
90    ) -> usize {
91        1
92    }
93
94    fn has_dynamic_outputs(
95        &self,
96        _sess: &crate::context::SessionData,
97        _op_id: crate::operators::operator::OperatorId,
98    ) -> bool {
99        false
100    }
101
102    fn build_transforms<'a>(
103        &'a self,
104        job: &mut crate::job::Job<'a>,
105        tf_state: &mut crate::operators::transform::TransformState,
106        _op_id: crate::operators::operator::OperatorId,
107        _prebound_outputs: &crate::operators::operator::PreboundOutputsMap,
108    ) -> TransformInstatiation<'a> {
109        let jd = &mut job.job_data;
110        let actor_id = jd.add_actor_for_tf_state(tf_state);
111        let input_iter = jd.claim_iter_for_tf_state(tf_state);
112        let group_track_iter =
113            jd.claim_group_track_iter_for_tf_state(tf_state);
114        TransformInstatiation::Single(Box::new(BasicGeneratorTransform {
115            op: self,
116            input_iter,
117            group_track_iter,
118            actor_id,
119            generator: self.base.create_generator(),
120        }))
121    }
122
123    fn output_field_kind(
124        &self,
125        _sess: &crate::context::SessionData,
126        _op_id: crate::operators::operator::OperatorId,
127    ) -> crate::operators::operator::OutputFieldKind {
128        crate::operators::operator::OutputFieldKind::Unique
129    }
130
131    fn update_variable_liveness(
132        &self,
133        sess: &crate::context::SessionData,
134        ld: &mut crate::liveness_analysis::LivenessData,
135        op_offset_after_last_write: crate::operators::operator::OffsetInChain,
136        op_id: crate::operators::operator::OperatorId,
137        bb_id: crate::liveness_analysis::BasicBlockId,
138        input_field: crate::liveness_analysis::OpOutputIdx,
139        output: &mut OperatorLivenessOutput,
140    ) {
141        self.base.update_variable_liveness(
142            sess,
143            ld,
144            op_offset_after_last_write,
145            op_id,
146            bb_id,
147            input_field,
148            output,
149        )
150    }
151
152    fn on_liveness_computed(
153        &mut self,
154        sess: &mut crate::context::SessionData,
155        ld: &crate::liveness_analysis::LivenessData,
156        op_id: crate::operators::operator::OperatorId,
157    ) {
158        self.base.on_liveness_computed(sess, ld, op_id)
159    }
160}
161
162impl<T: BasicGenerator + 'static> BasicGeneratorWrapper<T> {
163    pub fn new(base: T) -> Self {
164        Self { base }
165    }
166    pub fn new_operator(base: T) -> Box<dyn Operator> {
167        Box::new(Self::new(base))
168    }
169}