1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
use mco::std::sync::channel::Receiver;
use either::Either;
use cdbc::database::{Database, HasStatement};
use cdbc::{Error, Execute, Executor};
use cdbc::describe::Describe;
use cdbc::io::chan_stream::ChanStream;
use crate::{Sqlite, SqliteConnection, SqliteQueryResult, SqliteRow, SqliteStatement, SqliteTypeInfo};
pub(crate) fn sender_to_stream(arg: Receiver<Result<Either<SqliteQueryResult, SqliteRow>, Error>>) -> ChanStream<Either<SqliteQueryResult, SqliteRow>> {
ChanStream::new(|s|{
loop{
match arg.recv(){
Ok(v) => {
s.send(Some(v));
}
Err(e) => {
return Ok(());
}
}
}
})
}
impl Executor for SqliteConnection {
type Database = Sqlite;
fn fetch_many<'q, E: 'q>(&mut self,
mut query: E,
) -> ChanStream<Either<SqliteQueryResult, SqliteRow>>
where
E: Execute<'q, Self::Database>,
{
let sql = query.sql();
let arguments = query.take_arguments();
let persistent = query.persistent() && arguments.is_some();
let s = self.worker
.execute(sql, arguments, self.row_channel_size, persistent);
if s.is_err() {
let c = ChanStream::new(|sender|
Err(s.err().unwrap())
);
return c;
}
let s = s.unwrap();
sender_to_stream(s)
}
fn fetch_optional<'q, E: 'q>(
&mut self,
mut query: E,
) -> Result<Option<SqliteRow>, Error>
where
E: Execute<'q, Self::Database>,
{
let sql = query.sql();
let arguments = query.take_arguments();
let persistent = query.persistent() && arguments.is_some();
let mut stream = self
.worker
.execute(sql, arguments, self.row_channel_size, persistent)?;
let mut stream = sender_to_stream(stream);
use crate::cdbc::io::chan_stream::TryStream;
while let Some(res) = stream.try_next()? {
if let Either::Right(row) = res {
return Ok(Some(row));
}
}
Ok(None)
}
fn prepare_with<'q>(
&mut self,
sql: &'q str,
_parameters: &[SqliteTypeInfo],
) -> Result<SqliteStatement<'q>, Error>
where
{
let statement = self.worker.prepare(sql)?;
Ok(SqliteStatement {
sql: sql.into(),
..statement
})
}
#[doc(hidden)]
fn describe<'q>(&mut self, sql: &'q str) -> Result<Describe<Sqlite>, Error>
{
self.worker.describe(sql)
}
}