1use std::mem;
5
6use pending::{Pending, PendingWrite};
7use reifydb_catalog::catalog::Catalog;
8use reifydb_core::{
9 common::CommitVersion,
10 interface::{
11 catalog::schema::SchemaId,
12 change::{Change, ChangeOrigin, Diff},
13 },
14};
15use reifydb_transaction::{
16 change_accumulator::ChangeAccumulator,
17 interceptor::{
18 WithInterceptors,
19 authentication::{AuthenticationPostCreateInterceptor, AuthenticationPreDeleteInterceptor},
20 chain::InterceptorChain as Chain,
21 dictionary::{
22 DictionaryPostCreateInterceptor, DictionaryPostUpdateInterceptor,
23 DictionaryPreDeleteInterceptor, DictionaryPreUpdateInterceptor,
24 },
25 dictionary_row::{
26 DictionaryRowPostDeleteInterceptor, DictionaryRowPostInsertInterceptor,
27 DictionaryRowPostUpdateInterceptor, DictionaryRowPreDeleteInterceptor,
28 DictionaryRowPreInsertInterceptor, DictionaryRowPreUpdateInterceptor,
29 },
30 granted_role::{GrantedRolePostCreateInterceptor, GrantedRolePreDeleteInterceptor},
31 identity::{
32 IdentityPostCreateInterceptor, IdentityPostUpdateInterceptor, IdentityPreDeleteInterceptor,
33 IdentityPreUpdateInterceptor,
34 },
35 interceptors::Interceptors,
36 namespace::{
37 NamespacePostCreateInterceptor, NamespacePostUpdateInterceptor, NamespacePreDeleteInterceptor,
38 NamespacePreUpdateInterceptor,
39 },
40 ringbuffer::{
41 RingBufferPostCreateInterceptor, RingBufferPostUpdateInterceptor,
42 RingBufferPreDeleteInterceptor, RingBufferPreUpdateInterceptor,
43 },
44 ringbuffer_row::{
45 RingBufferRowPostDeleteInterceptor, RingBufferRowPostInsertInterceptor,
46 RingBufferRowPostUpdateInterceptor, RingBufferRowPreDeleteInterceptor,
47 RingBufferRowPreInsertInterceptor, RingBufferRowPreUpdateInterceptor,
48 },
49 role::{
50 RolePostCreateInterceptor, RolePostUpdateInterceptor, RolePreDeleteInterceptor,
51 RolePreUpdateInterceptor,
52 },
53 series::{
54 SeriesPostCreateInterceptor, SeriesPostUpdateInterceptor, SeriesPreDeleteInterceptor,
55 SeriesPreUpdateInterceptor,
56 },
57 series_row::{
58 SeriesRowPostDeleteInterceptor, SeriesRowPostInsertInterceptor, SeriesRowPostUpdateInterceptor,
59 SeriesRowPreDeleteInterceptor, SeriesRowPreInsertInterceptor, SeriesRowPreUpdateInterceptor,
60 },
61 table::{
62 TablePostCreateInterceptor, TablePostUpdateInterceptor, TablePreDeleteInterceptor,
63 TablePreUpdateInterceptor,
64 },
65 table_row::{
66 TableRowPostDeleteInterceptor, TableRowPostInsertInterceptor, TableRowPostUpdateInterceptor,
67 TableRowPreDeleteInterceptor, TableRowPreInsertInterceptor, TableRowPreUpdateInterceptor,
68 },
69 transaction::{PostCommitInterceptor, PreCommitInterceptor},
70 view::{
71 ViewPostCreateInterceptor, ViewPostUpdateInterceptor, ViewPreDeleteInterceptor,
72 ViewPreUpdateInterceptor,
73 },
74 view_row::{
75 ViewRowPostDeleteInterceptor, ViewRowPostInsertInterceptor, ViewRowPostUpdateInterceptor,
76 ViewRowPreDeleteInterceptor, ViewRowPreInsertInterceptor, ViewRowPreUpdateInterceptor,
77 },
78 },
79 multi::transaction::read::MultiReadTransaction,
80 transaction::admin::AdminTransaction,
81};
82use tracing::instrument;
83
84pub mod pending;
85pub mod range;
86pub mod read;
87pub mod state;
88pub mod write;
89
90pub struct FlowTransactionInner {
92 pub version: CommitVersion,
93 pub pending: Pending,
94 pub primitive_query: MultiReadTransaction,
95 pub state_query: MultiReadTransaction,
96 pub catalog: Catalog,
97 pub interceptors: Interceptors,
98 pub accumulator: ChangeAccumulator,
99}
100
101pub enum FlowTransaction {
152 Deferred {
155 inner: FlowTransactionInner,
156 },
157
158 Transactional {
161 inner: FlowTransactionInner,
162 base_pending: Pending,
164 },
165}
166
167impl FlowTransaction {
168 fn inner(&self) -> &FlowTransactionInner {
169 match self {
170 Self::Deferred {
171 inner,
172 ..
173 }
174 | Self::Transactional {
175 inner,
176 ..
177 } => inner,
178 }
179 }
180
181 fn inner_mut(&mut self) -> &mut FlowTransactionInner {
182 match self {
183 Self::Deferred {
184 inner,
185 ..
186 }
187 | Self::Transactional {
188 inner,
189 ..
190 } => inner,
191 }
192 }
193
194 #[instrument(name = "flow::transaction::deferred", level = "debug", skip(parent, catalog, interceptors), fields(version = version.0))]
199 pub fn deferred(
200 parent: &AdminTransaction,
201 version: CommitVersion,
202 catalog: Catalog,
203 interceptors: Interceptors,
204 ) -> Self {
205 let mut primitive_query = parent.multi.begin_query().unwrap();
206 primitive_query.read_as_of_version_inclusive(version);
207
208 let state_query = parent.multi.begin_query().unwrap();
209 Self::Deferred {
210 inner: FlowTransactionInner {
211 version,
212 pending: Pending::new(),
213 primitive_query,
214 state_query,
215 catalog,
216 interceptors,
217 accumulator: ChangeAccumulator::new(),
218 },
219 }
220 }
221
222 pub fn deferred_from_parts(
226 version: CommitVersion,
227 pending: Pending,
228 primitive_query: MultiReadTransaction,
229 state_query: MultiReadTransaction,
230 catalog: Catalog,
231 interceptors: Interceptors,
232 ) -> Self {
233 Self::Deferred {
234 inner: FlowTransactionInner {
235 version,
236 pending,
237 primitive_query,
238 state_query,
239 catalog,
240 interceptors,
241 accumulator: ChangeAccumulator::new(),
242 },
243 }
244 }
245
246 pub fn transactional(
252 version: CommitVersion,
253 pending: Pending,
254 base_pending: Pending,
255 primitive_query: MultiReadTransaction,
256 state_query: MultiReadTransaction,
257 catalog: Catalog,
258 interceptors: Interceptors,
259 ) -> Self {
260 Self::Transactional {
261 inner: FlowTransactionInner {
262 version,
263 pending,
264 primitive_query,
265 state_query,
266 catalog,
267 interceptors,
268 accumulator: ChangeAccumulator::new(),
269 },
270 base_pending,
271 }
272 }
273
274 pub fn version(&self) -> CommitVersion {
276 self.inner().version
277 }
278
279 pub fn take_pending(&mut self) -> Pending {
281 mem::take(&mut self.inner_mut().pending)
282 }
283
284 pub fn track_flow_change(&mut self, change: Change) {
286 if let ChangeOrigin::Schema(id) = change.origin {
287 for diff in change.diffs {
288 self.inner_mut().accumulator.track(id, diff);
289 }
290 }
291 }
292
293 pub fn take_accumulator_entries(&mut self) -> Vec<(SchemaId, Diff)> {
295 let acc = &mut self.inner_mut().accumulator;
296 let entries: Vec<_> = acc.entries_from(0).to_vec();
297 acc.clear();
298 entries
299 }
300
301 #[cfg(test)]
303 pub fn pending(&self) -> &Pending {
304 &self.inner().pending
305 }
306
307 pub fn update_version(&mut self, new_version: CommitVersion) {
309 let inner = self.inner_mut();
310 inner.version = new_version;
311 inner.primitive_query.read_as_of_version_inclusive(new_version);
312 }
313
314 pub fn catalog(&self) -> &Catalog {
316 &self.inner().catalog
317 }
318}
319
320macro_rules! interceptor_method {
321 ($method:ident, $field:ident, $trait_name:ident) => {
322 fn $method(&mut self) -> &mut Chain<dyn $trait_name + Send + Sync> {
323 &mut self.inner_mut().interceptors.$field
324 }
325 };
326}
327
328impl WithInterceptors for FlowTransaction {
329 interceptor_method!(table_row_pre_insert_interceptors, table_row_pre_insert, TableRowPreInsertInterceptor);
330 interceptor_method!(table_row_post_insert_interceptors, table_row_post_insert, TableRowPostInsertInterceptor);
331 interceptor_method!(table_row_pre_update_interceptors, table_row_pre_update, TableRowPreUpdateInterceptor);
332 interceptor_method!(table_row_post_update_interceptors, table_row_post_update, TableRowPostUpdateInterceptor);
333 interceptor_method!(table_row_pre_delete_interceptors, table_row_pre_delete, TableRowPreDeleteInterceptor);
334 interceptor_method!(table_row_post_delete_interceptors, table_row_post_delete, TableRowPostDeleteInterceptor);
335
336 interceptor_method!(
337 ringbuffer_row_pre_insert_interceptors,
338 ringbuffer_row_pre_insert,
339 RingBufferRowPreInsertInterceptor
340 );
341 interceptor_method!(
342 ringbuffer_row_post_insert_interceptors,
343 ringbuffer_row_post_insert,
344 RingBufferRowPostInsertInterceptor
345 );
346 interceptor_method!(
347 ringbuffer_row_pre_update_interceptors,
348 ringbuffer_row_pre_update,
349 RingBufferRowPreUpdateInterceptor
350 );
351 interceptor_method!(
352 ringbuffer_row_post_update_interceptors,
353 ringbuffer_row_post_update,
354 RingBufferRowPostUpdateInterceptor
355 );
356 interceptor_method!(
357 ringbuffer_row_pre_delete_interceptors,
358 ringbuffer_row_pre_delete,
359 RingBufferRowPreDeleteInterceptor
360 );
361 interceptor_method!(
362 ringbuffer_row_post_delete_interceptors,
363 ringbuffer_row_post_delete,
364 RingBufferRowPostDeleteInterceptor
365 );
366
367 interceptor_method!(pre_commit_interceptors, pre_commit, PreCommitInterceptor);
368 interceptor_method!(post_commit_interceptors, post_commit, PostCommitInterceptor);
369
370 interceptor_method!(namespace_post_create_interceptors, namespace_post_create, NamespacePostCreateInterceptor);
371 interceptor_method!(namespace_pre_update_interceptors, namespace_pre_update, NamespacePreUpdateInterceptor);
372 interceptor_method!(namespace_post_update_interceptors, namespace_post_update, NamespacePostUpdateInterceptor);
373 interceptor_method!(namespace_pre_delete_interceptors, namespace_pre_delete, NamespacePreDeleteInterceptor);
374
375 interceptor_method!(table_post_create_interceptors, table_post_create, TablePostCreateInterceptor);
376 interceptor_method!(table_pre_update_interceptors, table_pre_update, TablePreUpdateInterceptor);
377 interceptor_method!(table_post_update_interceptors, table_post_update, TablePostUpdateInterceptor);
378 interceptor_method!(table_pre_delete_interceptors, table_pre_delete, TablePreDeleteInterceptor);
379
380 interceptor_method!(view_row_pre_insert_interceptors, view_row_pre_insert, ViewRowPreInsertInterceptor);
381 interceptor_method!(view_row_post_insert_interceptors, view_row_post_insert, ViewRowPostInsertInterceptor);
382 interceptor_method!(view_row_pre_update_interceptors, view_row_pre_update, ViewRowPreUpdateInterceptor);
383 interceptor_method!(view_row_post_update_interceptors, view_row_post_update, ViewRowPostUpdateInterceptor);
384 interceptor_method!(view_row_pre_delete_interceptors, view_row_pre_delete, ViewRowPreDeleteInterceptor);
385 interceptor_method!(view_row_post_delete_interceptors, view_row_post_delete, ViewRowPostDeleteInterceptor);
386
387 interceptor_method!(view_post_create_interceptors, view_post_create, ViewPostCreateInterceptor);
388 interceptor_method!(view_pre_update_interceptors, view_pre_update, ViewPreUpdateInterceptor);
389 interceptor_method!(view_post_update_interceptors, view_post_update, ViewPostUpdateInterceptor);
390 interceptor_method!(view_pre_delete_interceptors, view_pre_delete, ViewPreDeleteInterceptor);
391
392 interceptor_method!(
393 ringbuffer_post_create_interceptors,
394 ringbuffer_post_create,
395 RingBufferPostCreateInterceptor
396 );
397 interceptor_method!(ringbuffer_pre_update_interceptors, ringbuffer_pre_update, RingBufferPreUpdateInterceptor);
398 interceptor_method!(
399 ringbuffer_post_update_interceptors,
400 ringbuffer_post_update,
401 RingBufferPostUpdateInterceptor
402 );
403 interceptor_method!(ringbuffer_pre_delete_interceptors, ringbuffer_pre_delete, RingBufferPreDeleteInterceptor);
404
405 interceptor_method!(
406 dictionary_row_pre_insert_interceptors,
407 dictionary_row_pre_insert,
408 DictionaryRowPreInsertInterceptor
409 );
410 interceptor_method!(
411 dictionary_row_post_insert_interceptors,
412 dictionary_row_post_insert,
413 DictionaryRowPostInsertInterceptor
414 );
415 interceptor_method!(
416 dictionary_row_pre_update_interceptors,
417 dictionary_row_pre_update,
418 DictionaryRowPreUpdateInterceptor
419 );
420 interceptor_method!(
421 dictionary_row_post_update_interceptors,
422 dictionary_row_post_update,
423 DictionaryRowPostUpdateInterceptor
424 );
425 interceptor_method!(
426 dictionary_row_pre_delete_interceptors,
427 dictionary_row_pre_delete,
428 DictionaryRowPreDeleteInterceptor
429 );
430 interceptor_method!(
431 dictionary_row_post_delete_interceptors,
432 dictionary_row_post_delete,
433 DictionaryRowPostDeleteInterceptor
434 );
435
436 interceptor_method!(
437 dictionary_post_create_interceptors,
438 dictionary_post_create,
439 DictionaryPostCreateInterceptor
440 );
441 interceptor_method!(dictionary_pre_update_interceptors, dictionary_pre_update, DictionaryPreUpdateInterceptor);
442 interceptor_method!(
443 dictionary_post_update_interceptors,
444 dictionary_post_update,
445 DictionaryPostUpdateInterceptor
446 );
447 interceptor_method!(dictionary_pre_delete_interceptors, dictionary_pre_delete, DictionaryPreDeleteInterceptor);
448
449 interceptor_method!(series_row_pre_insert_interceptors, series_row_pre_insert, SeriesRowPreInsertInterceptor);
450 interceptor_method!(
451 series_row_post_insert_interceptors,
452 series_row_post_insert,
453 SeriesRowPostInsertInterceptor
454 );
455 interceptor_method!(series_row_pre_update_interceptors, series_row_pre_update, SeriesRowPreUpdateInterceptor);
456 interceptor_method!(
457 series_row_post_update_interceptors,
458 series_row_post_update,
459 SeriesRowPostUpdateInterceptor
460 );
461 interceptor_method!(series_row_pre_delete_interceptors, series_row_pre_delete, SeriesRowPreDeleteInterceptor);
462 interceptor_method!(
463 series_row_post_delete_interceptors,
464 series_row_post_delete,
465 SeriesRowPostDeleteInterceptor
466 );
467
468 interceptor_method!(series_post_create_interceptors, series_post_create, SeriesPostCreateInterceptor);
469 interceptor_method!(series_pre_update_interceptors, series_pre_update, SeriesPreUpdateInterceptor);
470 interceptor_method!(series_post_update_interceptors, series_post_update, SeriesPostUpdateInterceptor);
471 interceptor_method!(series_pre_delete_interceptors, series_pre_delete, SeriesPreDeleteInterceptor);
472 interceptor_method!(identity_post_create_interceptors, identity_post_create, IdentityPostCreateInterceptor);
473 interceptor_method!(identity_pre_update_interceptors, identity_pre_update, IdentityPreUpdateInterceptor);
474 interceptor_method!(identity_post_update_interceptors, identity_post_update, IdentityPostUpdateInterceptor);
475 interceptor_method!(identity_pre_delete_interceptors, identity_pre_delete, IdentityPreDeleteInterceptor);
476 interceptor_method!(role_post_create_interceptors, role_post_create, RolePostCreateInterceptor);
477 interceptor_method!(role_pre_update_interceptors, role_pre_update, RolePreUpdateInterceptor);
478 interceptor_method!(role_post_update_interceptors, role_post_update, RolePostUpdateInterceptor);
479 interceptor_method!(role_pre_delete_interceptors, role_pre_delete, RolePreDeleteInterceptor);
480 interceptor_method!(
481 granted_role_post_create_interceptors,
482 granted_role_post_create,
483 GrantedRolePostCreateInterceptor
484 );
485 interceptor_method!(
486 granted_role_pre_delete_interceptors,
487 granted_role_pre_delete,
488 GrantedRolePreDeleteInterceptor
489 );
490 interceptor_method!(
491 authentication_post_create_interceptors,
492 authentication_post_create,
493 AuthenticationPostCreateInterceptor
494 );
495 interceptor_method!(
496 authentication_pre_delete_interceptors,
497 authentication_pre_delete,
498 AuthenticationPreDeleteInterceptor
499 );
500}