1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
use super::state::ArtilleryEpidemic; use crate::epidemic::cluster_config::ClusterConfig; use crate::epidemic::state::{ArtilleryClusterEvent, ArtilleryClusterRequest}; use crate::errors::*; use bastion_executor::prelude::*; use lightproc::{proc_stack::ProcStack, recoverable_handle::RecoverableHandle}; use std::convert::AsRef; use std::net::SocketAddr; use std::{ future::Future, pin::Pin, sync::mpsc::{channel, Receiver, Sender}, task::{Context, Poll}, }; use uuid::Uuid; #[derive(Debug)] pub struct Cluster { pub events: Receiver<ArtilleryClusterEvent>, comm: Sender<ArtilleryClusterRequest>, } impl Cluster { pub fn new_cluster( host_key: Uuid, config: ClusterConfig, ) -> Result<(Self, RecoverableHandle<()>)> { let (event_tx, event_rx) = channel::<ArtilleryClusterEvent>(); let (internal_tx, mut internal_rx) = channel::<ArtilleryClusterRequest>(); let (poll, state) = ArtilleryEpidemic::new(host_key, config, event_tx, internal_tx.clone())?; debug!("Starting Artillery Cluster"); let cluster_handle = spawn_blocking( async move { ArtilleryEpidemic::event_loop(&mut internal_rx, poll, state) .expect("Failed to create event loop"); }, ProcStack::default(), ); Ok(( Self { events: event_rx, comm: internal_tx, }, cluster_handle, )) } pub fn add_seed_node(&self, addr: SocketAddr) { let _ = self.comm.send(ArtilleryClusterRequest::AddSeed(addr)); } pub fn send_payload<T: AsRef<str>>(&self, id: Uuid, msg: T) { self.comm .send(ArtilleryClusterRequest::Payload( id, msg.as_ref().to_string(), )) .unwrap(); } pub fn leave_cluster(&self) { let _ = self.comm.send(ArtilleryClusterRequest::LeaveCluster); } } impl Future for Cluster { type Output = ArtilleryClusterEvent; fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> { match self.events.recv() { Ok(kv) => Poll::Ready(kv), Err(_) => Poll::Pending, } } } unsafe impl Send for Cluster {} unsafe impl Sync for Cluster {} impl Drop for Cluster { fn drop(&mut self) { let (tx, rx) = channel(); let _ = self.comm.send(ArtilleryClusterRequest::Exit(tx)); rx.recv().unwrap(); } }