reifydb_transaction/multi/
multi.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use async_trait::async_trait;
5use reifydb_core::{
6	CommitVersion, EncodedKey, EncodedKeyRange, Error,
7	event::EventBus,
8	interface::{
9		MultiVersionBatch, MultiVersionCommandTransaction, MultiVersionQueryTransaction,
10		MultiVersionTransaction, MultiVersionValues, TransactionId, WithEventBus,
11	},
12	value::encoded::EncodedValues,
13};
14
15use crate::multi::transaction::{CommandTransaction, QueryTransaction, TransactionMulti};
16
17impl WithEventBus for TransactionMulti {
18	fn event_bus(&self) -> &EventBus {
19		&self.event_bus
20	}
21}
22
23#[async_trait]
24impl MultiVersionTransaction for TransactionMulti {
25	type Query = QueryTransaction;
26	type Command = CommandTransaction;
27
28	async fn begin_query(&self) -> Result<Self::Query, Error> {
29		TransactionMulti::begin_query(self).await
30	}
31
32	async fn begin_command(&self) -> Result<Self::Command, Error> {
33		TransactionMulti::begin_command(self).await
34	}
35}
36
37#[async_trait]
38impl MultiVersionQueryTransaction for QueryTransaction {
39	fn version(&self) -> CommitVersion {
40		self.tm.version()
41	}
42
43	fn id(&self) -> TransactionId {
44		self.tm.id()
45	}
46
47	async fn get(&mut self, key: &EncodedKey) -> Result<Option<MultiVersionValues>, Error> {
48		Ok(QueryTransaction::get(self, key).await?.map(|tv| MultiVersionValues {
49			key: tv.key().clone(),
50			values: tv.values().clone(),
51			version: tv.version(),
52		}))
53	}
54
55	async fn contains_key(&mut self, key: &EncodedKey) -> Result<bool, Error> {
56		QueryTransaction::contains_key(self, key).await
57	}
58
59	async fn range_batch(&mut self, range: EncodedKeyRange, batch_size: u64) -> Result<MultiVersionBatch, Error> {
60		let batch = QueryTransaction::range_batch(self, range, batch_size).await?;
61		Ok(MultiVersionBatch {
62			items: batch
63				.items
64				.into_iter()
65				.map(|mv| MultiVersionValues {
66					key: mv.key,
67					values: mv.values,
68					version: mv.version,
69				})
70				.collect(),
71			has_more: batch.has_more,
72		})
73	}
74
75	async fn range_rev_batch(
76		&mut self,
77		range: EncodedKeyRange,
78		batch_size: u64,
79	) -> Result<MultiVersionBatch, Error> {
80		let batch = QueryTransaction::range_rev_batch(self, range, batch_size).await?;
81		Ok(MultiVersionBatch {
82			items: batch
83				.items
84				.into_iter()
85				.map(|mv| MultiVersionValues {
86					key: mv.key,
87					values: mv.values,
88					version: mv.version,
89				})
90				.collect(),
91			has_more: batch.has_more,
92		})
93	}
94
95	async fn read_as_of_version_exclusive(&mut self, version: CommitVersion) -> Result<(), Error> {
96		QueryTransaction::read_as_of_version_exclusive(self, version);
97		Ok(())
98	}
99}
100
101#[async_trait]
102impl MultiVersionQueryTransaction for CommandTransaction {
103	fn version(&self) -> CommitVersion {
104		self.tm.version()
105	}
106
107	fn id(&self) -> TransactionId {
108		self.tm.id()
109	}
110
111	async fn get(&mut self, key: &EncodedKey) -> Result<Option<MultiVersionValues>, Error> {
112		Ok(CommandTransaction::get(self, key).await?.map(|tv| MultiVersionValues {
113			key: tv.key().clone(),
114			values: tv.values().clone(),
115			version: tv.version(),
116		}))
117	}
118
119	async fn contains_key(&mut self, key: &EncodedKey) -> Result<bool, Error> {
120		CommandTransaction::contains_key(self, key).await
121	}
122
123	async fn range_batch(&mut self, range: EncodedKeyRange, batch_size: u64) -> Result<MultiVersionBatch, Error> {
124		let batch = CommandTransaction::range_batch(self, range, batch_size).await?;
125		Ok(MultiVersionBatch {
126			items: batch.items,
127			has_more: batch.has_more,
128		})
129	}
130
131	async fn range_rev_batch(
132		&mut self,
133		range: EncodedKeyRange,
134		batch_size: u64,
135	) -> Result<MultiVersionBatch, Error> {
136		let batch = CommandTransaction::range_rev_batch(self, range, batch_size).await?;
137		Ok(MultiVersionBatch {
138			items: batch.items,
139			has_more: batch.has_more,
140		})
141	}
142
143	async fn read_as_of_version_exclusive(&mut self, version: CommitVersion) -> Result<(), Error> {
144		CommandTransaction::read_as_of_version_exclusive(self, version);
145		Ok(())
146	}
147}
148
149#[async_trait]
150impl MultiVersionCommandTransaction for CommandTransaction {
151	async fn set(&mut self, key: &EncodedKey, values: EncodedValues) -> Result<(), Error> {
152		CommandTransaction::set(self, key, values)?;
153		Ok(())
154	}
155
156	async fn remove(&mut self, key: &EncodedKey) -> Result<(), Error> {
157		CommandTransaction::remove(self, key)?;
158		Ok(())
159	}
160
161	async fn commit(&mut self) -> Result<CommitVersion, Error> {
162		let version = CommandTransaction::commit(self).await?;
163		Ok(version)
164	}
165
166	async fn rollback(&mut self) -> Result<(), Error> {
167		CommandTransaction::rollback(self)?;
168		Ok(())
169	}
170}