Skip to main content

mssf_util/monitoring/
mod.rs

1// ------------------------------------------------------------
2// Copyright (c) Microsoft Corporation.  All rights reserved.
3// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
4// ------------------------------------------------------------
5
6mod producer;
7pub use producer::HealthDataProducer;
8mod entities;
9pub use entities::{LoopKind, NodeHealthEntity, ProducerEvent};
10
11#[cfg(test)]
12mod tests {
13
14    use std::time::Duration;
15
16    use mssf_core::{WString, client::FabricClient};
17    use tokio::sync::mpsc;
18
19    use crate::monitoring::{
20        HealthDataProducer, LoopKind, NodeHealthEntity, ProducerEvent,
21        entities::ClusterHealthEntity,
22    };
23
24    pub struct MockHealthDataConsumer {
25        receiver: mpsc::UnboundedReceiver<ProducerEvent>,
26    }
27
28    pub struct HealthDataCollection {
29        pub cluster_health_entity: Vec<ClusterHealthEntity>,
30        pub node_health_entities: Vec<NodeHealthEntity>,
31        pub application_health_entities: Vec<crate::monitoring::entities::ApplicationHealthEntity>,
32        pub partition_health_entities: Vec<crate::monitoring::entities::PartitionHealthEntity>,
33        pub service_health_entities: Vec<crate::monitoring::entities::ServiceHealthEntity>,
34        pub replica_health_entities: Vec<crate::monitoring::entities::ReplicaHealthEntity>,
35    }
36
37    impl MockHealthDataConsumer {
38        pub fn new(receiver: mpsc::UnboundedReceiver<ProducerEvent>) -> Self {
39            MockHealthDataConsumer { receiver }
40        }
41
42        /// Collect exactly one iteration worth of health data.
43        ///
44        /// Each producer loop emits an [`HealthEntity::IterationComplete`]
45        /// marker once it has produced a full set of data for the current
46        /// iteration. We wait until *both* loops (cluster/node and application)
47        /// have signalled completion, then cancel the producer (via `token`)
48        /// and return the collected data.
49        pub async fn get_all_data(
50            &mut self,
51            token: &mssf_core::runtime::executor::BoxedCancelToken,
52        ) -> HealthDataCollection {
53            let mut data = HealthDataCollection {
54                node_health_entities: Vec::new(),
55                cluster_health_entity: Vec::new(),
56                application_health_entities: Vec::new(),
57                partition_health_entities: Vec::new(),
58                service_health_entities: Vec::new(),
59                replica_health_entities: Vec::new(),
60            };
61            let mut cluster_node_done = false;
62            let mut application_done = false;
63            while let Some(entity) = self.receiver.recv().await {
64                match entity {
65                    ProducerEvent::Node(node_entity) => {
66                        data.node_health_entities.push(node_entity);
67                    }
68                    ProducerEvent::Cluster(cluster_entity) => {
69                        data.cluster_health_entity.push(cluster_entity);
70                    }
71                    ProducerEvent::Application(application_entity) => {
72                        data.application_health_entities.push(application_entity);
73                    }
74                    ProducerEvent::Partition(partition_entity) => {
75                        data.partition_health_entities.push(partition_entity);
76                    }
77                    ProducerEvent::Service(service_entity) => {
78                        data.service_health_entities.push(service_entity);
79                    }
80                    ProducerEvent::Replica(replica_entity) => {
81                        data.replica_health_entities.push(replica_entity);
82                    }
83                    ProducerEvent::IterationComplete(kind) => match kind {
84                        LoopKind::ClusterNode => cluster_node_done = true,
85                        LoopKind::Application => application_done = true,
86                    },
87                }
88                if cluster_node_done && application_done {
89                    // Both loops have produced a full iteration. Stop the
90                    // producer and finish.
91                    token.cancel();
92                    break;
93                }
94            }
95            data
96        }
97    }
98
99    pub fn new_health_data_producer_consumer(
100        fc: FabricClient,
101    ) -> (HealthDataProducer, MockHealthDataConsumer) {
102        let (sender, receiver) = mpsc::unbounded_channel();
103        let producer = HealthDataProducer::new(fc, Duration::from_secs(3), sender);
104        let consumer = MockHealthDataConsumer::new(receiver);
105        (producer, consumer)
106    }
107
108    #[tokio::test]
109    async fn test_health_data() {
110        // set up tracing
111        let _ = tracing_subscriber::fmt().try_init();
112
113        let fc = FabricClient::builder()
114            .with_connection_strings(vec![WString::from("localhost:19000")])
115            .build()
116            .unwrap();
117        let (producer, mut consumer) = new_health_data_producer_consumer(fc);
118
119        let token = mssf_core::sync::SimpleCancelToken::new_boxed();
120        // Simulate producing health data
121        let token_clone = token.clone();
122        let ph = tokio::spawn(async move {
123            producer.run_loop(token_clone).await;
124        });
125
126        // Consume the health data. The consumer stops the producer once it has
127        // observed one full iteration (signalled by the second cluster entity).
128        let data = consumer.get_all_data(&token).await;
129        ph.await.unwrap();
130
131        // check cluster health entity
132        assert_eq!(
133            data.cluster_health_entity.len(),
134            1,
135            "Should have one cluster health entity"
136        );
137        let cluster_health = &data.cluster_health_entity[0];
138        // Due to load, onebox could be in error state.
139        // It is not this tests job to verify cluster health, just check state is returned.
140        assert_ne!(
141            cluster_health.health.aggregated_health_state,
142            mssf_core::types::HealthState::Unknown,
143        );
144        assert!(
145            cluster_health.health.node_health_states.is_empty(),
146            "Cluster health should not have nodes, we retrieve them separately."
147        );
148        assert!(
149            cluster_health.health.application_health_states.is_empty(),
150            "Cluster health should not have application health states, we retrieve them separately."
151        );
152
153        // We have 5 nodes in local SF windows cluster
154        // and 3 nodes for linux cluster.
155        assert!(
156            data.node_health_entities.len() >= 3,
157            "Not enough nodes {:?}",
158            data.node_health_entities
159        );
160        let node1 = &data.node_health_entities[0];
161        assert!(!node1.node.name.is_empty());
162        assert!(
163            node1.health.aggregated_health_state == mssf_core::types::HealthState::Ok
164                || node1.health.aggregated_health_state == mssf_core::types::HealthState::Warning
165        );
166
167        // Get applications
168        // For empty cluster applications is 0
169        if data.application_health_entities.is_empty() {
170            tracing::warn!("No applications found in the cluster");
171        } else {
172            let app1 = &data.application_health_entities[0];
173            assert_eq!(
174                app1.application.health_state,
175                app1.health.aggregated_health_state
176            );
177            assert!(
178                app1.health.aggregated_health_state == mssf_core::types::HealthState::Ok
179                    || app1.health.aggregated_health_state
180                        == mssf_core::types::HealthState::Warning
181            );
182        }
183        if data.partition_health_entities.is_empty() {
184            tracing::warn!("No partitions found in the cluster");
185        } else {
186            let partition1 = &data.partition_health_entities[0];
187            assert_eq!(
188                partition1.partition.get_health_state(),
189                partition1.health.aggregated_health_state
190            );
191            assert!(
192                partition1.health.aggregated_health_state == mssf_core::types::HealthState::Ok
193                    || partition1.health.aggregated_health_state
194                        == mssf_core::types::HealthState::Warning
195            );
196        }
197        if data.service_health_entities.is_empty() {
198            tracing::warn!("No services found in the cluster");
199        } else {
200            let service1 = &data.service_health_entities[0];
201            assert_eq!(
202                service1.service.get_health_state(),
203                service1.health.aggregated_health_state
204            );
205            assert!(
206                service1.health.aggregated_health_state == mssf_core::types::HealthState::Ok
207                    || service1.health.aggregated_health_state
208                        == mssf_core::types::HealthState::Warning
209            );
210        }
211        if data.replica_health_entities.is_empty() {
212            tracing::warn!("No replicas found in the cluster");
213        } else {
214            let replica1 = &data.replica_health_entities[0];
215            assert_eq!(
216                replica1.replica.get_aggregated_health_state(),
217                replica1.health.replica_health.get_aggregated_health_state()
218            );
219            assert!(
220                replica1.health.replica_health.get_aggregated_health_state()
221                    == mssf_core::types::HealthState::Ok
222                    || replica1.health.replica_health.get_aggregated_health_state()
223                        == mssf_core::types::HealthState::Warning
224            );
225        }
226    }
227}