cdbc_sqlite/connection/
executor.rs1use cdbc::describe::Describe;
2use cdbc::error::Error;
3use cdbc::executor::{Execute, Executor};
4use crate::connection::describe::describe;
5use crate::statement::{StatementHandle, VirtualStatement};
6use crate::{
7 Sqlite, SqliteArguments, SqliteConnection, SqliteQueryResult, SqliteRow, SqliteStatement,
8 SqliteTypeInfo,
9};
10use either::Either;
11use libsqlite3_sys::sqlite3_last_insert_rowid;
12use std::borrow::Cow;
13use std::sync::Arc;
14use std::sync::mpsc::RecvError;
15use mco::std::io::TryStream;
16use mco::std::sync::channel;
17use mco::std::sync::channel::Receiver;
18use cdbc::database::{Database, HasStatement};
19use cdbc::io::chan_stream::ChanStream;
20use cdbc::utils::statement_cache::StatementCache;
21use crate::connection::executor_mut;
22
23
24impl Executor for &mut SqliteConnection {
25 type Database = Sqlite;
26
27 fn fetch_many<'q, E: 'q>(&mut self,
28 mut query: E,
29 ) -> ChanStream<Either<SqliteQueryResult, SqliteRow>>
30 where
31 E: Execute<'q, Self::Database>,
32 {
33 let sql = query.sql().to_owned();
34 let arguments = query.take_arguments();
35 let persistent = query.persistent() && arguments.is_some();
36 let s = self.worker
37 .execute(&sql, arguments, self.row_channel_size, persistent);
38 if s.is_err() {
39 let c = ChanStream::new(|sender|
40 Err(s.err().unwrap())
41 );
42 return c;
43 }
44 let s = s.unwrap();
45 executor_mut::sender_to_stream(s)
46 }
47
48 fn fetch_optional<'q, E: 'q>(
49 &mut self,
50 mut query: E,
51 ) -> Result<Option<SqliteRow>, Error>
52 where
53 E: Execute<'q, Self::Database>,
54 {
55 let arguments = query.take_arguments();
56 let persistent = query.persistent() && arguments.is_some();
57 let mut stream = self
58 .worker
59 .execute(query.sql(), arguments, self.row_channel_size, persistent)?;
60 let mut stream = executor_mut::sender_to_stream(stream);
61 use crate::cdbc::io::chan_stream::TryStream;
62 while let Some(res) = stream.try_next()? {
63 if let Either::Right(row) = res {
64 return Ok(Some(row));
65 }
66 }
67 Ok(None)
68 }
69
70 fn prepare_with<'q>(
71 &mut self,
72 sql: &'q str,
73 _parameters: &[SqliteTypeInfo],
74 ) -> Result<SqliteStatement, Error>
75 where
76 {
77 let statement = self.worker.prepare(sql)?;
78 Ok(SqliteStatement {
79 sql: sql.into(),
80 ..statement
81 })
82 }
83
84 #[doc(hidden)]
85 fn describe<'q>(&mut self, sql: &'q str) -> Result<Describe<Sqlite>, Error>
86 {
87 self.worker.describe(sql)
88 }
89}