cdbc_sqlite/connection/
executor.rs

1use cdbc::describe::Describe;
2use cdbc::error::Error;
3use cdbc::executor::{Execute, Executor};
4use crate::connection::describe::describe;
5use crate::statement::{StatementHandle, VirtualStatement};
6use crate::{
7    Sqlite, SqliteArguments, SqliteConnection, SqliteQueryResult, SqliteRow, SqliteStatement,
8    SqliteTypeInfo,
9};
10use either::Either;
11use libsqlite3_sys::sqlite3_last_insert_rowid;
12use std::borrow::Cow;
13use std::sync::Arc;
14use std::sync::mpsc::RecvError;
15use mco::std::io::TryStream;
16use mco::std::sync::channel;
17use mco::std::sync::channel::Receiver;
18use cdbc::database::{Database, HasStatement};
19use cdbc::io::chan_stream::ChanStream;
20use cdbc::utils::statement_cache::StatementCache;
21use crate::connection::executor_mut;
22
23
24impl Executor for &mut SqliteConnection {
25    type Database = Sqlite;
26
27    fn fetch_many<'q, E: 'q>(&mut self,
28                             mut query: E,
29    ) -> ChanStream<Either<SqliteQueryResult, SqliteRow>>
30        where
31            E: Execute<'q, Self::Database>,
32    {
33        let sql = query.sql().to_owned();
34        let arguments = query.take_arguments();
35        let persistent = query.persistent() && arguments.is_some();
36        let s = self.worker
37            .execute(&sql, arguments, self.row_channel_size, persistent);
38        if s.is_err() {
39            let c = ChanStream::new(|sender|
40                Err(s.err().unwrap())
41            );
42            return c;
43        }
44        let s = s.unwrap();
45        executor_mut::sender_to_stream(s)
46    }
47
48    fn fetch_optional<'q, E: 'q>(
49        &mut self,
50        mut query: E,
51    ) -> Result<Option<SqliteRow>, Error>
52        where
53            E: Execute<'q, Self::Database>,
54    {
55        let arguments = query.take_arguments();
56        let persistent = query.persistent() && arguments.is_some();
57        let mut stream = self
58            .worker
59            .execute(query.sql(), arguments, self.row_channel_size, persistent)?;
60        let mut stream = executor_mut::sender_to_stream(stream);
61        use crate::cdbc::io::chan_stream::TryStream;
62        while let Some(res) = stream.try_next()? {
63            if let Either::Right(row) = res {
64                return Ok(Some(row));
65            }
66        }
67        Ok(None)
68    }
69
70    fn prepare_with<'q>(
71        &mut self,
72        sql: &'q str,
73        _parameters: &[SqliteTypeInfo],
74    ) -> Result<SqliteStatement, Error>
75        where
76    {
77        let statement = self.worker.prepare(sql)?;
78        Ok(SqliteStatement {
79            sql: sql.into(),
80            ..statement
81        })
82    }
83
84    #[doc(hidden)]
85    fn describe<'q>(&mut self, sql: &'q str) -> Result<Describe<Sqlite>, Error>
86    {
87        self.worker.describe(sql)
88    }
89}