tentacle 0.7.5

Minimal implementation for a multiplexed p2p network framework.
use std::fmt;
use std::sync::Arc;
use std::time::Duration;

use crate::{
    ProtocolId, SessionId,
    context::SessionContext,
    error::{DialerErrorKind, ListenErrorKind, ProtocolHandleErrorKind},
    multiaddr::Multiaddr,
    service::{TargetProtocol, TargetSession, future_task::BoxedFutureTask},
};
use bytes::Bytes;

#[derive(Debug)]
pub(crate) enum ServiceEventAndError {
    Event {
        event: ServiceEvent,
        wait_response: Option<futures::channel::oneshot::Sender<()>>,
    },
    Error(ServiceError),
    Update {
        listen_addrs: Vec<multiaddr::MultiAddr>,
    },
}

impl ServiceEventAndError {
    pub fn wait_response(self, tx: futures::channel::oneshot::Sender<()>) -> Self {
        if let ServiceEventAndError::Event { event, .. } = self {
            ServiceEventAndError::Event {
                event,
                wait_response: Some(tx),
            }
        } else {
            self
        }
    }
}

impl From<ServiceEvent> for ServiceEventAndError {
    fn from(event: ServiceEvent) -> Self {
        ServiceEventAndError::Event {
            event,
            wait_response: None,
        }
    }
}

impl From<ServiceError> for ServiceEventAndError {
    fn from(event: ServiceError) -> Self {
        ServiceEventAndError::Error(event)
    }
}

/// Error generated by the Service
#[derive(Debug)]
pub enum ServiceError {
    /// When dial remote error
    DialerError {
        /// Remote address
        address: Multiaddr,
        /// error
        error: DialerErrorKind,
    },
    /// When listen error
    ListenError {
        /// Listen address
        address: Multiaddr,
        /// error
        error: ListenErrorKind,
    },
    /// Protocol select fail
    ProtocolSelectError {
        /// Protocol name, if none, timeout or other net problem,
        /// if Some, don't support this proto
        proto_name: Option<String>,
        /// Session context
        session_context: Arc<SessionContext>,
    },
    /// Protocol error during interaction
    ProtocolError {
        /// Session id
        id: SessionId,
        /// Protocol id
        proto_id: ProtocolId,
        /// Codec error
        error: std::io::Error,
    },
    /// After initializing the connection, the session does not open any protocol,
    /// suspected fd attack
    SessionTimeout {
        /// Session context
        session_context: Arc<SessionContext>,
    },
    /// Multiplex protocol error
    MuxerError {
        /// Session context
        session_context: Arc<SessionContext>,
        /// error, such as `InvalidData`
        error: std::io::Error,
    },
    /// Protocol handle error, will cause memory leaks/abnormal CPU usage
    /// tentacle will close after this error output
    ProtocolHandleError {
        /// Protocol id
        proto_id: ProtocolId,
        /// error
        error: ProtocolHandleErrorKind,
    },
    /// Session blocked, can't send message, if the task is too heavy in a short time.
    /// such as too many data cache on this session and can't send to remote,
    /// it may cause oom, so this session will be kill by tentacle
    ///
    /// Judging condition: unsent message size > send buffer size set by the user, default 24m
    SessionBlocked {
        /// Session context
        session_context: Arc<SessionContext>,
    },
}

/// Event generated by the Service
#[derive(Debug)]
pub enum ServiceEvent {
    /// A session close
    SessionClose {
        /// Session context
        session_context: Arc<SessionContext>,
    },
    /// A session open
    SessionOpen {
        /// Session context
        session_context: Arc<SessionContext>,
    },
    /// Listen close
    ListenClose {
        /// Listen address
        address: Multiaddr,
    },
    /// Listen start
    ListenStarted {
        /// Listen address
        address: Multiaddr,
    },
}

