Skip to main content

mssf_util/monitoring/
mod.rs

1// ------------------------------------------------------------
2// Copyright (c) Microsoft Corporation.  All rights reserved.
3// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
4// ------------------------------------------------------------
5
6mod producer;
7pub use producer::HealthDataProducer;
8mod entities;
9pub use entities::{HealthEntity, NodeHealthEntity};
10
11#[cfg(test)]
12mod tests {
13
14    use std::{sync::Arc, time::Duration};
15
16    use mssf_core::{WString, client::FabricClient};
17    use tokio::sync::mpsc;
18
19    use crate::monitoring::{
20        HealthDataProducer, HealthEntity, NodeHealthEntity, entities::ClusterHealthEntity,
21    };
22
23    pub struct MockHealthDataConsumer {
24        receiver: mpsc::UnboundedReceiver<HealthEntity>,
25    }
26
27    pub struct HealthDataCollection {
28        pub cluster_health_entity: Vec<ClusterHealthEntity>,
29        pub node_health_entities: Vec<NodeHealthEntity>,
30        pub application_health_entities: Vec<crate::monitoring::entities::ApplicationHealthEntity>,
31        pub partition_health_entities: Vec<crate::monitoring::entities::PartitionHealthEntity>,
32        pub service_health_entities: Vec<crate::monitoring::entities::ServiceHealthEntity>,
33        pub replica_health_entities: Vec<crate::monitoring::entities::ReplicaHealthEntity>,
34    }
35
36    impl MockHealthDataConsumer {
37        pub fn new(receiver: mpsc::UnboundedReceiver<HealthEntity>) -> Self {
38            MockHealthDataConsumer { receiver }
39        }
40
41        /// Producer must close.
42        pub async fn get_all_data(&mut self) -> HealthDataCollection {
43            let mut data = HealthDataCollection {
44                node_health_entities: Vec::new(),
45                cluster_health_entity: Vec::new(),
46                application_health_entities: Vec::new(),
47                partition_health_entities: Vec::new(),
48                service_health_entities: Vec::new(),
49                replica_health_entities: Vec::new(),
50            };
51            while let Some(entity) = self.receiver.recv().await {
52                match entity {
53                    HealthEntity::Node(node_entity) => {
54                        data.node_health_entities.push(node_entity);
55                    }
56                    HealthEntity::Cluster(cluster_entity) => {
57                        data.cluster_health_entity.push(cluster_entity);
58                    }
59                    HealthEntity::Application(application_entity) => {
60                        data.application_health_entities.push(application_entity);
61                    }
62                    HealthEntity::Partition(partition_entity) => {
63                        data.partition_health_entities.push(partition_entity);
64                    }
65                    HealthEntity::Service(service_entity) => {
66                        data.service_health_entities.push(service_entity);
67                    }
68                    HealthEntity::Replica(replica_entity) => {
69                        data.replica_health_entities.push(replica_entity);
70                    }
71                }
72            }
73            data
74        }
75    }
76
77    pub fn new_health_data_producer_consumer(
78        fc: FabricClient,
79    ) -> (HealthDataProducer, MockHealthDataConsumer) {
80        let (sender, receiver) = mpsc::unbounded_channel();
81        let producer = HealthDataProducer::new(fc, Duration::from_secs(30), sender);
82        let consumer = MockHealthDataConsumer::new(receiver);
83        (producer, consumer)
84    }
85
86    #[tokio::test]
87    async fn test_health_data() {
88        // set up tracing
89        let _ = tracing_subscriber::fmt().try_init();
90
91        let fc = FabricClient::builder()
92            .with_connection_strings(vec![WString::from("localhost:19000")])
93            .build()
94            .unwrap();
95        let (producer, mut consumer) = new_health_data_producer_consumer(fc);
96
97        let token = mssf_core::sync::SimpleCancelToken::new_boxed();
98        // Simulate producing health data
99        let token_clone = token.clone();
100        let producer = Arc::new(producer);
101        let producer_clone = producer.clone();
102        let ph = tokio::spawn(async move {
103            producer_clone.run_loop(token_clone).await;
104        });
105
106        // Wait at least 1 iteration bit and then stop the producer
107        let max_iteration = 30;
108        for _ in 0..max_iteration {
109            if producer.get_iteration() > 0 {
110                break;
111            }
112            tokio::time::sleep(Duration::from_secs(1)).await;
113        }
114        assert_ne!(
115            producer.get_iteration(),
116            0,
117            "Producer did not run any iteration"
118        );
119        drop(producer); // this is required for consumer to finish.
120        token.cancel();
121        ph.await.unwrap();
122
123        // Consume the health data
124        let data = consumer.get_all_data().await;
125        // check cluster health entity
126        assert_eq!(
127            data.cluster_health_entity.len(),
128            1,
129            "Should have one cluster health entity"
130        );
131        let cluster_health = &data.cluster_health_entity[0];
132        // Due to load, onebox could be in error state.
133        // It is not this tests job to verify cluster health, just check state is returned.
134        assert_ne!(
135            cluster_health.health.aggregated_health_state,
136            mssf_core::types::HealthState::Unknown,
137        );
138        assert!(
139            cluster_health.health.node_health_states.is_empty(),
140            "Cluster health should not have nodes, we retrieve them separately."
141        );
142        assert!(
143            cluster_health.health.application_health_states.is_empty(),
144            "Cluster health should not have application health states, we retrieve them separately."
145        );
146
147        // We have 5 nodes in local SF windows cluster
148        // and 3 nodes for linux cluster.
149        assert!(
150            data.node_health_entities.len() >= 3,
151            "Not enough nodes {:?}",
152            data.node_health_entities
153        );
154        let node1 = &data.node_health_entities[0];
155        assert!(!node1.node.name.is_empty());
156        assert!(
157            node1.health.aggregated_health_state == mssf_core::types::HealthState::Ok
158                || node1.health.aggregated_health_state == mssf_core::types::HealthState::Warning
159        );
160
161        // Get applications
162        // For empty cluster applications is 0
163        if data.application_health_entities.is_empty() {
164            tracing::warn!("No applications found in the cluster");
165        } else {
166            let app1 = &data.application_health_entities[0];
167            assert_eq!(
168                app1.application.health_state,
169                app1.health.aggregated_health_state
170            );
171            assert!(
172                app1.health.aggregated_health_state == mssf_core::types::HealthState::Ok
173                    || app1.health.aggregated_health_state
174                        == mssf_core::types::HealthState::Warning
175            );
176        }
177        if data.partition_health_entities.is_empty() {
178            tracing::warn!("No partitions found in the cluster");
179        } else {
180            let partition1 = &data.partition_health_entities[0];
181            assert_eq!(
182                partition1.partition.get_health_state(),
183                partition1.health.aggregated_health_state
184            );
185            assert!(
186                partition1.health.aggregated_health_state == mssf_core::types::HealthState::Ok
187                    || partition1.health.aggregated_health_state
188                        == mssf_core::types::HealthState::Warning
189            );
190        }
191        if data.service_health_entities.is_empty() {
192            tracing::warn!("No services found in the cluster");
193        } else {
194            let service1 = &data.service_health_entities[0];
195            assert_eq!(
196                service1.service.get_health_state(),
197                service1.health.aggregated_health_state
198            );
199            assert!(
200                service1.health.aggregated_health_state == mssf_core::types::HealthState::Ok
201                    || service1.health.aggregated_health_state
202                        == mssf_core::types::HealthState::Warning
203            );
204        }
205        if data.replica_health_entities.is_empty() {
206            tracing::warn!("No replicas found in the cluster");
207        } else {
208            let replica1 = &data.replica_health_entities[0];
209            assert_eq!(
210                replica1.replica.get_aggregated_health_state(),
211                replica1.health.replica_health.get_aggregated_health_state()
212            );
213            assert!(
214                replica1.health.replica_health.get_aggregated_health_state()
215                    == mssf_core::types::HealthState::Ok
216                    || replica1.health.replica_health.get_aggregated_health_state()
217                        == mssf_core::types::HealthState::Warning
218            );
219        }
220    }
221}