use bytes::Bytes;
use futures::prelude::*;
use std::{
collections::HashMap,
ops::{Deref, DerefMut},
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
},
time::Duration,
};
use crate::channel::QuickSinkExt;
use crate::{
channel::{mpsc, mpsc::Priority},
error::SendErrorKind,
multiaddr::Multiaddr,
protocol_select::ProtocolInfo,
secio::{PublicKey, SecioKeyPair},
service::{
event::ServiceTask, ServiceAsyncControl, ServiceControl, SessionType, TargetProtocol,
TargetSession,
},
session::SessionEvent,
ProtocolId, SessionId,
};
pub(crate) struct SessionController {
pub(crate) sender: mpsc::Sender<SessionEvent>,
pub(crate) inner: Arc<SessionContext>,
}
impl SessionController {
pub(crate) fn new(
event_sender: mpsc::Sender<SessionEvent>,
inner: Arc<SessionContext>,
) -> Self {
Self {
sender: event_sender,
inner,
}
}
pub(crate) async fn send(&mut self, priority: Priority, event: SessionEvent) -> Result {
if priority.is_high() {
self.sender.quick_send(event).await.map_err(|_err| {
SendErrorKind::BrokenPipe
})
} else {
self.sender.send(event).await.map_err(|_err| {
SendErrorKind::BrokenPipe
})
}
}
}
#[derive(Clone, Debug)]
pub struct SessionContext {
pub id: SessionId,
pub address: Multiaddr,
pub ty: SessionType,
pub remote_pubkey: Option<PublicKey>,
pub(crate) closed: Arc<AtomicBool>,
pending_data_size: Arc<AtomicUsize>,
}
impl SessionContext {
pub(crate) fn new(
id: SessionId,
address: Multiaddr,
ty: SessionType,
remote_pubkey: Option<PublicKey>,
closed: Arc<AtomicBool>,
pending_data_size: Arc<AtomicUsize>,
) -> SessionContext {
SessionContext {
id,
address,
ty,
remote_pubkey,
closed,
pending_data_size,
}
}
pub(crate) fn incr_pending_data_size(&self, data_size: usize) {
self.pending_data_size
.fetch_add(data_size, Ordering::AcqRel);
}
pub(crate) fn decr_pending_data_size(&self, data_size: usize) {
self.pending_data_size
.fetch_sub(data_size, Ordering::AcqRel);
}
pub fn closed(&self) -> bool {
self.closed.load(Ordering::SeqCst)
}
pub fn pending_data_size(&self) -> usize {
self.pending_data_size.load(Ordering::Acquire)
}
}
type Result = std::result::Result<(), SendErrorKind>;
pub struct ServiceContext {
listens: Vec<Multiaddr>,
key_pair: Option<SecioKeyPair>,
inner: ServiceAsyncControl,
}
impl ServiceContext {
pub(crate) fn new(
task_sender: mpsc::Sender<ServiceTask>,
proto_infos: HashMap<ProtocolId, ProtocolInfo>,
key_pair: Option<SecioKeyPair>,
closed: Arc<AtomicBool>,
) -> Self {
ServiceContext {
inner: ServiceControl::new(task_sender, proto_infos, closed).into(),
key_pair,
listens: Vec::new(),
}
}
#[inline]
pub async fn listen(&self, address: Multiaddr) -> Result {
self.inner.listen(address).await
}
#[inline]
pub async fn dial(&self, address: Multiaddr, target: TargetProtocol) -> Result {
self.inner.dial(address, target).await
}
#[inline]
pub async fn disconnect(&self, session_id: SessionId) -> Result {
self.inner.disconnect(session_id).await
}
#[inline]
pub async fn send_message_to(
&self,
session_id: SessionId,
proto_id: ProtocolId,
data: Bytes,
) -> Result {
self.inner.send_message_to(session_id, proto_id, data).await
}
#[inline]
pub async fn quick_send_message_to(
&self,
session_id: SessionId,
proto_id: ProtocolId,
data: Bytes,
) -> Result {
self.inner
.quick_send_message_to(session_id, proto_id, data)
.await
}
#[inline]
pub async fn filter_broadcast(
&self,
session_ids: TargetSession,
proto_id: ProtocolId,
data: Bytes,
) -> Result {
self.inner
.filter_broadcast(session_ids, proto_id, data)
.await
}
#[inline]
pub async fn quick_filter_broadcast(
&self,
session_ids: TargetSession,
proto_id: ProtocolId,
data: Bytes,
) -> Result {
self.inner
.quick_filter_broadcast(session_ids, proto_id, data)
.await
}
#[inline]
pub async fn future_task<T>(&self, task: T) -> Result
where
T: Future<Output = ()> + 'static + Send,
{
self.inner.future_task(task).await
}
#[inline]
pub async fn open_protocol(&self, session_id: SessionId, proto_id: ProtocolId) -> Result {
self.inner.open_protocol(session_id, proto_id).await
}
#[inline]
pub async fn open_protocols(&self, session_id: SessionId, target: TargetProtocol) -> Result {
self.inner.open_protocols(session_id, target).await
}
#[inline]
pub async fn close_protocol(&self, session_id: SessionId, proto_id: ProtocolId) -> Result {
self.inner.close_protocol(session_id, proto_id).await
}
#[inline]
pub fn control(&self) -> &ServiceAsyncControl {
&self.inner
}
#[inline]
pub fn protocols(&self) -> &Arc<HashMap<ProtocolId, ProtocolInfo>> {
self.inner.protocols()
}
#[inline]
pub fn key_pair(&self) -> Option<&SecioKeyPair> {
self.key_pair.as_ref()
}
#[inline]
pub fn listens(&self) -> &[Multiaddr] {
self.listens.as_ref()
}
#[inline]
pub(crate) fn update_listens(&mut self, address_list: Vec<Multiaddr>) {
self.listens = address_list;
}
pub async fn set_service_notify(
&self,
proto_id: ProtocolId,
interval: Duration,
token: u64,
) -> Result {
self.inner
.set_service_notify(proto_id, interval, token)
.await
}
pub async fn set_session_notify(
&self,
session_id: SessionId,
proto_id: ProtocolId,
interval: Duration,
token: u64,
) -> Result {
self.inner
.set_session_notify(session_id, proto_id, interval, token)
.await
}
pub async fn remove_service_notify(&self, proto_id: ProtocolId, token: u64) -> Result {
self.inner.remove_service_notify(proto_id, token).await
}
pub async fn remove_session_notify(
&self,
session_id: SessionId,
proto_id: ProtocolId,
token: u64,
) -> Result {
self.inner
.remove_session_notify(session_id, proto_id, token)
.await
}
pub async fn close(&self) -> Result {
self.inner.close().await
}
pub async fn shutdown(&self) -> Result {
self.inner.shutdown().await
}
pub(crate) fn clone_self(&self) -> Self {
ServiceContext {
inner: self.inner.clone(),
key_pair: self.key_pair.clone(),
listens: self.listens.clone(),
}
}
}
pub struct ProtocolContext {
inner: ServiceContext,
pub proto_id: ProtocolId,
}
impl ProtocolContext {
pub(crate) fn new(service_context: ServiceContext, proto_id: ProtocolId) -> Self {
ProtocolContext {
inner: service_context,
proto_id,
}
}
#[inline]
pub(crate) fn as_mut<'a, 'b: 'a>(
&'b mut self,
session: &'a SessionContext,
) -> ProtocolContextMutRef<'a> {
ProtocolContextMutRef {
inner: self,
session,
}
}
}
pub struct ProtocolContextMutRef<'a> {
inner: &'a mut ProtocolContext,
pub session: &'a SessionContext,
}
impl<'a> ProtocolContextMutRef<'a> {
#[inline]
pub async fn send_message(&self, data: Bytes) -> Result {
let proto_id = self.proto_id();
self.inner
.send_message_to(self.session.id, proto_id, data)
.await
}
#[inline]
pub async fn quick_send_message(&self, data: Bytes) -> Result {
let proto_id = self.proto_id();
self.inner
.quick_send_message_to(self.session.id, proto_id, data)
.await
}
#[inline]
pub fn proto_id(&self) -> ProtocolId {
self.inner.proto_id
}
}
impl Deref for ProtocolContext {
type Target = ServiceContext;
#[inline]
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl DerefMut for ProtocolContext {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
impl<'a> Deref for ProtocolContextMutRef<'a> {
type Target = ProtocolContext;
#[inline]
fn deref(&self) -> &Self::Target {
self.inner
}
}
impl<'a> DerefMut for ProtocolContextMutRef<'a> {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
self.inner
}
}