mssf_util/monitoring/
mod.rs

1// ------------------------------------------------------------
2// Copyright (c) Microsoft Corporation.  All rights reserved.
3// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
4// ------------------------------------------------------------
5
6mod producer;
7pub use producer::HealthDataProducer;
8mod entities;
9pub use entities::{HealthEntity, NodeHealthEntity};
10
11#[cfg(test)]
12mod tests {
13
14    use std::{sync::Arc, time::Duration};
15
16    use mssf_core::{WString, client::FabricClient};
17    use tokio::sync::mpsc;
18
19    use crate::monitoring::{
20        HealthDataProducer, HealthEntity, NodeHealthEntity, entities::ClusterHealthEntity,
21    };
22
23    pub struct MockHealthDataConsumer {
24        receiver: mpsc::UnboundedReceiver<HealthEntity>,
25    }
26
27    pub struct HealthDataCollection {
28        pub cluster_health_entity: Vec<ClusterHealthEntity>,
29        pub node_health_entities: Vec<NodeHealthEntity>,
30        pub application_health_entities: Vec<crate::monitoring::entities::ApplicationHealthEntity>,
31        pub partition_health_entities: Vec<crate::monitoring::entities::PartitionHealthEntity>,
32        pub service_health_entities: Vec<crate::monitoring::entities::ServiceHealthEntity>,
33        pub replica_health_entities: Vec<crate::monitoring::entities::ReplicaHealthEntity>,
34    }
35
36    impl MockHealthDataConsumer {
37        pub fn new(receiver: mpsc::UnboundedReceiver<HealthEntity>) -> Self {
38            MockHealthDataConsumer { receiver }
39        }
40
41        /// Producer must close.
42        pub async fn get_all_data(&mut self) -> HealthDataCollection {
43            let mut data = HealthDataCollection {
44                node_health_entities: Vec::new(),
45                cluster_health_entity: Vec::new(),
46                application_health_entities: Vec::new(),
47                partition_health_entities: Vec::new(),
48                service_health_entities: Vec::new(),
49                replica_health_entities: Vec::new(),
50            };
51            while let Some(entity) = self.receiver.recv().await {
52                match entity {
53                    HealthEntity::Node(node_entity) => {
54                        data.node_health_entities.push(node_entity);
55                    }
56                    HealthEntity::Cluster(cluster_entity) => {
57                        data.cluster_health_entity.push(cluster_entity);
58                    }
59                    HealthEntity::Application(application_entity) => {
60                        data.application_health_entities.push(application_entity);
61                    }
62                    HealthEntity::Partition(partition_entity) => {
63                        data.partition_health_entities.push(partition_entity);
64                    }
65                    HealthEntity::Service(service_entity) => {
66                        data.service_health_entities.push(service_entity);
67                    }
68                    HealthEntity::Replica(replica_entity) => {
69                        data.replica_health_entities.push(replica_entity);
70                    }
71                }
72            }
73            data
74        }
75    }
76
77    pub fn new_health_data_producer_consumer(
78        fc: FabricClient,
79    ) -> (HealthDataProducer, MockHealthDataConsumer) {
80        let (sender, receiver) = mpsc::unbounded_channel();
81        let producer = HealthDataProducer::new(fc, Duration::from_secs(30), sender);
82        let consumer = MockHealthDataConsumer::new(receiver);
83        (producer, consumer)
84    }
85
86    #[tokio::test]
87    async fn test_health_data() {
88        // set up tracing
89        let _ = tracing_subscriber::fmt().try_init();
90
91        let fc = FabricClient::builder()
92            .with_connection_strings(vec![WString::from("localhost:19000")])
93            .build()
94            .unwrap();
95        let (producer, mut consumer) = new_health_data_producer_consumer(fc);
96
97        let token = mssf_core::sync::SimpleCancelToken::new_boxed();
98        // Simulate producing health data
99        let token_clone = token.clone();
100        let producer = Arc::new(producer);
101        let producer_clone = producer.clone();
102        let ph = tokio::spawn(async move {
103            producer_clone.run_loop(token_clone).await;
104        });
105
106        // Wait at least 1 iteration bit and then stop the producer
107        let max_iteration = 10;
108        for _ in 0..max_iteration {
109            if producer.get_iteration() > 0 {
110                break;
111            }
112            tokio::time::sleep(Duration::from_secs(1)).await;
113        }
114        assert_ne!(
115            producer.get_iteration(),
116            0,
117            "Producer did not run any iteration"
118        );
119        drop(producer); // this is required for consumer to finish.
120        token.cancel();
121        ph.await.unwrap();
122
123        // Consume the health data
124        let data = consumer.get_all_data().await;
125        // check cluster health entity
126        assert_eq!(
127            data.cluster_health_entity.len(),
128            1,
129            "Should have one cluster health entity"
130        );
131        let cluster_health = &data.cluster_health_entity[0];
132        assert!(
133            cluster_health.health.aggregated_health_state == mssf_core::types::HealthState::Ok
134                || cluster_health.health.aggregated_health_state
135                    == mssf_core::types::HealthState::Warning
136        );
137        assert!(
138            cluster_health.health.node_health_states.is_empty(),
139            "Cluster health should not have nodes, we retrieve them separately."
140        );
141        assert!(
142            cluster_health.health.application_health_states.is_empty(),
143            "Cluster health should not have application health states, we retrieve them separately."
144        );
145
146        // We have 5 nodes in local SF windows cluster
147        // and 3 nodes for linux cluster.
148        assert!(
149            data.node_health_entities.len() >= 3,
150            "Not enough nodes {:?}",
151            data.node_health_entities
152        );
153        let node1 = &data.node_health_entities[0];
154        assert!(!node1.node.name.is_empty());
155        assert!(
156            node1.health.aggregated_health_state == mssf_core::types::HealthState::Ok
157                || node1.health.aggregated_health_state == mssf_core::types::HealthState::Warning
158        );
159
160        // Get applications
161        // For empty cluster applications is 0
162        if data.application_health_entities.is_empty() {
163            tracing::warn!("No applications found in the cluster");
164        } else {
165            let app1 = &data.application_health_entities[0];
166            assert_eq!(
167                app1.application.health_state,
168                app1.health.aggregated_health_state
169            );
170            assert!(
171                app1.health.aggregated_health_state == mssf_core::types::HealthState::Ok
172                    || app1.health.aggregated_health_state
173                        == mssf_core::types::HealthState::Warning
174            );
175        }
176        if data.partition_health_entities.is_empty() {
177            tracing::warn!("No partitions found in the cluster");
178        } else {
179            let partition1 = &data.partition_health_entities[0];
180            assert_eq!(
181                partition1.partition.get_health_state(),
182                partition1.health.aggregated_health_state
183            );
184            assert!(
185                partition1.health.aggregated_health_state == mssf_core::types::HealthState::Ok
186                    || partition1.health.aggregated_health_state
187                        == mssf_core::types::HealthState::Warning
188            );
189        }
190        if data.service_health_entities.is_empty() {
191            tracing::warn!("No services found in the cluster");
192        } else {
193            let service1 = &data.service_health_entities[0];
194            assert_eq!(
195                service1.service.get_health_state(),
196                service1.health.aggregated_health_state
197            );
198            assert!(
199                service1.health.aggregated_health_state == mssf_core::types::HealthState::Ok
200                    || service1.health.aggregated_health_state
201                        == mssf_core::types::HealthState::Warning
202            );
203        }
204        if data.replica_health_entities.is_empty() {
205            tracing::warn!("No replicas found in the cluster");
206        } else {
207            let replica1 = &data.replica_health_entities[0];
208            assert_eq!(
209                replica1.replica.get_aggregated_health_state(),
210                replica1.health.replica_health.get_aggregated_health_state()
211            );
212            assert!(
213                replica1.health.replica_health.get_aggregated_health_state()
214                    == mssf_core::types::HealthState::Ok
215                    || replica1.health.replica_health.get_aggregated_health_state()
216                        == mssf_core::types::HealthState::Warning
217            );
218        }
219    }
220}