artillery_core/cluster/
ap.rs1use crate::epidemic::prelude::*;
2use crate::errors::*;
3use crate::service_discovery::mdns::prelude::*;
4
5use lightproc::prelude::*;
6
7use futures::{select, FutureExt};
8use pin_utils::pin_mut;
9use std::{cell::Cell, sync::Arc};
10use uuid::Uuid;
11
12#[derive(Default, Debug, Clone)]
13pub struct ArtilleryAPClusterConfig {
14 pub app_name: String,
15 pub node_id: Uuid,
16 pub cluster_config: ClusterConfig,
17 pub sd_config: MDNSServiceDiscoveryConfig,
18}
19
20pub struct ArtilleryAPCluster {
21 config: ArtilleryAPClusterConfig,
22 cluster: Arc<Cluster>,
23 sd: Arc<MDNSServiceDiscovery>,
24 cluster_ev_loop_handle: Cell<RecoverableHandle<()>>,
25}
26
27unsafe impl Send for ArtilleryAPCluster {}
28unsafe impl Sync for ArtilleryAPCluster {}
29
30pub type DiscoveryLaunch = RecoverableHandle<()>;
31
32impl ArtilleryAPCluster {
33 pub fn new(config: ArtilleryAPClusterConfig) -> Result<Self> {
34 let sd = MDNSServiceDiscovery::new_service_discovery(config.sd_config.clone())?;
35
36 let (cluster, cluster_listener) =
37 Cluster::new_cluster(config.node_id, config.cluster_config.clone())?;
38
39 Ok(Self {
40 config,
41 cluster: Arc::new(cluster),
42 sd: Arc::new(sd),
43 cluster_ev_loop_handle: Cell::new(cluster_listener),
44 })
45 }
46
47 pub fn cluster(&self) -> Arc<Cluster> {
48 self.cluster.clone()
49 }
50
51 pub fn service_discovery(&self) -> Arc<MDNSServiceDiscovery> {
52 self.sd.clone()
53 }
54
55 pub fn shutdown(&self) {
56 self.cluster().leave_cluster();
57 }
58
59 pub async fn launch(&self) {
60 let (_, eh) = LightProc::recoverable(async {}, |_| (), ProcStack::default());
61 let ev_loop_handle = self.cluster_ev_loop_handle.replace(eh);
62
63 let ev_loop_handle = ev_loop_handle.fuse();
65 let discover_nodes_handle = self.discover_nodes().fuse();
66
67 pin_mut!(ev_loop_handle);
68 pin_mut!(discover_nodes_handle);
69
70 select! {
71 ev_loop_res = ev_loop_handle => { dbg!(ev_loop_res); ev_loop_res.unwrap() },
72 _ = discover_nodes_handle => panic!("Node discovery unexpectedly shutdown.")
73 };
74 }
75
76 async fn discover_nodes(&self) {
77 self.service_discovery()
78 .events()
79 .iter()
80 .filter(|discovery| {
81 discovery.get().port() != self.config.sd_config.local_service_addr.port()
82 })
83 .for_each(|discovery| self.cluster.add_seed_node(discovery.get()))
84 }
85}