use super::{Event, *};
use crate::core::*;
use futures::stream::{Stream, StreamExt};
use tokio_tungstenite::tungstenite::Error as WsError;
pub struct WebsocketReceiver<T>
where
T: Send + 'static + Sync + Stream<Item = Result<Message, WsError>>,
{
sender_handle: UnboundedHandle<WebsocketSenderEvent>,
split_stream: Option<T>,
}
impl<T> WebsocketReceiver<T>
where
T: Send + 'static + Sync + Stream<Item = Result<Message, WsError>>,
{
pub fn new(split_stream: T, sender_handle: UnboundedHandle<WebsocketSenderEvent>) -> Self {
Self {
sender_handle,
split_stream: Some(split_stream),
}
}
}
#[async_trait::async_trait]
impl<T> ChannelBuilder<IoChannel<T>> for WebsocketReceiver<T>
where
T: Send + 'static + Sync + Stream<Item = Result<Message, WsError>>,
{
async fn build_channel(&mut self) -> ActorResult<IoChannel<T>> {
if let Some(stream) = self.split_stream.take() {
Ok(IoChannel(stream))
} else {
Err(ActorError::exit_msg("Unable to build websocket receiver channel"))
}
}
}
#[async_trait::async_trait]
impl<S, T> Actor<S> for WebsocketReceiver<T>
where
S: SupHandle<Self>,
T: Send + 'static + Sync + Stream<Item = Result<Message, WsError>> + Unpin,
{
type Data = ();
type Channel = IoChannel<T>;
async fn init(&mut self, _rt: &mut Rt<Self, S>) -> ActorResult<Self::Data> {
Ok(())
}
async fn run(&mut self, rt: &mut Rt<Self, S>, _data: Self::Data) -> ActorResult<()> {
while let Some(Ok(message)) = rt.inbox_mut().next().await {
match message {
Message::Text(text) => {
if let Ok(interface) = serde_json::from_str::<Interface>(&text) {
let mut targeted_scope_id_opt = interface.actor_path.destination().await;
match interface.event {
Event::Shutdown => {
if let Some(scope_id) = targeted_scope_id_opt.take() {
if let Err(err) = rt.shutdown_scope(scope_id).await {
let err_string = err.to_string();
let r = Error::Shutdown(interface.actor_path, err_string);
self.sender_handle.send(WebsocketSenderEvent::Result(Err(r))).ok();
} else {
let r = Response::Shutdown(interface.actor_path);
self.sender_handle.send(WebsocketSenderEvent::Result(Ok(r))).ok();
};
} else {
let err_string = "Unreachable ActorPath".to_string();
let r = Error::Shutdown(interface.actor_path, err_string);
self.sender_handle.send(WebsocketSenderEvent::Result(Err(r))).ok();
}
}
Event::RequestServiceTree => {
if let Some(scope_id) = targeted_scope_id_opt.take() {
if let Some(service) = rt.lookup::<Service>(scope_id).await {
let serivice_json =
serde_json::to_string(&service).expect("Serializable service");
let r = Response::ServiceTree(serivice_json.into());
self.sender_handle.send(WebsocketSenderEvent::Result(Ok(r))).ok();
} else {
let r = Error::ServiceTree("Service not available".into());
self.sender_handle.send(WebsocketSenderEvent::Result(Err(r))).ok();
};
} else {
let r = Error::ServiceTree("Unreachable ActorPath".into());
self.sender_handle.send(WebsocketSenderEvent::Result(Err(r))).ok();
}
}
Event::Subscribe(message) => {
if let Some(scope_id) = targeted_scope_id_opt.take() {
self.sender_handle
.send(WebsocketSenderEvent::Subscribe(interface.actor_path, scope_id, message))
.ok();
} else {
let r = Error::ServiceTree("Unreachable ActorPath".into());
self.sender_handle.send(WebsocketSenderEvent::Result(Err(r))).ok();
}
}
Event::Cast(message_to_route) => {
if let Some(scope_id) = targeted_scope_id_opt.take() {
let route_message = message_to_route.clone();
match rt.send(scope_id, message_to_route).await {
Ok(()) => {
let r = Response::Sent(route_message);
self.sender_handle.send(WebsocketSenderEvent::Result(Ok(r))).ok();
}
Err(e) => {
let err = format!("{}", e);
let r = Error::Cast(interface.actor_path, route_message, err);
self.sender_handle.send(WebsocketSenderEvent::Result(Err(r))).ok();
}
};
} else {
let r = Error::Cast(
interface.actor_path,
message_to_route,
"Unreachable ActorPath".into(),
);
self.sender_handle.send(WebsocketSenderEvent::Result(Err(r))).ok();
}
}
Event::Call(message_to_route) => {
if let Some(scope_id) = targeted_scope_id_opt.take() {
let json_message = message_to_route.clone();
let ws_handle = self.sender_handle.clone();
let responder = JsonResponder::trait_obj(ws_handle);
if let Err(e) = rt.send(scope_id, (json_message, responder)).await {
let err = format!("{}", e);
let r = Error::Call(interface.actor_path, message_to_route, err);
self.sender_handle.send(WebsocketSenderEvent::Result(Err(r))).ok();
};
} else {
let r = Error::Call(
interface.actor_path,
message_to_route,
"Unreachable ActorPath".into(),
);
self.sender_handle.send(WebsocketSenderEvent::Result(Err(r))).ok();
}
}
}
} else {
break;
};
}
Message::Close(_) => {
break;
}
_ => {}
}
}
self.sender_handle.shutdown().await;
Ok(())
}
}