aex 0.1.6

A web server for rust.
Documentation
use crate::connection::context::{BoxReader, BoxWriter, Context};
use crate::connection::global::GlobalContext;
use crate::connection::heartbeat::{HeartbeatConfig, HeartbeatManager};
use crate::connection::node::Node;
use crate::tcp::types::{TCPCommand, TCPFrame};
use std::fmt;
use std::future::Future;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::io::{BufReader, BufWriter};
use tokio::net::TcpStream;
use tokio::sync::{Mutex, RwLock};
use tokio::task::AbortHandle;

#[derive(Clone)]
pub struct ConnectionEntry {
    pub node: Arc<RwLock<Option<Node>>>,
    pub addr: SocketAddr,
    pub abort_handle: AbortHandle,
    pub context: Option<Arc<Mutex<Context>>>,
    pub cancel_token: tokio_util::sync::CancellationToken,
    pub connected_at: u64,
    pub last_seen: Arc<AtomicU64>,
}

impl fmt::Debug for ConnectionEntry {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("ConnectionEntry")
            .field("addr", &self.addr)
            .field("connected_at", &self.connected_at)
            .field("last_seen", &self.last_seen)
            .finish()
    }
}

impl ConnectionEntry {
    pub fn new_empty_node(
        addr: SocketAddr,
        context: Option<Arc<Mutex<Context>>>,
        handle: tokio::task::AbortHandle,
        cancel_token: tokio_util::sync::CancellationToken,
    ) -> Self {
        let now = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .unwrap_or_default()
            .as_secs();
        Self {
            node: Arc::new(RwLock::new(None)),
            addr,
            abort_handle: handle,
            cancel_token,
            context,
            connected_at: now,
            last_seen: Arc::new(AtomicU64::new(now)),
        }
    }

    pub fn uptime_secs(&self) -> u64 {
        let now = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .unwrap_or_default()
            .as_secs();
        now.saturating_sub(self.connected_at)
    }

    pub fn is_deactivated(&self, current: u64, timeout_secs: u64, max_lifetime_secs: u64) -> bool {
        self.uptime_secs() >= max_lifetime_secs
            || self.last_seen.load(Ordering::Relaxed) + timeout_secs < current
    }

    pub async fn update_node(&self, new_node: Node) {
        let mut lock = self.node.write().await;
        *lock = Some(new_node);
    }

    pub async fn get_peer_id(&self) -> Option<Vec<u8>> {
        let lock = self.node.read().await;
        lock.as_ref().map(|n| n.id.clone())
    }

    pub fn start_heartbeat(&self, local_node: Node, config: HeartbeatConfig) {
        let ctx = match &self.context {
            Some(c) => c.clone(),
            None => return,
        };

        let heartbeat = HeartbeatManager::new(local_node).with_config(config);
        let _ = heartbeat.start_server_heartbeat(ctx, self.addr, self.cancel_token.clone());
    }

    pub async fn start_heartbeat_with_global(&self, global: &Arc<GlobalContext>) {
        let config = global.heartbeat_config.clone();
        let local_node = global.local_node.clone();
        let guard = local_node.read().await;
        let node = (*guard).clone();
        drop(guard);
        self.start_heartbeat(node, config);
    }

    pub fn start<FF, Fut>(
        parent_token: tokio_util::sync::CancellationToken,
        socket: TcpStream,
        addr: std::net::SocketAddr,
        global: Arc<GlobalContext>,
        f: FF,
    ) -> (
        tokio_util::sync::CancellationToken,
        tokio::task::AbortHandle,
        Arc<Mutex<Context>>,
    )
    where
        FF: FnOnce(Arc<Mutex<Context>>) -> Fut + Send + 'static,
        Fut: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
    {
        let child_token = parent_token.child_token();
        let task_token = child_token.clone();

        let (reader, writer) = socket.into_split();

        let r_opt: Option<BoxReader> = Some(Box::new(BufReader::new(reader)));
        let w_opt: Option<BoxWriter> = Some(Box::new(BufWriter::new(writer)));

        let mut raw_ctx = Context::new(r_opt, w_opt, global, addr);
        raw_ctx.set(task_token.clone());
        let ctx = Arc::new(Mutex::new(raw_ctx));

        let ctx_cloned = ctx.clone();
        let join_handle = tokio::spawn(async move {
            tokio::select! {
                _ = task_token.cancelled() => {
                    Ok::<(), anyhow::Error>(())
                }

                res = async {
                    f(ctx_cloned).await?;
                    Ok(())
                } => {
                    if let Err(e) = &res {
                         tracing::warn!("Pipeline error for {}: {}", addr, e);
                    }
                    res
                }
            }
        });

        (child_token, join_handle.abort_handle(), ctx)
    }

    pub fn default_pipeline<F, C>(
        _peer_addr: SocketAddr,
        _is_server: bool,
    ) -> impl FnOnce(Arc<Mutex<Context>>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>>
    + Send
    where
        F: TCPFrame + Send + 'static,
        C: TCPCommand + Send + 'static,
    {
        move |_ctx: Arc<Mutex<Context>>| {
            Box::pin(async move { Ok(()) })
                as Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>>
        }
    }
}

impl Drop for ConnectionEntry {
    fn drop(&mut self) {
        self.abort_handle.abort();
    }
}