maviola 0.3.0

High-level MAVLink communication library with support for essential micro-services.
Documentation
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::{Duration, SystemTime};

use crate::core::io::ConnectionInfo;
use crate::core::utils::Closable;
use crate::protocol::Peer;
use crate::sync::node::api::EventSender;
use crate::sync::node::Event;

use crate::prelude::*;

pub(in crate::sync::node) struct InactivePeersHandler<V: MaybeVersioned> {
    pub(in crate::sync::node) info: ConnectionInfo,
    pub(in crate::sync::node) peers: Arc<RwLock<HashMap<MavLinkId, Peer>>>,
    pub(in crate::sync::node) timeout: Duration,
    pub(in crate::sync::node) event_sender: EventSender<V>,
}

impl<V: MaybeVersioned> InactivePeersHandler<V> {
    pub(in crate::sync::node) fn spawn(self, state: Closable) {
        thread::spawn(move || {
            while !state.is_closed() {
                thread::sleep(self.timeout);

                let inactive_peers = match self.collect_inactive_peers() {
                    Ok(inactive_peers) => inactive_peers,
                    Err(_) => break,
                };

                if self.handle_inactive_peers(inactive_peers).is_err() {
                    break;
                }
            }

            self.shutdown();
        });
    }

    fn collect_inactive_peers(&self) -> Result<HashSet<MavLinkId>> {
        let now = SystemTime::now();

        let inactive_peers = match self.peers.read() {
            Ok(peers) => {
                let mut inactive_peers = HashSet::new();
                for peer in peers.values() {
                    if let Ok(since) = now.duration_since(peer.last_active) {
                        if since > self.timeout {
                            inactive_peers.insert(peer.id);
                        }
                    }
                }
                inactive_peers
            }
            Err(err) => {
                log::error!("[{:?}] can't read peers: {err:?}", self.info);
                return Err(Error::from(err));
            }
        };

        Ok(inactive_peers)
    }

    fn handle_inactive_peers(&self, inactive_peers: HashSet<MavLinkId>) -> Result<()> {
        let info = &self.info;

        match self.peers.write() {
            Ok(mut peers) => {
                for id in inactive_peers {
                    if let Some(peer) = peers.remove(&id) {
                        if let Err(err) = self.event_sender.send(Event::PeerLost(peer)) {
                            log::trace!("[{info:?}] failed to report lost peer event: {err:?}");
                            return Err(Error::from(err));
                        }
                    }
                }
            }
            Err(err) => {
                log::error!("[{info:?}] can't update peers: {err:?}");
                return Err(Error::from(err));
            }
        }

        Ok(())
    }

    fn shutdown(&self) {
        if let Ok(mut peers) = self.peers.write() {
            for peer in peers.values() {
                let _ = self.event_sender.send(Event::PeerLost(peer.clone()));
            }
            peers.clear();
        }
        log::trace!("[{:?}] inactive peers handler stopped", self.info);
    }
}