use crate::{
monotonic::Monotonic,
pipe::PipeBehavior,
protocols::pipe::{internal::InternalCmd, PipeMessage},
Context,
};
use ockam_core::compat::sync::Arc;
use ockam_core::compat::{boxed::Box, collections::VecDeque};
use ockam_core::{Address, AllowAll, Any, Mailbox, Mailboxes, Result, Route, Routed, Worker};
use ockam_node::WorkerBuilder;
enum PeerRoute {
Peer(Route),
Listener(Route),
}
impl PeerRoute {
fn peer(&self) -> &Route {
match self {
Self::Peer(ref p) => p,
Self::Listener(ref l) => l,
}
}
}
pub struct PipeSender {
index: Monotonic,
out_buf: VecDeque<PipeMessage>,
peer: Option<PeerRoute>,
int_addr: Address,
hooks: PipeBehavior,
}
#[ockam_core::worker]
impl Worker for PipeSender {
type Context = Context;
type Message = Any;
async fn initialize(&mut self, ctx: &mut Context) -> Result<()> {
ctx.set_cluster(crate::pipe::CLUSTER_NAME).await?;
if let Some(PeerRoute::Listener(ref route)) = self.peer {
ctx.send_from_address(
route.clone(),
InternalCmd::InitHandshake,
self.int_addr.clone(),
)
.await?
}
Ok(())
}
async fn handle_message(&mut self, ctx: &mut Context, msg: Routed<Any>) -> Result<()> {
match msg.msg_addr() {
addr if addr == self.int_addr => self.handle_internal(ctx, msg).await?,
_ => self.handle_external(ctx, msg).await?,
};
Ok(())
}
}
impl PipeSender {
pub async fn create(
ctx: &mut Context,
peer: Route,
addr: Address,
int_addr: Address,
hooks: PipeBehavior,
) -> Result<()> {
let worker = Self {
index: Monotonic::from(1),
out_buf: VecDeque::new(),
peer: Some(PeerRoute::Peer(peer)),
int_addr: int_addr.clone(),
hooks,
};
let mailboxes = Mailboxes::new(
Mailbox::new(addr, Arc::new(AllowAll), Arc::new(AllowAll)),
vec![Mailbox::new(
int_addr,
Arc::new(AllowAll),
Arc::new(AllowAll),
)],
);
WorkerBuilder::new(worker)
.with_mailboxes(mailboxes)
.start(ctx)
.await?;
Ok(())
}
pub async fn uninitialized(
ctx: &mut Context,
addr: Address,
int_addr: Address,
listener: Option<Route>,
hooks: PipeBehavior,
) -> Result<()> {
let worker = Self {
index: Monotonic::from(1),
out_buf: VecDeque::new(),
peer: listener.map(PeerRoute::Listener),
int_addr: int_addr.clone(),
hooks,
};
let mailboxes = Mailboxes::new(
Mailbox::new(addr, Arc::new(AllowAll), Arc::new(AllowAll)),
vec![Mailbox::new(
int_addr,
Arc::new(AllowAll),
Arc::new(AllowAll),
)],
);
WorkerBuilder::new(worker)
.with_mailboxes(mailboxes)
.start(ctx)
.await?;
Ok(())
}
async fn handle_internal(&mut self, ctx: &mut Context, msg: Routed<Any>) -> Result<()> {
trace!("PipeSender receiving internal command");
let return_route = msg.return_route();
let trans = msg.into_transport_message();
let internal_cmd = InternalCmd::from_transport(&trans)?;
match self.peer {
Some(PeerRoute::Peer(ref peer)) => {
self.hooks
.internal_all(self.int_addr.clone(), peer.clone(), ctx, &internal_cmd)
.await?;
}
_ => match internal_cmd {
InternalCmd::InitSender => {
debug!("Initialise pipe sender for route {:?}", return_route);
self.peer = Some(PeerRoute::Peer(return_route.clone()));
for msg in core::mem::take(&mut self.out_buf) {
send_pipe_msg(
&mut self.hooks,
ctx,
self.int_addr.clone(),
return_route.clone(),
msg,
)
.await?;
}
}
cmd => warn!(
"Received internal command '{:?}' for invalid state sender",
cmd
),
},
}
Ok(())
}
async fn handle_external(&mut self, ctx: &mut Context, msg: Routed<Any>) -> Result<()> {
let mut msg = msg.into_transport_message();
msg.onward_route.modify().pop_front();
debug!(
"Pipe sender '{:?}' dispatch {:?} -> {:?}",
ctx.address(),
self.peer.as_ref().map(|p| p.peer()),
msg.onward_route
);
let index = self.index.next() as u64;
let pipe_msg = PipeMessage::from_transport(index, msg)?;
match self.peer {
Some(PeerRoute::Peer(ref peer)) => {
send_pipe_msg(
&mut self.hooks,
ctx,
self.int_addr.clone(),
peer.clone(),
pipe_msg,
)
.await
}
_ => {
debug!("Queue message into output buffer...");
self.out_buf.push_back(pipe_msg);
Ok(())
}
}
}
}
async fn send_pipe_msg(
hooks: &mut PipeBehavior,
ctx: &mut Context,
int_addr: Address,
peer: Route,
msg: PipeMessage,
) -> Result<()> {
if let crate::pipe::PipeModifier::Drop = hooks
.external_all(int_addr.clone(), peer.clone(), ctx, &msg)
.await?
{
return Ok(());
}
ctx.send_from_address(peer, msg, int_addr).await
}