1use std::mem;
5
6use pending::{Pending, PendingWrite};
7use reifydb_catalog::catalog::Catalog;
8use reifydb_core::{common::CommitVersion, interface::change::Change, testing::TestingContext};
9use reifydb_transaction::{
10 interceptor::{
11 WithInterceptors,
12 chain::InterceptorChain as Chain,
13 interceptors::Interceptors,
14 namespace::{
15 NamespacePostCreateInterceptor, NamespacePostUpdateInterceptor, NamespacePreDeleteInterceptor,
16 NamespacePreUpdateInterceptor,
17 },
18 ringbuffer::{
19 RingBufferPostDeleteInterceptor, RingBufferPostInsertInterceptor,
20 RingBufferPostUpdateInterceptor, RingBufferPreDeleteInterceptor,
21 RingBufferPreInsertInterceptor, RingBufferPreUpdateInterceptor,
22 },
23 ringbuffer_def::{
24 RingBufferDefPostCreateInterceptor, RingBufferDefPostUpdateInterceptor,
25 RingBufferDefPreDeleteInterceptor, RingBufferDefPreUpdateInterceptor,
26 },
27 table::{
28 TablePostDeleteInterceptor, TablePostInsertInterceptor, TablePostUpdateInterceptor,
29 TablePreDeleteInterceptor, TablePreInsertInterceptor, TablePreUpdateInterceptor,
30 },
31 table_def::{
32 TableDefPostCreateInterceptor, TableDefPostUpdateInterceptor, TableDefPreDeleteInterceptor,
33 TableDefPreUpdateInterceptor,
34 },
35 transaction::{PostCommitInterceptor, PreCommitInterceptor},
36 view::{
37 ViewPostDeleteInterceptor, ViewPostInsertInterceptor, ViewPostUpdateInterceptor,
38 ViewPreDeleteInterceptor, ViewPreInsertInterceptor, ViewPreUpdateInterceptor,
39 },
40 view_def::{
41 ViewDefPostCreateInterceptor, ViewDefPostUpdateInterceptor, ViewDefPreDeleteInterceptor,
42 ViewDefPreUpdateInterceptor,
43 },
44 },
45 multi::transaction::read::MultiReadTransaction,
46 transaction::admin::AdminTransaction,
47};
48use tracing::instrument;
49
50pub mod pending;
51pub mod range;
52pub mod read;
53pub mod state;
54pub mod write;
55
56pub enum FlowTransaction {
107 Deferred {
110 version: CommitVersion,
111 pending: Pending,
112 primitive_query: MultiReadTransaction,
113 state_query: MultiReadTransaction,
114 catalog: Catalog,
115 interceptors: Interceptors,
116 testing: Option<TestingContext>,
117 },
118
119 Transactional {
122 version: CommitVersion,
123 pending: Pending,
124 base_pending: Pending,
126 primitive_query: MultiReadTransaction,
127 state_query: MultiReadTransaction,
128 catalog: Catalog,
129 interceptors: Interceptors,
130 testing: Option<TestingContext>,
131 },
132}
133
134impl FlowTransaction {
135 #[instrument(name = "flow::transaction::deferred", level = "debug", skip(parent, catalog, interceptors), fields(version = version.0))]
140 pub fn deferred(
141 parent: &AdminTransaction,
142 version: CommitVersion,
143 catalog: Catalog,
144 interceptors: Interceptors,
145 ) -> Self {
146 let mut primitive_query = parent.multi.begin_query().unwrap();
147 primitive_query.read_as_of_version_inclusive(version);
148
149 let state_query = parent.multi.begin_query().unwrap();
150 Self::Deferred {
151 version,
152 pending: Pending::new(),
153 primitive_query,
154 state_query,
155 catalog,
156 interceptors,
157 testing: None,
158 }
159 }
160
161 pub fn deferred_from_parts(
165 version: CommitVersion,
166 pending: Pending,
167 primitive_query: MultiReadTransaction,
168 state_query: MultiReadTransaction,
169 catalog: Catalog,
170 interceptors: Interceptors,
171 ) -> Self {
172 Self::Deferred {
173 version,
174 pending,
175 primitive_query,
176 state_query,
177 catalog,
178 interceptors,
179 testing: None,
180 }
181 }
182
183 pub fn transactional(
189 version: CommitVersion,
190 pending: Pending,
191 base_pending: Pending,
192 primitive_query: MultiReadTransaction,
193 state_query: MultiReadTransaction,
194 catalog: Catalog,
195 interceptors: Interceptors,
196 testing: Option<TestingContext>,
197 ) -> Self {
198 Self::Transactional {
199 version,
200 pending,
201 base_pending,
202 primitive_query,
203 state_query,
204 catalog,
205 interceptors,
206 testing,
207 }
208 }
209
210 pub fn version(&self) -> CommitVersion {
212 match self {
213 Self::Deferred {
214 version,
215 ..
216 } => *version,
217 Self::Transactional {
218 version,
219 ..
220 } => *version,
221 }
222 }
223
224 pub fn take_pending(&mut self) -> Pending {
226 match self {
227 Self::Deferred {
228 pending,
229 ..
230 } => mem::take(pending),
231 Self::Transactional {
232 pending,
233 ..
234 } => mem::take(pending),
235 }
236 }
237
238 #[cfg(test)]
240 pub fn pending(&self) -> &Pending {
241 match self {
242 Self::Deferred {
243 pending,
244 ..
245 } => pending,
246 Self::Transactional {
247 pending,
248 ..
249 } => pending,
250 }
251 }
252
253 pub fn take_view_changes(&mut self) -> pending::ViewChanges {
255 match self {
256 Self::Deferred {
257 pending,
258 ..
259 } => pending.take_view_changes(),
260 Self::Transactional {
261 pending,
262 ..
263 } => pending.take_view_changes(),
264 }
265 }
266
267 pub fn push_view_change(&mut self, change: Change) {
269 match self {
270 Self::Deferred {
271 pending,
272 ..
273 } => pending.push_view_change(change),
274 Self::Transactional {
275 pending,
276 ..
277 } => pending.push_view_change(change),
278 }
279 }
280
281 pub fn update_version(&mut self, new_version: CommitVersion) {
283 match self {
284 Self::Deferred {
285 version,
286 primitive_query,
287 ..
288 } => {
289 *version = new_version;
290 primitive_query.read_as_of_version_inclusive(new_version);
291 }
292 Self::Transactional {
293 version,
294 primitive_query,
295 ..
296 } => {
297 *version = new_version;
298 primitive_query.read_as_of_version_inclusive(new_version);
299 }
300 }
301 }
302
303 pub fn catalog(&self) -> &Catalog {
305 match self {
306 Self::Deferred {
307 catalog,
308 ..
309 } => catalog,
310 Self::Transactional {
311 catalog,
312 ..
313 } => catalog,
314 }
315 }
316
317 pub fn testing_mut(&mut self) -> Option<&mut TestingContext> {
319 match self {
320 Self::Deferred {
321 testing,
322 ..
323 } => testing.as_mut(),
324 Self::Transactional {
325 testing,
326 ..
327 } => testing.as_mut(),
328 }
329 }
330
331 pub fn take_testing(&mut self) -> Option<TestingContext> {
333 match self {
334 Self::Deferred {
335 testing,
336 ..
337 } => testing.take(),
338 Self::Transactional {
339 testing,
340 ..
341 } => testing.take(),
342 }
343 }
344}
345
346macro_rules! interceptor_method {
347 ($method:ident, $field:ident, $trait_name:ident) => {
348 fn $method(&mut self) -> &mut Chain<dyn $trait_name + Send + Sync> {
349 match self {
350 Self::Deferred {
351 interceptors,
352 ..
353 } => &mut interceptors.$field,
354 Self::Transactional {
355 interceptors,
356 ..
357 } => &mut interceptors.$field,
358 }
359 }
360 };
361}
362
363impl WithInterceptors for FlowTransaction {
364 interceptor_method!(table_pre_insert_interceptors, table_pre_insert, TablePreInsertInterceptor);
365 interceptor_method!(table_post_insert_interceptors, table_post_insert, TablePostInsertInterceptor);
366 interceptor_method!(table_pre_update_interceptors, table_pre_update, TablePreUpdateInterceptor);
367 interceptor_method!(table_post_update_interceptors, table_post_update, TablePostUpdateInterceptor);
368 interceptor_method!(table_pre_delete_interceptors, table_pre_delete, TablePreDeleteInterceptor);
369 interceptor_method!(table_post_delete_interceptors, table_post_delete, TablePostDeleteInterceptor);
370
371 interceptor_method!(ringbuffer_pre_insert_interceptors, ringbuffer_pre_insert, RingBufferPreInsertInterceptor);
372 interceptor_method!(
373 ringbuffer_post_insert_interceptors,
374 ringbuffer_post_insert,
375 RingBufferPostInsertInterceptor
376 );
377 interceptor_method!(ringbuffer_pre_update_interceptors, ringbuffer_pre_update, RingBufferPreUpdateInterceptor);
378 interceptor_method!(
379 ringbuffer_post_update_interceptors,
380 ringbuffer_post_update,
381 RingBufferPostUpdateInterceptor
382 );
383 interceptor_method!(ringbuffer_pre_delete_interceptors, ringbuffer_pre_delete, RingBufferPreDeleteInterceptor);
384 interceptor_method!(
385 ringbuffer_post_delete_interceptors,
386 ringbuffer_post_delete,
387 RingBufferPostDeleteInterceptor
388 );
389
390 interceptor_method!(pre_commit_interceptors, pre_commit, PreCommitInterceptor);
391 interceptor_method!(post_commit_interceptors, post_commit, PostCommitInterceptor);
392
393 interceptor_method!(namespace_post_create_interceptors, namespace_post_create, NamespacePostCreateInterceptor);
394 interceptor_method!(namespace_pre_update_interceptors, namespace_pre_update, NamespacePreUpdateInterceptor);
395 interceptor_method!(namespace_post_update_interceptors, namespace_post_update, NamespacePostUpdateInterceptor);
396 interceptor_method!(namespace_pre_delete_interceptors, namespace_pre_delete, NamespacePreDeleteInterceptor);
397
398 interceptor_method!(table_def_post_create_interceptors, table_def_post_create, TableDefPostCreateInterceptor);
399 interceptor_method!(table_def_pre_update_interceptors, table_def_pre_update, TableDefPreUpdateInterceptor);
400 interceptor_method!(table_def_post_update_interceptors, table_def_post_update, TableDefPostUpdateInterceptor);
401 interceptor_method!(table_def_pre_delete_interceptors, table_def_pre_delete, TableDefPreDeleteInterceptor);
402
403 interceptor_method!(view_pre_insert_interceptors, view_pre_insert, ViewPreInsertInterceptor);
404 interceptor_method!(view_post_insert_interceptors, view_post_insert, ViewPostInsertInterceptor);
405 interceptor_method!(view_pre_update_interceptors, view_pre_update, ViewPreUpdateInterceptor);
406 interceptor_method!(view_post_update_interceptors, view_post_update, ViewPostUpdateInterceptor);
407 interceptor_method!(view_pre_delete_interceptors, view_pre_delete, ViewPreDeleteInterceptor);
408 interceptor_method!(view_post_delete_interceptors, view_post_delete, ViewPostDeleteInterceptor);
409
410 interceptor_method!(view_def_post_create_interceptors, view_def_post_create, ViewDefPostCreateInterceptor);
411 interceptor_method!(view_def_pre_update_interceptors, view_def_pre_update, ViewDefPreUpdateInterceptor);
412 interceptor_method!(view_def_post_update_interceptors, view_def_post_update, ViewDefPostUpdateInterceptor);
413 interceptor_method!(view_def_pre_delete_interceptors, view_def_pre_delete, ViewDefPreDeleteInterceptor);
414
415 interceptor_method!(
416 ringbuffer_def_post_create_interceptors,
417 ringbuffer_def_post_create,
418 RingBufferDefPostCreateInterceptor
419 );
420 interceptor_method!(
421 ringbuffer_def_pre_update_interceptors,
422 ringbuffer_def_pre_update,
423 RingBufferDefPreUpdateInterceptor
424 );
425 interceptor_method!(
426 ringbuffer_def_post_update_interceptors,
427 ringbuffer_def_post_update,
428 RingBufferDefPostUpdateInterceptor
429 );
430 interceptor_method!(
431 ringbuffer_def_pre_delete_interceptors,
432 ringbuffer_def_pre_delete,
433 RingBufferDefPreDeleteInterceptor
434 );
435}