1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
use crate::config::Config;
use crate::errors::*;
use crate::messages::*;
use crate::pool::*;
use crate::stream::*;
use crate::types::*;
use std::sync::Arc;
use tokio::sync::Mutex;
#[derive(Clone)]
pub struct Query {
query: String,
params: BoltMap,
}
impl Query {
pub fn new(query: String) -> Self {
Query {
query,
params: BoltMap::new(),
}
}
pub fn param<T: std::convert::Into<BoltType>>(mut self, key: &str, value: T) -> Self {
self.params.put(key.into(), value.into());
self
}
pub(crate) async fn run(
self,
config: &Config,
connection: Arc<Mutex<ManagedConnection>>,
) -> Result<()> {
let run = BoltRequest::run(&config.db, &self.query, self.params.clone());
let mut connection = connection.lock().await;
match connection.send_recv(run).await? {
BoltResponse::SuccessMessage(_) => {
match connection.send_recv(BoltRequest::discard()).await? {
BoltResponse::SuccessMessage(_) => Ok(()),
msg => Err(Error::UnexpectedMessage(format!(
"unexpected response for DISCARD: {:?}",
msg
))),
}
}
msg => Err(Error::UnexpectedMessage(format!(
"unexpected response for RUN: {:?}",
msg
))),
}
}
pub(crate) async fn execute(
self,
config: &Config,
connection: Arc<Mutex<ManagedConnection>>,
) -> Result<RowStream> {
let run = BoltRequest::run(&config.db, &self.query, self.params);
match connection.lock().await.send_recv(run).await {
Ok(BoltResponse::SuccessMessage(success)) => {
let fields: BoltList = success.get("fields").unwrap_or(BoltList::new());
let qid: i64 = success.get("qid").unwrap_or(-1);
Ok(RowStream::new(
qid,
fields,
config.fetch_size,
connection.clone(),
))
}
msg => Err(Error::UnexpectedMessage(format!(
"unexpected response for RUN: {:?}",
msg
))),
}
}
}