mssf_util/monitoring/
mod.rs1mod producer;
7pub use producer::HealthDataProducer;
8mod entities;
9pub use entities::{HealthEntity, NodeHealthEntity};
10
11#[cfg(test)]
12mod tests {
13
14 use std::{sync::Arc, time::Duration};
15
16 use mssf_core::{WString, client::FabricClient};
17 use tokio::sync::mpsc;
18
19 use crate::monitoring::{
20 HealthDataProducer, HealthEntity, NodeHealthEntity, entities::ClusterHealthEntity,
21 };
22
23 pub struct MockHealthDataConsumer {
24 receiver: mpsc::UnboundedReceiver<HealthEntity>,
25 }
26
27 pub struct HealthDataCollection {
28 pub cluster_health_entity: Vec<ClusterHealthEntity>,
29 pub node_health_entities: Vec<NodeHealthEntity>,
30 pub application_health_entities: Vec<crate::monitoring::entities::ApplicationHealthEntity>,
31 pub partition_health_entities: Vec<crate::monitoring::entities::PartitionHealthEntity>,
32 pub service_health_entities: Vec<crate::monitoring::entities::ServiceHealthEntity>,
33 pub replica_health_entities: Vec<crate::monitoring::entities::ReplicaHealthEntity>,
34 }
35
36 impl MockHealthDataConsumer {
37 pub fn new(receiver: mpsc::UnboundedReceiver<HealthEntity>) -> Self {
38 MockHealthDataConsumer { receiver }
39 }
40
41 pub async fn get_all_data(&mut self) -> HealthDataCollection {
43 let mut data = HealthDataCollection {
44 node_health_entities: Vec::new(),
45 cluster_health_entity: Vec::new(),
46 application_health_entities: Vec::new(),
47 partition_health_entities: Vec::new(),
48 service_health_entities: Vec::new(),
49 replica_health_entities: Vec::new(),
50 };
51 while let Some(entity) = self.receiver.recv().await {
52 match entity {
53 HealthEntity::Node(node_entity) => {
54 data.node_health_entities.push(node_entity);
55 }
56 HealthEntity::Cluster(cluster_entity) => {
57 data.cluster_health_entity.push(cluster_entity);
58 }
59 HealthEntity::Application(application_entity) => {
60 data.application_health_entities.push(application_entity);
61 }
62 HealthEntity::Partition(partition_entity) => {
63 data.partition_health_entities.push(partition_entity);
64 }
65 HealthEntity::Service(service_entity) => {
66 data.service_health_entities.push(service_entity);
67 }
68 HealthEntity::Replica(replica_entity) => {
69 data.replica_health_entities.push(replica_entity);
70 }
71 }
72 }
73 data
74 }
75 }
76
77 pub fn new_health_data_producer_consumer(
78 fc: FabricClient,
79 ) -> (HealthDataProducer, MockHealthDataConsumer) {
80 let (sender, receiver) = mpsc::unbounded_channel();
81 let producer = HealthDataProducer::new(fc, Duration::from_secs(30), sender);
82 let consumer = MockHealthDataConsumer::new(receiver);
83 (producer, consumer)
84 }
85
86 #[tokio::test]
87 async fn test_health_data() {
88 let _ = tracing_subscriber::fmt().try_init();
90
91 let fc = FabricClient::builder()
92 .with_connection_strings(vec![WString::from("localhost:19000")])
93 .build()
94 .unwrap();
95 let (producer, mut consumer) = new_health_data_producer_consumer(fc);
96
97 let token = mssf_core::sync::SimpleCancelToken::new_boxed();
98 let token_clone = token.clone();
100 let producer = Arc::new(producer);
101 let producer_clone = producer.clone();
102 let ph = tokio::spawn(async move {
103 producer_clone.run_loop(token_clone).await;
104 });
105
106 let max_iteration = 10;
108 for _ in 0..max_iteration {
109 if producer.get_iteration() > 0 {
110 break;
111 }
112 tokio::time::sleep(Duration::from_secs(1)).await;
113 }
114 assert_ne!(
115 producer.get_iteration(),
116 0,
117 "Producer did not run any iteration"
118 );
119 drop(producer); token.cancel();
121 ph.await.unwrap();
122
123 let data = consumer.get_all_data().await;
125 assert_eq!(
127 data.cluster_health_entity.len(),
128 1,
129 "Should have one cluster health entity"
130 );
131 let cluster_health = &data.cluster_health_entity[0];
132 assert!(
133 cluster_health.health.aggregated_health_state == mssf_core::types::HealthState::Ok
134 || cluster_health.health.aggregated_health_state
135 == mssf_core::types::HealthState::Warning
136 );
137 assert!(
138 cluster_health.health.node_health_states.is_empty(),
139 "Cluster health should not have nodes, we retrieve them separately."
140 );
141 assert!(
142 cluster_health.health.application_health_states.is_empty(),
143 "Cluster health should not have application health states, we retrieve them separately."
144 );
145
146 assert!(
149 data.node_health_entities.len() >= 3,
150 "Not enough nodes {:?}",
151 data.node_health_entities
152 );
153 let node1 = &data.node_health_entities[0];
154 assert!(!node1.node.name.is_empty());
155 assert!(
156 node1.health.aggregated_health_state == mssf_core::types::HealthState::Ok
157 || node1.health.aggregated_health_state == mssf_core::types::HealthState::Warning
158 );
159
160 if data.application_health_entities.is_empty() {
163 tracing::warn!("No applications found in the cluster");
164 } else {
165 let app1 = &data.application_health_entities[0];
166 assert_eq!(
167 app1.application.health_state,
168 app1.health.aggregated_health_state
169 );
170 assert!(
171 app1.health.aggregated_health_state == mssf_core::types::HealthState::Ok
172 || app1.health.aggregated_health_state
173 == mssf_core::types::HealthState::Warning
174 );
175 }
176 if data.partition_health_entities.is_empty() {
177 tracing::warn!("No partitions found in the cluster");
178 } else {
179 let partition1 = &data.partition_health_entities[0];
180 assert_eq!(
181 partition1.partition.get_health_state(),
182 partition1.health.aggregated_health_state
183 );
184 assert!(
185 partition1.health.aggregated_health_state == mssf_core::types::HealthState::Ok
186 || partition1.health.aggregated_health_state
187 == mssf_core::types::HealthState::Warning
188 );
189 }
190 if data.service_health_entities.is_empty() {
191 tracing::warn!("No services found in the cluster");
192 } else {
193 let service1 = &data.service_health_entities[0];
194 assert_eq!(
195 service1.service.get_health_state(),
196 service1.health.aggregated_health_state
197 );
198 assert!(
199 service1.health.aggregated_health_state == mssf_core::types::HealthState::Ok
200 || service1.health.aggregated_health_state
201 == mssf_core::types::HealthState::Warning
202 );
203 }
204 if data.replica_health_entities.is_empty() {
205 tracing::warn!("No replicas found in the cluster");
206 } else {
207 let replica1 = &data.replica_health_entities[0];
208 assert_eq!(
209 replica1.replica.get_aggregated_health_state(),
210 replica1.health.replica_health.get_aggregated_health_state()
211 );
212 assert!(
213 replica1.health.replica_health.get_aggregated_health_state()
214 == mssf_core::types::HealthState::Ok
215 || replica1.health.replica_health.get_aggregated_health_state()
216 == mssf_core::types::HealthState::Warning
217 );
218 }
219 }
220}