use actix::{prelude::*, WrapFuture};
use futures::{channel::oneshot, future::Either, prelude::*, FutureExt, SinkExt};
use std::ops::Not;
use std::{collections::HashSet, time::Duration};
use crate::connection::ClientInfo;
use crate::{
connection::{self, ConnectionRef, LocalRouterHandler, Transport},
error::ConnectionTimeout,
Error, RpcRawCall, RpcRawStreamCall,
};
const CONNECT_TIMEOUT: Duration = Duration::from_secs(3);
const RECONNECT_DELAY: Duration = Duration::from_millis(1000);
type RemoteConnection = ConnectionRef<Transport, LocalRouterHandler>;
pub struct RemoteRouter {
client_info: ClientInfo,
local_bindings: HashSet<String>,
pending_calls: Vec<oneshot::Sender<Result<RemoteConnection, ConnectionTimeout>>>,
connection: Option<RemoteConnection>,
shutdown_rx: Option<oneshot::Receiver<()>>,
}
impl Actor for RemoteRouter {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
if self.shutdown_rx.is_none() {
let (tx, rx) = oneshot::channel();
tokio::task::spawn_local(async move {
let _ = tokio::signal::ctrl_c().await;
let _ = tx.send(());
});
self.shutdown_rx.replace(rx);
}
if let Some(ref mut rx) = self.shutdown_rx {
match rx.try_recv() {
Ok(None) => self.try_connect(ctx),
_ => log::debug!("(re)connection interrupted"),
}
}
}
}
impl RemoteRouter {
fn try_connect(&mut self, ctx: &mut <Self as Actor>::Context) {
let addr = ya_sb_proto::GsbAddr::default();
let client_info = self.client_info.clone();
log::info!("trying to connect to: {}", addr);
let timeout_h = ctx.run_later(CONNECT_TIMEOUT, |act, ctx| {
if act.connection.is_none() {
act.clean_pending_calls(
Err(ConnectionTimeout(ya_sb_proto::GsbAddr::default())),
ctx,
);
log::warn!("connection timed out after {:?}", CONNECT_TIMEOUT);
ctx.stop();
}
});
let connect_fut = connection::transport(addr.clone())
.map_err(move |e| Error::ConnectionFail(addr, e))
.into_actor(self)
.then(|transport, act, ctx| {
let transport = match transport {
Ok(v) => v,
Err(e) => return Either::Left(fut::err(e)),
};
let connection =
connection::connect_with_handler(client_info, transport, act.handler(ctx));
act.connection = Some(connection.clone());
act.clean_pending_calls(Ok(connection.clone()), ctx);
Either::Right(
future::try_join_all(
act.local_bindings
.clone()
.into_iter()
.map(move |service_id| connection.bind(service_id)),
)
.and_then(|_| async { Ok(log::debug!("registered all services")) })
.into_actor(act),
)
})
.then(move |result: Result<(), Error>, _, ctx| {
ctx.cancel_future(timeout_h);
if let Err(e) = result {
log::warn!("routing error: {}", e);
ctx.run_later(RECONNECT_DELAY, |_, ctx| ctx.stop());
}
fut::ready(())
});
ctx.spawn(connect_fut);
}
fn clean_pending_calls(
&mut self,
connection: Result<ConnectionRef<Transport, LocalRouterHandler>, ConnectionTimeout>,
ctx: &mut <Self as Actor>::Context,
) {
log::debug!(
"got connection activating {} calls",
self.pending_calls.len()
);
for tx in std::mem::take(&mut self.pending_calls) {
let connection = connection.clone();
let send_fut = async move {
let _v = tx.send(connection);
}
.into_actor(self);
let _ = ctx.spawn(send_fut);
}
}
fn connection(&mut self) -> impl Future<Output = Result<RemoteConnection, Error>> + 'static {
if let Some(c) = &self.connection {
return future::ok((*c).clone()).left_future();
}
log::debug!("wait for connection");
let (tx, rx) = oneshot::channel();
self.pending_calls.push(tx);
rx.map(|r| match r {
Err(_) => Err(Error::Cancelled),
Ok(c) => c.map_err(From::from),
})
.right_future()
}
fn handler(&mut self, ctx: &mut <Self as Actor>::Context) -> LocalRouterHandler {
let (tx, rx) = oneshot::channel();
rx.into_actor(self)
.map(|_, this, ctx| {
if let Some(c) = this.connection.as_ref() {
c.connected().not().then(|| log::warn!("connection lost"));
}
ctx.run_later(RECONNECT_DELAY, |_, ctx| ctx.stop());
})
.spawn(ctx);
LocalRouterHandler::new(|| {
let _ = tx.send(());
})
}
}
impl Default for RemoteRouter {
fn default() -> Self {
Self {
connection: Default::default(),
local_bindings: Default::default(),
pending_calls: Default::default(),
client_info: ClientInfo::new("sb-client"),
shutdown_rx: Default::default(),
}
}
}
impl Supervised for RemoteRouter {
fn restarting(&mut self, _ctx: &mut Self::Context) {
let _ = self.connection.take();
}
}
impl SystemService for RemoteRouter {}
pub enum UpdateService {
Add(String),
Remove(String),
}
impl Message for UpdateService {
type Result = ();
}
impl Handler<UpdateService> for RemoteRouter {
type Result = MessageResult<UpdateService>;
fn handle(&mut self, msg: UpdateService, _ctx: &mut Self::Context) -> Self::Result {
match msg {
UpdateService::Add(service_id) => {
if let Some(c) = &mut self.connection {
Arbiter::current().spawn(c.bind(service_id.clone()).then(|v| async {
v.unwrap_or_else(|err| match err {
Error::GsbAlreadyRegistered(m) => {
log::warn!("already registered: {}", m)
}
e => log::error!("bind error: {}", e),
})
}));
}
log::trace!("Binding local service '{}'", service_id);
self.local_bindings.insert(service_id);
}
UpdateService::Remove(service_id) => {
if let Some(c) = &mut self.connection {
Arbiter::current().spawn(c.unbind(service_id.clone()).then(|v| async {
v.unwrap_or_else(|e| log::error!("unbind error: {}", e))
}));
}
log::trace!("Unbinding local service '{}'", service_id);
self.local_bindings.remove(&service_id);
}
}
MessageResult(())
}
}
impl Handler<RpcRawCall> for RemoteRouter {
type Result = ActorResponse<Self, Result<Vec<u8>, Error>>;
fn handle(&mut self, msg: RpcRawCall, _ctx: &mut Self::Context) -> Self::Result {
ya_packet_trace::packet_trace_maybe!("RemoteRouter::Handler<RpcRawCall>", {
&ya_packet_trace::try_extract_from_ip_frame(&msg.body)
});
ActorResponse::r#async(
self.connection()
.and_then(move |connection| {
connection.call(msg.caller, msg.addr, msg.body, msg.no_reply)
})
.into_actor(self),
)
}
}
impl Handler<RpcRawStreamCall> for RemoteRouter {
type Result = Result<(), Error>;
fn handle(&mut self, msg: RpcRawStreamCall, ctx: &mut Self::Context) -> Self::Result {
let conn = self.connection();
let fut = async move {
let connection = match conn.await {
Ok(c) => c,
Err(e) => return log::error!("Remote router connection error: {}", e),
};
let reply = msg.reply.sink_map_err(|e| Error::GsbFailure(e.to_string()));
futures::pin_mut!(reply);
ya_packet_trace::packet_trace_maybe!("RemoteRouter::Handler<RpcRawStreamCall>", {
&{ &ya_packet_trace::try_extract_from_ip_frame(&msg.body) }
});
let result = SinkExt::send_all(
&mut reply,
&mut connection
.call_streaming(msg.caller, msg.addr, msg.body)
.map(|v| Ok(v)),
)
.await;
if let Err(e) = result {
log::error!("Remote router RpcRawStreamCall handler error: {}", e);
}
};
ctx.spawn(fut.into_actor(self));
Ok(())
}
}