1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
use crate::config::Config;
use crate::errors::*;
use crate::messages::*;
use crate::pool::*;
use crate::stream::*;
use crate::types::*;
use std::sync::Arc;
use tokio::sync::Mutex;

#[derive(Clone)]
pub struct Query {
    query: String,
    params: BoltMap,
}

/// Abstracts a cypher query that is sent to neo4j server.
impl Query {
    pub fn new(query: String) -> Self {
        Query {
            query,
            params: BoltMap::new(),
        }
    }

    pub fn param<T: std::convert::Into<BoltType>>(mut self, key: &str, value: T) -> Self {
        self.params.put(key.into(), value.into());
        self
    }

    pub(crate) async fn run(
        self,
        config: &Config,
        connection: Arc<Mutex<ManagedConnection>>,
    ) -> Result<()> {
        let run = BoltRequest::run(&config.db, &self.query, self.params.clone());
        let mut connection = connection.lock().await;
        match connection.send_recv(run).await? {
            BoltResponse::SuccessMessage(_) => {
                match connection.send_recv(BoltRequest::discard()).await? {
                    BoltResponse::SuccessMessage(_) => Ok(()),
                    msg => Err(Error::UnexpectedMessage(format!(
                        "unexpected response for DISCARD: {:?}",
                        msg
                    ))),
                }
            }
            msg => Err(Error::UnexpectedMessage(format!(
                "unexpected response for RUN: {:?}",
                msg
            ))),
        }
    }

    pub(crate) async fn execute(
        self,
        config: &Config,
        connection: Arc<Mutex<ManagedConnection>>,
    ) -> Result<RowStream> {
        let run = BoltRequest::run(&config.db, &self.query, self.params);
        match connection.lock().await.send_recv(run).await {
            Ok(BoltResponse::SuccessMessage(success)) => {
                let fields: BoltList = success.get("fields").unwrap_or(BoltList::new());
                let qid: i64 = success.get("qid").unwrap_or(-1);
                Ok(RowStream::new(
                    qid,
                    fields,
                    config.fetch_size,
                    connection.clone(),
                ))
            }
            msg => Err(Error::UnexpectedMessage(format!(
                "unexpected response for RUN: {:?}",
                msg
            ))),
        }
    }
}