reifydb_core/actors/
flow.rs1use std::collections::BTreeMap;
5
6use reifydb_runtime::actor::system::ActorHandle;
7use reifydb_type::{Result, value::datetime::DateTime};
8
9use super::pending::Pending;
10use crate::{
11 common::CommitVersion,
12 encoded::shape::RowShape,
13 interface::{catalog::flow::FlowId, cdc::Cdc, change::Change},
14};
15
16#[derive(Clone, Debug)]
21pub struct FlowInstruction {
22 pub flow_id: FlowId,
24 pub to_version: CommitVersion,
26 pub changes: Vec<Change>,
29}
30
31impl FlowInstruction {
32 pub fn new(flow_id: FlowId, to_version: CommitVersion, changes: Vec<Change>) -> Self {
34 Self {
35 flow_id,
36 to_version,
37 changes,
38 }
39 }
40}
41
42#[derive(Clone, Debug)]
47pub struct WorkerBatch {
48 pub state_version: CommitVersion,
51 pub instructions: Vec<FlowInstruction>,
54}
55
56impl WorkerBatch {
57 pub fn new(state_version: CommitVersion) -> Self {
59 Self {
60 state_version,
61 instructions: Vec::new(),
62 }
63 }
64
65 pub fn add_instruction(&mut self, instruction: FlowInstruction) {
67 self.instructions.push(instruction);
68 }
69}
70
71pub enum FlowResponse {
73 Success {
75 pending: Pending,
76 pending_shapes: Vec<RowShape>,
77 view_changes: Vec<Change>,
78 },
79 Error(String),
81}
82
83pub type FlowHandle = ActorHandle<FlowMessage>;
85
86pub enum FlowMessage {
88 Process {
90 batch: WorkerBatch,
91 reply: Box<dyn FnOnce(FlowResponse) + Send>,
92 },
93 Register {
95 flow_id: FlowId,
96 reply: Box<dyn FnOnce(FlowResponse) + Send>,
97 },
98 Tick {
100 flow_ids: Vec<FlowId>,
101 timestamp: DateTime,
102 state_version: CommitVersion,
103 reply: Box<dyn FnOnce(FlowResponse) + Send>,
104 },
105 Rebalance {
106 flow_ids: Vec<FlowId>,
107 reply: Box<dyn FnOnce(FlowResponse) + Send>,
108 },
109}
110
111pub enum PoolResponse {
113 Success {
115 pending: Pending,
116 pending_shapes: Vec<RowShape>,
117 view_changes: Vec<Change>,
118 },
119 RegisterSuccess,
121 Error(String),
123}
124
125pub type FlowPoolHandle = ActorHandle<FlowPoolMessage>;
127
128pub enum FlowPoolMessage {
130 RegisterFlow {
132 flow_id: FlowId,
133 reply: Box<dyn FnOnce(PoolResponse) + Send>,
134 },
135 Submit {
137 batches: BTreeMap<usize, WorkerBatch>,
138 reply: Box<dyn FnOnce(PoolResponse) + Send>,
139 },
140 SubmitToWorker {
142 worker_id: usize,
143 batch: WorkerBatch,
144 reply: Box<dyn FnOnce(PoolResponse) + Send>,
145 },
146 Tick {
148 ticks: BTreeMap<usize, Vec<FlowId>>,
149 timestamp: DateTime,
150 state_version: CommitVersion,
151 reply: Box<dyn FnOnce(PoolResponse) + Send>,
152 },
153 Rebalance {
154 assignments: BTreeMap<usize, Vec<FlowId>>,
155 reply: Box<dyn FnOnce(PoolResponse) + Send>,
156 },
157 WorkerReply {
159 worker_id: usize,
160 response: FlowResponse,
161 },
162}
163
164pub type FlowCoordinatorHandle = ActorHandle<FlowCoordinatorMessage>;
166
167pub enum FlowCoordinatorMessage {
169 Consume {
171 cdcs: Vec<Cdc>,
172 current_version: CommitVersion,
173 reply: Box<dyn FnOnce(Result<()>) + Send>,
174 },
175 PoolReply(PoolResponse),
177 Tick,
179}