bolt_client/
client.rs

1// Much of the documentation comments in this module are copied from the descriptions on
2// https://neo4j.com/docs/bolt/current, with minor modifications.
3//
4// Original copyright and license information for these descriptions:
5// Copyright © Neo4j Sweden AB [http://neo4j.com]
6// CC BY-NC-SA 4.0 (https://creativecommons.org/licenses/by-nc-sa/4.0/)
7//
8// The aforementioned documentation comments are thus licensed under CC BY-NC-SA 4.0.
9
10use std::{collections::VecDeque, io};
11
12use bytes::*;
13use futures_util::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
14
15use bolt_client_macros::*;
16use bolt_proto::{
17    error::Error as ProtocolError, message::*, version::*, Message, ServerState, ServerState::*,
18    Value,
19};
20
21use crate::{
22    error::{CommunicationError, CommunicationResult, ConnectionError, ConnectionResult},
23    Metadata, Params, RoutingContext,
24};
25
26mod v1;
27mod v2;
28mod v3;
29mod v4;
30mod v4_1;
31mod v4_2;
32mod v4_3;
33mod v4_4;
34
35const PREAMBLE: [u8; 4] = [0x60, 0x60, 0xB0, 0x17];
36
37/// Return whether a version is compatible with version specifier.
38fn is_compatible(version: u32, specifier: u32) -> bool {
39    let (major, minor) = (version & 0xff, version >> 8 & 0xff);
40    let (specified_major, specified_minor, range) = (
41        specifier & 0xff,
42        specifier >> 8 & 0xff,
43        specifier >> 16 & 0xff,
44    );
45
46    major == specified_major
47        && (specified_minor.saturating_sub(range)..=specified_minor).contains(&minor)
48}
49
50/// An asynchronous client for Bolt servers.
51#[derive(Debug)]
52pub struct Client<S: AsyncRead + AsyncWrite + Unpin> {
53    stream: S,
54    version: u32,
55    server_state: ServerState,
56    sent_queue: VecDeque<Message>,
57    open_tx_streams: usize,
58}
59
60impl<S: AsyncRead + AsyncWrite + Unpin> Client<S> {
61    /// Attempt to create a new client from an asynchronous stream. A handshake will be performed
62    /// with the provided protocol version specifiers, and, if this succeeds, a Client will be
63    /// returned.
64    pub async fn new(mut stream: S, version_specifiers: &[u32; 4]) -> ConnectionResult<Self> {
65        let mut version_specifiers_bytes = BytesMut::with_capacity(16);
66        version_specifiers
67            .iter()
68            .for_each(|&v| version_specifiers_bytes.put_u32(v));
69        stream.write_all(&PREAMBLE).await?;
70        stream.write_all(&version_specifiers_bytes).await?;
71        stream.flush().await?;
72
73        let mut u32_bytes = [0, 0, 0, 0];
74        stream.read_exact(&mut u32_bytes).await?;
75        let version = u32::from_be_bytes(u32_bytes);
76
77        if version > 0 {
78            for &specifier in version_specifiers {
79                if is_compatible(version, specifier) {
80                    return Ok(Self {
81                        stream,
82                        version,
83                        server_state: Connected,
84                        sent_queue: VecDeque::default(),
85                        open_tx_streams: 0,
86                    });
87                }
88            }
89        }
90        Err(ConnectionError::HandshakeFailed(*version_specifiers))
91    }
92
93    /// Get the current version of this client.
94    pub fn version(&self) -> u32 {
95        self.version
96    }
97
98    /// Get the current server state for this client.
99    pub fn server_state(&self) -> ServerState {
100        self.server_state
101    }
102
103    pub(crate) async fn read_message(&mut self) -> CommunicationResult<Message> {
104        let message = Message::from_stream(&mut self.stream)
105            .await
106            .map_err(ProtocolError::from)?;
107
108        #[cfg(test)]
109        println!("<<< {:?}\n", message);
110
111        match (self.server_state, self.sent_queue.pop_front(), message) {
112            // CONNECTED
113            (Connected, Some(Message::Init(_)), Message::Success(success)) => {
114                self.server_state = Ready;
115                Ok(Message::Success(success))
116            }
117            (Connected, Some(Message::Init(_)), Message::Failure(failure)) => {
118                self.server_state = Defunct;
119                Ok(Message::Failure(failure))
120            }
121            (Connected, Some(Message::Hello(_)), Message::Success(success)) => {
122                self.server_state = Ready;
123                Ok(Message::Success(success))
124            }
125            (Connected, Some(Message::Hello(_)), Message::Failure(failure)) => {
126                self.server_state = Defunct;
127                Ok(Message::Failure(failure))
128            }
129
130            // READY
131            (Ready, Some(Message::Run(_)), Message::Success(success)) => {
132                self.server_state = Streaming;
133                Ok(Message::Success(success))
134            }
135            (Ready, Some(Message::Run(_)), Message::Failure(failure)) => {
136                self.server_state = Failed;
137                Ok(Message::Failure(failure))
138            }
139            (Ready, Some(Message::RunWithMetadata(_)), Message::Success(success)) => {
140                self.server_state = Streaming;
141                Ok(Message::Success(success))
142            }
143            (Ready, Some(Message::RunWithMetadata(_)), Message::Failure(failure)) => {
144                self.server_state = Failed;
145                Ok(Message::Failure(failure))
146            }
147            (Ready, Some(Message::Begin(_)), Message::Success(success)) => {
148                self.server_state = TxReady;
149                Ok(Message::Success(success))
150            }
151            (Ready, Some(Message::Begin(_)), Message::Failure(failure)) => {
152                self.server_state = Failed;
153                Ok(Message::Failure(failure))
154            }
155            (Ready, Some(Message::Route(_)), Message::Success(success)) => {
156                self.server_state = Ready;
157                Ok(Message::Success(success))
158            }
159            (Ready, Some(Message::Route(_)), Message::Failure(failure)) => {
160                self.server_state = Failed;
161                Ok(Message::Failure(failure))
162            }
163            (Ready, Some(Message::RouteWithMetadata(_)), Message::Success(success)) => {
164                self.server_state = Ready;
165                Ok(Message::Success(success))
166            }
167            (Ready, Some(Message::RouteWithMetadata(_)), Message::Failure(failure)) => {
168                self.server_state = Failed;
169                Ok(Message::Failure(failure))
170            }
171
172            // STREAMING
173            (Streaming, Some(Message::PullAll), Message::Success(success)) => {
174                self.server_state = Ready;
175                Ok(Message::Success(success))
176            }
177            (Streaming, Some(Message::PullAll), Message::Record(record)) => {
178                self.server_state = Streaming;
179                // Put the PULL_ALL message back so we can keep consuming records
180                self.sent_queue.push_front(Message::PullAll);
181                Ok(Message::Record(record))
182            }
183            (Streaming, Some(Message::PullAll), Message::Failure(failure)) => {
184                self.server_state = Failed;
185                Ok(Message::Failure(failure))
186            }
187            (Streaming, Some(Message::Pull(_)), Message::Success(success)) => {
188                self.server_state = match success.metadata().get("has_more") {
189                    Some(&Value::Boolean(true)) => Streaming,
190                    _ => Ready,
191                };
192                Ok(Message::Success(success))
193            }
194            (Streaming, Some(Message::Pull(pull)), Message::Record(record)) => {
195                self.server_state = Streaming;
196                // Put the PULL message back so we can keep consuming records
197                self.sent_queue.push_front(Message::Pull(pull));
198                Ok(Message::Record(record))
199            }
200            (Streaming, Some(Message::Pull(_)), Message::Failure(failure)) => {
201                self.server_state = Failed;
202                Ok(Message::Failure(failure))
203            }
204            (Streaming, Some(Message::DiscardAll), Message::Success(success)) => {
205                self.server_state = Ready;
206                Ok(Message::Success(success))
207            }
208            (Streaming, Some(Message::DiscardAll), Message::Failure(failure)) => {
209                self.server_state = Failed;
210                Ok(Message::Failure(failure))
211            }
212            (Streaming, Some(Message::Discard(_)), Message::Success(success)) => {
213                self.server_state = match success.metadata().get("has_more") {
214                    Some(&Value::Boolean(true)) => Streaming,
215                    _ => Ready,
216                };
217                Ok(Message::Success(success))
218            }
219            (Streaming, Some(Message::Discard(_)), Message::Failure(failure)) => {
220                self.server_state = Failed;
221                Ok(Message::Failure(failure))
222            }
223
224            // TX_READY
225            (TxReady, Some(Message::RunWithMetadata(_)), Message::Success(success)) => {
226                self.open_tx_streams += 1;
227                self.server_state = TxStreaming;
228                Ok(Message::Success(success))
229            }
230            (TxReady, Some(Message::RunWithMetadata(_)), Message::Failure(failure)) => {
231                self.server_state = Failed;
232                Ok(Message::Failure(failure))
233            }
234            (TxReady, Some(Message::Commit), Message::Success(success)) => {
235                self.server_state = Ready;
236                Ok(Message::Success(success))
237            }
238            (TxReady, Some(Message::Commit), Message::Failure(failure)) => {
239                self.server_state = Failed;
240                Ok(Message::Failure(failure))
241            }
242            (TxReady, Some(Message::Rollback), Message::Success(success)) => {
243                self.server_state = Ready;
244                Ok(Message::Success(success))
245            }
246            (TxReady, Some(Message::Rollback), Message::Failure(failure)) => {
247                self.server_state = Failed;
248                Ok(Message::Failure(failure))
249            }
250
251            // TX_STREAMING
252            (TxStreaming, Some(Message::RunWithMetadata(_)), Message::Success(success)) => {
253                self.open_tx_streams += 1;
254                self.server_state = TxStreaming;
255                Ok(Message::Success(success))
256            }
257            (TxStreaming, Some(Message::RunWithMetadata(_)), Message::Failure(failure)) => {
258                self.server_state = Failed;
259                Ok(Message::Failure(failure))
260            }
261            (TxStreaming, Some(Message::PullAll), Message::Success(success)) => {
262                self.open_tx_streams -= 1;
263                self.server_state = TxReady;
264                Ok(Message::Success(success))
265            }
266            (TxStreaming, Some(Message::PullAll), Message::Record(record)) => {
267                self.server_state = TxStreaming;
268                // Put the PULL_ALL message back so we can keep consuming records
269                self.sent_queue.push_front(Message::PullAll);
270                Ok(Message::Record(record))
271            }
272            (TxStreaming, Some(Message::PullAll), Message::Failure(failure)) => {
273                self.server_state = Failed;
274                Ok(Message::Failure(failure))
275            }
276            (TxStreaming, Some(Message::Pull(_)), Message::Success(success)) => {
277                self.server_state = match success.metadata().get("has_more") {
278                    Some(&Value::Boolean(true)) => TxStreaming,
279                    _ => {
280                        self.open_tx_streams -= 1;
281                        if self.open_tx_streams > 0 {
282                            TxStreaming
283                        } else {
284                            TxReady
285                        }
286                    }
287                };
288                Ok(Message::Success(success))
289            }
290            (TxStreaming, Some(Message::Pull(pull)), Message::Record(record)) => {
291                self.server_state = TxStreaming;
292                // Put the PULL message back so we can keep consuming records
293                self.sent_queue.push_front(Message::Pull(pull));
294                Ok(Message::Record(record))
295            }
296            (TxStreaming, Some(Message::Pull(_)), Message::Failure(failure)) => {
297                self.server_state = Failed;
298                Ok(Message::Failure(failure))
299            }
300            (TxStreaming, Some(Message::DiscardAll), Message::Success(success)) => {
301                self.open_tx_streams -= 1;
302                self.server_state = TxReady;
303                Ok(Message::Success(success))
304            }
305            (TxStreaming, Some(Message::DiscardAll), Message::Failure(failure)) => {
306                self.server_state = Failed;
307                Ok(Message::Failure(failure))
308            }
309            (TxStreaming, Some(Message::Discard(_)), Message::Success(success)) => {
310                self.server_state = match success.metadata().get("has_more") {
311                    Some(&Value::Boolean(true)) => TxStreaming,
312                    _ => {
313                        self.open_tx_streams -= 1;
314                        if self.open_tx_streams > 0 {
315                            TxStreaming
316                        } else {
317                            TxReady
318                        }
319                    }
320                };
321                Ok(Message::Success(success))
322            }
323            (TxStreaming, Some(Message::Discard(_)), Message::Failure(failure)) => {
324                self.server_state = Failed;
325                Ok(Message::Failure(failure))
326            }
327
328            // FAILED
329            (Failed, Some(Message::Run(_)), Message::Ignored) => {
330                self.server_state = Failed;
331                Ok(Message::Ignored)
332            }
333            (Failed, Some(Message::RunWithMetadata(_)), Message::Ignored) => {
334                self.server_state = Failed;
335                Ok(Message::Ignored)
336            }
337            (Failed, Some(Message::PullAll), Message::Ignored) => {
338                self.server_state = Failed;
339                Ok(Message::Ignored)
340            }
341            (Failed, Some(Message::Pull(_)), Message::Ignored) => {
342                self.server_state = Failed;
343                Ok(Message::Ignored)
344            }
345            (Failed, Some(Message::DiscardAll), Message::Ignored) => {
346                self.server_state = Failed;
347                Ok(Message::Ignored)
348            }
349            (Failed, Some(Message::Discard(_)), Message::Ignored) => {
350                self.server_state = Failed;
351                Ok(Message::Ignored)
352            }
353            (Failed, Some(Message::Route(_)), Message::Ignored) => {
354                self.server_state = Failed;
355                Ok(Message::Ignored)
356            }
357            (Failed, Some(Message::RouteWithMetadata(_)), Message::Ignored) => {
358                self.server_state = Failed;
359                Ok(Message::Ignored)
360            }
361            (Failed, Some(Message::AckFailure), Message::Success(success)) => {
362                self.server_state = Ready;
363                Ok(Message::Success(success))
364            }
365            (Failed, Some(Message::AckFailure), Message::Failure(failure)) => {
366                self.server_state = Defunct;
367                Ok(Message::Failure(failure))
368            }
369
370            // INTERRUPTED
371            (Interrupted, Some(Message::Run(_)), _) => {
372                self.server_state = Interrupted;
373                Ok(Message::Ignored)
374            }
375            (Interrupted, Some(Message::RunWithMetadata(_)), _) => {
376                self.server_state = Interrupted;
377                Ok(Message::Ignored)
378            }
379            (Interrupted, Some(Message::PullAll), Message::Record(_)) => {
380                self.server_state = Interrupted;
381                // Put the PULL_ALL message back so we can keep consuming records
382                self.sent_queue.push_front(Message::PullAll);
383                Ok(Message::Ignored)
384            }
385            (Interrupted, Some(Message::PullAll), _) => {
386                self.server_state = Interrupted;
387                Ok(Message::Ignored)
388            }
389            (Interrupted, Some(Message::Pull(pull)), Message::Record(_)) => {
390                self.server_state = Interrupted;
391                // Put the PULL message back so we can keep consuming records
392                self.sent_queue.push_front(Message::Pull(pull));
393                Ok(Message::Ignored)
394            }
395            (Interrupted, Some(Message::Pull(_)), _) => {
396                self.server_state = Interrupted;
397                Ok(Message::Ignored)
398            }
399            (Interrupted, Some(Message::DiscardAll), _) => {
400                self.server_state = Interrupted;
401                Ok(Message::Ignored)
402            }
403            (Interrupted, Some(Message::Discard(_)), _) => {
404                self.server_state = Interrupted;
405                Ok(Message::Ignored)
406            }
407            (Interrupted, Some(Message::Begin(_)), _) => {
408                self.server_state = Interrupted;
409                Ok(Message::Ignored)
410            }
411            (Interrupted, Some(Message::Commit), _) => {
412                self.server_state = Interrupted;
413                Ok(Message::Ignored)
414            }
415            (Interrupted, Some(Message::Rollback), _) => {
416                self.server_state = Interrupted;
417                Ok(Message::Ignored)
418            }
419            (Interrupted, Some(Message::AckFailure), _) => {
420                self.server_state = Interrupted;
421                Ok(Message::Ignored)
422            }
423            (Interrupted, Some(Message::Route(_)), _) => {
424                self.server_state = Interrupted;
425                Ok(Message::Ignored)
426            }
427            (Interrupted, Some(Message::RouteWithMetadata(_)), _) => {
428                self.server_state = Interrupted;
429                Ok(Message::Ignored)
430            }
431            (Interrupted, Some(Message::Reset), Message::Success(success)) => {
432                self.open_tx_streams = 0;
433                self.server_state = Ready;
434                Ok(Message::Success(success))
435            }
436            (Interrupted, Some(Message::Reset), Message::Failure(failure)) => {
437                self.server_state = Defunct;
438                Ok(Message::Failure(failure))
439            }
440            (state, request, response) => {
441                self.server_state = Defunct;
442                Err(CommunicationError::InvalidResponse {
443                    state,
444                    request,
445                    response,
446                })
447            }
448        }
449    }
450
451    pub(crate) async fn send_message(&mut self, message: Message) -> CommunicationResult<()> {
452        match (self.server_state, &message) {
453            (Connected, Message::Init(_)) => {}
454            (Connected, Message::Hello(_)) => {}
455            (Ready, Message::Run(_)) => {}
456            (Ready, Message::RunWithMetadata(_)) => {}
457            (Ready, Message::Begin(_)) => {}
458            (Ready, Message::Route(_)) => {}
459            (Ready, Message::RouteWithMetadata(_)) => {}
460            (Ready, Message::Reset) => {}
461            (Ready, Message::Goodbye) => {}
462            (Streaming, Message::PullAll) => {}
463            (Streaming, Message::Pull(_)) => {}
464            (Streaming, Message::DiscardAll) => {}
465            (Streaming, Message::Discard(_)) => {}
466            (Streaming, Message::Reset) => {}
467            (Streaming, Message::Goodbye) => {}
468            (TxReady, Message::RunWithMetadata(_)) => {}
469            (TxReady, Message::Commit) => {}
470            (TxReady, Message::Rollback) => {}
471            (TxReady, Message::Reset) => {}
472            (TxReady, Message::Goodbye) => {}
473            (TxStreaming, Message::RunWithMetadata(_)) => {}
474            (TxStreaming, Message::PullAll) => {}
475            (TxStreaming, Message::Pull(_)) => {}
476            (TxStreaming, Message::DiscardAll) => {}
477            (TxStreaming, Message::Discard(_)) => {}
478            (TxStreaming, Message::Reset) => {}
479            (TxStreaming, Message::Goodbye) => {}
480            (Failed, Message::Run(_)) => {}
481            (Failed, Message::RunWithMetadata(_)) => {}
482            (Failed, Message::PullAll) => {}
483            (Failed, Message::Pull(_)) => {}
484            (Failed, Message::DiscardAll) => {}
485            (Failed, Message::Discard(_)) => {}
486            (Failed, Message::AckFailure) => {}
487            (Failed, Message::Reset) => {}
488            (Failed, Message::Goodbye) => {}
489            (Interrupted, Message::Run(_)) => {}
490            (Interrupted, Message::RunWithMetadata(_)) => {}
491            (Interrupted, Message::PullAll) => {}
492            (Interrupted, Message::Pull(_)) => {}
493            (Interrupted, Message::DiscardAll) => {}
494            (Interrupted, Message::Discard(_)) => {}
495            (Interrupted, Message::AckFailure) => {}
496            (Interrupted, Message::Begin(_)) => {}
497            (Interrupted, Message::Commit) => {}
498            (Interrupted, Message::Rollback) => {}
499            (Interrupted, Message::Reset) => {}
500            (Interrupted, Message::Goodbye) => {}
501            (state, message) => {
502                self.server_state = Defunct;
503                return Err(CommunicationError::InvalidState {
504                    state,
505                    message: message.clone(),
506                });
507            }
508        }
509
510        #[cfg(test)]
511        println!(">>> {:?}", message);
512
513        let chunks = message.clone().into_chunks().map_err(ProtocolError::from)?;
514
515        for chunk in chunks {
516            self.stream.write_all(&chunk).await?;
517        }
518        self.stream.flush().await?;
519
520        // Immediate state changes
521        match message {
522            Message::Reset => self.server_state = Interrupted,
523            Message::Goodbye => self.server_state = Disconnected,
524            _ => {}
525        }
526
527        self.sent_queue.push_back(message);
528        Ok(())
529    }
530
531    /// Send a [`HELLO`](Message::Hello) (or [`INIT`](Message::Init)) message to the server.
532    /// _(Sends `INIT` for Bolt v1 - v2, and `HELLO` for Bolt v3+.)_
533    ///
534    /// # Description
535    /// The `HELLO` message requests the connection to be authorized for use with the remote
536    /// database. Clients should send a `HELLO` message to the server immediately after connection
537    /// and process the response before using that connection in any other way.
538    ///
539    /// The server must be in the [`Connected`](ServerState::Connected) state to be able to process
540    /// a `HELLO` message. For any other states, receipt of a `HELLO` message is considered a
541    /// protocol violation and leads to connection closure.
542    ///
543    /// If authentication fails, the server will respond with a [`FAILURE`](Message::Failure)
544    /// message and immediately close the connection. Clients wishing to retry initialization
545    /// should establish a new connection.
546    ///
547    /// # Fields
548    /// `metadata` should contain at least two entries:
549    /// - `user_agent`, which should conform to the format `"Name/Version"`, for example
550    ///   `"Example/1.0.0"` (see
551    ///   [here](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/User-Agent)).
552    /// - `scheme` is the authentication scheme. Predefined schemes are `"none"`, `"basic"`, or
553    ///   `"kerberos"`.
554    ///
555    /// If using Bolt v4.1 or later, the following additional `metadata` entries can be specified:
556    /// - `routing`, a map which should contain routing context information as well as an `address`
557    ///   field indicating to which address the client should initially connect. Leaving this
558    ///   unspecified indicates that the server should not carry out any routing.
559    ///   _(Bolt v4.1+ only.)_
560    ///
561    /// Further entries in `metadata` are passed to the implementation of the chosen authentication
562    /// scheme. Their names, types, and defaults depend on that choice. For example, the scheme
563    /// `"basic"` requires `metadata` to contain the username and password in the form
564    /// `{"principal": "<username>", "credentials": "<password>"}`.
565    ///
566    /// # Response
567    /// - [`Message::Success`] - initialization has completed successfully and the server has
568    ///   entered the [`Ready`](ServerState::Ready) state. The server may include metadata that
569    ///   describes details of the server environment and/or the connection. The following fields
570    ///   are defined for inclusion in the `SUCCESS` metadata:
571    ///   - `server`, the server agent string (e.g. `"Neo4j/4.3.0"`)
572    ///   - `connection_id`, a unique identifier for the connection (e.g. `"bolt-61"`)
573    ///     _(Bolt v3+ only.)_
574    ///   - `hints`, a map of configuration hints (e.g. `{"connection.recv_timeout_seconds": 120}`)
575    ///     These hints may be interpreted or ignored by drivers at their own discretion in order
576    ///     to augment operations where applicable. Hints remain valid throughout the lifetime of a
577    ///     given connection and cannot be changed. As such, newly established connections may
578    ///     observe different hints as the server configuration is adjusted.
579    ///     _(Bolt v4.3+ only.)_
580    /// - [`Message::Failure`] - initialization has failed and the server has entered the
581    ///   [`Defunct`](ServerState::Defunct) state. The server may choose to include metadata
582    ///   describing the nature of the failure but will immediately close the connection after the
583    ///   failure has been sent.
584    #[bolt_version(1, 2, 3, 4, 4.1, 4.2, 4.3, 4.4)]
585    pub async fn hello(&mut self, mut metadata: Metadata) -> CommunicationResult<Message> {
586        let message = match self.version() {
587            V1_0 | V2_0 => {
588                let user_agent: String = metadata
589                    .value
590                    .remove("user_agent")
591                    .ok_or_else(|| {
592                        io::Error::new(io::ErrorKind::InvalidInput, "missing user_agent")
593                    })?
594                    .try_into()
595                    .map_err(|_| {
596                        io::Error::new(io::ErrorKind::InvalidInput, "user_agent must be a string")
597                    })?;
598                let auth_token = metadata.value;
599
600                Message::Init(Init::new(user_agent, auth_token))
601            }
602            _ => Message::Hello(Hello::new(metadata.value)),
603        };
604
605        self.send_message(message).await?;
606        self.read_message().await
607    }
608
609    /// Send a [`ROUTE`](Message::RouteWithMetadata) message to the server.
610    /// _(Bolt v4.3+ only. For Bolt v4.3, an [alternate version](Message::Route) of the message is
611    /// sent.)_
612    ///
613    /// # Description
614    /// The `ROUTE` message instructs the server to return the current routing table.
615    ///
616    /// The server must be in the [`Ready`](ServerState::Ready) state to be able to successfully
617    /// process a `ROUTE` request. If the server is in the [`Failed`](ServerState::Failed) or
618    /// [`Interrupted`](ServerState::Interrupted) state, the response will be
619    /// [`IGNORED`](Message::Ignored). For any other states, receipt of a `ROUTE` request will be
620    /// considered a protocol violation and will lead to connection closure.
621    ///
622    /// # Fields
623    /// - `context`, which should contain routing context information as well as an `address` field
624    ///   indicating to which address the client should initially connect.
625    /// - `bookmarks`, a list of strings containing some kind of bookmark identification, e.g
626    ///   `["bkmk-transaction:1", "bkmk-transaction:2"]`. Default is `[]`.
627    /// - `metadata`, a map which can contain the following optional entries:
628    ///   - `db`, a string containing the name of the database for which this command should be
629    ///     run. [`null`](Value::Null) denotes the server-side configured default database.
630    ///   - `imp_user`, a string specifying the impersonated user for the purposes of resolving
631    ///     their home database. [`null`](Value::Null) denotes no impersonation (i.e., execution
632    ///     takes place as the current user). _(Bolt v4.4+ only.)_
633    ///
634    /// # Response
635    /// - [`Message::Success`] - the routing table has been successfully retrieved and the server
636    ///   has entered the [`Ready`](ServerState::Ready) state. The server sends the following
637    ///   metadata fields in the response:
638    ///   - `rt`, a map with the following fields:
639    ///     - `ttl`, an integer denoting the number of seconds this routing table should be
640    ///       considered valid
641    ///     - `servers`, a list of maps representing roles for one or more addresses. Each element
642    ///       will have the following fields:
643    ///       - `role`, a server role. Possible values are `"READ"`, `"WRITE"`, and `"ROUTE"`.
644    ///       - `addresses`, a list of strings representing the servers with the specified role
645    /// - [`Message::Ignored`] - the server is in the [`Failed`](ServerState::Failed) or
646    ///   [`Interrupted`](ServerState::Interrupted) state, and the request was discarded without
647    ///   being processed. No server state change has occurred.
648    /// - [`Message::Failure`] - the request could not be processed successfully and the server has
649    ///   entered the [`Failed`](ServerState::Failed) state. The server may attach metadata to the
650    ///   message to provide more detail on the nature of the failure.
651    #[bolt_version(4.3, 4.4)]
652    pub async fn route(
653        &mut self,
654        context: RoutingContext,
655        bookmarks: impl Into<Vec<String>>,
656        metadata: Option<Metadata>,
657    ) -> CommunicationResult<Message> {
658        let mut metadata = metadata.unwrap_or_default().value;
659        let message = match self.version() {
660            V4_3 => {
661                let database = match metadata.remove("db") {
662                    Some(value) => match value {
663                        Value::String(string) => Some(string),
664                        Value::Null => None,
665                        _ => {
666                            return Err(io::Error::new(
667                                io::ErrorKind::InvalidInput,
668                                "db must be either a string or null",
669                            )
670                            .into())
671                        }
672                    },
673                    None => None,
674                };
675
676                Message::Route(Route::new(context.value, bookmarks.into(), database))
677            }
678            _ => Message::RouteWithMetadata(RouteWithMetadata::new(
679                context.value,
680                bookmarks.into(),
681                metadata,
682            )),
683        };
684
685        self.send_message(message).await?;
686        self.read_message().await
687    }
688
689    /// Send a [`RUN`](Message::Run) message to the server.
690    /// _(Bolt v1+. For Bolt v1 - v2, the `metadata` parameter is ignored.)_
691    ///
692    /// # Description
693    /// A `RUN` message submits a new query for execution, the result of which will be consumed by
694    /// a subsequent message, such as [`PULL`](Message::Pull).
695    ///
696    /// The server must be in either the [`Ready`](ServerState::Ready) state, the
697    /// [`TxReady`](ServerState::TxReady) state (Bolt v3+), or the
698    /// [`TxStreaming`](ServerState::TxStreaming) state (Bolt v4+) to be able to successfully
699    /// process a `RUN` request. If the server is in the [`Failed`](ServerState::Failed) or
700    /// [`Interrupted`](ServerState::Interrupted) state, the response will be
701    /// [`IGNORED`](Message::Ignored). For any other states, receipt of a `RUN` request will be
702    /// considered a protocol violation and will lead to connection closure.
703    ///
704    /// # Fields
705    /// - `query` contains a database query or remote procedure call.
706    /// - `parameters` contains variable fields for `query`.
707    ///
708    /// If using Bolt v3 or later, the following `metadata` entries can be specified:
709    /// - `bookmarks`, a list of strings containing some kind of bookmark identification, e.g
710    ///   `["bkmk-transaction:1", "bkmk-transaction:2"]`. Default is `[]`.
711    /// - `tx_timeout`, an integer specifying a transaction timeout in milliseconds. Default is the
712    ///   server-side configured timeout.
713    /// - `tx_metadata`, a map containing some metadata information, mainly used for logging.
714    /// - `mode`, a string which specifies what kind of server should be used for this transaction.
715    ///   For write access, use `"w"` and for read access use `"r"`. Default is `"w"`.
716    /// - `db`, a string containing the name of the database where the transaction should take
717    ///   place. [`null`](Value::Null) and `""` denote the server-side configured default database.
718    ///   _(Bolt v4+ only.)_
719    /// - `imp_user`, a string specifying the impersonated user which executes this transaction.
720    ///   [`null`](Value::Null) denotes no impersonation (i.e., execution takes place as the
721    ///   current user). _(Bolt v4.4+ only.)_
722    ///
723    /// # Response
724    /// - [`Message::Success`] - the request has been successfully received and the server has
725    ///   entered the [`Streaming`](ServerState::Streaming) state. Clients should not consider a
726    ///   `SUCCESS` response to indicate completion of the execution of the query, merely
727    ///   acceptance of it. The server may attach metadata to the message to provide header detail
728    ///   for the results that follow. The following fields are defined for inclusion in the
729    ///   metadata:
730    ///   - `fields`, the fields included in the result (e.g. `["name", "age"]`)
731    ///   - `result_available_after`, the time in milliseconds after which the first record in the
732    ///     result stream is available. _(Bolt v1 - v2 only.)_
733    ///   - `t_first`, supercedes `result_available_after`. _(Bolt v3+ only.)_
734    ///   - `qid`, an integer that specifies the server-assigned query ID. This is sent for queries
735    ///     submitted within an explicit transaction. _(Bolt v4+ only.)_
736    /// - [`Message::Ignored`] - the server is in the [`Failed`](ServerState::Failed) or
737    ///   [`Interrupted`](ServerState::Interrupted) state, and the request was discarded without
738    ///   being processed. No server state change has occurred.
739    /// - [`Message::Failure`] - the request could not be processed successfully or is invalid, and
740    ///   the server has entered the [`Failed`](ServerState::Failed) state. The server may attach
741    ///   metadata to the message to provide more detail on the nature of the failure.
742    #[bolt_version(1, 2, 3, 4, 4.1, 4.2, 4.3, 4.4)]
743    pub async fn run(
744        &mut self,
745        query: impl Into<String>,
746        parameters: Option<Params>,
747        metadata: Option<Metadata>,
748    ) -> CommunicationResult<Message> {
749        let message = match self.version() {
750            V1_0 | V2_0 => {
751                Message::Run(Run::new(query.into(), parameters.unwrap_or_default().value))
752            }
753            _ => Message::RunWithMetadata(RunWithMetadata::new(
754                query.into(),
755                parameters.unwrap_or_default().value,
756                metadata.unwrap_or_default().value,
757            )),
758        };
759
760        self.send_message(message).await?;
761        self.read_message().await
762    }
763
764    /// Send a [`PULL`](Message::Pull) (or [`PULL_ALL`](Message::PullAll)) message to the server.
765    /// _(Sends `PULL_ALL` for Bolt v1 - v3, and `PULL` for Bolt v4+. For Bolt v1 - v3, the
766    /// `metadata` parameter is ignored.)_
767    ///
768    /// # Description
769    /// The `PULL` message issues a request to stream outstanding results back to the client,
770    /// before returning to the [`Ready`](ServerState::Ready) state.
771    ///
772    /// Result details consist of zero or more [`RECORD`](Message::Record) messages and a summary
773    /// message. Each record carries with it a list of values which form the data content of the
774    /// record. The order of the values within that list should be meaningful to the client,
775    /// perhaps based on a requested ordering for that result, but no guarantees are made around
776    /// the order of records within the result. A record should only be considered valid if
777    /// accompanied by a [`SUCCESS`](Message::Success) summary message.
778    ///
779    /// The server must be in the [`Streaming`](ServerState::Streaming) or
780    /// [`TxStreaming`](ServerState::TxStreaming) state to be able to successfully process a `PULL`
781    /// request. If the server is in the [`Failed`](ServerState::Failed) state or
782    /// [`Interrupted`](ServerState::Interrupted) state, the response will be
783    /// [`IGNORED`](Message::Ignored). For any other states, receipt of a `PULL` request will be
784    /// considered a protocol violation and will lead to connection closure.
785    ///
786    /// # Fields
787    /// For Bolt v4+, additional metadata is passed along with this message:
788    /// - `n` is an integer specifying how many records to fetch. `-1` will fetch all records. `n`
789    ///   has no default and must be present.
790    /// - `qid` is an integer that specifies for which statement the `PULL` operation should be
791    ///   carried out within an explicit transaction. `-1` is the default, which denotes the last
792    ///   executed statement.
793    ///
794    /// # Response
795    /// - `(_, `[`Message::Success`]`)` - results have been successfully pulled and the server has
796    ///   entered the [`Ready`](ServerState::Ready) state. The server may attach metadata to the
797    ///   `SUCCESS` message to provide footer detail for the results. The following fields are
798    ///   defined for inclusion in the metadata:
799    ///   - `type`, the type of query: read-only (`"r"`), write-only (`"w"`), read-write (`"rw"`),
800    ///     or schema (`"s"`)
801    ///   - `result_consumed_after`, the time in milliseconds after which the last record in the
802    ///     result stream is consumed. _(Bolt v1 - v2 only.)_
803    ///   - `t_last`, supercedes `result_consumed_after`. _(Bolt v3+ only.)_
804    ///   - `bookmark` (e.g. `"bookmark:1234"`). _(Bolt v3+ only.)_
805    ///   - `stats`, a map containing counter information, such as DB hits, etc. _(Bolt v3+ only.)_
806    ///   - `plan`, a map containing the query plan result. _(Bolt v3+ only.)_
807    ///   - `profile`, a map containing the query profile result. _(Bolt v3+ only.)_
808    ///   - `notifications`, a map containing any notifications generated during execution of the
809    ///     query. _(Bolt v3+ only.)_
810    ///   - `db`, a string containing the name of the database where the query was executed.
811    ///     _(Bolt v4+ only.)_
812    ///   - `has_more`, a boolean indicating whether there are still records left in the result
813    ///     stream. Default is `false`. _(Bolt v4+ only.)_
814    /// - `(_, `[`Message::Ignored`]`)` - the server is in the [`Failed`](ServerState::Failed) or
815    ///   [`Interrupted`](ServerState::Interrupted) state, and the request was discarded without
816    ///   being processed. No server state change has occurred.
817    /// - `(_, `[`Message::Failure`]`)` - the request could not be processed successfully and the
818    ///   server has entered the [`Failed`](ServerState::Failed) state. The server may attach
819    ///   metadata to the message to provide more detail on the nature of the failure. Failure may
820    ///   occur at any time during result streaming, so any records returned in the response should
821    ///   be considered invalid.
822    #[bolt_version(1, 2, 3, 4, 4.1, 4.2, 4.3, 4.4)]
823    pub async fn pull(
824        &mut self,
825        metadata: Option<Metadata>,
826    ) -> CommunicationResult<(Vec<Record>, Message)> {
827        match self.version() {
828            V1_0 | V2_0 | V3_0 => self.send_message(Message::PullAll).await?,
829            _ => {
830                self.send_message(Message::Pull(Pull::new(metadata.unwrap_or_default().value)))
831                    .await?
832            }
833        }
834        let mut records = vec![];
835        loop {
836            match self.read_message().await? {
837                Message::Record(record) => records.push(record),
838                Message::Success(success) => return Ok((records, Message::Success(success))),
839                Message::Failure(failure) => return Ok((records, Message::Failure(failure))),
840                Message::Ignored => return Ok((vec![], Message::Ignored)),
841                _ => unreachable!(),
842            }
843        }
844    }
845
846    /// Send a [`DISCARD`](Message::Discard) (or [`DISCARD_ALL`](Message::DiscardAll)) message to
847    /// the server.
848    /// _(Sends a `DISCARD_ALL` for Bolt v1 - v3, and `DISCARD` for Bold v4+. For Bolt v1 - v3, the
849    /// `metadata` parameter is ignored.)_
850    ///
851    /// # Description
852    /// The `DISCARD` message issues a request to discard the outstanding result and return to the
853    /// [`Ready`](ServerState::Ready) state. A receiving server will not abort the request but
854    /// continue to process it without streaming any detail messages to the client.
855    ///
856    /// The server must be in the [`Streaming`](ServerState::Streaming) or
857    /// [`TxStreaming`](ServerState::TxStreaming) state to be able to successfully process a
858    /// `DISCARD` request. If the server is in the [`Failed`](ServerState::Failed) state or
859    /// [`Interrupted`](ServerState::Interrupted) state, the response will be
860    /// [`IGNORED`](Message::Ignored). For any other states, receipt of a `DISCARD` request will be
861    /// considered a protocol violation and will lead to connection closure.
862    ///
863    /// # Fields
864    /// For Bolt v4+, additional metadata is passed along with this message:
865    /// - `n` is an integer specifying how many records to discard. `-1` will discard all records.
866    ///   `n` has no default and must be present.
867    /// - `qid` is an integer that specifies for which statement the `DISCARD` operation should be
868    ///   carried out within an explicit transaction. `-1` is the default, which denotes the last
869    ///   executed statement.
870    ///
871    /// # Response
872    /// - [`Message::Success`] - results have been successfully discarded and the server has
873    ///   entered the [`Ready`](ServerState::Ready) state. The server may attach metadata to the
874    ///   message to provide footer detail for the discarded results. The following fields are
875    ///   defined for inclusion in the metadata:
876    ///   - `type`, the type of query: read-only (`"r"`), write-only (`"w"`), read-write (`"rw"`),
877    ///     or schema (`"s"`)
878    ///   - `result_consumed_after`, the time in milliseconds after which the last record in the
879    ///     result stream is consumed. _(Bolt v1 - v2 only.)_
880    ///   - `t_last`, supercedes `result_consumed_after`. _(Bolt v3+ only.)_
881    ///   - `bookmark` (e.g. `"bookmark:1234"`). _(Bolt v3+ only.)_
882    ///   - `db`, a string containing the name of the database where the query was executed.
883    ///     _(Bolt v4+ only.)_
884    ///   - `has_more`, a boolean indicating whether there are still records left in the result
885    ///     stream. Default is `false`. _(Bolt v4+ only.)_
886    /// - [`Message::Ignored`] - the server is in the [`Failed`](ServerState::Failed) or
887    ///   [`Interrupted`](ServerState::Interrupted) state, and the request was discarded without
888    ///   being processed. No server state change has occurred.
889    /// - [`Message::Failure`] - the request could not be processed successfully and the server has
890    ///   entered the [`Failed`](ServerState::Failed) state. The server may attach metadata to the
891    ///   message to provide more detail on the nature of the failure.
892    #[bolt_version(1, 2, 3, 4, 4.1, 4.2, 4.3, 4.4)]
893    pub async fn discard(&mut self, metadata: Option<Metadata>) -> CommunicationResult<Message> {
894        let message = match self.version() {
895            V1_0 | V2_0 | V3_0 => Message::DiscardAll,
896            _ => Message::Discard(Discard::new(metadata.unwrap_or_default().value)),
897        };
898        self.send_message(message).await?;
899        self.read_message().await
900    }
901
902    /// Send a [`BEGIN`](Message::Begin) message to the server.
903    /// _(Bolt v3+ only.)_
904    ///
905    /// # Description
906    /// The `BEGIN` message starts a new explicit transaction and transitions the server to the
907    /// [`TxReady`](ServerState::TxReady) state. The explicit transaction is closed with either the
908    /// [`COMMIT`](Message::Commit) message or [`ROLLBACK`](Message::Rollback) message.
909    ///
910    /// The server must be in the [`Ready`](ServerState::Ready) state to be able to successfully
911    /// process a `BEGIN` request. If the server is in the [`Failed`](ServerState::Failed) or
912    /// [`Interrupted`](ServerState::Interrupted) state, the response will be
913    /// [`IGNORED`](Message::Ignored). For any other states, receipt of a `BEGIN` request will be
914    /// considered a protocol violation and will lead to connection closure.
915    ///
916    /// # Fields
917    /// `metadata` may contain the following optional fields:
918    /// - `bookmarks`, a list of strings containing some kind of bookmark identification, e.g
919    ///   `["bkmk-transaction:1", "bkmk-transaction:2"]`. Default is `[]`.
920    /// - `tx_timeout`, an integer specifying a transaction timeout in milliseconds. Default is the
921    ///   server-side configured timeout.
922    /// - `tx_metadata`, a map containing some metadata information, mainly used for logging.
923    /// - `mode`, a string which specifies what kind of server should be used for this transaction.
924    ///   For write access, use `"w"` and for read access use `"r"`. Default is `"w"`.
925    /// - `db`, a string containing the name of the database where the transaction should take
926    ///   place. [`null`](Value::Null) and `""` denote the server-side configured default database.
927    ///   _(Bolt v4+ only.)_
928    /// - `imp_user`, a string specifying the impersonated user which executes this transaction.
929    ///   [`null`](Value::Null) denotes no impersonation (i.e., execution takes place as the
930    ///   current user). _(Bolt v4.4+ only.)_
931    ///
932    /// # Response
933    /// - [`Message::Success`] - the transaction has been successfully started and the server has
934    ///   entered the [`TxReady`](ServerState::Ready) state.
935    /// - [`Message::Ignored`] - the server is in the [`Failed`](ServerState::Failed) or
936    ///   [`Interrupted`](ServerState::Interrupted) state, and the request was discarded without
937    ///   being processed. No server state change has occurred.
938    /// - [`Message::Failure`] - the request could not be processed successfully and the server has
939    ///   entered the [`Failed`](ServerState::Failed) state. The server may attach metadata to the
940    ///   message to provide more detail on the nature of the failure.
941    #[bolt_version(3, 4, 4.1, 4.2, 4.3, 4.4)]
942    pub async fn begin(&mut self, metadata: Option<Metadata>) -> CommunicationResult<Message> {
943        let begin_msg = Begin::new(metadata.unwrap_or_default().value);
944        self.send_message(Message::Begin(begin_msg)).await?;
945        self.read_message().await
946    }
947
948    /// Send a [`COMMIT`](Message::Commit) message to the server.
949    /// _(Bolt v3+ only.)_
950    ///
951    /// # Description
952    /// The `COMMIT` message requests to commit the results of an explicit transaction and
953    /// transition the server back to the [`Ready`](ServerState::Ready) state.
954    ///
955    /// The server must be in the [`TxReady`](ServerState::TxReady) state to be able to
956    /// successfully process a `COMMIT` request, which means that any outstanding results in the
957    /// result stream must be consumed via [`Client::pull`]. If the server is in the
958    /// [`Failed`](ServerState::Failed) or [`Interrupted`](ServerState::Interrupted) state, the
959    /// response will be [`IGNORED`](Message::Ignored). For any other states, receipt of a `COMMIT`
960    /// request will be considered a protocol violation and will lead to connection closure.
961    ///
962    /// To instead cancel pending changes, send a [`ROLLBACK`](Message::Rollback) message.
963    ///
964    /// # Response
965    /// - [`Message::Success`] - the transaction has been successfully committed and the server has
966    ///   entered the [`Ready`](ServerState::Ready) state. The server sends the following metadata
967    ///   fields in the response:
968    ///   - `bookmark` (e.g. `"bookmark:1234"`)
969    /// - [`Message::Ignored`] - the server is in the [`Failed`](ServerState::Failed) or
970    ///   [`Interrupted`](ServerState::Interrupted) state, and the request was discarded without
971    ///   being processed. No server state change has occurred.
972    /// - [`Message::Failure`] - the request could not be processed successfully and the server has
973    ///   entered the [`Failed`](ServerState::Failed) state. The server may attach metadata to the
974    ///   message to provide more detail on the nature of the failure.
975    #[bolt_version(3, 4, 4.1, 4.2, 4.3, 4.4)]
976    pub async fn commit(&mut self) -> CommunicationResult<Message> {
977        self.send_message(Message::Commit).await?;
978        self.read_message().await
979    }
980
981    /// Send a [`ROLLBACK`](Message::Rollback) message to the server.
982    /// _(Bolt v3+ only.)_
983    ///
984    /// # Description
985    /// The `ROLLBACK` message requests to cancel a transaction and transition the server back to
986    /// the [`Ready`](ServerState::Ready) state. Any changes made since the transaction was started
987    /// will be undone.
988    ///
989    /// The server must be in the [`TxReady`](ServerState::TxReady) state to be able to
990    /// successfully process a `ROLLBACK` request, which means that any outstanding results in the
991    /// result stream must be consumed via [`Client::pull`]. If the server is in the
992    /// [`Failed`](ServerState::Failed) or [`Interrupted`](ServerState::Interrupted) state, the
993    /// response will be [`IGNORED`](Message::Ignored). For any other states, receipt of a
994    /// `ROLLBACK` request will be considered a protocol violation and will lead to connection
995    /// closure.
996    ///
997    /// To instead persist pending changes, send a [`COMMIT`](Message::Commit) message.
998    ///
999    /// # Response
1000    /// - [`Message::Success`] - the transaction has been successfully reverted and the server has
1001    ///   entered the [`Ready`](ServerState::Ready) state.
1002    /// - [`Message::Ignored`] - the server is in the [`Failed`](ServerState::Failed) or
1003    ///   [`Interrupted`](ServerState::Interrupted) state, and the request was discarded without
1004    ///   being processed. No server state change has occurred.
1005    /// - [`Message::Failure`] - the request could not be processed successfully and the server has
1006    ///   entered the [`Failed`](ServerState::Failed) state. The server may attach metadata to the
1007    ///   message to provide more detail on the nature of the failure.
1008    #[bolt_version(3, 4, 4.1, 4.2, 4.3, 4.4)]
1009    pub async fn rollback(&mut self) -> CommunicationResult<Message> {
1010        self.send_message(Message::Rollback).await?;
1011        self.read_message().await
1012    }
1013
1014    /// Send an [`ACK_FAILURE`](Message::AckFailure) message to the server.
1015    /// _(Bolt v1 - v2 only. For Bolt v3+, see [`Client::reset`].)_
1016    ///
1017    /// # Description
1018    /// `ACK_FAILURE` signals to the server that the client has acknowledged a previous failure and
1019    /// should return to the [`Ready`](ServerState::Ready) state.
1020    ///
1021    /// The server must be in the [`Failed`](ServerState::Failed) state to be able to successfully
1022    /// process an `ACK_FAILURE` request. For any other states, receipt of an `ACK_FAILURE` request
1023    /// will be considered a protocol violation and will lead to connection closure.
1024    ///
1025    /// # Response
1026    /// - [`Message::Success`] - failure has been successfully acknowledged and the server has
1027    ///   entered the [`Ready`](ServerState::Ready) state. The server may attach metadata to the
1028    ///   `SUCCESS` message.
1029    /// - [`Message::Failure`] - the request could not be processed successfully and the server has
1030    ///   entered the [`Defunct`](ServerState::Defunct) state. The server may choose to include
1031    ///   metadata describing the nature of the failure but will immediately close the connection
1032    ///   after the failure has been sent.
1033    #[bolt_version(1, 2)]
1034    pub async fn ack_failure(&mut self) -> CommunicationResult<Message> {
1035        self.send_message(Message::AckFailure).await?;
1036        self.read_message().await
1037    }
1038
1039    /// Send a [`RESET`](Message::Reset) message to the server.
1040    /// _(Bolt v1+. For Bolt v1 - v2, see [`Client::ack_failure`] for just clearing the
1041    /// [`Failed`](ServerState::Failed) state.)_
1042    ///
1043    /// # Description
1044    /// The `RESET` message requests that the connection be set back to its initial state, as if
1045    /// initialization had just been successfully completed. The `RESET` message is unique in that
1046    /// it on arrival at the server, it jumps ahead in the message queue, stopping any unit of work
1047    /// that happens to be executing. All the queued messages originally in front of the `RESET`
1048    /// message will then be [`IGNORED`](Message::Ignored) until the `RESET` position is reached,
1049    /// at which point the server will be ready for a new session.
1050    ///
1051    /// Specifically, `RESET` will:
1052    /// - force any currently processing message to abort with [`IGNORED`](Message::Ignored)
1053    /// - force any pending messages that have not yet started processing to be
1054    ///   [`IGNORED`](Message::Ignored)
1055    /// - clear any outstanding [`Failed`](ServerState::Failed) state
1056    /// - dispose of any outstanding result records
1057    /// - cancel the current transaction, if any
1058    ///
1059    /// # Response
1060    /// - [`Message::Success`] - the session has been successfully reset and the server has entered
1061    ///   the [`Ready`](ServerState::Ready) state.
1062    /// - [`Message::Failure`] - the request could not be processed successfully and the server has
1063    ///   entered the [`Defunct`](ServerState::Defunct) state. The server may choose to include
1064    ///   metadata describing the nature of the failure but will immediately close the connection
1065    ///   after the failure has been sent.
1066    #[bolt_version(1, 2, 3, 4, 4.1, 4.2, 4.3, 4.4)]
1067    pub async fn reset(&mut self) -> CommunicationResult<Message> {
1068        self.send_message(Message::Reset).await?;
1069        loop {
1070            match self.read_message().await? {
1071                Message::Success(success) => return Ok(Message::Success(success)),
1072                Message::Failure(failure) => return Ok(Message::Failure(failure)),
1073                Message::Ignored => {}
1074                _ => unreachable!(),
1075            }
1076        }
1077    }
1078
1079    /// Send a [`GOODBYE`](Message::Goodbye) message to the server.
1080    /// _(Bolt v3+ only.)_
1081    ///
1082    /// # Description
1083    /// The `GOODBYE` message notifies the server that the connection is terminating gracefully. On
1084    /// receipt of this message, the server will immediately shut down the socket on its side
1085    /// without sending a response. A client may shut down the socket at any time after sending the
1086    /// `GOODBYE` message. This message interrupts the server's current work, if any.
1087    #[bolt_version(3, 4, 4.1, 4.2, 4.3, 4.4)]
1088    pub async fn goodbye(&mut self) -> CommunicationResult<()> {
1089        self.send_message(Message::Goodbye).await?;
1090        self.server_state = Defunct;
1091        Ok(self.stream.close().await?)
1092    }
1093
1094    /// Send multiple messages to the server without waiting for a response. Returns a [`Vec`]
1095    /// containing the server's response messages for each of the sent messages, in the order they
1096    /// were provided.
1097    ///
1098    /// # Description
1099    /// The client is not required to wait for a response before sending more messages.  Sending
1100    /// multiple messages together like this is called _pipelining_. For performance reasons, it is
1101    /// recommended that clients use pipelining wherever possible. Using pipelining, multiple
1102    /// messages can be transmitted together in the same network package, significantly reducing
1103    /// latency and increasing throughput.
1104    ///
1105    /// A common technique is to buffer outgoing messages on the client until the last possible
1106    /// moment, such as when a commit is issued or a result is read by the application, and then
1107    /// sending all messages in the buffer together.
1108    ///
1109    /// # Failure Handling
1110    /// Because the protocol leverages pipelining, the client and the server need to agree on what
1111    /// happens when a failure occurs, otherwise messages that were sent assuming no failure would
1112    /// occur might have unintended effects.
1113    ///
1114    /// When requests fail on the server, the server will send the client a
1115    /// [`FAILURE`](Message::Failure) message. The client must clear the failure state by sending a
1116    /// [`RESET`](Message::Reset) (Bolt v3+) or [`ACK_FAILURE`](Message::AckFailure) (Bolt v1 - v2)
1117    /// message to the server. Until the server receives the `RESET`/`ACK_FAILURE` message, it will
1118    /// send an [`IGNORED`](Message::Ignored) message in response to any other message from the
1119    /// client, including messages that were sent in a pipeline.
1120    pub async fn pipeline(&mut self, messages: Vec<Message>) -> CommunicationResult<Vec<Message>> {
1121        // This Vec is too small if we're expecting some RECORD messages, so there's no "good" size
1122        let mut responses = Vec::with_capacity(messages.len());
1123
1124        for message in &messages {
1125            #[cfg(test)]
1126            println!(">>> {:?}", message);
1127
1128            let chunks = message.clone().into_chunks().map_err(ProtocolError::from)?;
1129
1130            for chunk in chunks {
1131                self.stream.write_all(&chunk).await?;
1132            }
1133
1134            // Immediate state changes
1135            match message {
1136                Message::Reset => self.server_state = Interrupted,
1137                Message::Goodbye => self.server_state = Disconnected,
1138                _ => {}
1139            }
1140        }
1141        self.stream.flush().await?;
1142        self.sent_queue.extend(messages);
1143
1144        while !self.sent_queue.is_empty() {
1145            responses.push(self.read_message().await?);
1146        }
1147        Ok(responses)
1148    }
1149}