cdbc_sqlite/connection/
executor_mut.rs

1use mco::std::sync::channel::Receiver;
2use either::Either;
3use cdbc::database::{Database, HasStatement};
4use cdbc::{Error, Execute, Executor};
5use cdbc::describe::Describe;
6use cdbc::io::chan_stream::ChanStream;
7use crate::{Sqlite, SqliteConnection, SqliteQueryResult, SqliteRow, SqliteStatement, SqliteTypeInfo};
8
9pub(crate) fn sender_to_stream(arg: Receiver<Result<Either<SqliteQueryResult, SqliteRow>, Error>>) -> ChanStream<Either<SqliteQueryResult, SqliteRow>> {
10    ChanStream::new(|s| {
11        loop {
12            match arg.recv() {
13                Ok(v) => {
14                    if v.is_err() {
15                        return Err(v.err().unwrap());
16                    } else {
17                        s.send(Some(v));
18                    }
19                }
20                Err(_) => {
21                    //log::error!("{}",e);
22                    // s.send(None);
23                    return Ok(());
24                }
25            }
26        }
27    })
28}
29
30
31impl Executor for SqliteConnection {
32    type Database = Sqlite;
33
34    fn fetch_many<'q, E: 'q>(&mut self,
35                             mut query: E,
36    ) -> ChanStream<Either<SqliteQueryResult, SqliteRow>>
37        where
38            E: Execute<'q, Self::Database>,
39    {
40        let arguments = query.take_arguments();
41        let persistent = query.persistent() && arguments.is_some();
42        let s = self.worker
43            .execute(query.sql(), arguments, self.row_channel_size, persistent);
44        if s.is_err() {
45            let c = ChanStream::new(|sender|
46                Err(s.err().unwrap())
47            );
48            return c;
49        }
50        let s = s.unwrap();
51        sender_to_stream(s)
52    }
53
54    fn fetch_optional<'q, E: 'q>(
55        &mut self,
56        mut query: E,
57    ) -> Result<Option<SqliteRow>, Error>
58        where
59            E: Execute<'q, Self::Database>,
60    {
61        let arguments = query.take_arguments();
62        let persistent = query.persistent() && arguments.is_some();
63        let mut stream = self
64            .worker
65            .execute(query.sql(), arguments, self.row_channel_size, persistent)?;
66        let mut stream = sender_to_stream(stream);
67        use crate::cdbc::io::chan_stream::TryStream;
68        while let Some(res) = stream.try_next()? {
69            if let Either::Right(row) = res {
70                return Ok(Some(row));
71            }
72        }
73        Ok(None)
74    }
75
76    fn prepare_with<'q>(
77        &mut self,
78        sql: &'q str,
79        _parameters: &[SqliteTypeInfo],
80    ) -> Result<SqliteStatement, Error>
81        where
82    {
83        let statement = self.worker.prepare(sql)?;
84        Ok(SqliteStatement {
85            sql: sql.into(),
86            ..statement
87        })
88    }
89
90    #[doc(hidden)]
91    fn describe<'q>(&mut self, sql: &'q str) -> Result<Describe<Sqlite>, Error>
92    {
93        self.worker.describe(sql)
94    }
95}