bolt_client/client/
v1.rs

1#[cfg(test)]
2pub(crate) mod tests {
3    use std::env;
4
5    use bolt_proto::{message::*, value::*, version::*, ServerState::*};
6    use tokio::io::BufStream;
7    use tokio_util::compat::*;
8
9    use crate::{
10        error::{CommunicationError, CommunicationResult, ConnectionResult, Result},
11        skip_if_handshake_failed, stream, Client, Metadata, Params,
12    };
13
14    type Stream = Compat<BufStream<stream::Stream>>;
15
16    pub(crate) async fn new_client(version: u32) -> ConnectionResult<Client<Stream>> {
17        Client::new(
18            BufStream::new(
19                stream::Stream::connect(
20                    env::var("BOLT_TEST_ADDR").unwrap(),
21                    env::var("BOLT_TEST_DOMAIN").ok(),
22                )
23                .await?,
24            )
25            .compat(),
26            &[version, 0, 0, 0],
27        )
28        .await
29    }
30
31    pub(crate) async fn initialize_client(
32        client: &mut Client<Stream>,
33        succeed: bool,
34    ) -> CommunicationResult<Message> {
35        let username = env::var("BOLT_TEST_USERNAME").unwrap();
36        let password = if succeed {
37            env::var("BOLT_TEST_PASSWORD").unwrap()
38        } else {
39            String::from("invalid")
40        };
41
42        client
43            .hello(Metadata::from_iter(vec![
44                ("user_agent", "bolt-client/X.Y.Z"),
45                ("scheme", "basic"),
46                ("principal", &username),
47                ("credentials", &password),
48            ]))
49            .await
50    }
51
52    pub(crate) async fn get_initialized_client(version: u32) -> Result<Client<Stream>> {
53        let mut client = new_client(version).await?;
54        initialize_client(&mut client, true).await?;
55        Ok(client)
56    }
57
58    pub(crate) async fn run_invalid_query(
59        client: &mut Client<Stream>,
60    ) -> CommunicationResult<Message> {
61        client
62            .run(
63                "RETURN invalid query oof as n;",
64                Some(Params::from_iter(vec![("some_val", 25.5432)])),
65                Some(Metadata::from_iter(vec![("some_key", true)])),
66            )
67            .await
68    }
69
70    pub(crate) async fn run_valid_query(
71        client: &mut Client<Stream>,
72    ) -> CommunicationResult<Message> {
73        client
74            .run(
75                "RETURN $some_val as n;",
76                Some(Params::from_iter(vec![("some_val", 25.5432)])),
77                Some(Metadata::from_iter(vec![("some_key", true)])),
78            )
79            .await
80    }
81
82    #[tokio::test]
83    async fn init() {
84        let client = new_client(V1_0).await;
85        skip_if_handshake_failed!(client);
86        let mut client = client.unwrap();
87        assert_eq!(client.server_state(), Connected);
88        let response = initialize_client(&mut client, true).await.unwrap();
89        assert!(Success::try_from(response).is_ok());
90        assert_eq!(client.server_state(), Ready);
91    }
92
93    #[tokio::test]
94    async fn init_fail() {
95        let client = new_client(V1_0).await;
96        skip_if_handshake_failed!(client);
97        let mut client = client.unwrap();
98        assert_eq!(client.server_state(), Connected);
99        let response = initialize_client(&mut client, false).await.unwrap();
100        assert!(Failure::try_from(response).is_ok());
101        assert_eq!(client.server_state(), Defunct);
102
103        // Messages now fail to send since connection was closed
104        let response = initialize_client(&mut client, true).await;
105        assert!(matches!(
106            response,
107            Err(CommunicationError::InvalidState { state: Defunct, .. })
108        ));
109    }
110
111    #[tokio::test]
112    async fn ack_failure() {
113        let client = get_initialized_client(V1_0).await;
114        skip_if_handshake_failed!(client);
115        let mut client = client.unwrap();
116        assert_eq!(client.server_state(), Ready);
117        let response = run_invalid_query(&mut client).await.unwrap();
118        assert!(Failure::try_from(response).is_ok());
119        assert_eq!(client.server_state(), Failed);
120        let response = client.ack_failure().await.unwrap();
121        assert!(Success::try_from(response).is_ok());
122        assert_eq!(client.server_state(), Ready);
123        let response = run_valid_query(&mut client).await.unwrap();
124        assert!(Success::try_from(response).is_ok());
125        assert_eq!(client.server_state(), Streaming);
126    }
127
128    #[tokio::test]
129    async fn ack_failure_after_ignored() {
130        let client = get_initialized_client(V1_0).await;
131        skip_if_handshake_failed!(client);
132        let mut client = client.unwrap();
133        assert_eq!(client.server_state(), Ready);
134        let response = run_invalid_query(&mut client).await.unwrap();
135        assert!(Failure::try_from(response).is_ok());
136        assert_eq!(client.server_state(), Failed);
137        let response = run_valid_query(&mut client).await.unwrap();
138        assert!(matches!(response, Message::Ignored));
139        assert_eq!(client.server_state(), Failed);
140        let response = client.ack_failure().await.unwrap();
141        assert!(Success::try_from(response).is_ok());
142        assert_eq!(client.server_state(), Ready);
143        let response = run_valid_query(&mut client).await.unwrap();
144        assert!(Success::try_from(response).is_ok());
145        assert_eq!(client.server_state(), Streaming);
146    }
147
148    #[tokio::test]
149    async fn run() {
150        let client = get_initialized_client(V1_0).await;
151        skip_if_handshake_failed!(client);
152        let mut client = client.unwrap();
153        assert_eq!(client.server_state(), Ready);
154        let response = run_valid_query(&mut client).await.unwrap();
155        assert!(Success::try_from(response).is_ok());
156        assert_eq!(client.server_state(), Streaming);
157    }
158
159    #[tokio::test]
160    async fn run_pipelined() {
161        let client = get_initialized_client(V1_0).await;
162        skip_if_handshake_failed!(client);
163        let mut client = client.unwrap();
164        let messages = vec![
165            Message::Run(Run::new("MATCH (n {test: 'v1-pipelined'}) DETACH DELETE n;".to_string(), Default::default())),
166            Message::PullAll,
167            Message::Run(Run::new("CREATE (:Database {name: 'neo4j', born: 2007, test: 'v1-pipelined'});".to_string(), Default::default())),
168            Message::PullAll,
169            Message::Run(Run::new(
170                "MATCH (neo4j:Database {name: 'neo4j', test: 'v1-pipelined'}) CREATE (:Library {name: 'bolt-client', born: 2019, test: 'v1-pipelined'})-[:CLIENT_FOR]->(neo4j);".to_string(),
171                Default::default())),
172            Message::PullAll,
173            Message::Run(Run::new(
174                "MATCH (neo4j:Database {name: 'neo4j', test: 'v1-pipelined'}), (bolt_client:Library {name: 'bolt-client', test: 'v1-pipelined'}) RETURN bolt_client.born - neo4j.born;".to_string(),
175                Default::default())),
176            Message::PullAll,
177        ];
178        for response in client.pipeline(messages).await.unwrap() {
179            assert!(match response {
180                Message::Success(_) => true,
181                Message::Record(record) => {
182                    assert_eq!(record.fields()[0], Value::from(12_i8));
183                    true
184                }
185                _ => false,
186            });
187        }
188    }
189
190    #[tokio::test]
191    async fn run_and_pull() {
192        let client = get_initialized_client(V1_0).await;
193        skip_if_handshake_failed!(client);
194        let mut client = client.unwrap();
195        assert_eq!(client.server_state(), Ready);
196        let response = client
197            .run("RETURN 3458376 as n;", None, None)
198            .await
199            .unwrap();
200        assert!(Success::try_from(response).is_ok());
201        assert_eq!(client.server_state(), Streaming);
202
203        let (records, response) = client.pull(None).await.unwrap();
204        assert!(Success::try_from(response).is_ok());
205        assert_eq!(client.server_state(), Ready);
206        assert_eq!(records.len(), 1);
207        assert_eq!(records[0].fields(), &[Value::from(3_458_376)]);
208    }
209
210    #[tokio::test]
211    async fn node_and_rel_creation() {
212        let client = get_initialized_client(V1_0).await;
213        skip_if_handshake_failed!(client);
214        let mut client = client.unwrap();
215        client
216            .run(
217                "MATCH (n {test: 'v1-node-rel'}) DETACH DELETE n;",
218                None,
219                None,
220            )
221            .await
222            .unwrap();
223        client.pull(None).await.unwrap();
224
225        client.run("CREATE (:Client {name: 'bolt-client', test: 'v1-node-rel'})-[:WRITTEN_IN]->(:Language {name: 'Rust', test: 'v1-node-rel'});", None, None).await.unwrap();
226        client.pull(None).await.unwrap();
227        client
228            .run(
229                "MATCH (c {test: 'v1-node-rel'})-[r:WRITTEN_IN]->(l) RETURN c, r, l;",
230                None,
231                None,
232            )
233            .await
234            .unwrap();
235        let (records, _response) = client.pull(None).await.unwrap();
236
237        let c = Node::try_from(records[0].fields()[0].clone()).unwrap();
238        let r = Relationship::try_from(records[0].fields()[1].clone()).unwrap();
239        let l = Node::try_from(records[0].fields()[2].clone()).unwrap();
240
241        assert_eq!(c.labels(), &[String::from("Client")]);
242        assert_eq!(
243            c.properties().get("name"),
244            Some(&Value::from("bolt-client"))
245        );
246        assert_eq!(l.labels(), &[String::from("Language")]);
247        assert_eq!(l.properties().get("name"), Some(&Value::from("Rust")));
248        assert_eq!(r.rel_type(), "WRITTEN_IN");
249        assert!(r.properties().is_empty());
250        assert_eq!(
251            (r.start_node_identity(), r.end_node_identity()),
252            (c.node_identity(), l.node_identity())
253        );
254    }
255
256    #[tokio::test]
257    async fn discard_fail() {
258        let client = get_initialized_client(V1_0).await;
259        skip_if_handshake_failed!(client);
260        let mut client = client.unwrap();
261        assert_eq!(client.server_state(), Ready);
262        assert!(matches!(
263            client.discard(None).await,
264            Err(CommunicationError::InvalidState { state: Ready, .. })
265        ));
266    }
267
268    #[tokio::test]
269    async fn discard() {
270        let client = get_initialized_client(V1_0).await;
271        skip_if_handshake_failed!(client);
272        let mut client = client.unwrap();
273        assert_eq!(client.server_state(), Ready);
274        let response = run_valid_query(&mut client).await.unwrap();
275        assert!(Success::try_from(response).is_ok());
276        assert_eq!(client.server_state(), Streaming);
277        let response = client.discard(None).await.unwrap();
278        assert!(Success::try_from(response).is_ok());
279        assert_eq!(client.server_state(), Ready);
280    }
281
282    #[tokio::test]
283    async fn discard_and_pull() {
284        let client = get_initialized_client(V1_0).await;
285        skip_if_handshake_failed!(client);
286        let mut client = client.unwrap();
287        assert_eq!(client.server_state(), Ready);
288        let response = run_valid_query(&mut client).await.unwrap();
289        assert!(Success::try_from(response).is_ok());
290        assert_eq!(client.server_state(), Streaming);
291        let response = client.discard(None).await.unwrap();
292        assert!(Success::try_from(response).is_ok());
293        assert_eq!(client.server_state(), Ready);
294        assert!(matches!(
295            client.pull(None).await,
296            Err(CommunicationError::InvalidState { state: Ready, .. })
297        ));
298    }
299
300    #[tokio::test]
301    async fn reset() {
302        let client = get_initialized_client(V1_0).await;
303        skip_if_handshake_failed!(client);
304        let mut client = client.unwrap();
305        assert_eq!(client.server_state(), Ready);
306        let response = run_invalid_query(&mut client).await.unwrap();
307        assert!(Failure::try_from(response).is_ok());
308        assert_eq!(client.server_state(), Failed);
309        let response = run_valid_query(&mut client).await.unwrap();
310        assert!(matches!(response, Message::Ignored));
311        assert_eq!(client.server_state(), Failed);
312        let response = client.reset().await.unwrap();
313        assert!(Success::try_from(response).is_ok());
314        assert_eq!(client.server_state(), Ready);
315        let response = run_valid_query(&mut client).await.unwrap();
316        assert!(Success::try_from(response).is_ok());
317        assert_eq!(client.server_state(), Streaming);
318    }
319
320    #[tokio::test]
321    async fn reset_internals_pipelined() {
322        let client = get_initialized_client(V1_0).await;
323        skip_if_handshake_failed!(client);
324        let mut client = client.unwrap();
325
326        let mut messages = client
327            .pipeline(vec![
328                Message::Run(Run::new(String::from("RETURN 1;"), Default::default())),
329                Message::PullAll,
330                Message::Run(Run::new(String::from("RETURN 1;"), Default::default())),
331                Message::PullAll,
332                Message::Reset,
333            ])
334            .await
335            .unwrap();
336
337        // Last message should be a SUCCESS...
338        assert_eq!(
339            messages.pop(),
340            Some(Message::Success(Success::new(Default::default())))
341        );
342
343        // ... preceded by 4 or more IGNORED
344        assert!(messages.len() >= 4);
345        for message in messages {
346            assert_eq!(message, Message::Ignored);
347        }
348    }
349
350    #[tokio::test]
351    async fn reset_internals() {
352        let client = get_initialized_client(V1_0).await;
353        skip_if_handshake_failed!(client);
354        let mut client = client.unwrap();
355
356        client.run("RETURN 1;", None, None).await.unwrap();
357        client.send_message(Message::PullAll).await.unwrap();
358        client.send_message(Message::Reset).await.unwrap();
359        assert_eq!(client.server_state(), Interrupted);
360
361        // Two situations can happen here - either the PULL_ALL is ignored, or the records of the
362        // PULL_ALL are ignored. The latter situation results in additional IGNORED messages in
363        // the result stream.
364
365        // RECORD or PULL_ALL summary, it's not consistent
366        assert_eq!(client.read_message().await.unwrap(), Message::Ignored);
367
368        match client.read_message().await.unwrap() {
369            // PULL_ALL summary
370            Message::Ignored => {
371                // RESET result
372                Success::try_from(client.read_message().await.unwrap()).unwrap();
373            }
374            // RESET result
375            Message::Success(_) => {}
376            other => panic!("unexpected response {:?}", other),
377        }
378    }
379
380    #[tokio::test]
381    async fn ignored() {
382        let client = get_initialized_client(V1_0).await;
383        skip_if_handshake_failed!(client);
384        let mut client = client.unwrap();
385        assert_eq!(client.server_state(), Ready);
386        let response = run_invalid_query(&mut client).await.unwrap();
387        assert!(Failure::try_from(response).is_ok());
388        assert_eq!(client.server_state(), Failed);
389        let response = run_valid_query(&mut client).await.unwrap();
390        assert!(matches!(response, Message::Ignored));
391        assert_eq!(client.server_state(), Failed);
392    }
393
394    #[tokio::test]
395    async fn v3_method_with_v1_client_fails() {
396        let client = get_initialized_client(V1_0).await;
397        skip_if_handshake_failed!(client);
398        let mut client = client.unwrap();
399        assert!(matches!(
400            client.commit().await,
401            Err(CommunicationError::UnsupportedOperation(V1_0))
402        ));
403    }
404
405    #[tokio::test]
406    async fn v3_message_with_v1_client_fails() {
407        let client = get_initialized_client(V1_0).await;
408        skip_if_handshake_failed!(client);
409        let mut client = client.unwrap();
410        let begin = Begin::new(Default::default());
411        client.send_message(Message::Begin(begin)).await.unwrap();
412        assert!(matches!(
413            client.read_message().await,
414            Err(CommunicationError::ProtocolError(
415                bolt_proto::error::Error::DeserializationError(
416                    bolt_proto::error::DeserializationError::IoError(_)
417                )
418            ))
419        ));
420    }
421}