artillery_core/cluster/
ap.rs

1use crate::epidemic::prelude::*;
2use crate::errors::*;
3use crate::service_discovery::mdns::prelude::*;
4
5use lightproc::prelude::*;
6
7use futures::{select, FutureExt};
8use pin_utils::pin_mut;
9use std::{cell::Cell, sync::Arc};
10use uuid::Uuid;
11
12#[derive(Default, Debug, Clone)]
13pub struct ArtilleryAPClusterConfig {
14    pub app_name: String,
15    pub node_id: Uuid,
16    pub cluster_config: ClusterConfig,
17    pub sd_config: MDNSServiceDiscoveryConfig,
18}
19
20pub struct ArtilleryAPCluster {
21    config: ArtilleryAPClusterConfig,
22    cluster: Arc<Cluster>,
23    sd: Arc<MDNSServiceDiscovery>,
24    cluster_ev_loop_handle: Cell<RecoverableHandle<()>>,
25}
26
27unsafe impl Send for ArtilleryAPCluster {}
28unsafe impl Sync for ArtilleryAPCluster {}
29
30pub type DiscoveryLaunch = RecoverableHandle<()>;
31
32impl ArtilleryAPCluster {
33    pub fn new(config: ArtilleryAPClusterConfig) -> Result<Self> {
34        let sd = MDNSServiceDiscovery::new_service_discovery(config.sd_config.clone())?;
35
36        let (cluster, cluster_listener) =
37            Cluster::new_cluster(config.node_id, config.cluster_config.clone())?;
38
39        Ok(Self {
40            config,
41            cluster: Arc::new(cluster),
42            sd: Arc::new(sd),
43            cluster_ev_loop_handle: Cell::new(cluster_listener),
44        })
45    }
46
47    pub fn cluster(&self) -> Arc<Cluster> {
48        self.cluster.clone()
49    }
50
51    pub fn service_discovery(&self) -> Arc<MDNSServiceDiscovery> {
52        self.sd.clone()
53    }
54
55    pub fn shutdown(&self) {
56        self.cluster().leave_cluster();
57    }
58
59    pub async fn launch(&self) {
60        let (_, eh) = LightProc::recoverable(async {}, |_| (), ProcStack::default());
61        let ev_loop_handle = self.cluster_ev_loop_handle.replace(eh);
62
63        // do fusing
64        let ev_loop_handle = ev_loop_handle.fuse();
65        let discover_nodes_handle = self.discover_nodes().fuse();
66
67        pin_mut!(ev_loop_handle);
68        pin_mut!(discover_nodes_handle);
69
70        select! {
71            ev_loop_res = ev_loop_handle => { dbg!(ev_loop_res); ev_loop_res.unwrap() },
72            _ = discover_nodes_handle => panic!("Node discovery unexpectedly shutdown.")
73        };
74    }
75
76    async fn discover_nodes(&self) {
77        self.service_discovery()
78            .events()
79            .iter()
80            .filter(|discovery| {
81                discovery.get().port() != self.config.sd_config.local_service_addr.port()
82            })
83            .for_each(|discovery| self.cluster.add_seed_node(discovery.get()))
84    }
85}