1use std::mem;
5
6use pending::{Pending, PendingWrite};
7use reifydb_catalog::catalog::Catalog;
8use reifydb_core::{common::CommitVersion, interface::change::Change};
9use reifydb_transaction::{
10 interceptor::{
11 WithInterceptors,
12 chain::InterceptorChain as Chain,
13 interceptors::Interceptors,
14 namespace_def::{
15 NamespaceDefPostCreateInterceptor, NamespaceDefPostUpdateInterceptor,
16 NamespaceDefPreDeleteInterceptor, NamespaceDefPreUpdateInterceptor,
17 },
18 ringbuffer::{
19 RingBufferPostDeleteInterceptor, RingBufferPostInsertInterceptor,
20 RingBufferPostUpdateInterceptor, RingBufferPreDeleteInterceptor,
21 RingBufferPreInsertInterceptor, RingBufferPreUpdateInterceptor,
22 },
23 ringbuffer_def::{
24 RingBufferDefPostCreateInterceptor, RingBufferDefPostUpdateInterceptor,
25 RingBufferDefPreDeleteInterceptor, RingBufferDefPreUpdateInterceptor,
26 },
27 table::{
28 TablePostDeleteInterceptor, TablePostInsertInterceptor, TablePostUpdateInterceptor,
29 TablePreDeleteInterceptor, TablePreInsertInterceptor, TablePreUpdateInterceptor,
30 },
31 table_def::{
32 TableDefPostCreateInterceptor, TableDefPostUpdateInterceptor, TableDefPreDeleteInterceptor,
33 TableDefPreUpdateInterceptor,
34 },
35 transaction::{PostCommitInterceptor, PreCommitInterceptor},
36 view::{
37 ViewPostDeleteInterceptor, ViewPostInsertInterceptor, ViewPostUpdateInterceptor,
38 ViewPreDeleteInterceptor, ViewPreInsertInterceptor, ViewPreUpdateInterceptor,
39 },
40 view_def::{
41 ViewDefPostCreateInterceptor, ViewDefPostUpdateInterceptor, ViewDefPreDeleteInterceptor,
42 ViewDefPreUpdateInterceptor,
43 },
44 },
45 multi::transaction::read::MultiReadTransaction,
46 transaction::admin::AdminTransaction,
47};
48use tracing::instrument;
49
50pub mod pending;
51pub mod range;
52pub mod read;
53pub mod state;
54pub mod write;
55
56pub enum FlowTransaction {
107 Deferred {
110 version: CommitVersion,
111 pending: Pending,
112 primitive_query: MultiReadTransaction,
113 state_query: MultiReadTransaction,
114 catalog: Catalog,
115 interceptors: Interceptors,
116 },
117
118 Transactional {
121 version: CommitVersion,
122 pending: Pending,
123 base_pending: Pending,
125 primitive_query: MultiReadTransaction,
126 state_query: MultiReadTransaction,
127 catalog: Catalog,
128 interceptors: Interceptors,
129 },
130}
131
132impl FlowTransaction {
133 #[instrument(name = "flow::transaction::deferred", level = "debug", skip(parent, catalog, interceptors), fields(version = version.0))]
138 pub fn deferred(
139 parent: &AdminTransaction,
140 version: CommitVersion,
141 catalog: Catalog,
142 interceptors: Interceptors,
143 ) -> Self {
144 let mut primitive_query = parent.multi.begin_query().unwrap();
145 primitive_query.read_as_of_version_inclusive(version);
146
147 let state_query = parent.multi.begin_query().unwrap();
148 Self::Deferred {
149 version,
150 pending: Pending::new(),
151 primitive_query,
152 state_query,
153 catalog,
154 interceptors,
155 }
156 }
157
158 pub fn deferred_from_parts(
162 version: CommitVersion,
163 pending: Pending,
164 primitive_query: MultiReadTransaction,
165 state_query: MultiReadTransaction,
166 catalog: Catalog,
167 interceptors: Interceptors,
168 ) -> Self {
169 Self::Deferred {
170 version,
171 pending,
172 primitive_query,
173 state_query,
174 catalog,
175 interceptors,
176 }
177 }
178
179 pub fn transactional(
185 version: CommitVersion,
186 pending: Pending,
187 base_pending: Pending,
188 primitive_query: MultiReadTransaction,
189 state_query: MultiReadTransaction,
190 catalog: Catalog,
191 interceptors: Interceptors,
192 ) -> Self {
193 Self::Transactional {
194 version,
195 pending,
196 base_pending,
197 primitive_query,
198 state_query,
199 catalog,
200 interceptors,
201 }
202 }
203
204 pub fn version(&self) -> CommitVersion {
206 match self {
207 Self::Deferred {
208 version,
209 ..
210 } => *version,
211 Self::Transactional {
212 version,
213 ..
214 } => *version,
215 }
216 }
217
218 pub fn take_pending(&mut self) -> Pending {
220 match self {
221 Self::Deferred {
222 pending,
223 ..
224 } => mem::take(pending),
225 Self::Transactional {
226 pending,
227 ..
228 } => mem::take(pending),
229 }
230 }
231
232 #[cfg(test)]
234 pub fn pending(&self) -> &Pending {
235 match self {
236 Self::Deferred {
237 pending,
238 ..
239 } => pending,
240 Self::Transactional {
241 pending,
242 ..
243 } => pending,
244 }
245 }
246
247 pub fn take_view_changes(&mut self) -> pending::ViewChanges {
249 match self {
250 Self::Deferred {
251 pending,
252 ..
253 } => pending.take_view_changes(),
254 Self::Transactional {
255 pending,
256 ..
257 } => pending.take_view_changes(),
258 }
259 }
260
261 pub fn push_view_change(&mut self, change: Change) {
263 match self {
264 Self::Deferred {
265 pending,
266 ..
267 } => pending.push_view_change(change),
268 Self::Transactional {
269 pending,
270 ..
271 } => pending.push_view_change(change),
272 }
273 }
274
275 pub fn update_version(&mut self, new_version: CommitVersion) {
277 match self {
278 Self::Deferred {
279 version,
280 primitive_query,
281 ..
282 } => {
283 *version = new_version;
284 primitive_query.read_as_of_version_inclusive(new_version);
285 }
286 Self::Transactional {
287 version,
288 primitive_query,
289 ..
290 } => {
291 *version = new_version;
292 primitive_query.read_as_of_version_inclusive(new_version);
293 }
294 }
295 }
296
297 pub(crate) fn catalog(&self) -> &Catalog {
299 match self {
300 Self::Deferred {
301 catalog,
302 ..
303 } => catalog,
304 Self::Transactional {
305 catalog,
306 ..
307 } => catalog,
308 }
309 }
310}
311
312macro_rules! interceptor_method {
313 ($method:ident, $field:ident, $trait_name:ident) => {
314 fn $method(&mut self) -> &mut Chain<dyn $trait_name + Send + Sync> {
315 match self {
316 Self::Deferred {
317 interceptors,
318 ..
319 } => &mut interceptors.$field,
320 Self::Transactional {
321 interceptors,
322 ..
323 } => &mut interceptors.$field,
324 }
325 }
326 };
327}
328
329impl WithInterceptors for FlowTransaction {
330 interceptor_method!(table_pre_insert_interceptors, table_pre_insert, TablePreInsertInterceptor);
331 interceptor_method!(table_post_insert_interceptors, table_post_insert, TablePostInsertInterceptor);
332 interceptor_method!(table_pre_update_interceptors, table_pre_update, TablePreUpdateInterceptor);
333 interceptor_method!(table_post_update_interceptors, table_post_update, TablePostUpdateInterceptor);
334 interceptor_method!(table_pre_delete_interceptors, table_pre_delete, TablePreDeleteInterceptor);
335 interceptor_method!(table_post_delete_interceptors, table_post_delete, TablePostDeleteInterceptor);
336
337 interceptor_method!(ringbuffer_pre_insert_interceptors, ringbuffer_pre_insert, RingBufferPreInsertInterceptor);
338 interceptor_method!(
339 ringbuffer_post_insert_interceptors,
340 ringbuffer_post_insert,
341 RingBufferPostInsertInterceptor
342 );
343 interceptor_method!(ringbuffer_pre_update_interceptors, ringbuffer_pre_update, RingBufferPreUpdateInterceptor);
344 interceptor_method!(
345 ringbuffer_post_update_interceptors,
346 ringbuffer_post_update,
347 RingBufferPostUpdateInterceptor
348 );
349 interceptor_method!(ringbuffer_pre_delete_interceptors, ringbuffer_pre_delete, RingBufferPreDeleteInterceptor);
350 interceptor_method!(
351 ringbuffer_post_delete_interceptors,
352 ringbuffer_post_delete,
353 RingBufferPostDeleteInterceptor
354 );
355
356 interceptor_method!(pre_commit_interceptors, pre_commit, PreCommitInterceptor);
357 interceptor_method!(post_commit_interceptors, post_commit, PostCommitInterceptor);
358
359 interceptor_method!(
360 namespace_def_post_create_interceptors,
361 namespace_def_post_create,
362 NamespaceDefPostCreateInterceptor
363 );
364 interceptor_method!(
365 namespace_def_pre_update_interceptors,
366 namespace_def_pre_update,
367 NamespaceDefPreUpdateInterceptor
368 );
369 interceptor_method!(
370 namespace_def_post_update_interceptors,
371 namespace_def_post_update,
372 NamespaceDefPostUpdateInterceptor
373 );
374 interceptor_method!(
375 namespace_def_pre_delete_interceptors,
376 namespace_def_pre_delete,
377 NamespaceDefPreDeleteInterceptor
378 );
379
380 interceptor_method!(table_def_post_create_interceptors, table_def_post_create, TableDefPostCreateInterceptor);
381 interceptor_method!(table_def_pre_update_interceptors, table_def_pre_update, TableDefPreUpdateInterceptor);
382 interceptor_method!(table_def_post_update_interceptors, table_def_post_update, TableDefPostUpdateInterceptor);
383 interceptor_method!(table_def_pre_delete_interceptors, table_def_pre_delete, TableDefPreDeleteInterceptor);
384
385 interceptor_method!(view_pre_insert_interceptors, view_pre_insert, ViewPreInsertInterceptor);
386 interceptor_method!(view_post_insert_interceptors, view_post_insert, ViewPostInsertInterceptor);
387 interceptor_method!(view_pre_update_interceptors, view_pre_update, ViewPreUpdateInterceptor);
388 interceptor_method!(view_post_update_interceptors, view_post_update, ViewPostUpdateInterceptor);
389 interceptor_method!(view_pre_delete_interceptors, view_pre_delete, ViewPreDeleteInterceptor);
390 interceptor_method!(view_post_delete_interceptors, view_post_delete, ViewPostDeleteInterceptor);
391
392 interceptor_method!(view_def_post_create_interceptors, view_def_post_create, ViewDefPostCreateInterceptor);
393 interceptor_method!(view_def_pre_update_interceptors, view_def_pre_update, ViewDefPreUpdateInterceptor);
394 interceptor_method!(view_def_post_update_interceptors, view_def_post_update, ViewDefPostUpdateInterceptor);
395 interceptor_method!(view_def_pre_delete_interceptors, view_def_pre_delete, ViewDefPreDeleteInterceptor);
396
397 interceptor_method!(
398 ringbuffer_def_post_create_interceptors,
399 ringbuffer_def_post_create,
400 RingBufferDefPostCreateInterceptor
401 );
402 interceptor_method!(
403 ringbuffer_def_pre_update_interceptors,
404 ringbuffer_def_pre_update,
405 RingBufferDefPreUpdateInterceptor
406 );
407 interceptor_method!(
408 ringbuffer_def_post_update_interceptors,
409 ringbuffer_def_post_update,
410 RingBufferDefPostUpdateInterceptor
411 );
412 interceptor_method!(
413 ringbuffer_def_pre_delete_interceptors,
414 ringbuffer_def_pre_delete,
415 RingBufferDefPreDeleteInterceptor
416 );
417}