reifydb_core/actors/
flow.rs1use std::collections::BTreeMap;
7
8use reifydb_runtime::actor::system::ActorHandle;
9use reifydb_type::{Result, value::datetime::DateTime};
10
11use super::pending::Pending;
12use crate::{
13 common::CommitVersion,
14 encoded::{key::EncodedKey, shape::RowShape},
15 interface::{catalog::flow::FlowId, cdc::Cdc, change::Change},
16};
17
18#[derive(Clone, Debug)]
23pub struct FlowInstruction {
24 pub flow_id: FlowId,
26 pub to_version: CommitVersion,
28 pub changes: Vec<Change>,
31}
32
33impl FlowInstruction {
34 pub fn new(flow_id: FlowId, to_version: CommitVersion, changes: Vec<Change>) -> Self {
36 Self {
37 flow_id,
38 to_version,
39 changes,
40 }
41 }
42}
43
44#[derive(Clone, Debug)]
49pub struct WorkerBatch {
50 pub state_version: CommitVersion,
53 pub instructions: Vec<FlowInstruction>,
56}
57
58impl WorkerBatch {
59 pub fn new(state_version: CommitVersion) -> Self {
61 Self {
62 state_version,
63 instructions: Vec::new(),
64 }
65 }
66
67 pub fn add_instruction(&mut self, instruction: FlowInstruction) {
69 self.instructions.push(instruction);
70 }
71}
72
73pub enum FlowResponse {
75 Success {
77 pending: Pending,
78 pending_shapes: Vec<RowShape>,
79 view_changes: Vec<Change>,
80 },
81 Error(String),
83}
84
85pub type FlowHandle = ActorHandle<FlowMessage>;
87
88pub enum FlowMessage {
90 Process {
92 batch: WorkerBatch,
93 reply: Box<dyn FnOnce(FlowResponse) + Send>,
94 },
95 Register {
97 flow_id: FlowId,
98 reply: Box<dyn FnOnce(FlowResponse) + Send>,
99 },
100 Tick {
102 flow_ids: Vec<FlowId>,
103 timestamp: DateTime,
104 state_version: CommitVersion,
105 reply: Box<dyn FnOnce(FlowResponse) + Send>,
106 },
107 Rebalance {
108 flow_ids: Vec<FlowId>,
109 reply: Box<dyn FnOnce(FlowResponse) + Send>,
110 },
111}
112
113pub enum PoolResponse {
115 Success {
117 pending: Pending,
118 pending_shapes: Vec<RowShape>,
119 view_changes: Vec<Change>,
120 },
121 RegisterSuccess,
123 Error(String),
125}
126
127pub type FlowPoolHandle = ActorHandle<FlowPoolMessage>;
129
130pub enum FlowPoolMessage {
132 RegisterFlow {
134 flow_id: FlowId,
135 reply: Box<dyn FnOnce(PoolResponse) + Send>,
136 },
137 Submit {
139 batches: BTreeMap<usize, WorkerBatch>,
140 reply: Box<dyn FnOnce(PoolResponse) + Send>,
141 },
142 SubmitToWorker {
144 worker_id: usize,
145 batch: WorkerBatch,
146 reply: Box<dyn FnOnce(PoolResponse) + Send>,
147 },
148 Tick {
150 ticks: BTreeMap<usize, Vec<FlowId>>,
151 timestamp: DateTime,
152 state_version: CommitVersion,
153 reply: Box<dyn FnOnce(PoolResponse) + Send>,
154 },
155 Rebalance {
156 assignments: BTreeMap<usize, Vec<FlowId>>,
157 reply: Box<dyn FnOnce(PoolResponse) + Send>,
158 },
159 WorkerReply {
161 worker_id: usize,
162 response: FlowResponse,
163 },
164}
165
166pub type FlowCoordinatorHandle = ActorHandle<FlowCoordinatorMessage>;
168
169pub enum FlowCoordinatorMessage {
171 Consume {
173 cdcs: Vec<Cdc>,
174 consumer_key: EncodedKey,
175 current_version: CommitVersion,
176 reply: Box<dyn FnOnce(Result<()>) + Send>,
177 },
178 PoolReply(PoolResponse),
180 Tick,
182}