netxclient 1.7.4

netx client assembly.
Documentation
use crate::client::{INextClientInner, NetXClient, SessionSave};
use anyhow::{anyhow, Result};
use aqueue::Actor;
use std::collections::VecDeque;
use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};
use tokio::time::sleep;

pub struct RequestManager<T> {
    queue: VecDeque<(i64, Instant)>,
    request_out_time: u32,
    netx_client: Weak<Actor<NetXClient<T>>>,
}

unsafe impl<T> Send for RequestManager<T> {}
unsafe impl<T> Sync for RequestManager<T> {}

impl<T> Drop for RequestManager<T> {
    fn drop(&mut self) {
        log::debug!("request manager is drop");
    }
}

impl<T: SessionSave + 'static> RequestManager<T> {
    pub fn new(
        request_out_time: u32,
        netx_client: Weak<Actor<NetXClient<T>>>,
    ) -> Arc<Actor<RequestManager<T>>> {
        let ptr = Arc::new(Actor::new(RequestManager {
            queue: VecDeque::new(),
            request_out_time,
            netx_client,
        }));

        Self::start_check(Arc::downgrade(&ptr));
        ptr
    }

    fn start_check(request_manager: Weak<Actor<RequestManager<T>>>) {
        tokio::spawn(async move {
            while let Some(req) = request_manager.upgrade() {
                if let Err(er) = req.check().await {
                    log::error!("check request error:{}", er);
                }
                sleep(Duration::from_millis(500)).await
            }
        });
    }

    #[inline]
    pub async fn check(&mut self) {
        while let Some(item) = self.queue.pop_back() {
            if item.1.elapsed().as_millis() as u32 >= self.request_out_time {
                if let Some(client) = self.netx_client.upgrade() {
                    if let Err(er) = client
                        .set_error(item.0, anyhow!("serial:{} time out", item.0))
                        .await
                    {
                        log::error!("check err:{}", er);
                    }
                }
            } else {
                self.queue.push_back(item);
                break;
            }
        }
    }

    #[inline]
    pub fn set(&mut self, session_id: i64) {
        self.queue.push_front((session_id, Instant::now()));
    }
}

#[async_trait::async_trait]
pub trait IRequestManager {
    async fn check(&self) -> Result<()>;
    async fn set(&self, session_id: i64) -> Result<()>;
}

#[async_trait::async_trait]
impl<T: SessionSave + 'static> IRequestManager for Actor<RequestManager<T>> {
    #[inline]
    async fn check(&self) -> Result<()> {
        self.inner_call(|inner| async move {
            inner.get_mut().check().await;
            Ok(())
        })
        .await
    }
    #[inline]
    async fn set(&self, session_id: i64) -> Result<()> {
        self.inner_call(|inner| async move {
            inner.get_mut().set(session_id);
            Ok(())
        })
        .await
    }
}