use futures::prelude::*;
use std::fmt::Debug;
use std::sync::{Arc, atomic::Ordering};
use std::time::Duration;
use crate::{
ProtocolId, SessionId,
channel::mpsc,
error::SendErrorKind,
multiaddr::Multiaddr,
service::{
TargetProtocol, TargetSession,
event::{RawSessionInfo, ServiceTask},
},
};
use bytes::Bytes;
use std::sync::atomic::AtomicBool;
type Result = std::result::Result<(), SendErrorKind>;
#[derive(Clone)]
pub struct ServiceControl {
pub(crate) task_sender: mpsc::Sender<ServiceTask>,
closed: Arc<AtomicBool>,
}
impl ServiceControl {
pub(crate) fn new(task_sender: mpsc::Sender<ServiceTask>, closed: Arc<AtomicBool>) -> Self {
ServiceControl {
task_sender,
closed,
}
}
pub(crate) fn send(&self, event: ServiceTask) -> Result {
if self.closed.load(Ordering::SeqCst) {
return Err(SendErrorKind::BrokenPipe);
}
self.task_sender.try_send(event).map_err(|err| {
if err.is_full() {
SendErrorKind::WouldBlock
} else {
SendErrorKind::BrokenPipe
}
})
}
#[inline]
fn quick_send(&self, event: ServiceTask) -> Result {
if self.closed.load(Ordering::SeqCst) {
return Err(SendErrorKind::BrokenPipe);
}
self.task_sender.try_quick_send(event).map_err(|err| {
if err.is_full() {
SendErrorKind::WouldBlock
} else {
SendErrorKind::BrokenPipe
}
})
}
#[inline]
pub fn listen(&self, address: Multiaddr) -> Result {
self.quick_send(ServiceTask::Listen { address })
}
#[inline]
pub fn dial(&self, address: Multiaddr, target: TargetProtocol) -> Result {
self.quick_send(ServiceTask::Dial { address, target })
}
#[inline]
pub fn raw_session<T>(
&self,
raw_session: T,
remote_address: Multiaddr,
info: RawSessionInfo,
) -> Result
where
T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static,
{
self.quick_send(ServiceTask::RawSession {
raw_session: Box::new(raw_session),
remote_address,
session_info: info,
})
}
#[inline]
pub fn disconnect(&self, session_id: SessionId) -> Result {
self.quick_send(ServiceTask::Disconnect { session_id })
}
#[inline]
pub fn send_message_to(
&self,
session_id: SessionId,
proto_id: ProtocolId,
data: Bytes,
) -> Result {
self.filter_broadcast(TargetSession::Single(session_id), proto_id, data)
}
#[inline]
pub fn quick_send_message_to(
&self,
session_id: SessionId,
proto_id: ProtocolId,
data: Bytes,
) -> Result {
self.quick_filter_broadcast(TargetSession::Single(session_id), proto_id, data)
}
#[inline]
pub fn filter_broadcast(
&self,
target: TargetSession,
proto_id: ProtocolId,
data: Bytes,
) -> Result {
self.send(ServiceTask::ProtocolMessage {
target,
proto_id,
data,
})
}
#[inline]
pub fn quick_filter_broadcast(
&self,
target: TargetSession,
proto_id: ProtocolId,
data: Bytes,
) -> Result {
self.quick_send(ServiceTask::ProtocolMessage {
target,
proto_id,
data,
})
}
#[inline]
pub fn future_task<T>(&self, task: T) -> Result
where
T: Future<Output = ()> + 'static + Send,
{
self.send(ServiceTask::FutureTask {
task: Box::pin(task),
})
}
#[inline]
pub fn open_protocol(&self, session_id: SessionId, proto_id: ProtocolId) -> Result {
self.quick_send(ServiceTask::ProtocolOpen {
session_id,
target: proto_id.into(),
})
}
#[inline]
pub fn open_protocols(&self, session_id: SessionId, target: TargetProtocol) -> Result {
self.quick_send(ServiceTask::ProtocolOpen { session_id, target })
}
#[inline]
pub fn close_protocol(&self, session_id: SessionId, proto_id: ProtocolId) -> Result {
self.quick_send(ServiceTask::ProtocolClose {
session_id,
proto_id,
})
}
pub fn set_service_notify(
&self,
proto_id: ProtocolId,
interval: Duration,
token: u64,
) -> Result {
self.send(ServiceTask::SetProtocolNotify {
proto_id,
interval,
token,
})
}
pub fn remove_service_notify(&self, proto_id: ProtocolId, token: u64) -> Result {
self.send(ServiceTask::RemoveProtocolNotify { proto_id, token })
}
pub fn set_session_notify(
&self,
session_id: SessionId,
proto_id: ProtocolId,
interval: Duration,
token: u64,
) -> Result {
self.send(ServiceTask::SetProtocolSessionNotify {
session_id,
proto_id,
interval,
token,
})
}
pub fn remove_session_notify(
&self,
session_id: SessionId,
proto_id: ProtocolId,
token: u64,
) -> Result {
self.send(ServiceTask::RemoveProtocolSessionNotify {
session_id,
proto_id,
token,
})
}
pub fn close(&self) -> Result {
self.quick_send(ServiceTask::Shutdown(false))
}
pub fn shutdown(&self) -> Result {
self.quick_send(ServiceTask::Shutdown(true))
}
}
impl From<ServiceControl> for ServiceAsyncControl {
fn from(control: ServiceControl) -> Self {
ServiceAsyncControl {
task_sender: control.task_sender,
closed: control.closed,
}
}
}
impl From<ServiceAsyncControl> for ServiceControl {
fn from(control: ServiceAsyncControl) -> Self {
ServiceControl {
task_sender: control.task_sender,
closed: control.closed,
}
}
}
impl Debug for ServiceControl {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "ServiceControl")
}
}
impl Debug for ServiceAsyncControl {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "ServiceAsyncControl")
}
}
#[derive(Clone)]
pub struct ServiceAsyncControl {
task_sender: mpsc::Sender<ServiceTask>,
closed: Arc<AtomicBool>,
}
impl ServiceAsyncControl {
async fn send(&self, event: ServiceTask) -> Result {
if self.closed.load(Ordering::SeqCst) {
return Err(SendErrorKind::BrokenPipe);
}
self.task_sender.async_send(event).await.map_err(|_err| {
SendErrorKind::BrokenPipe
})
}
#[inline]
async fn quick_send(&self, event: ServiceTask) -> Result {
if self.closed.load(Ordering::SeqCst) {
return Err(SendErrorKind::BrokenPipe);
}
self.task_sender
.async_quick_send(event)
.await
.map_err(|_err| {
SendErrorKind::BrokenPipe
})
}
#[inline]
pub async fn listen(&self, address: Multiaddr) -> Result {
self.quick_send(ServiceTask::Listen { address }).await
}
#[inline]
pub async fn dial(&self, address: Multiaddr, target: TargetProtocol) -> Result {
self.quick_send(ServiceTask::Dial { address, target }).await
}
#[inline]
pub async fn raw_session<T>(
&self,
raw_session: T,
remote_address: Multiaddr,
info: RawSessionInfo,
) -> Result
where
T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static,
{
self.quick_send(ServiceTask::RawSession {
raw_session: Box::new(raw_session),
remote_address,
session_info: info,
})
.await
}
#[inline]
pub async fn disconnect(&self, session_id: SessionId) -> Result {
self.quick_send(ServiceTask::Disconnect { session_id })
.await
}
#[inline]
pub async fn send_message_to(
&self,
session_id: SessionId,
proto_id: ProtocolId,
data: Bytes,
) -> Result {
self.filter_broadcast(TargetSession::Single(session_id), proto_id, data)
.await
}
#[inline]
pub async fn quick_send_message_to(
&self,
session_id: SessionId,
proto_id: ProtocolId,
data: Bytes,
) -> Result {
self.quick_filter_broadcast(TargetSession::Single(session_id), proto_id, data)
.await
}
#[inline]
pub async fn filter_broadcast(
&self,
target: TargetSession,
proto_id: ProtocolId,
data: Bytes,
) -> Result {
self.send(ServiceTask::ProtocolMessage {
target,
proto_id,
data,
})
.await
}
#[inline]
pub async fn quick_filter_broadcast(
&self,
target: TargetSession,
proto_id: ProtocolId,
data: Bytes,
) -> Result {
self.quick_send(ServiceTask::ProtocolMessage {
target,
proto_id,
data,
})
.await
}
#[inline]
pub async fn future_task<T>(&self, task: T) -> Result
where
T: Future<Output = ()> + 'static + Send,
{
self.send(ServiceTask::FutureTask {
task: Box::pin(task),
})
.await
}
#[inline]
pub async fn open_protocol(&self, session_id: SessionId, proto_id: ProtocolId) -> Result {
self.quick_send(ServiceTask::ProtocolOpen {
session_id,
target: proto_id.into(),
})
.await
}
#[inline]
pub async fn open_protocols(&self, session_id: SessionId, target: TargetProtocol) -> Result {
self.quick_send(ServiceTask::ProtocolOpen { session_id, target })
.await
}
#[inline]
pub async fn close_protocol(&self, session_id: SessionId, proto_id: ProtocolId) -> Result {
self.quick_send(ServiceTask::ProtocolClose {
session_id,
proto_id,
})
.await
}
pub async fn set_service_notify(
&self,
proto_id: ProtocolId,
interval: Duration,
token: u64,
) -> Result {
self.send(ServiceTask::SetProtocolNotify {
proto_id,
interval,
token,
})
.await
}
pub async fn remove_service_notify(&self, proto_id: ProtocolId, token: u64) -> Result {
self.send(ServiceTask::RemoveProtocolNotify { proto_id, token })
.await
}
pub async fn set_session_notify(
&self,
session_id: SessionId,
proto_id: ProtocolId,
interval: Duration,
token: u64,
) -> Result {
self.send(ServiceTask::SetProtocolSessionNotify {
session_id,
proto_id,
interval,
token,
})
.await
}
pub async fn remove_session_notify(
&self,
session_id: SessionId,
proto_id: ProtocolId,
token: u64,
) -> Result {
self.send(ServiceTask::RemoveProtocolSessionNotify {
session_id,
proto_id,
token,
})
.await
}
pub async fn close(&self) -> Result {
self.quick_send(ServiceTask::Shutdown(false)).await
}
pub async fn shutdown(&self) -> Result {
self.quick_send(ServiceTask::Shutdown(true)).await
}
}