Skip to main content

reifydb_core/actors/
flow.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::collections::BTreeMap;
5
6use reifydb_runtime::actor::system::ActorHandle;
7use reifydb_type::{Result, value::datetime::DateTime};
8
9use super::pending::Pending;
10use crate::{
11	common::CommitVersion,
12	encoded::shape::RowShape,
13	interface::{catalog::flow::FlowId, cdc::Cdc, change::Change},
14};
15
16#[derive(Clone, Debug)]
17pub struct FlowInstruction {
18	pub flow_id: FlowId,
19
20	pub to_version: CommitVersion,
21
22	pub changes: Vec<Change>,
23}
24
25impl FlowInstruction {
26	pub fn new(flow_id: FlowId, to_version: CommitVersion, changes: Vec<Change>) -> Self {
27		Self {
28			flow_id,
29			to_version,
30			changes,
31		}
32	}
33}
34
35#[derive(Clone, Debug)]
36pub struct WorkerBatch {
37	pub state_version: CommitVersion,
38
39	pub instructions: Vec<FlowInstruction>,
40}
41
42impl WorkerBatch {
43	pub fn new(state_version: CommitVersion) -> Self {
44		Self {
45			state_version,
46			instructions: Vec::new(),
47		}
48	}
49
50	pub fn add_instruction(&mut self, instruction: FlowInstruction) {
51		self.instructions.push(instruction);
52	}
53}
54
55pub enum FlowResponse {
56	Success {
57		pending: Pending,
58		pending_shapes: Vec<RowShape>,
59		view_changes: Vec<Change>,
60	},
61
62	Error(String),
63}
64
65pub type FlowHandle = ActorHandle<FlowMessage>;
66
67pub enum FlowMessage {
68	Process {
69		batch: WorkerBatch,
70		reply: Box<dyn FnOnce(FlowResponse) + Send>,
71	},
72
73	Register {
74		flow_id: FlowId,
75		reply: Box<dyn FnOnce(FlowResponse) + Send>,
76	},
77
78	Tick {
79		flow_ids: Vec<FlowId>,
80		timestamp: DateTime,
81		state_version: CommitVersion,
82		reply: Box<dyn FnOnce(FlowResponse) + Send>,
83	},
84	Rebalance {
85		flow_ids: Vec<FlowId>,
86		reply: Box<dyn FnOnce(FlowResponse) + Send>,
87	},
88}
89
90pub enum PoolResponse {
91	Success {
92		pending: Pending,
93		pending_shapes: Vec<RowShape>,
94		view_changes: Vec<Change>,
95	},
96
97	RegisterSuccess,
98
99	Error(String),
100}
101
102pub type FlowPoolHandle = ActorHandle<FlowPoolMessage>;
103
104pub enum FlowPoolMessage {
105	RegisterFlow {
106		flow_id: FlowId,
107		reply: Box<dyn FnOnce(PoolResponse) + Send>,
108	},
109
110	Submit {
111		batches: BTreeMap<usize, WorkerBatch>,
112		reply: Box<dyn FnOnce(PoolResponse) + Send>,
113	},
114
115	SubmitToWorker {
116		worker_id: usize,
117		batch: WorkerBatch,
118		reply: Box<dyn FnOnce(PoolResponse) + Send>,
119	},
120
121	Tick {
122		ticks: BTreeMap<usize, Vec<FlowId>>,
123		timestamp: DateTime,
124		state_version: CommitVersion,
125		reply: Box<dyn FnOnce(PoolResponse) + Send>,
126	},
127	Rebalance {
128		assignments: BTreeMap<usize, Vec<FlowId>>,
129		reply: Box<dyn FnOnce(PoolResponse) + Send>,
130	},
131
132	WorkerReply {
133		worker_id: usize,
134		response: FlowResponse,
135	},
136}
137
138pub type FlowCoordinatorHandle = ActorHandle<FlowCoordinatorMessage>;
139
140pub enum FlowCoordinatorMessage {
141	Consume {
142		cdcs: Vec<Cdc>,
143		current_version: CommitVersion,
144		reply: Box<dyn FnOnce(Result<()>) + Send>,
145	},
146
147	PoolReply(PoolResponse),
148
149	Tick,
150}