1use crate::{
2 CBox, SQLiteDriver, SQLitePrepared, SQLiteTransaction,
3 extract::{extract_name, extract_value},
4};
5use async_stream::try_stream;
6use flume::Sender;
7use libsqlite3_sys::*;
8use std::{
9 borrow::Cow,
10 ffi::{CStr, CString, c_char, c_int},
11 mem, ptr,
12 str::FromStr,
13 sync::{
14 Arc,
15 atomic::{AtomicPtr, Ordering},
16 },
17};
18use tank_core::{
19 AsQuery, Connection, Error, ErrorContext, Executor, Prepared, Query, QueryResult, RawQuery,
20 Result, Row, RowsAffected, error_message_from_ptr, send_value, stream::Stream, truncate_long,
21};
22use tokio::task::spawn_blocking;
23
24pub struct SQLiteConnection {
28 pub(crate) connection: CBox<*mut sqlite3>,
29}
30
31impl SQLiteConnection {
32 pub fn last_error(&self) -> String {
33 unsafe {
34 let errcode = sqlite3_errcode(*self.connection);
35 format!(
36 "Error ({errcode}): {}",
37 error_message_from_ptr(&sqlite3_errmsg(*self.connection)),
38 )
39 }
40 }
41
42 pub(crate) fn do_run_prepared(
43 connection: *mut sqlite3,
44 statement: *mut sqlite3_stmt,
45 tx: Sender<Result<QueryResult>>,
46 ) {
47 unsafe {
48 let count = sqlite3_column_count(statement);
49 let labels = match (0..count)
50 .map(|i| extract_name(statement, i))
51 .collect::<Result<Arc<[_]>>>()
52 {
53 Ok(labels) => labels,
54 Err(error) => {
55 send_value!(tx, Err(error.into()));
56 return;
57 }
58 };
59 loop {
60 match sqlite3_step(statement) {
61 SQLITE_BUSY => {
62 continue;
63 }
64 SQLITE_DONE => {
65 if sqlite3_stmt_readonly(statement) == 0 {
66 send_value!(
67 tx,
68 Ok(QueryResult::Affected(RowsAffected {
69 rows_affected: Some(sqlite3_changes64(connection) as _),
70 last_affected_id: Some(sqlite3_last_insert_rowid(connection)),
71 }))
72 );
73 }
74 break;
75 }
76 SQLITE_ROW => {
77 let values = match (0..count)
78 .map(|i| extract_value(statement, i))
79 .collect::<Result<_>>()
80 {
81 Ok(value) => value,
82 Err(error) => {
83 send_value!(tx, Err(error));
84 return;
85 }
86 };
87 send_value!(
88 tx,
89 Ok(QueryResult::Row(Row {
90 labels: labels.clone(),
91 values: values,
92 }))
93 )
94 }
95 _ => {
96 send_value!(
97 tx,
98 Err(Error::msg(
99 error_message_from_ptr(&sqlite3_errmsg(sqlite3_db_handle(
100 statement,
101 )))
102 .to_string(),
103 ))
104 );
105 break;
106 }
107 }
108 }
109 }
110 }
111
112 pub(crate) fn do_run_unprepared(
113 connection: *mut sqlite3,
114 sql: &str,
115 tx: Sender<Result<QueryResult>>,
116 ) {
117 unsafe {
118 let sql = sql.trim();
119 let mut it = sql.as_ptr() as *const c_char;
120 let mut len = sql.len();
121 loop {
122 let (statement, tail) = {
123 let mut statement = SQLitePrepared::new(CBox::new(ptr::null_mut(), |p| {
124 let rc = sqlite3_finalize(p);
125 if rc != SQLITE_OK {
126 return;
127 }
128 }));
129 let mut sql_tail = ptr::null();
130 let rc = sqlite3_prepare_v2(
131 connection,
132 it,
133 len as c_int,
134 &mut *statement.statement,
135 &mut sql_tail,
136 );
137 if rc != SQLITE_OK {
138 send_value!(
139 tx,
140 Err(Error::msg(
141 error_message_from_ptr(&sqlite3_errmsg(connection)).to_string(),
142 ))
143 );
144 return;
145 }
146 (statement, sql_tail)
147 };
148 Self::do_run_prepared(connection, statement.statement(), tx.clone());
149 len = if tail != ptr::null() {
150 len - tail.offset_from_unsigned(it)
151 } else {
152 0
153 };
154 if len == 0 {
155 break;
156 }
157 it = tail;
158 }
159 };
160 }
161}
162
163impl Executor for SQLiteConnection {
164 type Driver = SQLiteDriver;
165
166 async fn do_prepare(&mut self, sql: String) -> Result<Query<SQLiteDriver>> {
167 let connection = AtomicPtr::new(*self.connection);
168 let context = format!("While preparing the query:\n{}", truncate_long!(sql));
169 let prepared = spawn_blocking(move || unsafe {
170 let connection = connection.load(Ordering::Relaxed);
171 let len = sql.len();
172 let sql = CString::new(sql.into_bytes())?;
173 let mut statement = CBox::new(ptr::null_mut(), |p| {
174 let db = sqlite3_db_handle(p);
175 let rc = sqlite3_finalize(p);
176 if rc != SQLITE_OK {
177 let error = Error::msg(error_message_from_ptr(&sqlite3_errmsg(db)).to_string())
178 .context("While finalizing a prepared statement");
179 log::error!("{error:#}");
180 }
181 });
182 let mut tail = ptr::null();
183 let rc = sqlite3_prepare_v2(
184 connection,
185 sql.as_ptr(),
186 len as c_int,
187 &mut *statement,
188 &mut tail,
189 );
190 if rc != SQLITE_OK {
191 let error =
192 Error::msg(error_message_from_ptr(&sqlite3_errmsg(connection)).to_string())
193 .context(context);
194 log::error!("{:#}", error);
195 return Err(error);
196 }
197 if tail != ptr::null() && *tail != '\0' as i8 {
198 let error = Error::msg(format!(
199 "Cannot prepare more than one statement at a time (remaining: {})",
200 CStr::from_ptr(tail).to_str().unwrap_or("unprintable")
201 ))
202 .context(context);
203 log::error!("{:#}", error);
204 return Err(error);
205 }
206 Ok(statement)
207 })
208 .await?;
209 Ok(SQLitePrepared::new(prepared?).into())
210 }
211
212 fn run<'s>(
213 &'s mut self,
214 query: impl AsQuery<SQLiteDriver> + 's,
215 ) -> impl Stream<Item = Result<QueryResult>> + Send {
216 let mut query = query.as_query();
217 let context = Arc::new(format!("While running the query:\n{}", query.as_mut()));
218 let (tx, rx) = flume::unbounded::<Result<QueryResult>>();
219 let connection = AtomicPtr::new(*self.connection);
220 let mut owned = mem::take(query.as_mut());
221 let join = spawn_blocking(move || {
222 match &mut owned {
223 Query::Raw(RawQuery(sql)) => {
224 Self::do_run_unprepared(connection.load(Ordering::Relaxed), sql, tx);
225 }
226 Query::Prepared(prepared) => {
227 Self::do_run_prepared(
228 connection.load(Ordering::Relaxed),
229 prepared.statement(),
230 tx,
231 );
232 let _ = prepared.clear_bindings();
233 }
234 }
235 owned
236 });
237 try_stream! {
238 while let Ok(result) = rx.recv_async().await {
239 yield result.map_err(|e| {
240 let error = e.context(context.clone());
241 log::error!("{:#}", error);
242 error
243 })?;
244 }
245 *query.as_mut() = mem::take(&mut join.await?);
246 }
247 }
248}
249
250impl Connection for SQLiteConnection {
251 async fn connect(driver: &SQLiteDriver, url: Cow<'static, str>) -> Result<Self> {
252 let context = "While trying to connect to SQLite";
253 let url = Self::sanitize_url(driver, url)?;
254 let url =
255 CString::from_str(&url.as_str().replacen("sqlite://", "file:", 1)).context(context)?;
256 let mut connection;
257 unsafe {
258 connection = CBox::new(ptr::null_mut(), |p| {
259 if sqlite3_close(p) != SQLITE_OK {
260 let error = Error::msg(error_message_from_ptr(&sqlite3_errmsg(p)).to_string())
261 .context("While closing the sqlite connection");
262 log::error!("{error:#}");
263 }
264 });
265 let rc = sqlite3_open_v2(
266 url.as_ptr(),
267 &mut *connection,
268 SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_URI,
269 ptr::null(),
270 );
271 if rc != SQLITE_OK {
272 let error =
273 Error::msg(error_message_from_ptr(&sqlite3_errmsg(*connection)).to_string())
274 .context(context);
275 log::error!("{:#}", error);
276 return Err(error);
277 }
278 }
279 Ok(Self { connection })
280 }
281
282 fn begin(&mut self) -> impl Future<Output = Result<SQLiteTransaction<'_>>> {
283 SQLiteTransaction::new(self)
284 }
285}