1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
use sqlx::{
sqlite::{SqliteArguments, SqliteQueryResult, SqliteRow},
Sqlite, SqlitePool,
};
sea_query::sea_query_driver_sqlite!();
use sea_query_driver_sqlite::bind_query;
use crate::{debug_print, error::*, executor::*, DatabaseConnection, DbBackend, Statement};
use super::sqlx_common::*;
#[derive(Debug)]
pub struct SqlxSqliteConnector;
#[derive(Debug, Clone)]
pub struct SqlxSqlitePoolConnection {
pool: SqlitePool,
}
impl SqlxSqliteConnector {
pub fn accepts(string: &str) -> bool {
DbBackend::Sqlite.is_prefix_of(string)
}
pub async fn connect(string: &str) -> Result<DatabaseConnection, DbErr> {
if let Ok(pool) = SqlitePool::connect(string).await {
Ok(DatabaseConnection::SqlxSqlitePoolConnection(
SqlxSqlitePoolConnection { pool },
))
} else {
Err(DbErr::Conn("Failed to connect.".to_owned()))
}
}
}
impl SqlxSqliteConnector {
pub fn from_sqlx_sqlite_pool(pool: SqlitePool) -> DatabaseConnection {
DatabaseConnection::SqlxSqlitePoolConnection(SqlxSqlitePoolConnection { pool })
}
}
impl SqlxSqlitePoolConnection {
pub async fn execute(&self, stmt: Statement) -> Result<ExecResult, DbErr> {
debug_print!("{}", stmt);
let query = sqlx_query(&stmt);
if let Ok(conn) = &mut self.pool.acquire().await {
match query.execute(conn).await {
Ok(res) => Ok(res.into()),
Err(err) => Err(sqlx_error_to_exec_err(err)),
}
} else {
Err(DbErr::Exec(
"Failed to acquire connection from pool.".to_owned(),
))
}
}
pub async fn query_one(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr> {
debug_print!("{}", stmt);
let query = sqlx_query(&stmt);
if let Ok(conn) = &mut self.pool.acquire().await {
match query.fetch_one(conn).await {
Ok(row) => Ok(Some(row.into())),
Err(err) => match err {
sqlx::Error::RowNotFound => Ok(None),
_ => Err(DbErr::Query(err.to_string())),
},
}
} else {
Err(DbErr::Query(
"Failed to acquire connection from pool.".to_owned(),
))
}
}
pub async fn query_all(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr> {
debug_print!("{}", stmt);
let query = sqlx_query(&stmt);
if let Ok(conn) = &mut self.pool.acquire().await {
match query.fetch_all(conn).await {
Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()),
Err(err) => Err(sqlx_error_to_query_err(err)),
}
} else {
Err(DbErr::Query(
"Failed to acquire connection from pool.".to_owned(),
))
}
}
}
impl From<SqliteRow> for QueryResult {
fn from(row: SqliteRow) -> QueryResult {
QueryResult {
row: QueryResultRow::SqlxSqlite(row),
}
}
}
impl From<SqliteQueryResult> for ExecResult {
fn from(result: SqliteQueryResult) -> ExecResult {
ExecResult {
result: ExecResultHolder::SqlxSqlite(result),
}
}
}
fn sqlx_query(stmt: &Statement) -> sqlx::query::Query<'_, Sqlite, SqliteArguments> {
let mut query = sqlx::query(&stmt.sql);
if let Some(values) = &stmt.values {
query = bind_query(query, values);
}
query
}