reifydb_core/actors/
flow.rs1use std::collections::BTreeMap;
5
6use reifydb_runtime::actor::system::ActorHandle;
7use reifydb_type::{Result, value::datetime::DateTime};
8
9use super::pending::Pending;
10use crate::{
11 common::CommitVersion,
12 encoded::shape::RowShape,
13 interface::{catalog::flow::FlowId, cdc::Cdc, change::Change},
14};
15
16#[derive(Clone, Debug)]
17pub struct FlowInstruction {
18 pub flow_id: FlowId,
19
20 pub to_version: CommitVersion,
21
22 pub changes: Vec<Change>,
23}
24
25impl FlowInstruction {
26 pub fn new(flow_id: FlowId, to_version: CommitVersion, changes: Vec<Change>) -> Self {
27 Self {
28 flow_id,
29 to_version,
30 changes,
31 }
32 }
33}
34
35#[derive(Clone, Debug)]
36pub struct WorkerBatch {
37 pub state_version: CommitVersion,
38
39 pub instructions: Vec<FlowInstruction>,
40}
41
42impl WorkerBatch {
43 pub fn new(state_version: CommitVersion) -> Self {
44 Self {
45 state_version,
46 instructions: Vec::new(),
47 }
48 }
49
50 pub fn add_instruction(&mut self, instruction: FlowInstruction) {
51 self.instructions.push(instruction);
52 }
53}
54
55pub enum FlowResponse {
56 Success {
57 pending: Pending,
58 pending_shapes: Vec<RowShape>,
59 view_changes: Vec<Change>,
60 },
61
62 Error(String),
63}
64
65pub type FlowHandle = ActorHandle<FlowMessage>;
66
67pub enum FlowMessage {
68 Process {
69 batch: WorkerBatch,
70 reply: Box<dyn FnOnce(FlowResponse) + Send>,
71 },
72
73 Register {
74 flow_id: FlowId,
75 reply: Box<dyn FnOnce(FlowResponse) + Send>,
76 },
77
78 Tick {
79 flow_ids: Vec<FlowId>,
80 timestamp: DateTime,
81 state_version: CommitVersion,
82 reply: Box<dyn FnOnce(FlowResponse) + Send>,
83 },
84 Rebalance {
85 flow_ids: Vec<FlowId>,
86 reply: Box<dyn FnOnce(FlowResponse) + Send>,
87 },
88}
89
90pub enum PoolResponse {
91 Success {
92 pending: Pending,
93 pending_shapes: Vec<RowShape>,
94 view_changes: Vec<Change>,
95 },
96
97 RegisterSuccess,
98
99 Error(String),
100}
101
102pub type FlowPoolHandle = ActorHandle<FlowPoolMessage>;
103
104pub enum FlowPoolMessage {
105 RegisterFlow {
106 flow_id: FlowId,
107 reply: Box<dyn FnOnce(PoolResponse) + Send>,
108 },
109
110 Submit {
111 batches: BTreeMap<usize, WorkerBatch>,
112 reply: Box<dyn FnOnce(PoolResponse) + Send>,
113 },
114
115 SubmitToWorker {
116 worker_id: usize,
117 batch: WorkerBatch,
118 reply: Box<dyn FnOnce(PoolResponse) + Send>,
119 },
120
121 Tick {
122 ticks: BTreeMap<usize, Vec<FlowId>>,
123 timestamp: DateTime,
124 state_version: CommitVersion,
125 reply: Box<dyn FnOnce(PoolResponse) + Send>,
126 },
127 Rebalance {
128 assignments: BTreeMap<usize, Vec<FlowId>>,
129 reply: Box<dyn FnOnce(PoolResponse) + Send>,
130 },
131
132 WorkerReply {
133 worker_id: usize,
134 response: FlowResponse,
135 },
136}
137
138pub type FlowCoordinatorHandle = ActorHandle<FlowCoordinatorMessage>;
139
140pub enum FlowCoordinatorMessage {
141 Consume {
142 cdcs: Vec<Cdc>,
143 current_version: CommitVersion,
144 reply: Box<dyn FnOnce(Result<()>) + Send>,
145 },
146
147 PoolReply(PoolResponse),
148
149 Tick,
150}