Skip to main content

reifydb_core/actors/
flow.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Actor messages and types for the flow processing subsystem.
5
6use std::collections::BTreeMap;
7
8use reifydb_runtime::actor::system::ActorHandle;
9use reifydb_type::{Result, value::datetime::DateTime};
10
11use super::pending::Pending;
12use crate::{
13	common::CommitVersion,
14	encoded::{key::EncodedKey, shape::RowShape},
15	interface::{catalog::flow::FlowId, cdc::Cdc, change::Change},
16};
17
18/// Instructions for processing a single flow within a batch.
19///
20/// Contains all the changes relevant to a specific flow for a version range,
21/// along with the version bounds for checkpoint validation.
22#[derive(Clone, Debug)]
23pub struct FlowInstruction {
24	/// The flow to process these changes for
25	pub flow_id: FlowId,
26	/// Start of version range (exclusive)
27	pub to_version: CommitVersion,
28	/// The actual changes to process, filtered to only those relevant to this flow.
29	/// Changes maintain their original CDC sequence order.
30	pub changes: Vec<Change>,
31}
32
33impl FlowInstruction {
34	/// Create a new flow instruction.
35	pub fn new(flow_id: FlowId, to_version: CommitVersion, changes: Vec<Change>) -> Self {
36		Self {
37			flow_id,
38			to_version,
39			changes,
40		}
41	}
42}
43
44/// A batch of instructions for a single worker.
45///
46/// Contains instructions for multiple flows, all of which are assigned to the same worker
47/// via hash partitioning (flow_id % num_workers).
48#[derive(Clone, Debug)]
49pub struct WorkerBatch {
50	/// The version to use for reading flow state.
51	/// This is constant for the entire CDC batch being processed.
52	pub state_version: CommitVersion,
53	/// Instructions for each flow assigned to this worker.
54	/// Each flow appears at most once in this list.
55	pub instructions: Vec<FlowInstruction>,
56}
57
58impl WorkerBatch {
59	/// Create a new empty worker batch.
60	pub fn new(state_version: CommitVersion) -> Self {
61		Self {
62			state_version,
63			instructions: Vec::new(),
64		}
65	}
66
67	/// Add an instruction to this batch.
68	pub fn add_instruction(&mut self, instruction: FlowInstruction) {
69		self.instructions.push(instruction);
70	}
71}
72
73/// Response from a flow worker actor.
74pub enum FlowResponse {
75	/// Operation succeeded with pending writes, pending shapes, and view changes
76	Success {
77		pending: Pending,
78		pending_shapes: Vec<RowShape>,
79		view_changes: Vec<Change>,
80	},
81	/// Operation failed with error message
82	Error(String),
83}
84
85/// Handle to the flow worker actor.
86pub type FlowHandle = ActorHandle<FlowMessage>;
87
88/// Messages for the flow worker actor.
89pub enum FlowMessage {
90	/// Process a batch of flow instructions
91	Process {
92		batch: WorkerBatch,
93		reply: Box<dyn FnOnce(FlowResponse) + Send>,
94	},
95	/// Register a new flow by ID (worker looks up the FlowDag from the registry)
96	Register {
97		flow_id: FlowId,
98		reply: Box<dyn FnOnce(FlowResponse) + Send>,
99	},
100	/// Process periodic tick for time-based maintenance
101	Tick {
102		flow_ids: Vec<FlowId>,
103		timestamp: DateTime,
104		state_version: CommitVersion,
105		reply: Box<dyn FnOnce(FlowResponse) + Send>,
106	},
107	Rebalance {
108		flow_ids: Vec<FlowId>,
109		reply: Box<dyn FnOnce(FlowResponse) + Send>,
110	},
111}
112
113/// Response from the flow pool actor.
114pub enum PoolResponse {
115	/// Operation succeeded with pending writes, pending shapes, and view changes
116	Success {
117		pending: Pending,
118		pending_shapes: Vec<RowShape>,
119		view_changes: Vec<Change>,
120	},
121	/// Registration succeeded
122	RegisterSuccess,
123	/// Operation failed with error message
124	Error(String),
125}
126
127/// Handle to the flow pool actor.
128pub type FlowPoolHandle = ActorHandle<FlowPoolMessage>;
129
130/// Messages for the flow pool actor.
131pub enum FlowPoolMessage {
132	/// Register a new flow by ID (routes to appropriate worker)
133	RegisterFlow {
134		flow_id: FlowId,
135		reply: Box<dyn FnOnce(PoolResponse) + Send>,
136	},
137	/// Submit batches to multiple workers
138	Submit {
139		batches: BTreeMap<usize, WorkerBatch>,
140		reply: Box<dyn FnOnce(PoolResponse) + Send>,
141	},
142	/// Submit to a specific worker
143	SubmitToWorker {
144		worker_id: usize,
145		batch: WorkerBatch,
146		reply: Box<dyn FnOnce(PoolResponse) + Send>,
147	},
148	/// Process periodic tick for time-based maintenance
149	Tick {
150		ticks: BTreeMap<usize, Vec<FlowId>>,
151		timestamp: DateTime,
152		state_version: CommitVersion,
153		reply: Box<dyn FnOnce(PoolResponse) + Send>,
154	},
155	Rebalance {
156		assignments: BTreeMap<usize, Vec<FlowId>>,
157		reply: Box<dyn FnOnce(PoolResponse) + Send>,
158	},
159	/// Async reply from a FlowActor worker
160	WorkerReply {
161		worker_id: usize,
162		response: FlowResponse,
163	},
164}
165
166/// Handle to the flow coordinator actor.
167pub type FlowCoordinatorHandle = ActorHandle<FlowCoordinatorMessage>;
168
169/// Messages for the flow coordinator actor.
170pub enum FlowCoordinatorMessage {
171	/// Consume CDC events and process them through flows
172	Consume {
173		cdcs: Vec<Cdc>,
174		consumer_key: EncodedKey,
175		current_version: CommitVersion,
176		reply: Box<dyn FnOnce(Result<()>) + Send>,
177	},
178	/// Async reply from PoolActor
179	PoolReply(PoolResponse),
180	/// Periodic tick for time-based maintenance
181	Tick,
182}