1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
use crate::{
DbBackend, DbErr, ExecResult, QueryResult, Statement, StatementBuilder, TransactionError,
};
use futures_util::Stream;
use std::{future::Future, pin::Pin};
/// The generic API for a database connection that can perform query or execute statements.
/// It abstracts database connection and transaction
#[async_trait::async_trait]
pub trait ConnectionTrait: Sync {
/// Get the database backend for the connection. This depends on feature flags enabled.
fn get_database_backend(&self) -> DbBackend;
/// Execute a [Statement]
async fn execute_raw(&self, stmt: Statement) -> Result<ExecResult, DbErr>;
/// Execute a [QueryStatement]
async fn execute<S: StatementBuilder>(&self, stmt: &S) -> Result<ExecResult, DbErr> {
let db_backend = self.get_database_backend();
let stmt = db_backend.build(stmt);
self.execute_raw(stmt).await
}
/// Execute a unprepared [Statement]
async fn execute_unprepared(&self, sql: &str) -> Result<ExecResult, DbErr>;
/// Execute a [Statement] and return a single row of `QueryResult`
async fn query_one_raw(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr>;
/// Execute a [QueryStatement] and return a single row of `QueryResult`
async fn query_one<S: StatementBuilder>(&self, stmt: &S) -> Result<Option<QueryResult>, DbErr> {
let db_backend = self.get_database_backend();
let stmt = db_backend.build(stmt);
self.query_one_raw(stmt).await
}
/// Execute a [Statement] and return a vector of `QueryResult`
async fn query_all_raw(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr>;
/// Execute a [QueryStatement] and return a vector of `QueryResult`
async fn query_all<S: StatementBuilder>(&self, stmt: &S) -> Result<Vec<QueryResult>, DbErr> {
let db_backend = self.get_database_backend();
let stmt = db_backend.build(stmt);
self.query_all_raw(stmt).await
}
/// Check if the connection supports `RETURNING` syntax on insert and update
fn support_returning(&self) -> bool {
let db_backend = self.get_database_backend();
db_backend.support_returning()
}
/// Check if the connection is a test connection for the Mock database
fn is_mock_connection(&self) -> bool {
false
}
}
/// Stream query results
pub trait StreamTrait: Send + Sync {
/// Create a stream for the [QueryResult]
type Stream<'a>: Stream<Item = Result<QueryResult, DbErr>> + Send
where
Self: 'a;
/// Get the database backend for the connection. This depends on feature flags enabled.
fn get_database_backend(&self) -> DbBackend;
/// Execute a [Statement] and return a stream of results
fn stream_raw<'a>(
&'a self,
stmt: Statement,
) -> Pin<Box<dyn Future<Output = Result<Self::Stream<'a>, DbErr>> + 'a + Send>>;
/// Execute a [QueryStatement] and return a stream of results
fn stream<'a, S: StatementBuilder + Sync>(
&'a self,
stmt: &S,
) -> Pin<Box<dyn Future<Output = Result<Self::Stream<'a>, DbErr>> + 'a + Send>> {
let db_backend = self.get_database_backend();
let stmt = db_backend.build(stmt);
self.stream_raw(stmt)
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
/// Isolation level
pub enum IsolationLevel {
/// Consistent reads within the same transaction read the snapshot established by the first read.
RepeatableRead,
/// Each consistent read, even within the same transaction, sets and reads its own fresh snapshot.
ReadCommitted,
/// SELECT statements are performed in a nonlocking fashion, but a possible earlier version of a row might be used.
ReadUncommitted,
/// All statements of the current transaction can only see rows committed before the first query or data-modification statement was executed in this transaction.
Serializable,
}
impl std::fmt::Display for IsolationLevel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
IsolationLevel::RepeatableRead => write!(f, "REPEATABLE READ"),
IsolationLevel::ReadCommitted => write!(f, "READ COMMITTED"),
IsolationLevel::ReadUncommitted => write!(f, "READ UNCOMMITTED"),
IsolationLevel::Serializable => write!(f, "SERIALIZABLE"),
}
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
/// Access mode
pub enum AccessMode {
/// Data can't be modified in this transaction
ReadOnly,
/// Data can be modified in this transaction (default)
ReadWrite,
}
impl std::fmt::Display for AccessMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
AccessMode::ReadOnly => write!(f, "READ ONLY"),
AccessMode::ReadWrite => write!(f, "READ WRITE"),
}
}
}
/// Spawn database transaction
#[async_trait::async_trait]
pub trait TransactionTrait {
/// The concrete type for the transaction
type Transaction: ConnectionTrait + TransactionTrait + TransactionSession;
/// Execute SQL `BEGIN` transaction.
/// Returns a Transaction that can be committed or rolled back
async fn begin(&self) -> Result<Self::Transaction, DbErr>;
/// Execute SQL `BEGIN` transaction with isolation level and/or access mode.
/// Returns a Transaction that can be committed or rolled back
async fn begin_with_config(
&self,
isolation_level: Option<IsolationLevel>,
access_mode: Option<AccessMode>,
) -> Result<Self::Transaction, DbErr>;
/// Execute the function inside a transaction.
/// If the function returns an error, the transaction will be rolled back. If it does not return an error, the transaction will be committed.
async fn transaction<F, T, E>(&self, callback: F) -> Result<T, TransactionError<E>>
where
F: for<'c> FnOnce(
&'c Self::Transaction,
) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'c>>
+ Send,
T: Send,
E: std::fmt::Display + std::fmt::Debug + Send;
/// Execute the function inside a transaction with isolation level and/or access mode.
/// If the function returns an error, the transaction will be rolled back. If it does not return an error, the transaction will be committed.
async fn transaction_with_config<F, T, E>(
&self,
callback: F,
isolation_level: Option<IsolationLevel>,
access_mode: Option<AccessMode>,
) -> Result<T, TransactionError<E>>
where
F: for<'c> FnOnce(
&'c Self::Transaction,
) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'c>>
+ Send,
T: Send,
E: std::fmt::Display + std::fmt::Debug + Send;
}
/// Represents an open transaction
#[async_trait::async_trait]
pub trait TransactionSession {
/// Commit a transaction
async fn commit(self) -> Result<(), DbErr>;
/// Rolls back a transaction explicitly
async fn rollback(self) -> Result<(), DbErr>;
}