cdbc_sqlite/connection/
executor_mut.rs1use mco::std::sync::channel::Receiver;
2use either::Either;
3use cdbc::database::{Database, HasStatement};
4use cdbc::{Error, Execute, Executor};
5use cdbc::describe::Describe;
6use cdbc::io::chan_stream::ChanStream;
7use crate::{Sqlite, SqliteConnection, SqliteQueryResult, SqliteRow, SqliteStatement, SqliteTypeInfo};
8
9pub(crate) fn sender_to_stream(arg: Receiver<Result<Either<SqliteQueryResult, SqliteRow>, Error>>) -> ChanStream<Either<SqliteQueryResult, SqliteRow>> {
10 ChanStream::new(|s| {
11 loop {
12 match arg.recv() {
13 Ok(v) => {
14 if v.is_err() {
15 return Err(v.err().unwrap());
16 } else {
17 s.send(Some(v));
18 }
19 }
20 Err(_) => {
21 return Ok(());
24 }
25 }
26 }
27 })
28}
29
30
31impl Executor for SqliteConnection {
32 type Database = Sqlite;
33
34 fn fetch_many<'q, E: 'q>(&mut self,
35 mut query: E,
36 ) -> ChanStream<Either<SqliteQueryResult, SqliteRow>>
37 where
38 E: Execute<'q, Self::Database>,
39 {
40 let arguments = query.take_arguments();
41 let persistent = query.persistent() && arguments.is_some();
42 let s = self.worker
43 .execute(query.sql(), arguments, self.row_channel_size, persistent);
44 if s.is_err() {
45 let c = ChanStream::new(|sender|
46 Err(s.err().unwrap())
47 );
48 return c;
49 }
50 let s = s.unwrap();
51 sender_to_stream(s)
52 }
53
54 fn fetch_optional<'q, E: 'q>(
55 &mut self,
56 mut query: E,
57 ) -> Result<Option<SqliteRow>, Error>
58 where
59 E: Execute<'q, Self::Database>,
60 {
61 let arguments = query.take_arguments();
62 let persistent = query.persistent() && arguments.is_some();
63 let mut stream = self
64 .worker
65 .execute(query.sql(), arguments, self.row_channel_size, persistent)?;
66 let mut stream = sender_to_stream(stream);
67 use crate::cdbc::io::chan_stream::TryStream;
68 while let Some(res) = stream.try_next()? {
69 if let Either::Right(row) = res {
70 return Ok(Some(row));
71 }
72 }
73 Ok(None)
74 }
75
76 fn prepare_with<'q>(
77 &mut self,
78 sql: &'q str,
79 _parameters: &[SqliteTypeInfo],
80 ) -> Result<SqliteStatement, Error>
81 where
82 {
83 let statement = self.worker.prepare(sql)?;
84 Ok(SqliteStatement {
85 sql: sql.into(),
86 ..statement
87 })
88 }
89
90 #[doc(hidden)]
91 fn describe<'q>(&mut self, sql: &'q str) -> Result<Describe<Sqlite>, Error>
92 {
93 self.worker.describe(sql)
94 }
95}