/// Task received by the Service.
///
/// An instruction that the outside world can send to the service
pub(crate) enum ServiceTask {
    /// Send protocol data task
    ProtocolMessage {
        /// Specify which sessions to send to,
        /// None means broadcast
        target: TargetSession,
        /// protocol id
        proto_id: ProtocolId,
        /// data
        data: Bytes,
    },
    /// Open specify protocol
    ProtocolOpen {
        /// Session id
        session_id: SessionId,
        /// protocol id
        target: TargetProtocol,
    },
    /// Close specify protocol
    ProtocolClose {
        /// Session id
        session_id: SessionId,
        /// protocol id
        proto_id: ProtocolId,
    },
    /// Set service notify task
    SetProtocolNotify {
        /// Protocol id
        proto_id: ProtocolId,
        /// Timer interval
        interval: Duration,
        /// The timer token
        token: u64,
    },
    /// Remove serivce notify task
    RemoveProtocolNotify {
        /// Protocol id
        proto_id: ProtocolId,
        /// The timer token
        token: u64,
    },
    /// Set service notify task
    SetProtocolSessionNotify {
        /// Session id
        session_id: SessionId,
        /// Protocol id
        proto_id: ProtocolId,
        /// Timer interval
        interval: Duration,
        /// The timer token
        token: u64,
    },
    /// Remove serivce notify task
    RemoveProtocolSessionNotify {
        /// Session id
        session_id: SessionId,
        /// Protocol id
        proto_id: ProtocolId,
        /// The timer token
        token: u64,
    },
    /// Future task
    FutureTask {
        /// Future
        task: BoxedFutureTask,
    },
    /// Disconnect task
    Disconnect {
        /// Session id
        session_id: SessionId,
    },
    /// Dial task
    Dial {
        /// Remote address
        address: Multiaddr,
        /// Dial protocols
        target: TargetProtocol,
    },
    /// Listen task
    Listen {
        /// Listen address
        address: Multiaddr,
    },
    /// Receive an established connection session
    /// and build the tentacle protocol on top of it.
    RawSession {
        /// Remote address
        remote_address: Multiaddr,
        /// Raw session
        raw_session: Box<dyn crate::session::AsyncRw + Send + Unpin + 'static>,
        /// Session info
        session_info: RawSessionInfo,
    },
    /// Shutdown service
    Shutdown(bool),
}

/// Raw session info
pub enum RawSessionInfo {
    /// Inbound session
    Inbound {
        /// Session provenance
        listen_addr: Multiaddr,
    },
    /// Outbound session
    Outbound {
        /// Dial protocols
        target: TargetProtocol,
    },
}

impl RawSessionInfo {
    /// Inbound session info
    pub fn inbound(listen_addr: Multiaddr) -> Self {
        RawSessionInfo::Inbound { listen_addr }
    }

    /// Outbound session info
    pub fn outbound(target: TargetProtocol) -> Self {
        RawSessionInfo::Outbound { target }
    }
}

impl fmt::Debug for ServiceTask {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        use self::ServiceTask::*;

        match self {
            ProtocolMessage { proto_id, data, .. } => {
                write!(f, "proto_id: {}, message: {:?}", proto_id, data)
            }
            SetProtocolNotify {
                proto_id, token, ..
            } => write!(f, "set protocol({}) notify({})", proto_id, token),
            RemoveProtocolNotify { proto_id, token } => {
                write!(f, "remove protocol({}) notify({})", proto_id, token)
            }
            SetProtocolSessionNotify {
                session_id,
                proto_id,
                token,
                ..
            } => write!(
                f,
                "set protocol({}) session({}) notify({})",
                proto_id, session_id, token
            ),
            RemoveProtocolSessionNotify {
                session_id,
                proto_id,
                token,
            } => write!(
                f,
                "remove protocol({}) session({}) notify({})",
                proto_id, session_id, token
            ),
            FutureTask { .. } => write!(f, "Future task"),
            Disconnect { session_id } => write!(f, "Disconnect session [{}]", session_id),
            Dial { address, .. } => write!(f, "Dial address: {}", address),
            Listen { address } => write!(f, "Listen address: {}", address),
            RawSession { remote_address, .. } => write!(f, "Raw session from: {}", remote_address),
            ProtocolOpen { session_id, .. } => write!(f, "Open session [{}] proto", session_id),
            ProtocolClose {
                session_id,
                proto_id,
            } => write!(f, "Close session [{}] proto [{}]", session_id, proto_id),
            Shutdown(_) => write!(f, "Try close service"),
        }
    }
}