1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
use std::convert::{TryFrom, TryInto};
use std::panic::catch_unwind;
use std::sync::{Arc, Mutex};
use bytes::Bytes;
use failure::Error;
use tokio::io::BufStream;
use tokio::prelude::*;
pub(crate) use chunk::Chunk;
pub use init::Init;
pub(crate) use message_bytes::MessageBytes;
pub use success::Success;
use crate::bolt::structure::get_signature_from_bytes;
use crate::error::DeserializeError;
use crate::native;
mod chunk;
mod init;
mod message_bytes;
mod success;
const CHUNK_SIZE: usize = 16;
#[derive(Debug)]
pub enum Message {
Init(Init),
Success(Success),
}
impl Message {
pub async fn from_stream<T: Unpin + AsyncRead + AsyncWrite>(
buf_stream: &mut BufStream<T>,
) -> Result<Message, Error> {
Message::try_from(MessageBytes::from_stream(buf_stream).await?)
}
}
impl From<native::message::Init> for Message {
fn from(message: native::message::Init) -> Self {
Message::Init(Init::from(message))
}
}
impl From<native::message::Success> for Message {
fn from(message: native::message::Success) -> Self {
Message::Success(Success::from(message))
}
}
impl TryFrom<MessageBytes> for Message {
type Error = Error;
fn try_from(mut message_bytes: MessageBytes) -> Result<Self, Self::Error> {
let result: Result<Message, Error> = catch_unwind(move || {
let signature = get_signature_from_bytes(&mut message_bytes)?;
let remaining_bytes_arc =
Arc::new(Mutex::new(message_bytes.split_to(message_bytes.len())));
match signature {
init::SIGNATURE => Ok(Message::Init(Init::try_from(remaining_bytes_arc)?)),
success::SIGNATURE => Ok(Message::Success(Success::try_from(remaining_bytes_arc)?)),
_ => {
Err(DeserializeError(format!("Invalid signature byte: {:x}", signature)).into())
}
}
})
.map_err(|_| DeserializeError("Panicked during deserialization".to_string()))?;
Ok(result.map_err(|err: Error| {
DeserializeError(format!("Error creating Message from Bytes: {}", err))
})?)
}
}
impl TryInto<Vec<Bytes>> for Message {
type Error = Error;
fn try_into(self) -> Result<Vec<Bytes>, Self::Error> {
let bytes: Bytes = match self {
Message::Init(init) => init.try_into()?,
Message::Success(success) => success.try_into()?,
};
let mut result: Vec<Bytes> = Vec::with_capacity(bytes.len() / CHUNK_SIZE + 2);
for slice in bytes.chunks(CHUNK_SIZE) {
let chunk_bytes: Bytes = Chunk::try_from(Bytes::copy_from_slice(slice))?.into();
result.push(chunk_bytes);
}
result.push(Bytes::from_static(&[0, 0]));
Ok(result)
}
}