Skip to main content

reifydb_core/actors/
flow.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::collections::BTreeMap;
5
6use reifydb_runtime::actor::system::ActorHandle;
7use reifydb_type::{Result, value::datetime::DateTime};
8
9use super::pending::Pending;
10use crate::{
11	common::CommitVersion,
12	encoded::shape::RowShape,
13	interface::{catalog::flow::FlowId, cdc::Cdc, change::Change},
14};
15
16/// Instructions for processing a single flow within a batch.
17///
18/// Contains all the changes relevant to a specific flow for a version range,
19/// along with the version bounds for checkpoint validation.
20#[derive(Clone, Debug)]
21pub struct FlowInstruction {
22	/// The flow to process these changes for
23	pub flow_id: FlowId,
24	/// Start of version range (exclusive)
25	pub to_version: CommitVersion,
26	/// The actual changes to process, filtered to only those relevant to this flow.
27	/// Changes maintain their original CDC sequence order.
28	pub changes: Vec<Change>,
29}
30
31impl FlowInstruction {
32	/// Create a new flow instruction.
33	pub fn new(flow_id: FlowId, to_version: CommitVersion, changes: Vec<Change>) -> Self {
34		Self {
35			flow_id,
36			to_version,
37			changes,
38		}
39	}
40}
41
42/// A batch of instructions for a single worker.
43///
44/// Contains instructions for multiple flows, all of which are assigned to the same worker
45/// via hash partitioning (flow_id % num_workers).
46#[derive(Clone, Debug)]
47pub struct WorkerBatch {
48	/// The version to use for reading flow state.
49	/// This is constant for the entire CDC batch being processed.
50	pub state_version: CommitVersion,
51	/// Instructions for each flow assigned to this worker.
52	/// Each flow appears at most once in this list.
53	pub instructions: Vec<FlowInstruction>,
54}
55
56impl WorkerBatch {
57	/// Create a new empty worker batch.
58	pub fn new(state_version: CommitVersion) -> Self {
59		Self {
60			state_version,
61			instructions: Vec::new(),
62		}
63	}
64
65	/// Add an instruction to this batch.
66	pub fn add_instruction(&mut self, instruction: FlowInstruction) {
67		self.instructions.push(instruction);
68	}
69}
70
71/// Response from a flow worker actor.
72pub enum FlowResponse {
73	/// Operation succeeded with pending writes, pending shapes, and view changes
74	Success {
75		pending: Pending,
76		pending_shapes: Vec<RowShape>,
77		view_changes: Vec<Change>,
78	},
79	/// Operation failed with error message
80	Error(String),
81}
82
83/// Handle to the flow worker actor.
84pub type FlowHandle = ActorHandle<FlowMessage>;
85
86/// Messages for the flow worker actor.
87pub enum FlowMessage {
88	/// Process a batch of flow instructions
89	Process {
90		batch: WorkerBatch,
91		reply: Box<dyn FnOnce(FlowResponse) + Send>,
92	},
93	/// Register a new flow by ID (worker looks up the FlowDag from the registry)
94	Register {
95		flow_id: FlowId,
96		reply: Box<dyn FnOnce(FlowResponse) + Send>,
97	},
98	/// Process periodic tick for time-based maintenance
99	Tick {
100		flow_ids: Vec<FlowId>,
101		timestamp: DateTime,
102		state_version: CommitVersion,
103		reply: Box<dyn FnOnce(FlowResponse) + Send>,
104	},
105	Rebalance {
106		flow_ids: Vec<FlowId>,
107		reply: Box<dyn FnOnce(FlowResponse) + Send>,
108	},
109}
110
111/// Response from the flow pool actor.
112pub enum PoolResponse {
113	/// Operation succeeded with pending writes, pending shapes, and view changes
114	Success {
115		pending: Pending,
116		pending_shapes: Vec<RowShape>,
117		view_changes: Vec<Change>,
118	},
119	/// Registration succeeded
120	RegisterSuccess,
121	/// Operation failed with error message
122	Error(String),
123}
124
125/// Handle to the flow pool actor.
126pub type FlowPoolHandle = ActorHandle<FlowPoolMessage>;
127
128/// Messages for the flow pool actor.
129pub enum FlowPoolMessage {
130	/// Register a new flow by ID (routes to appropriate worker)
131	RegisterFlow {
132		flow_id: FlowId,
133		reply: Box<dyn FnOnce(PoolResponse) + Send>,
134	},
135	/// Submit batches to multiple workers
136	Submit {
137		batches: BTreeMap<usize, WorkerBatch>,
138		reply: Box<dyn FnOnce(PoolResponse) + Send>,
139	},
140	/// Submit to a specific worker
141	SubmitToWorker {
142		worker_id: usize,
143		batch: WorkerBatch,
144		reply: Box<dyn FnOnce(PoolResponse) + Send>,
145	},
146	/// Process periodic tick for time-based maintenance
147	Tick {
148		ticks: BTreeMap<usize, Vec<FlowId>>,
149		timestamp: DateTime,
150		state_version: CommitVersion,
151		reply: Box<dyn FnOnce(PoolResponse) + Send>,
152	},
153	Rebalance {
154		assignments: BTreeMap<usize, Vec<FlowId>>,
155		reply: Box<dyn FnOnce(PoolResponse) + Send>,
156	},
157	/// Async reply from a FlowActor worker
158	WorkerReply {
159		worker_id: usize,
160		response: FlowResponse,
161	},
162}
163
164/// Handle to the flow coordinator actor.
165pub type FlowCoordinatorHandle = ActorHandle<FlowCoordinatorMessage>;
166
167/// Messages for the flow coordinator actor.
168pub enum FlowCoordinatorMessage {
169	/// Consume CDC events and process them through flows
170	Consume {
171		cdcs: Vec<Cdc>,
172		current_version: CommitVersion,
173		reply: Box<dyn FnOnce(Result<()>) + Send>,
174	},
175	/// Async reply from PoolActor
176	PoolReply(PoolResponse),
177	/// Periodic tick for time-based maintenance
178	Tick,
179}