1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
use super::state::ArtilleryEpidemic;
use crate::epidemic::cluster_config::ClusterConfig;
use crate::epidemic::state::{ArtilleryClusterEvent, ArtilleryClusterRequest};
use crate::errors::*;
use bastion_executor::prelude::*;
use lightproc::{proc_stack::ProcStack, recoverable_handle::RecoverableHandle};
use std::convert::AsRef;
use std::net::SocketAddr;
use std::{
    future::Future,
    pin::Pin,
    sync::mpsc::{channel, Receiver, Sender},
    task::{Context, Poll},
};
use uuid::Uuid;

#[derive(Debug)]
pub struct Cluster {
    pub events: Receiver<ArtilleryClusterEvent>,
    comm: Sender<ArtilleryClusterRequest>,
}

impl Cluster {
    pub fn new_cluster(
        host_key: Uuid,
        config: ClusterConfig,
    ) -> Result<(Self, RecoverableHandle<()>)> {
        let (event_tx, event_rx) = channel::<ArtilleryClusterEvent>();
        let (internal_tx, mut internal_rx) = channel::<ArtilleryClusterRequest>();

        let (poll, state) =
            ArtilleryEpidemic::new(host_key, config, event_tx, internal_tx.clone())?;

        debug!("Starting Artillery Cluster");
        let cluster_handle = spawn_blocking(
            async move {
                ArtilleryEpidemic::event_loop(&mut internal_rx, poll, state)
                    .expect("Failed to create event loop");
            },
            ProcStack::default(),
        );

        Ok((
            Self {
                events: event_rx,
                comm: internal_tx,
            },
            cluster_handle,
        ))
    }

    pub fn add_seed_node(&self, addr: SocketAddr) {
        let _ = self.comm.send(ArtilleryClusterRequest::AddSeed(addr));
    }

    pub fn send_payload<T: AsRef<str>>(&self, id: Uuid, msg: T) {
        self.comm
            .send(ArtilleryClusterRequest::Payload(
                id,
                msg.as_ref().to_string(),
            ))
            .unwrap();
    }

    pub fn leave_cluster(&self) {
        let _ = self.comm.send(ArtilleryClusterRequest::LeaveCluster);
    }
}

impl Future for Cluster {
    type Output = ArtilleryClusterEvent;

    fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
        match self.events.recv() {
            Ok(kv) => Poll::Ready(kv),
            Err(_) => Poll::Pending,
        }
    }
}

unsafe impl Send for Cluster {}
unsafe impl Sync for Cluster {}

impl Drop for Cluster {
    fn drop(&mut self) {
        let (tx, rx) = channel();

        let _ = self.comm.send(ArtilleryClusterRequest::Exit(tx));

        rx.recv().unwrap();
    }
}