use std::cell::Cell;
use std::collections::HashMap;
use std::sync::Arc;
use actix::prelude::*;
use tokio_stream::StreamExt;
use super::api_model::ConnectResetRequest;
use super::bistream_manage::{BiStreamManage, BiStreamManageCmd};
use super::PayloadUtils;
use super::{api_model::ClientDetectionRequest, nacos_proto::Payload};
type SenderType = tokio::sync::mpsc::Sender<Result<Payload, tonic::Status>>;
type ReceiverStreamType = tonic::Streaming<Payload>;
#[derive(Debug)]
pub enum NamespaceType {
Empty,
Public,
Other(Arc<String>),
Unknown,
}
impl NamespaceType {
pub fn from_option(namespace: Option<String>) -> Self {
if let Some(namespace) = namespace {
NamespaceType::get_namespace_type(&namespace)
} else {
NamespaceType::Unknown
}
}
pub fn get_namespace_type(namespace: &str) -> Self {
if namespace.is_empty() {
NamespaceType::Empty
} else if namespace == "public" {
NamespaceType::Public
} else {
NamespaceType::Other(Arc::new(namespace.to_owned()))
}
}
pub fn is_default(&self) -> bool {
match self {
NamespaceType::Empty => true,
NamespaceType::Public => true,
_ => false,
}
}
pub fn to_str(&self) -> &str {
match self {
NamespaceType::Empty => "",
NamespaceType::Public => "public",
NamespaceType::Other(namespace) => namespace.as_str(),
NamespaceType::Unknown => "",
}
}
}
pub struct BiStreamConn {
sender: SenderType,
client_id: Arc<String>,
receiver_stream: Cell<Option<ReceiverStreamType>>,
manage: Addr<BiStreamManage>,
}
impl BiStreamConn {
pub fn new(
sender: SenderType,
client_id: Arc<String>,
receiver_stream: ReceiverStreamType,
manage: Addr<BiStreamManage>,
) -> Self {
Self {
sender,
client_id,
receiver_stream: Cell::new(Some(receiver_stream)),
manage,
}
}
pub fn receive(&mut self, ctx: &mut Context<Self>) {
if let Some(mut receiver_stream) = self.receiver_stream.replace(None) {
let manage = self.manage.clone();
let client_id = self.client_id.clone();
async move {
while let Some(Ok(payload)) = receiver_stream.next().await {
manage.do_send(BiStreamManageCmd::Response(client_id.clone(), payload));
}
manage.do_send(BiStreamManageCmd::ConnClose(client_id));
}
.into_actor(self)
.map(|_, _, ctx| {
ctx.stop();
})
.spawn(ctx);
}
}
fn send_payload(&mut self, ctx: &mut Context<Self>, payload: Payload) {
let sender = self.sender.clone();
async move { sender.send(Ok(payload)).await }
.into_actor(self)
.map(|_, _, _| {})
.spawn(ctx);
}
fn close_stream_and_stop(&mut self, ctx: &mut Context<Self>) {
let sender = self.sender.clone();
async move {
sender.send(Err(tonic::Status::cancelled("close"))).await
}
.into_actor(self)
.map(|_, _, ctx| {
ctx.stop();
})
.wait(ctx);
}
}
impl Actor for BiStreamConn {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
self.receive(ctx);
}
}
#[derive(Debug, Message)]
#[rtype(result = "Result<BiStreamSenderResult,std::io::Error>")]
pub enum BiStreamSenderCmd {
Detection(String),
Reset(String, Option<String>, Option<String>),
Send(Arc<Payload>),
Close,
}
pub enum BiStreamSenderResult {
None,
}
impl Handler<BiStreamSenderCmd> for BiStreamConn {
type Result = Result<BiStreamSenderResult, std::io::Error>;
fn handle(&mut self, msg: BiStreamSenderCmd, ctx: &mut Context<Self>) -> Self::Result {
match msg {
BiStreamSenderCmd::Detection(request_id) => {
let request = ClientDetectionRequest {
module: Some("internal".to_owned()),
request_id: Some(request_id),
headers: Some(HashMap::new()),
};
let payload = PayloadUtils::build_payload(
"ClientDetectionRequest",
serde_json::to_string(&request).unwrap(),
);
self.send_payload(ctx, payload);
}
BiStreamSenderCmd::Reset(request_id, ip, port) => {
let request = ConnectResetRequest {
module: Some("internal".to_owned()),
request_id: Some(request_id),
server_ip: ip,
server_port: port,
headers: Some(HashMap::new()),
};
let payload = PayloadUtils::build_payload(
"ConnectResetRequest",
serde_json::to_string(&request).unwrap(),
);
self.send_payload(ctx, payload);
}
BiStreamSenderCmd::Send(payload) => {
self.send_payload(ctx, payload.as_ref().to_owned());
}
BiStreamSenderCmd::Close => {
self.close_stream_and_stop(ctx);
}
}
Ok(BiStreamSenderResult::None)
}
}