1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
use ockam_core::async_trait;
use ockam_core::{Address, Decodable, LocalMessage, Processor, Result, TransportMessage};
use ockam_node::Context;
use ockam_transport_core::TransportError;
use tokio::{io::AsyncReadExt, net::tcp::OwnedReadHalf};
use tracing::{error, info, trace};
pub(crate) struct TcpRecvProcessor {
rx: OwnedReadHalf,
peer_addr: Address,
}
impl TcpRecvProcessor {
pub fn new(rx: OwnedReadHalf, peer_addr: Address) -> Self {
Self { rx, peer_addr }
}
}
#[async_trait]
impl Processor for TcpRecvProcessor {
type Context = Context;
async fn initialize(&mut self, ctx: &mut Context) -> Result<()> {
ctx.set_cluster(crate::CLUSTER_NAME).await
}
async fn process(&mut self, ctx: &mut Context) -> Result<bool> {
let len = match self.rx.read_u16().await {
Ok(len) => len,
Err(_e) => {
info!(
"Connection to peer '{}' was closed; dropping stream",
self.peer_addr
);
return Ok(false);
}
};
trace!("Received message header for {} bytes", len);
let mut buf = vec![0; len as usize];
match self.rx.read_exact(&mut buf).await {
Ok(_) => {}
_ => {
error!("Failed to receive message of length: {}", len);
return Ok(true);
}
}
let mut msg = TransportMessage::decode(&buf).map_err(|_| TransportError::RecvBadMessage)?;
if msg.onward_route.next().is_err() {
trace!("Got heartbeat message from: {}", self.peer_addr);
}
msg.return_route.modify().prepend(self.peer_addr.clone());
trace!("Message onward route: {}", msg.onward_route);
trace!("Message return route: {}", msg.return_route);
ctx.forward(LocalMessage::new(msg, Vec::new())).await?;
Ok(true)
}
}