reifydb_sub_flow/transaction/
mod.rs1use pending::{Pending, PendingWrites};
5use reifydb_catalog::catalog::Catalog;
6use reifydb_core::common::CommitVersion;
7use reifydb_transaction::{
8 interceptor::{
9 WithInterceptors,
10 chain::InterceptorChain as Chain,
11 interceptors::Interceptors,
12 namespace_def::{
13 NamespaceDefPostCreateInterceptor, NamespaceDefPostUpdateInterceptor,
14 NamespaceDefPreDeleteInterceptor, NamespaceDefPreUpdateInterceptor,
15 },
16 ringbuffer::{
17 RingBufferPostDeleteInterceptor, RingBufferPostInsertInterceptor,
18 RingBufferPostUpdateInterceptor, RingBufferPreDeleteInterceptor,
19 RingBufferPreInsertInterceptor, RingBufferPreUpdateInterceptor,
20 },
21 ringbuffer_def::{
22 RingBufferDefPostCreateInterceptor, RingBufferDefPostUpdateInterceptor,
23 RingBufferDefPreDeleteInterceptor, RingBufferDefPreUpdateInterceptor,
24 },
25 table::{
26 TablePostDeleteInterceptor, TablePostInsertInterceptor, TablePostUpdateInterceptor,
27 TablePreDeleteInterceptor, TablePreInsertInterceptor, TablePreUpdateInterceptor,
28 },
29 table_def::{
30 TableDefPostCreateInterceptor, TableDefPostUpdateInterceptor, TableDefPreDeleteInterceptor,
31 TableDefPreUpdateInterceptor,
32 },
33 transaction::{PostCommitInterceptor, PreCommitInterceptor},
34 view::{
35 ViewPostDeleteInterceptor, ViewPostInsertInterceptor, ViewPostUpdateInterceptor,
36 ViewPreDeleteInterceptor, ViewPreInsertInterceptor, ViewPreUpdateInterceptor,
37 },
38 view_def::{
39 ViewDefPostCreateInterceptor, ViewDefPostUpdateInterceptor, ViewDefPreDeleteInterceptor,
40 ViewDefPreUpdateInterceptor,
41 },
42 },
43 multi::transaction::read::MultiReadTransaction,
44 transaction::admin::AdminTransaction,
45};
46use tracing::instrument;
47
48pub mod pending;
49pub mod range;
50pub mod read;
51pub mod state;
52pub mod write;
53
54pub struct FlowTransaction {
149 pub(crate) version: CommitVersion,
156
157 pub(crate) pending: PendingWrites,
162
163 pub(crate) primitive_query: MultiReadTransaction,
168
169 pub(crate) state_query: MultiReadTransaction,
175
176 pub(crate) catalog: Catalog,
178
179 pub(crate) interceptors: Interceptors,
181}
182
183impl FlowTransaction {
184 #[instrument(name = "flow::transaction::new", level = "debug", skip(parent, catalog, interceptors), fields(version = version.0))]
195 pub fn new(
196 parent: &AdminTransaction,
197 version: CommitVersion,
198 catalog: Catalog,
199 interceptors: Interceptors,
200 ) -> Self {
201 let mut primitive_query = parent.multi.begin_query().unwrap();
202 primitive_query.read_as_of_version_inclusive(version);
203
204 let state_query = parent.multi.begin_query().unwrap();
205 Self {
206 version,
207 pending: PendingWrites::new(),
208 primitive_query,
209 state_query,
210 catalog,
211 interceptors,
212 }
213 }
214
215 pub fn update_version(&mut self, new_version: CommitVersion) {
217 self.version = new_version;
218 self.primitive_query.read_as_of_version_inclusive(new_version);
219 }
220
221 pub(crate) fn catalog(&self) -> &Catalog {
223 &self.catalog
224 }
225}
226
227impl WithInterceptors for FlowTransaction {
228 fn table_pre_insert_interceptors(&mut self) -> &mut Chain<dyn TablePreInsertInterceptor + Send + Sync> {
229 &mut self.interceptors.table_pre_insert
230 }
231
232 fn table_post_insert_interceptors(&mut self) -> &mut Chain<dyn TablePostInsertInterceptor + Send + Sync> {
233 &mut self.interceptors.table_post_insert
234 }
235
236 fn table_pre_update_interceptors(&mut self) -> &mut Chain<dyn TablePreUpdateInterceptor + Send + Sync> {
237 &mut self.interceptors.table_pre_update
238 }
239
240 fn table_post_update_interceptors(&mut self) -> &mut Chain<dyn TablePostUpdateInterceptor + Send + Sync> {
241 &mut self.interceptors.table_post_update
242 }
243
244 fn table_pre_delete_interceptors(&mut self) -> &mut Chain<dyn TablePreDeleteInterceptor + Send + Sync> {
245 &mut self.interceptors.table_pre_delete
246 }
247
248 fn table_post_delete_interceptors(&mut self) -> &mut Chain<dyn TablePostDeleteInterceptor + Send + Sync> {
249 &mut self.interceptors.table_post_delete
250 }
251
252 fn ringbuffer_pre_insert_interceptors(
253 &mut self,
254 ) -> &mut Chain<dyn RingBufferPreInsertInterceptor + Send + Sync> {
255 &mut self.interceptors.ringbuffer_pre_insert
256 }
257
258 fn ringbuffer_post_insert_interceptors(
259 &mut self,
260 ) -> &mut Chain<dyn RingBufferPostInsertInterceptor + Send + Sync> {
261 &mut self.interceptors.ringbuffer_post_insert
262 }
263
264 fn ringbuffer_pre_update_interceptors(
265 &mut self,
266 ) -> &mut Chain<dyn RingBufferPreUpdateInterceptor + Send + Sync> {
267 &mut self.interceptors.ringbuffer_pre_update
268 }
269
270 fn ringbuffer_post_update_interceptors(
271 &mut self,
272 ) -> &mut Chain<dyn RingBufferPostUpdateInterceptor + Send + Sync> {
273 &mut self.interceptors.ringbuffer_post_update
274 }
275
276 fn ringbuffer_pre_delete_interceptors(
277 &mut self,
278 ) -> &mut Chain<dyn RingBufferPreDeleteInterceptor + Send + Sync> {
279 &mut self.interceptors.ringbuffer_pre_delete
280 }
281
282 fn ringbuffer_post_delete_interceptors(
283 &mut self,
284 ) -> &mut Chain<dyn RingBufferPostDeleteInterceptor + Send + Sync> {
285 &mut self.interceptors.ringbuffer_post_delete
286 }
287
288 fn pre_commit_interceptors(&mut self) -> &mut Chain<dyn PreCommitInterceptor + Send + Sync> {
289 &mut self.interceptors.pre_commit
290 }
291
292 fn post_commit_interceptors(&mut self) -> &mut Chain<dyn PostCommitInterceptor + Send + Sync> {
293 &mut self.interceptors.post_commit
294 }
295
296 fn namespace_def_post_create_interceptors(
297 &mut self,
298 ) -> &mut Chain<dyn NamespaceDefPostCreateInterceptor + Send + Sync> {
299 &mut self.interceptors.namespace_def_post_create
300 }
301
302 fn namespace_def_pre_update_interceptors(
303 &mut self,
304 ) -> &mut Chain<dyn NamespaceDefPreUpdateInterceptor + Send + Sync> {
305 &mut self.interceptors.namespace_def_pre_update
306 }
307
308 fn namespace_def_post_update_interceptors(
309 &mut self,
310 ) -> &mut Chain<dyn NamespaceDefPostUpdateInterceptor + Send + Sync> {
311 &mut self.interceptors.namespace_def_post_update
312 }
313
314 fn namespace_def_pre_delete_interceptors(
315 &mut self,
316 ) -> &mut Chain<dyn NamespaceDefPreDeleteInterceptor + Send + Sync> {
317 &mut self.interceptors.namespace_def_pre_delete
318 }
319
320 fn table_def_post_create_interceptors(
321 &mut self,
322 ) -> &mut Chain<dyn TableDefPostCreateInterceptor + Send + Sync> {
323 &mut self.interceptors.table_def_post_create
324 }
325
326 fn table_def_pre_update_interceptors(&mut self) -> &mut Chain<dyn TableDefPreUpdateInterceptor + Send + Sync> {
327 &mut self.interceptors.table_def_pre_update
328 }
329
330 fn table_def_post_update_interceptors(
331 &mut self,
332 ) -> &mut Chain<dyn TableDefPostUpdateInterceptor + Send + Sync> {
333 &mut self.interceptors.table_def_post_update
334 }
335
336 fn table_def_pre_delete_interceptors(&mut self) -> &mut Chain<dyn TableDefPreDeleteInterceptor + Send + Sync> {
337 &mut self.interceptors.table_def_pre_delete
338 }
339
340 fn view_pre_insert_interceptors(&mut self) -> &mut Chain<dyn ViewPreInsertInterceptor + Send + Sync> {
341 &mut self.interceptors.view_pre_insert
342 }
343
344 fn view_post_insert_interceptors(&mut self) -> &mut Chain<dyn ViewPostInsertInterceptor + Send + Sync> {
345 &mut self.interceptors.view_post_insert
346 }
347
348 fn view_pre_update_interceptors(&mut self) -> &mut Chain<dyn ViewPreUpdateInterceptor + Send + Sync> {
349 &mut self.interceptors.view_pre_update
350 }
351
352 fn view_post_update_interceptors(&mut self) -> &mut Chain<dyn ViewPostUpdateInterceptor + Send + Sync> {
353 &mut self.interceptors.view_post_update
354 }
355
356 fn view_pre_delete_interceptors(&mut self) -> &mut Chain<dyn ViewPreDeleteInterceptor + Send + Sync> {
357 &mut self.interceptors.view_pre_delete
358 }
359
360 fn view_post_delete_interceptors(&mut self) -> &mut Chain<dyn ViewPostDeleteInterceptor + Send + Sync> {
361 &mut self.interceptors.view_post_delete
362 }
363
364 fn view_def_post_create_interceptors(&mut self) -> &mut Chain<dyn ViewDefPostCreateInterceptor + Send + Sync> {
365 &mut self.interceptors.view_def_post_create
366 }
367
368 fn view_def_pre_update_interceptors(&mut self) -> &mut Chain<dyn ViewDefPreUpdateInterceptor + Send + Sync> {
369 &mut self.interceptors.view_def_pre_update
370 }
371
372 fn view_def_post_update_interceptors(&mut self) -> &mut Chain<dyn ViewDefPostUpdateInterceptor + Send + Sync> {
373 &mut self.interceptors.view_def_post_update
374 }
375
376 fn view_def_pre_delete_interceptors(&mut self) -> &mut Chain<dyn ViewDefPreDeleteInterceptor + Send + Sync> {
377 &mut self.interceptors.view_def_pre_delete
378 }
379
380 fn ringbuffer_def_post_create_interceptors(
381 &mut self,
382 ) -> &mut Chain<dyn RingBufferDefPostCreateInterceptor + Send + Sync> {
383 &mut self.interceptors.ringbuffer_def_post_create
384 }
385
386 fn ringbuffer_def_pre_update_interceptors(
387 &mut self,
388 ) -> &mut Chain<dyn RingBufferDefPreUpdateInterceptor + Send + Sync> {
389 &mut self.interceptors.ringbuffer_def_pre_update
390 }
391
392 fn ringbuffer_def_post_update_interceptors(
393 &mut self,
394 ) -> &mut Chain<dyn RingBufferDefPostUpdateInterceptor + Send + Sync> {
395 &mut self.interceptors.ringbuffer_def_post_update
396 }
397
398 fn ringbuffer_def_pre_delete_interceptors(
399 &mut self,
400 ) -> &mut Chain<dyn RingBufferDefPreDeleteInterceptor + Send + Sync> {
401 &mut self.interceptors.ringbuffer_def_pre_delete
402 }
403}