bolt_client/client/
v4.rs

1#[cfg(test)]
2mod tests {
3    use std::collections::HashMap;
4
5    use bolt_proto::{message::*, value::*, version::*, ServerState::*};
6
7    use crate::{
8        client::v1::tests::*, error::CommunicationError, skip_if_handshake_failed, Metadata,
9    };
10
11    #[tokio::test]
12    async fn hello() {
13        let client = new_client(V4_0).await;
14        skip_if_handshake_failed!(client);
15        let mut client = client.unwrap();
16        assert_eq!(client.server_state(), Connected);
17        let response = initialize_client(&mut client, true).await.unwrap();
18        assert!(Success::try_from(response).is_ok());
19        assert_eq!(client.server_state(), Ready);
20    }
21
22    #[tokio::test]
23    async fn hello_fail() {
24        let client = new_client(V4_0).await;
25        skip_if_handshake_failed!(client);
26        let mut client = client.unwrap();
27        assert_eq!(client.server_state(), Connected);
28        let response = initialize_client(&mut client, false).await.unwrap();
29        assert!(Failure::try_from(response).is_ok());
30        assert_eq!(client.server_state(), Defunct);
31    }
32
33    #[tokio::test]
34    async fn goodbye() {
35        let client = get_initialized_client(V4_0).await;
36        skip_if_handshake_failed!(client);
37        let mut client = client.unwrap();
38        assert_eq!(client.server_state(), Ready);
39        assert!(client.goodbye().await.is_ok());
40        assert_eq!(client.server_state(), Defunct);
41    }
42
43    #[tokio::test]
44    async fn run() {
45        let client = get_initialized_client(V4_0).await;
46        skip_if_handshake_failed!(client);
47        let mut client = client.unwrap();
48        assert_eq!(client.server_state(), Ready);
49        let response = run_valid_query(&mut client).await.unwrap();
50        assert!(Success::try_from(response).is_ok());
51        assert_eq!(client.server_state(), Streaming);
52    }
53
54    #[tokio::test]
55    async fn run_pipelined() {
56        let client = get_initialized_client(V4_0).await;
57        skip_if_handshake_failed!(client);
58        let mut client = client.unwrap();
59        let messages = vec![
60            Message::RunWithMetadata(RunWithMetadata::new(
61                "MATCH (n {test: 'v4-pipelined'}) DETACH DELETE n;".to_string(),
62                Default::default(), Default::default())),
63            Message::Pull(Pull::new(HashMap::from_iter(vec![("n".to_string(), Value::from(1))]))),
64            Message::RunWithMetadata(RunWithMetadata::new(
65                "CREATE (:Database {name: 'neo4j', v1_release: date('2010-02-16'), test: 'v4-pipelined'});".to_string(),
66                Default::default(), Default::default())),
67            Message::Pull(Pull::new(HashMap::from_iter(vec![("n".to_string(), Value::from(1))]))),
68            Message::RunWithMetadata(RunWithMetadata::new(
69                "MATCH (neo4j:Database {name: 'neo4j', test: 'v4-pipelined'}) CREATE (:Library {name: 'bolt-client', v1_release: date('2019-12-23'), test: 'v4-pipelined'})-[:CLIENT_FOR]->(neo4j);".to_string(),
70                Default::default(), Default::default())),
71            Message::Pull(Pull::new(HashMap::from_iter(vec![("n".to_string(), Value::from(1))]))),
72            Message::RunWithMetadata(RunWithMetadata::new(
73                "MATCH (neo4j:Database {name: 'neo4j', test: 'v4-pipelined'}), (bolt_client:Library {name: 'bolt-client', test: 'v4-pipelined'}) RETURN duration.between(neo4j.v1_release, bolt_client.v1_release);".to_string(),
74                Default::default(), Default::default())),
75            Message::Pull(Pull::new(HashMap::from_iter(vec![("n".to_string(), Value::from(1))]))),
76        ];
77        for response in client.pipeline(messages).await.unwrap() {
78            assert!(match response {
79                Message::Success(_) => true,
80                Message::Record(record) => {
81                    assert_eq!(record.fields()[0], Value::from(Duration::new(118, 7, 0, 0)));
82                    true
83                }
84                _ => false,
85            });
86        }
87    }
88
89    // Current Neo4j behavior:
90    //   - Sending DISCARD without 'n' metadata parameter results in a
91    //     Neo.ClientError.Request.Invalid, saying "Expecting DISCARD size n to be a Long
92    //     value, but got: NO_VALUE"
93    //   - Sending DISCARD with 'n' equal to some number results in a
94    //     Neo.DatabaseError.General.UnknownError, saying "Currently it is only supported
95    //     to discard ALL records, but it was requested to discard " + n
96    //   - Sending DISCARD with 'n' equal to -1 indicates discard of all records in the
97    //     result stream.
98    #[tokio::test]
99    async fn discard() {
100        let client = get_initialized_client(V4_0).await;
101        skip_if_handshake_failed!(client);
102        let mut client = client.unwrap();
103        assert_eq!(client.server_state(), Ready);
104
105        let response = run_valid_query(&mut client).await.unwrap();
106        assert!(Success::try_from(response).is_ok());
107        assert_eq!(client.server_state(), Streaming);
108        let response = client.discard(None).await.unwrap();
109        assert!(Failure::try_from(response).is_ok());
110        assert_eq!(client.server_state(), Failed);
111
112        let response = client.reset().await.unwrap();
113        assert!(Success::try_from(response).is_ok());
114        assert_eq!(client.server_state(), Ready);
115
116        let response = run_valid_query(&mut client).await.unwrap();
117        assert!(Success::try_from(response).is_ok());
118        assert_eq!(client.server_state(), Streaming);
119        let response = client
120            .discard(Some(Metadata::from_iter(vec![("n", 1)])))
121            .await
122            .unwrap();
123        assert!(Failure::try_from(response).is_ok());
124        assert_eq!(client.server_state(), Failed);
125
126        let response = client.reset().await.unwrap();
127        assert!(Success::try_from(response).is_ok());
128        assert_eq!(client.server_state(), Ready);
129
130        let response = run_valid_query(&mut client).await.unwrap();
131        assert!(Success::try_from(response).is_ok());
132        assert_eq!(client.server_state(), Streaming);
133        let response = client
134            .discard(Some(Metadata::from_iter(vec![("n", -1)])))
135            .await
136            .unwrap();
137        assert!(Success::try_from(response).is_ok());
138        assert_eq!(client.server_state(), Ready);
139    }
140
141    // Current Neo4j behavior:
142    //   - Need to send an 'n' metadata parameter here too, but finite values of n will
143    //     work here.
144    #[tokio::test]
145    async fn run_and_pull() {
146        let client = get_initialized_client(V4_0).await;
147        skip_if_handshake_failed!(client);
148        let mut client = client.unwrap();
149        assert_eq!(client.server_state(), Ready);
150
151        // Try pulling 1 result
152        let response = client
153            .run("RETURN 3458376 as n;", None, None)
154            .await
155            .unwrap();
156        assert!(Success::try_from(response).is_ok());
157        assert_eq!(client.server_state(), Streaming);
158
159        let (records, response) = client
160            .pull(Some(Metadata::from_iter(vec![("n", 1)])))
161            .await
162            .unwrap();
163        assert!(Success::try_from(response).is_ok());
164        assert_eq!(client.server_state(), Ready);
165        assert_eq!(records.len(), 1);
166        assert_eq!(records[0].fields(), &[Value::from(3_458_376)]);
167
168        // Try pulling all results
169        let response = client
170            .run("RETURN 3458376 as n;", None, None)
171            .await
172            .unwrap();
173        assert!(Success::try_from(response).is_ok());
174        assert_eq!(client.server_state(), Streaming);
175
176        let (records, response) = client
177            .pull(Some(Metadata::from_iter(vec![("n", -1)])))
178            .await
179            .unwrap();
180        assert!(Success::try_from(response).is_ok());
181        assert_eq!(client.server_state(), Ready);
182        assert_eq!(records.len(), 1);
183        assert_eq!(records[0].fields(), &[Value::from(3_458_376)]);
184    }
185
186    #[tokio::test]
187    async fn begin() {
188        let client = get_initialized_client(V4_0).await;
189        skip_if_handshake_failed!(client);
190        let mut client = client.unwrap();
191        assert_eq!(client.server_state(), Ready);
192        let response = client.begin(None).await.unwrap();
193        assert!(Success::try_from(response).is_ok());
194        assert_eq!(client.server_state(), TxReady);
195    }
196
197    #[tokio::test]
198    async fn commit_empty_transaction() {
199        let client = get_initialized_client(V4_0).await;
200        skip_if_handshake_failed!(client);
201        let mut client = client.unwrap();
202        assert_eq!(client.server_state(), Ready);
203        client.begin(None).await.unwrap();
204        assert_eq!(client.server_state(), TxReady);
205        let response = client.commit().await.unwrap();
206        assert!(Success::try_from(response).is_ok());
207        assert_eq!(client.server_state(), Ready);
208    }
209
210    #[tokio::test]
211    async fn commit() {
212        let client = get_initialized_client(V4_0).await;
213        skip_if_handshake_failed!(client);
214        let mut client = client.unwrap();
215        assert_eq!(client.server_state(), Ready);
216        client.begin(None).await.unwrap();
217        assert_eq!(client.server_state(), TxReady);
218
219        let messages = vec![
220            Message::RunWithMetadata(RunWithMetadata::new(
221                "MATCH (n {test: 'v4-commit'}) DETACH DELETE n;".to_string(),
222                Default::default(), Default::default())),
223            Message::Pull(Pull::new(HashMap::from_iter(vec![("n".to_string(), Value::from(1))]))),
224            Message::RunWithMetadata(RunWithMetadata::new(
225                "CREATE (:Database {name: 'neo4j', v1_release: date('2010-02-16'), test: 'v4-commit'});".to_string(),
226                Default::default(), Default::default())),
227            Message::Pull(Pull::new(HashMap::from_iter(vec![("n".to_string(), Value::from(1))]))),
228        ];
229        client.pipeline(messages).await.unwrap();
230        assert_eq!(client.server_state(), TxReady);
231        let response = client.commit().await.unwrap();
232        assert!(Success::try_from(response).is_ok());
233        assert_eq!(client.server_state(), Ready);
234
235        let messages = vec![
236            Message::RunWithMetadata(RunWithMetadata::new(
237                "MATCH (n {test: 'v4-commit'}) RETURN n;".to_string(),
238                Default::default(),
239                Default::default(),
240            )),
241            Message::Pull(Pull::new(HashMap::from_iter(vec![(
242                "n".to_string(),
243                Value::from(1),
244            )]))),
245        ];
246        let mut node_exists = false;
247        for response in client.pipeline(messages).await.unwrap() {
248            if let Message::Record(record) = response {
249                let node = Node::try_from(record.fields()[0].clone()).unwrap();
250                assert_eq!(node.labels(), &["Database"]);
251                node_exists = true;
252                break;
253            }
254        }
255        assert!(node_exists);
256    }
257
258    #[tokio::test]
259    async fn commit_with_no_begin_fails() {
260        let client = get_initialized_client(V4_0).await;
261        skip_if_handshake_failed!(client);
262        let mut client = client.unwrap();
263        assert!(matches!(
264            client.commit().await,
265            Err(CommunicationError::InvalidState { state: Ready, .. })
266        ));
267    }
268
269    #[tokio::test]
270    async fn rollback_empty_transaction() {
271        let client = get_initialized_client(V4_0).await;
272        skip_if_handshake_failed!(client);
273        let mut client = client.unwrap();
274        assert_eq!(client.server_state(), Ready);
275        client.begin(None).await.unwrap();
276        assert_eq!(client.server_state(), TxReady);
277        let response = client.rollback().await.unwrap();
278        assert!(Success::try_from(response).is_ok());
279        assert_eq!(client.server_state(), Ready);
280    }
281
282    #[tokio::test]
283    async fn rollback() {
284        let client = get_initialized_client(V4_0).await;
285        skip_if_handshake_failed!(client);
286        let mut client = client.unwrap();
287        assert_eq!(client.server_state(), Ready);
288        client.begin(None).await.unwrap();
289        assert_eq!(client.server_state(), TxReady);
290        let messages = vec![
291            Message::RunWithMetadata(RunWithMetadata::new(
292                "MATCH (n {test: 'v4-rollback'}) DETACH DELETE n;".to_string(),
293                Default::default(), Default::default())),
294            Message::Pull(Pull::new(HashMap::from_iter(vec![("n".to_string(), Value::from(1))]))),
295            Message::RunWithMetadata(RunWithMetadata::new(
296                "CREATE (:Database {name: 'neo4j', v1_release: date('2010-02-16'), test: 'v4-rollback'});".to_string(),
297                Default::default(), Default::default())),
298            Message::Pull(Pull::new(HashMap::from_iter(vec![("n".to_string(), Value::from(1))]))),
299        ];
300        client.pipeline(messages).await.unwrap();
301        assert_eq!(client.server_state(), TxReady);
302        let response = client.rollback().await.unwrap();
303        assert!(Success::try_from(response).is_ok());
304        assert_eq!(client.server_state(), Ready);
305
306        let messages = vec![
307            Message::RunWithMetadata(RunWithMetadata::new(
308                "MATCH (n {test: 'v4-rollback'}) RETURN n;".to_string(),
309                Default::default(),
310                Default::default(),
311            )),
312            Message::Pull(Pull::new(HashMap::from_iter(vec![(
313                "n".to_string(),
314                Value::from(1),
315            )]))),
316        ];
317        for response in client.pipeline(messages).await.unwrap() {
318            // There should be no RECORD messages
319            assert!(matches!(response, Message::Success(_)));
320        }
321    }
322
323    #[tokio::test]
324    async fn rollback_with_no_begin_fails() {
325        let client = get_initialized_client(V4_0).await;
326        skip_if_handshake_failed!(client);
327        let mut client = client.unwrap();
328        assert!(matches!(
329            client.rollback().await,
330            Err(CommunicationError::InvalidState { state: Ready, .. })
331        ));
332    }
333
334    #[tokio::test]
335    async fn multiple_open_streams() {
336        let client = get_initialized_client(V4_0).await;
337        skip_if_handshake_failed!(client);
338        let mut client = client.unwrap();
339        assert_eq!(client.server_state(), Ready);
340        client.begin(None).await.unwrap();
341        assert_eq!(client.server_state(), TxReady);
342
343        client
344            .run(
345                "MATCH (n {test: 'v4-multi-stream'}) DETACH DELETE n;",
346                None,
347                None,
348            )
349            .await
350            .unwrap();
351        client
352            .pull(Some(Metadata::from_iter(vec![("n", -1)])))
353            .await
354            .unwrap();
355
356        const NUM_STREAMS: usize = 5;
357        let mut qids: HashMap<i32, i64> = HashMap::with_capacity(NUM_STREAMS);
358        for n in 1..=NUM_STREAMS {
359            let response = client
360                .run(
361                    format!(
362                        "CREATE (s:Stream {{number: {}, test: 'v4-multi-stream'}}) RETURN s",
363                        n
364                    ),
365                    None,
366                    None,
367                )
368                .await
369                .unwrap();
370            let success = Success::try_from(response).unwrap();
371            match success.metadata().get("qid").unwrap() {
372                Value::Integer(qid) => {
373                    qids.insert(n as i32, *qid);
374                }
375                _ => panic!("qid not returned"),
376            }
377        }
378
379        assert_eq!(client.open_tx_streams, NUM_STREAMS);
380
381        for (n, qid) in qids {
382            assert_eq!(client.server_state(), TxStreaming);
383
384            let (records, response) = client
385                .pull(Some(Metadata::from_iter(vec![("n", -1), ("qid", qid)])))
386                .await
387                .unwrap();
388
389            assert!(Success::try_from(response).is_ok());
390            let node = Node::try_from(records[0].fields()[0].clone()).unwrap();
391            assert_eq!(node.properties().get("number").unwrap(), &Value::from(n));
392        }
393
394        assert_eq!(client.server_state(), TxReady);
395        assert_eq!(client.open_tx_streams, 0);
396    }
397
398    #[tokio::test]
399    async fn reset_internals_pipelined() {
400        let client = get_initialized_client(V4_0).await;
401        skip_if_handshake_failed!(client);
402        let mut client = client.unwrap();
403
404        let mut messages = client
405            .pipeline(vec![
406                Message::RunWithMetadata(RunWithMetadata::new(
407                    String::from("RETURN 1;"),
408                    Default::default(),
409                    Default::default(),
410                )),
411                Message::Pull(Pull::new(HashMap::from_iter(vec![(
412                    String::from("n"),
413                    Value::from(1),
414                )]))),
415                Message::RunWithMetadata(RunWithMetadata::new(
416                    String::from("RETURN 1;"),
417                    Default::default(),
418                    Default::default(),
419                )),
420                Message::Pull(Pull::new(HashMap::from_iter(vec![(
421                    String::from("n"),
422                    Value::from(1),
423                )]))),
424                Message::Reset,
425            ])
426            .await
427            .unwrap();
428
429        // Last message should be a SUCCESS...
430        assert_eq!(
431            messages.pop(),
432            Some(Message::Success(Success::new(Default::default())))
433        );
434
435        // ... preceded by 4 or more IGNORED
436        assert!(messages.len() >= 4);
437        for message in messages {
438            assert_eq!(message, Message::Ignored);
439        }
440    }
441
442    #[tokio::test]
443    async fn reset_internals() {
444        let client = get_initialized_client(V4_0).await;
445        skip_if_handshake_failed!(client);
446        let mut client = client.unwrap();
447
448        client.run("RETURN 1;", None, None).await.unwrap();
449        client
450            .send_message(Message::Pull(Pull::new(HashMap::from_iter(vec![(
451                String::from("n"),
452                Value::from(1),
453            )]))))
454            .await
455            .unwrap();
456        client.send_message(Message::Reset).await.unwrap();
457        assert_eq!(client.server_state(), Interrupted);
458
459        // Two situations can happen here - either the PULL is ignored, or the records of the
460        // PULL are ignored. The latter situation results in additional IGNORED messages in
461        // the result stream.
462
463        // RECORD or PULL summary, it's not consistent
464        assert_eq!(client.read_message().await.unwrap(), Message::Ignored);
465
466        match client.read_message().await.unwrap() {
467            // PULL summary
468            Message::Ignored => {
469                // RESET result
470                Success::try_from(client.read_message().await.unwrap()).unwrap();
471            }
472            // RESET result
473            Message::Success(_) => {}
474            other => panic!("unexpected response {:?}", other),
475        }
476    }
477}