1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
use crate::{
pipe::{PipeBehavior, PipeModifier},
protocols::pipe::{internal::InternalCmd, PipeMessage},
Context,
};
use ockam_core::compat::boxed::Box;
use ockam_core::{Address, Any, Decodable, Result, Routed, Worker};
pub struct PipeReceiver {
hooks: PipeBehavior,
int_addr: Address,
}
#[crate::worker]
impl Worker for PipeReceiver {
type Context = Context;
type Message = Any;
async fn initialize(&mut self, ctx: &mut Context) -> Result<()> {
ctx.set_cluster(super::CLUSTER_NAME).await
}
async fn handle_message(&mut self, ctx: &mut Context, msg: Routed<Any>) -> Result<()> {
match msg.msg_addr() {
addr if addr == self.int_addr => self.handle_internal(ctx, msg).await?,
_ => self.handle_external(ctx, msg).await?,
};
Ok(())
}
}
impl PipeReceiver {
pub async fn create(
ctx: &mut Context,
addr: Address,
int_addr: Address,
hooks: PipeBehavior,
) -> Result<()> {
ctx.start_worker(
vec![addr, int_addr.clone()],
PipeReceiver { hooks, int_addr },
)
.await
}
async fn handle_external(&mut self, ctx: &mut Context, msg: Routed<Any>) -> Result<()> {
let return_route = msg.return_route();
let pipe_msg = PipeMessage::decode(&msg.into_transport_message().payload)?;
debug!(
"Received pipe message with index '{}'",
pipe_msg.index.u64()
);
match self
.hooks
.external_all(ctx.address(), return_route, ctx, &pipe_msg)
.await
{
Ok(PipeModifier::Drop) => return Ok(()),
Err(e) => {
warn!("Received message was invalid: {}!", e);
return Ok(());
}
Ok(PipeModifier::None) => {}
}
let msg = super::unpack_pipe_message(&pipe_msg)?;
debug!("Forwarding message to {:?}", msg.transport().onward_route);
ctx.forward(msg).await
}
async fn handle_internal(&mut self, ctx: &mut Context, msg: Routed<Any>) -> Result<()> {
trace!("PipeReceiver receiving internal command");
let return_route = msg.return_route();
let trans = msg.into_transport_message();
let internal_cmd = InternalCmd::from_transport(&trans)?;
self.hooks
.internal_all(self.int_addr.clone(), return_route, ctx, &internal_cmd)
.await
}
}