mssf_util/monitoring/
producer.rs

1// ------------------------------------------------------------
2// Copyright (c) Microsoft Corporation.  All rights reserved.
3// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
4// ------------------------------------------------------------
5
6use crate::monitoring::{HealthEntity, NodeHealthEntity, entities::ClusterHealthEntity};
7use ::tokio::sync::mpsc;
8use mssf_core::{
9    client::FabricClient,
10    runtime::executor::BoxedCancelToken,
11    types::{
12        ApplicationHealthStatesFilter, ApplicationQueryDescription, ClusterHealthQueryDescription,
13        HealthEventsFilter, HealthStateFilterFlags, NodeHealthQueryDescription,
14        NodeHealthStatesFilter, NodeQueryResultItem, Uri,
15    },
16};
17use std::time::Duration;
18
19/// Queries SF and produces health data.
20/// User is responsible to implement a consumer to receive the data.
21pub struct HealthDataProducer {
22    fc: FabricClient,
23    interval: Duration,
24    sender: mpsc::UnboundedSender<HealthEntity>,
25    iteration: std::sync::atomic::AtomicU64,
26}
27
28/// Default timeout for FabricClient operations.
29const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
30
31pub enum Action {
32    Stop,
33}
34
35impl HealthDataProducer {
36    pub fn new(
37        fc: FabricClient,
38        interval: Duration,
39        sender: mpsc::UnboundedSender<HealthEntity>,
40    ) -> Self {
41        HealthDataProducer {
42            fc,
43            interval,
44            sender,
45            iteration: std::sync::atomic::AtomicU64::new(0),
46        }
47    }
48
49    fn send_entity(&self, entity: HealthEntity) -> Result<(), Action> {
50        self.sender.send(entity).map_err(|_| {
51            tracing::error!("Receiver dropped, cannot send more data.");
52            Action::Stop
53        })
54    }
55
56    /// Run once to produce health data.
57    pub(crate) async fn run_once(&self, token: BoxedCancelToken) -> Result<(), Action> {
58        // Get cluster health information.
59        if let Some(entity) = self.produce_cluster_health_entity(token.clone()).await {
60            self.send_entity(entity)?;
61        }
62        // Get node information.
63        if let Ok(nodes) = self.get_all_nodes(token.clone()).await {
64            for node in nodes {
65                if let Some(entity) = self.produce_node_health_entity(token.clone(), node).await {
66                    self.send_entity(entity)?;
67                }
68            }
69        }
70        // Get application information.
71        if let Ok(apps) = self.get_all_applications(token.clone()).await {
72            for app in apps {
73                let app_name = app.application_name.clone();
74                if let Some(entity) = self
75                    .produce_application_health_entity(token.clone(), app)
76                    .await
77                {
78                    self.send_entity(entity)?;
79                }
80
81                // Get service information for the application.
82                if let Ok(services) = self.get_all_services_for_app(token.clone(), app_name).await {
83                    for svc in services {
84                        let svc_name = svc.get_service_name().clone();
85                        // produce service health entity
86                        if let Some(entity) =
87                            self.produce_service_health_entity(token.clone(), svc).await
88                        {
89                            self.send_entity(entity)?;
90                        }
91
92                        // Get partition information for the service.
93                        if let Ok(partitions) = self
94                            .get_all_partitions_for_svc(token.clone(), svc_name)
95                            .await
96                        {
97                            for partition in partitions {
98                                let partition_id = partition.get_partition_id();
99                                // produce partition health entity
100                                if let Some(entity) = self
101                                    .produce_partition_health_entity(token.clone(), partition)
102                                    .await
103                                {
104                                    self.send_entity(entity)?;
105                                }
106                                // Get replica information for the partition.
107                                if let Ok(replicas) = self
108                                    .get_all_replicas_for_partition(token.clone(), partition_id)
109                                    .await
110                                {
111                                    for replica in replicas {
112                                        // produce replica health entity
113                                        if let Some(entity) = self
114                                            .produce_replica_health_entity(
115                                                token.clone(),
116                                                partition_id,
117                                                replica,
118                                            )
119                                            .await
120                                        {
121                                            self.send_entity(entity)?;
122                                        }
123                                    }
124                                }
125                            }
126                        }
127                    }
128                }
129            }
130        }
131        self.iteration
132            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
133        Ok(())
134    }
135
136    /// Run a loop to produce health data.
137    pub async fn run_loop(&self, token: BoxedCancelToken) {
138        loop {
139            let start_time = ::tokio::time::Instant::now();
140            match self.run_once(token.clone()).await {
141                Err(Action::Stop) => {
142                    tracing::info!("Health data producer stopped.");
143                    break;
144                }
145                _ => {
146                    // continue the loop
147                }
148            }
149
150            // remaining time
151            let elapsed = start_time.elapsed();
152            // wait for more time if necessary.
153            if elapsed < self.interval {
154                let wait_duration = self.interval - elapsed;
155
156                tokio::select! {
157                    _ = token.wait() => {
158                        tracing::info!("Cancellation requested, exiting health data producer loop.");
159                        break;
160                    }
161                    _ = tokio::time::sleep(wait_duration) => {}
162                }
163            }
164
165            if token.is_cancelled() {
166                tracing::info!("Cancellation requested, exiting health data producer loop.");
167                break;
168            }
169        }
170        tracing::info!("Health data producer loop exited.");
171    }
172
173    pub fn get_iteration(&self) -> u64 {
174        self.iteration.load(std::sync::atomic::Ordering::Relaxed)
175    }
176}
177
178impl HealthDataProducer {
179    async fn produce_cluster_health_entity(&self, token: BoxedCancelToken) -> Option<HealthEntity> {
180        // Ignore nodes and app health because we retrieve them separately.
181        // Technically we can get everything in one call, but the payload might be too large,
182        // and we want to get other entities not present in the cluster health.
183        // For example, each system service health is not present in this result.
184        let desc = ClusterHealthQueryDescription {
185            nodes_filter: Some(NodeHealthStatesFilter {
186                health_state_filter: HealthStateFilterFlags::NONE,
187            }),
188            applications_filter: Some(ApplicationHealthStatesFilter {
189                health_state_filter: HealthStateFilterFlags::NONE,
190            }),
191            ..Default::default()
192        };
193        let cluster_healths = self
194            .fc
195            .get_health_manager()
196            .get_cluster_health(&desc, DEFAULT_TIMEOUT, Some(token))
197            .await
198            .inspect_err(|err| {
199                tracing::error!("Failed to get cluster health: {}", err);
200            })
201            .ok()?;
202        Some(HealthEntity::Cluster(ClusterHealthEntity {
203            health: cluster_healths,
204        }))
205    }
206
207    /// Produce the health entity for a node.
208    async fn produce_node_health_entity(
209        &self,
210        token: BoxedCancelToken,
211        node: NodeQueryResultItem,
212    ) -> Option<HealthEntity> {
213        // Logic to get node health goes here.
214
215        let desc = NodeHealthQueryDescription {
216            node_name: node.name.clone(),
217            // We only care about the aggregated health state.
218            events_filter: Some(HealthEventsFilter {
219                health_state_filter: HealthStateFilterFlags::NONE,
220            }),
221            ..Default::default()
222        };
223        let node_healths = self
224            .fc
225            .get_health_manager()
226            .get_node_health(&desc, DEFAULT_TIMEOUT, Some(token))
227            .await
228            .inspect_err(|err| {
229                tracing::error!("Failed to get node health: {}", err);
230            })
231            .ok()?;
232        Some(HealthEntity::Node(NodeHealthEntity {
233            node,
234            health: node_healths,
235        }))
236    }
237
238    async fn produce_application_health_entity(
239        &self,
240        token: BoxedCancelToken,
241        app: mssf_core::types::ApplicationQueryResultItem,
242    ) -> Option<HealthEntity> {
243        let desc = mssf_core::types::ApplicationHealthQueryDescription {
244            application_name: app.application_name.clone(),
245            ..Default::default()
246        };
247        let app_health = self
248            .fc
249            .get_health_manager()
250            .get_application_health(&desc, DEFAULT_TIMEOUT, Some(token))
251            .await
252            .inspect_err(|err| {
253                tracing::error!("Failed to get application health: {}", err);
254            })
255            .ok()?;
256        Some(HealthEntity::Application(
257            crate::monitoring::entities::ApplicationHealthEntity {
258                application: app,
259                health: app_health,
260            },
261        ))
262    }
263
264    async fn produce_service_health_entity(
265        &self,
266        token: BoxedCancelToken,
267        svc: mssf_core::types::ServiceQueryResultItem,
268    ) -> Option<HealthEntity> {
269        let svc_name = svc.get_service_name().clone();
270        let desc = mssf_core::types::ServiceHealthQueryDescription {
271            service_name: svc_name,
272            ..Default::default()
273        };
274        let svc_health = self
275            .fc
276            .get_health_manager()
277            .get_service_health(&desc, DEFAULT_TIMEOUT, Some(token))
278            .await
279            .inspect_err(|err| {
280                tracing::error!("Failed to get service health: {}", err);
281            })
282            .ok()?;
283        Some(HealthEntity::Service(
284            crate::monitoring::entities::ServiceHealthEntity {
285                health: svc_health,
286                service: svc,
287            },
288        ))
289    }
290    async fn produce_partition_health_entity(
291        &self,
292        token: BoxedCancelToken,
293        part: mssf_core::types::ServicePartitionQueryResultItem,
294    ) -> Option<HealthEntity> {
295        let partition_id = part.get_partition_id();
296        let desc = mssf_core::types::PartitionHealthQueryDescription {
297            partition_id,
298            ..Default::default()
299        };
300        let part_health = self
301            .fc
302            .get_health_manager()
303            .get_partition_health(&desc, DEFAULT_TIMEOUT, Some(token))
304            .await
305            .inspect_err(|err| {
306                tracing::error!("Failed to get partition health: {}", err);
307            })
308            .ok()?;
309        Some(HealthEntity::Partition(
310            crate::monitoring::entities::PartitionHealthEntity {
311                health: part_health,
312                partition: part,
313            },
314        ))
315    }
316    async fn produce_replica_health_entity(
317        &self,
318        token: BoxedCancelToken,
319        partition_id: mssf_core::GUID,
320        replica: mssf_core::types::ServiceReplicaQueryResultItem,
321    ) -> Option<HealthEntity> {
322        let desc = mssf_core::types::ReplicaHealthQueryDescription {
323            partition_id,
324            replica_id_or_instance_id: replica.get_replica_or_instance_id(),
325            ..Default::default()
326        };
327        let replica_health = self
328            .fc
329            .get_health_manager()
330            .get_replica_health(&desc, DEFAULT_TIMEOUT, Some(token))
331            .await
332            .inspect_err(|err| {
333                tracing::error!("Failed to get replica health: {}", err);
334            })
335            .ok()?;
336        Some(HealthEntity::Replica(
337            crate::monitoring::entities::ReplicaHealthEntity {
338                health: replica_health,
339                replica,
340            },
341        ))
342    }
343}
344
345// Get lists of entities
346impl HealthDataProducer {
347    async fn get_all_nodes(
348        &self,
349        token: BoxedCancelToken,
350    ) -> mssf_core::Result<Vec<NodeQueryResultItem>> {
351        // Logic to get node information goes here.
352        let desc = &Default::default();
353        let nodes = self
354            .fc
355            .get_query_manager()
356            .get_node_list(desc, DEFAULT_TIMEOUT, Some(token.clone()))
357            .await
358            .inspect_err(|err| {
359                tracing::error!("Failed to get node list: {}", err);
360            })?
361            .nodes;
362        Ok(nodes)
363    }
364
365    /// This does not include system application.
366    /// We will report system service health separately.
367    async fn get_all_applications(
368        &self,
369        token: BoxedCancelToken,
370    ) -> mssf_core::Result<Vec<mssf_core::types::ApplicationQueryResultItem>> {
371        let desc = ApplicationQueryDescription::default();
372        let apps = self
373            .fc
374            .get_query_manager()
375            .get_application_list(&desc, DEFAULT_TIMEOUT, Some(token.clone()))
376            .await
377            .inspect_err(|err| {
378                tracing::error!("Failed to get application list: {}", err);
379            })?
380            .items;
381        Ok(apps)
382    }
383    async fn get_all_services_for_app(
384        &self,
385        token: BoxedCancelToken,
386        app_name: Uri,
387    ) -> mssf_core::Result<Vec<mssf_core::types::ServiceQueryResultItem>> {
388        let app_name_cp = app_name.clone();
389        // Logic to get service information goes here.
390        let desc = mssf_core::types::ServiceQueryDescription {
391            application_name: app_name_cp,
392            ..Default::default()
393        };
394        let services = self
395            .fc
396            .get_query_manager()
397            .get_service_list(&desc, DEFAULT_TIMEOUT, Some(token.clone()))
398            .await
399            .inspect_err(|err| {
400                tracing::error!("Failed to get service list for app {app_name}: {err}");
401            })?;
402        Ok(services.items)
403    }
404
405    async fn get_all_partitions_for_svc(
406        &self,
407        token: BoxedCancelToken,
408        service_name: Uri,
409    ) -> mssf_core::Result<Vec<mssf_core::types::ServicePartitionQueryResultItem>> {
410        // Logic to get partition information goes here.
411        let desc = mssf_core::types::ServicePartitionQueryDescription {
412            service_name,
413            partition_id_filter: None,
414        };
415        let partitions = self
416            .fc
417            .get_query_manager()
418            .get_partition_list(&desc, DEFAULT_TIMEOUT, Some(token.clone()))
419            .await
420            .inspect_err(|err| {
421                tracing::error!("Failed to get partition list: {}", err);
422            })?
423            .service_partitions;
424        Ok(partitions)
425    }
426
427    async fn get_all_replicas_for_partition(
428        &self,
429        token: BoxedCancelToken,
430        partition_id: mssf_core::GUID,
431    ) -> mssf_core::Result<Vec<mssf_core::types::ServiceReplicaQueryResultItem>> {
432        // Logic to get replica information goes here.
433        let desc = mssf_core::types::ServiceReplicaQueryDescription {
434            partition_id,
435            ..Default::default()
436        };
437        let replicas = self
438            .fc
439            .get_query_manager()
440            .get_replica_list(&desc, DEFAULT_TIMEOUT, Some(token.clone()))
441            .await
442            .inspect_err(|err| {
443                tracing::error!("Failed to get replica list: {}", err);
444            })?
445            .service_replicas;
446        Ok(replicas)
447    }
448}