mssf_util/monitoring/
mod.rs1mod producer;
7pub use producer::HealthDataProducer;
8mod entities;
9pub use entities::{LoopKind, NodeHealthEntity, ProducerEvent};
10
11#[cfg(test)]
12mod tests {
13
14 use std::time::Duration;
15
16 use mssf_core::{WString, client::FabricClient};
17 use tokio::sync::mpsc;
18
19 use crate::monitoring::{
20 HealthDataProducer, LoopKind, NodeHealthEntity, ProducerEvent,
21 entities::ClusterHealthEntity,
22 };
23
24 pub struct MockHealthDataConsumer {
25 receiver: mpsc::UnboundedReceiver<ProducerEvent>,
26 }
27
28 pub struct HealthDataCollection {
29 pub cluster_health_entity: Vec<ClusterHealthEntity>,
30 pub node_health_entities: Vec<NodeHealthEntity>,
31 pub application_health_entities: Vec<crate::monitoring::entities::ApplicationHealthEntity>,
32 pub partition_health_entities: Vec<crate::monitoring::entities::PartitionHealthEntity>,
33 pub service_health_entities: Vec<crate::monitoring::entities::ServiceHealthEntity>,
34 pub replica_health_entities: Vec<crate::monitoring::entities::ReplicaHealthEntity>,
35 }
36
37 impl MockHealthDataConsumer {
38 pub fn new(receiver: mpsc::UnboundedReceiver<ProducerEvent>) -> Self {
39 MockHealthDataConsumer { receiver }
40 }
41
42 pub async fn get_all_data(
50 &mut self,
51 token: &mssf_core::runtime::executor::BoxedCancelToken,
52 ) -> HealthDataCollection {
53 let mut data = HealthDataCollection {
54 node_health_entities: Vec::new(),
55 cluster_health_entity: Vec::new(),
56 application_health_entities: Vec::new(),
57 partition_health_entities: Vec::new(),
58 service_health_entities: Vec::new(),
59 replica_health_entities: Vec::new(),
60 };
61 let mut cluster_node_done = false;
62 let mut application_done = false;
63 while let Some(entity) = self.receiver.recv().await {
64 match entity {
65 ProducerEvent::Node(node_entity) => {
66 data.node_health_entities.push(node_entity);
67 }
68 ProducerEvent::Cluster(cluster_entity) => {
69 data.cluster_health_entity.push(cluster_entity);
70 }
71 ProducerEvent::Application(application_entity) => {
72 data.application_health_entities.push(application_entity);
73 }
74 ProducerEvent::Partition(partition_entity) => {
75 data.partition_health_entities.push(partition_entity);
76 }
77 ProducerEvent::Service(service_entity) => {
78 data.service_health_entities.push(service_entity);
79 }
80 ProducerEvent::Replica(replica_entity) => {
81 data.replica_health_entities.push(replica_entity);
82 }
83 ProducerEvent::IterationComplete(kind) => match kind {
84 LoopKind::ClusterNode => cluster_node_done = true,
85 LoopKind::Application => application_done = true,
86 },
87 }
88 if cluster_node_done && application_done {
89 token.cancel();
92 break;
93 }
94 }
95 data
96 }
97 }
98
99 pub fn new_health_data_producer_consumer(
100 fc: FabricClient,
101 ) -> (HealthDataProducer, MockHealthDataConsumer) {
102 let (sender, receiver) = mpsc::unbounded_channel();
103 let producer = HealthDataProducer::new(fc, Duration::from_secs(3), sender);
104 let consumer = MockHealthDataConsumer::new(receiver);
105 (producer, consumer)
106 }
107
108 #[tokio::test]
109 async fn test_health_data() {
110 let _ = tracing_subscriber::fmt().try_init();
112
113 let fc = FabricClient::builder()
114 .with_connection_strings(vec![WString::from("localhost:19000")])
115 .build()
116 .unwrap();
117 let (producer, mut consumer) = new_health_data_producer_consumer(fc);
118
119 let token = mssf_core::sync::SimpleCancelToken::new_boxed();
120 let token_clone = token.clone();
122 let ph = tokio::spawn(async move {
123 producer.run_loop(token_clone).await;
124 });
125
126 let data = consumer.get_all_data(&token).await;
129 ph.await.unwrap();
130
131 assert_eq!(
133 data.cluster_health_entity.len(),
134 1,
135 "Should have one cluster health entity"
136 );
137 let cluster_health = &data.cluster_health_entity[0];
138 assert_ne!(
141 cluster_health.health.aggregated_health_state,
142 mssf_core::types::HealthState::Unknown,
143 );
144 assert!(
145 cluster_health.health.node_health_states.is_empty(),
146 "Cluster health should not have nodes, we retrieve them separately."
147 );
148 assert!(
149 cluster_health.health.application_health_states.is_empty(),
150 "Cluster health should not have application health states, we retrieve them separately."
151 );
152
153 assert!(
156 data.node_health_entities.len() >= 3,
157 "Not enough nodes {:?}",
158 data.node_health_entities
159 );
160 let node1 = &data.node_health_entities[0];
161 assert!(!node1.node.name.is_empty());
162 assert!(
163 node1.health.aggregated_health_state == mssf_core::types::HealthState::Ok
164 || node1.health.aggregated_health_state == mssf_core::types::HealthState::Warning
165 );
166
167 if data.application_health_entities.is_empty() {
170 tracing::warn!("No applications found in the cluster");
171 } else {
172 let app1 = &data.application_health_entities[0];
173 assert_eq!(
174 app1.application.health_state,
175 app1.health.aggregated_health_state
176 );
177 assert!(
178 app1.health.aggregated_health_state == mssf_core::types::HealthState::Ok
179 || app1.health.aggregated_health_state
180 == mssf_core::types::HealthState::Warning
181 );
182 }
183 if data.partition_health_entities.is_empty() {
184 tracing::warn!("No partitions found in the cluster");
185 } else {
186 let partition1 = &data.partition_health_entities[0];
187 assert_eq!(
188 partition1.partition.get_health_state(),
189 partition1.health.aggregated_health_state
190 );
191 assert!(
192 partition1.health.aggregated_health_state == mssf_core::types::HealthState::Ok
193 || partition1.health.aggregated_health_state
194 == mssf_core::types::HealthState::Warning
195 );
196 }
197 if data.service_health_entities.is_empty() {
198 tracing::warn!("No services found in the cluster");
199 } else {
200 let service1 = &data.service_health_entities[0];
201 assert_eq!(
202 service1.service.get_health_state(),
203 service1.health.aggregated_health_state
204 );
205 assert!(
206 service1.health.aggregated_health_state == mssf_core::types::HealthState::Ok
207 || service1.health.aggregated_health_state
208 == mssf_core::types::HealthState::Warning
209 );
210 }
211 if data.replica_health_entities.is_empty() {
212 tracing::warn!("No replicas found in the cluster");
213 } else {
214 let replica1 = &data.replica_health_entities[0];
215 assert_eq!(
216 replica1.replica.get_aggregated_health_state(),
217 replica1.health.replica_health.get_aggregated_health_state()
218 );
219 assert!(
220 replica1.health.replica_health.get_aggregated_health_state()
221 == mssf_core::types::HealthState::Ok
222 || replica1.health.replica_health.get_aggregated_health_state()
223 == mssf_core::types::HealthState::Warning
224 );
225 }
226 }
227}