maviola 0.3.0

High-level MAVLink communication library with support for essential micro-services.
Documentation
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::{Duration, SystemTime};

use crate::asnc::node::api::EventSender;
use tokio::sync::RwLock;

use crate::asnc::node::Event;
use crate::core::io::ConnectionInfo;
use crate::core::utils::Closable;
use crate::protocol::Peer;

use crate::prelude::*;

pub(in crate::asnc::node) struct InactivePeersHandler<V: MaybeVersioned> {
    pub(in crate::asnc::node) info: ConnectionInfo,
    pub(in crate::asnc::node) peers: Arc<RwLock<HashMap<MavLinkId, Peer>>>,
    pub(in crate::asnc::node) timeout: Duration,
    pub(in crate::asnc::node) event_sender: EventSender<V>,
}

impl<V: MaybeVersioned> InactivePeersHandler<V> {
    pub(in crate::asnc::node) fn spawn(self, state: Closable) {
        tokio::spawn(async move {
            while !state.is_closed() {
                tokio::time::sleep(self.timeout).await;

                let inactive_peers = self.collect_inactive_peers().await;

                if self.handle_inactive_peers(inactive_peers).await.is_err() {
                    break;
                }
            }

            self.shutdown().await;
        });
    }

    async fn collect_inactive_peers(&self) -> HashSet<MavLinkId> {
        let now = SystemTime::now();

        let peers = self.peers.read().await;
        let mut inactive_peers = HashSet::new();
        for peer in peers.values() {
            if let Ok(since) = now.duration_since(peer.last_active) {
                if since > self.timeout {
                    inactive_peers.insert(peer.id);
                }
            }
        }
        inactive_peers
    }

    pub async fn handle_inactive_peers(&self, inactive_peers: HashSet<MavLinkId>) -> Result<()> {
        let mut peers = self.peers.write().await;

        for id in inactive_peers {
            if let Some(peer) = peers.remove(&id) {
                if let Err(err) = self.event_sender.send(Event::PeerLost(peer)) {
                    log::trace!(
                        "[{:?}] failed to report lost peer event: {err:?}",
                        &self.info
                    );
                    return Err(Error::from(err));
                }
            }
        }

        Ok(())
    }

    async fn shutdown(&self) {
        let mut peers = self.peers.write().await;

        for peer in peers.values() {
            let _ = self.event_sender.send(Event::PeerLost(peer.clone()));
        }
        peers.clear();

        log::trace!("[{:?}] inactive peers handler stopped", self.info);
    }
}