risq 0.4.1

Re-implementation of Bisq (https://github.com/bisq-network/bisq) in rust
use super::event::ConnectionAdded;
use crate::{
    bisq::payload::{gen_nonce, Ping, Pong},
    p2p::{
        connection::{Connection, ConnectionId, Payload, Request},
        dispatch::Receive,
    },
    prelude::*,
};
use lazy_static::lazy_static;
use rand::{thread_rng, Rng};
use std::{
    collections::HashMap,
    time::{Duration, SystemTime},
};

lazy_static! {
    static ref LOOP_INTERVAL_SEC: u64 = thread_rng().gen::<u64>() % 5 + 30;
    static ref LOOP_INTERVAL: Duration = Duration::from_secs(*LOOP_INTERVAL_SEC);
    static ref LAST_ACTIVITY_AGE: Duration = Duration::from_secs(*LOOP_INTERVAL_SEC / 2);
}

struct Info {
    last_active: SystemTime,
    last_round_trip_time: Duration,
}
pub struct KeepAlive {
    infos: HashMap<ConnectionId, Info>,
    connections: HashMap<ConnectionId, WeakAddr<Connection>>,
}
impl KeepAlive {
    pub fn start() -> Addr<KeepAlive> {
        KeepAlive {
            infos: HashMap::new(),
            connections: HashMap::new(),
        }
        .start()
    }
}
impl Actor for KeepAlive {
    type Context = Context<KeepAlive>;

    fn started(&mut self, ctx: &mut Self::Context) {
        ctx.run_interval(*LOOP_INTERVAL, |keep_alive, ctx| {
            let infos = &mut keep_alive.infos;
            keep_alive.connections.retain(|id, conn| {
                if ping_peer(id.to_owned(), conn, infos.get(id), ctx) {
                    true
                } else {
                    infos.remove(id);
                    false
                }
            })
        });
    }
}
impl Handler<ConnectionAdded> for KeepAlive {
    type Result = ();
    fn handle(
        &mut self,
        ConnectionAdded(id, conn): ConnectionAdded,
        _: &mut Self::Context,
    ) -> Self::Result {
        self.connections.insert(id, conn);
    }
}
pub struct ReportLastActive;
impl Message for ReportLastActive {
    type Result = HashMap<ConnectionId, SystemTime>;
}
impl Handler<ReportLastActive> for KeepAlive {
    type Result = MessageResult<ReportLastActive>;

    fn handle(&mut self, _: ReportLastActive, _: &mut Self::Context) -> Self::Result {
        MessageResult(
            self.infos
                .iter()
                .map(|(id, info)| (*id, info.last_active))
                .collect(),
        )
    }
}

impl Handler<Receive<Ping>> for KeepAlive {
    type Result = ();
    fn handle(&mut self, Receive(id, ping): Receive<Ping>, _: &mut Self::Context) -> Self::Result {
        let now = SystemTime::now();
        self.infos.insert(
            id,
            Info {
                last_active: now,
                last_round_trip_time: Duration::from_millis(ping.last_round_trip_time as u64),
            },
        );
        if let Some(conn) = self.connections.get(&id) {
            if let Some(conn) = conn.upgrade() {
                arbiter_spawn!(conn.send(Payload(Pong {
                    request_nonce: ping.nonce,
                })));
            }
        }
    }
}

fn ping_peer(
    id: ConnectionId,
    conn: &WeakAddr<Connection>,
    info: Option<&Info>,
    ctx: &mut Context<KeepAlive>,
) -> bool {
    let send_time = SystemTime::now();
    let should_ping = match info {
        Some(info)
            if send_time
                .duration_since(info.last_active)
                .map(|t| t > *LAST_ACTIVITY_AGE)
                .unwrap_or(false) =>
        {
            true
        }
        None => true,
        _ => false,
    };
    if should_ping {
        if let Some(conn) = conn.upgrade() {
            let ping = Ping {
                nonce: gen_nonce(),
                last_round_trip_time: info.map_or(0, |i| i.last_round_trip_time.as_millis() as i32),
            };
            ctx.spawn(
                fut::wrap_future(conn.send(Request(ping)).flatten().map(move |_pong| {
                    let ret = SystemTime::now();
                    Info {
                        last_active: ret,
                        last_round_trip_time: ret
                            .duration_since(send_time)
                            .expect("Pong before Ping"),
                    }
                }))
                .map(move |info, keep_alive: &mut KeepAlive, _ctx| {
                    keep_alive.infos.insert(id, info)
                })
                .then(|_, _, _| fut::ok(())),
            );
            true
        } else {
            false
        }
    } else {
        true
    }
}