Skip to main content

mssf_util/monitoring/
producer.rs

1// ------------------------------------------------------------
2// Copyright (c) Microsoft Corporation.  All rights reserved.
3// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
4// ------------------------------------------------------------
5
6use crate::monitoring::{
7    NodeHealthEntity, ProducerEvent,
8    entities::{ClusterHealthEntity, LoopKind},
9};
10use ::tokio::sync::mpsc;
11use mssf_core::{
12    client::FabricClient,
13    runtime::executor::BoxedCancelToken,
14    types::{
15        ApplicationHealthStatesFilter, ApplicationQueryDescription, ClusterHealthQueryDescription,
16        HealthEventsFilter, HealthStateFilterFlags, NodeHealthQueryDescription,
17        NodeHealthStatesFilter, NodeQueryResultItem, Uri,
18    },
19};
20use std::time::Duration;
21
22/// Queries SF and produces health data.
23/// User is responsible to implement a consumer to receive the data.
24pub struct HealthDataProducer {
25    fc: FabricClient,
26    interval: Duration,
27    sender: mpsc::UnboundedSender<ProducerEvent>,
28}
29
30/// Default timeout for FabricClient operations.
31const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
32
33pub enum Action {
34    Stop,
35}
36
37impl HealthDataProducer {
38    pub fn new(
39        fc: FabricClient,
40        interval: Duration,
41        sender: mpsc::UnboundedSender<ProducerEvent>,
42    ) -> Self {
43        HealthDataProducer {
44            fc,
45            interval,
46            sender,
47        }
48    }
49
50    fn send_event(&self, event: ProducerEvent) -> Result<(), Action> {
51        self.sender.send(event).map_err(|_| {
52            tracing::error!("Receiver dropped, cannot send more data.");
53            Action::Stop
54        })
55    }
56
57    /// Run once to produce cluster and node health data.
58    ///
59    /// Kept separate from application health so that the (potentially slow)
60    /// application traversal does not delay cluster/node reporting.
61    pub(crate) async fn run_once_cluster_and_nodes(
62        &self,
63        token: BoxedCancelToken,
64    ) -> Result<(), Action> {
65        // Get cluster health information.
66        if let Some(entity) = self.produce_cluster_health_entity(token.clone()).await {
67            self.send_event(entity)?;
68        }
69        // Get node information.
70        if let Ok(nodes) = self.get_all_nodes(token.clone()).await {
71            for node in nodes {
72                if let Some(entity) = self.produce_node_health_entity(token.clone(), node).await {
73                    self.send_event(entity)?;
74                }
75            }
76        }
77        self.send_event(ProducerEvent::IterationComplete(LoopKind::ClusterNode))?;
78        Ok(())
79    }
80
81    /// Run once to produce application health data, including the services,
82    /// partitions, and replicas underneath each application.
83    ///
84    /// This traversal can be slow on large clusters, so it runs in its own
85    /// loop independent of cluster/node reporting.
86    pub(crate) async fn run_once_applications(
87        &self,
88        token: BoxedCancelToken,
89    ) -> Result<(), Action> {
90        // Get application information.
91        if let Ok(apps) = self.get_all_applications(token.clone()).await {
92            for app in apps {
93                let app_name = app.application_name.clone();
94                if let Some(entity) = self
95                    .produce_application_health_entity(token.clone(), app)
96                    .await
97                {
98                    self.send_event(entity)?;
99                }
100
101                // Get service information for the application.
102                if let Ok(services) = self
103                    .get_all_services_for_app(token.clone(), app_name.clone())
104                    .await
105                {
106                    for svc in services {
107                        let svc_name = svc.get_service_name().clone();
108                        // produce service health entity
109                        if let Some(entity) =
110                            self.produce_service_health_entity(token.clone(), svc).await
111                        {
112                            self.send_event(entity)?;
113                        }
114
115                        // Get partition information for the service.
116                        if let Ok(partitions) = self
117                            .get_all_partitions_for_svc(token.clone(), svc_name.clone())
118                            .await
119                        {
120                            for partition in partitions {
121                                let partition_id = partition.get_partition_id();
122                                // produce partition health entity
123                                if let Some(entity) = self
124                                    .produce_partition_health_entity(
125                                        token.clone(),
126                                        partition,
127                                        svc_name.clone(),
128                                        app_name.clone(),
129                                    )
130                                    .await
131                                {
132                                    self.send_event(entity)?;
133                                }
134                                // Get replica information for the partition.
135                                if let Ok(replicas) = self
136                                    .get_all_replicas_for_partition(token.clone(), partition_id)
137                                    .await
138                                {
139                                    for replica in replicas {
140                                        // produce replica health entity
141                                        if let Some(entity) = self
142                                            .produce_replica_health_entity(
143                                                token.clone(),
144                                                partition_id,
145                                                replica,
146                                                svc_name.clone(),
147                                                app_name.clone(),
148                                            )
149                                            .await
150                                        {
151                                            self.send_event(entity)?;
152                                        }
153                                    }
154                                }
155                            }
156                        }
157                    }
158                }
159            }
160        }
161        self.send_event(ProducerEvent::IterationComplete(LoopKind::Application))?;
162        Ok(())
163    }
164
165    /// Run both the cluster/node loop and the application loop concurrently.
166    ///
167    /// The two loops are independent: a slow application traversal will not
168    /// delay cluster/node reporting, and vice versa.
169    pub async fn run_loop(&self, token: BoxedCancelToken) {
170        tokio::join!(
171            self.run_cluster_node_loop(token.clone()),
172            self.run_application_loop(token.clone()),
173        );
174        tracing::info!("Health data producer loops exited.");
175    }
176
177    /// Run a loop to produce cluster and node health data.
178    pub async fn run_cluster_node_loop(&self, token: BoxedCancelToken) {
179        self.run_interval_loop(token, "cluster/node", |token| {
180            self.run_once_cluster_and_nodes(token)
181        })
182        .await;
183    }
184
185    /// Run a loop to produce application health data.
186    pub async fn run_application_loop(&self, token: BoxedCancelToken) {
187        self.run_interval_loop(token, "application", |token| {
188            self.run_once_applications(token)
189        })
190        .await;
191    }
192
193    /// Drive `run_once` on `self.interval`, honoring cancellation.
194    async fn run_interval_loop<F, Fut>(&self, token: BoxedCancelToken, name: &str, mut run_once: F)
195    where
196        F: FnMut(BoxedCancelToken) -> Fut,
197        Fut: std::future::Future<Output = Result<(), Action>>,
198    {
199        loop {
200            let start_time = ::tokio::time::Instant::now();
201            if let Err(Action::Stop) = run_once(token.clone()).await {
202                tracing::info!("Health data {name} producer stopped.");
203                break;
204            }
205
206            // remaining time
207            let elapsed = start_time.elapsed();
208            // wait for more time if necessary.
209            if elapsed < self.interval {
210                let wait_duration = self.interval - elapsed;
211
212                tokio::select! {
213                    _ = token.wait() => {
214                        tracing::info!("Cancellation requested, exiting health data {name} producer loop.");
215                        break;
216                    }
217                    _ = tokio::time::sleep(wait_duration) => {}
218                }
219            }
220
221            if token.is_cancelled() {
222                tracing::info!("Cancellation requested, exiting health data {name} producer loop.");
223                break;
224            }
225        }
226        tracing::info!("Health data {name} producer loop exited.");
227    }
228}
229
230impl HealthDataProducer {
231    async fn produce_cluster_health_entity(
232        &self,
233        token: BoxedCancelToken,
234    ) -> Option<ProducerEvent> {
235        // Ignore nodes and app health because we retrieve them separately.
236        // Technically we can get everything in one call, but the payload might be too large,
237        // and we want to get other entities not present in the cluster health.
238        // For example, each system service health is not present in this result.
239        let desc = ClusterHealthQueryDescription {
240            nodes_filter: Some(NodeHealthStatesFilter {
241                health_state_filter: HealthStateFilterFlags::NONE,
242            }),
243            applications_filter: Some(ApplicationHealthStatesFilter {
244                health_state_filter: HealthStateFilterFlags::NONE,
245            }),
246            ..Default::default()
247        };
248        let cluster_healths = self
249            .fc
250            .get_health_manager()
251            .get_cluster_health(&desc, DEFAULT_TIMEOUT, Some(token))
252            .await
253            .inspect_err(|err| {
254                tracing::error!("Failed to get cluster health: {}", err);
255            })
256            .ok()?;
257        Some(ProducerEvent::Cluster(ClusterHealthEntity {
258            health: cluster_healths,
259        }))
260    }
261
262    /// Produce the health entity for a node.
263    async fn produce_node_health_entity(
264        &self,
265        token: BoxedCancelToken,
266        node: NodeQueryResultItem,
267    ) -> Option<ProducerEvent> {
268        // Logic to get node health goes here.
269
270        let desc = NodeHealthQueryDescription {
271            node_name: node.name.clone(),
272            // We only care about the aggregated health state.
273            events_filter: Some(HealthEventsFilter {
274                health_state_filter: HealthStateFilterFlags::NONE,
275            }),
276            ..Default::default()
277        };
278        let node_healths = self
279            .fc
280            .get_health_manager()
281            .get_node_health(&desc, DEFAULT_TIMEOUT, Some(token))
282            .await
283            .inspect_err(|err| {
284                tracing::error!("Failed to get node health: {}", err);
285            })
286            .ok()?;
287        Some(ProducerEvent::Node(NodeHealthEntity {
288            node,
289            health: node_healths,
290        }))
291    }
292
293    async fn produce_application_health_entity(
294        &self,
295        token: BoxedCancelToken,
296        app: mssf_core::types::ApplicationQueryResultItem,
297    ) -> Option<ProducerEvent> {
298        let desc = mssf_core::types::ApplicationHealthQueryDescription {
299            application_name: app.application_name.clone(),
300            ..Default::default()
301        };
302        let app_health = self
303            .fc
304            .get_health_manager()
305            .get_application_health(&desc, DEFAULT_TIMEOUT, Some(token))
306            .await
307            .inspect_err(|err| {
308                tracing::error!("Failed to get application health: {}", err);
309            })
310            .ok()?;
311        Some(ProducerEvent::Application(
312            crate::monitoring::entities::ApplicationHealthEntity {
313                application: app,
314                health: app_health,
315            },
316        ))
317    }
318
319    async fn produce_service_health_entity(
320        &self,
321        token: BoxedCancelToken,
322        svc: mssf_core::types::ServiceQueryResultItem,
323    ) -> Option<ProducerEvent> {
324        let svc_name = svc.get_service_name().clone();
325        let desc = mssf_core::types::ServiceHealthQueryDescription {
326            service_name: svc_name,
327            ..Default::default()
328        };
329        let svc_health = self
330            .fc
331            .get_health_manager()
332            .get_service_health(&desc, DEFAULT_TIMEOUT, Some(token))
333            .await
334            .inspect_err(|err| {
335                tracing::error!("Failed to get service health: {}", err);
336            })
337            .ok()?;
338        Some(ProducerEvent::Service(
339            crate::monitoring::entities::ServiceHealthEntity {
340                health: svc_health,
341                service: svc,
342            },
343        ))
344    }
345
346    async fn produce_partition_health_entity(
347        &self,
348        token: BoxedCancelToken,
349        part: mssf_core::types::ServicePartitionQueryResultItem,
350        service_name: Uri,
351        application_name: Uri,
352    ) -> Option<ProducerEvent> {
353        let partition_id = part.get_partition_id();
354        let desc = mssf_core::types::PartitionHealthQueryDescription {
355            partition_id,
356            ..Default::default()
357        };
358        let part_health = self
359            .fc
360            .get_health_manager()
361            .get_partition_health(&desc, DEFAULT_TIMEOUT, Some(token))
362            .await
363            .inspect_err(|err| {
364                tracing::error!("Failed to get partition health: {}", err);
365            })
366            .ok()?;
367        Some(ProducerEvent::Partition(
368            crate::monitoring::entities::PartitionHealthEntity {
369                health: part_health,
370                partition: part,
371                service_name,
372                application_name,
373            },
374        ))
375    }
376
377    async fn produce_replica_health_entity(
378        &self,
379        token: BoxedCancelToken,
380        partition_id: mssf_core::GUID,
381        replica: mssf_core::types::ServiceReplicaQueryResultItem,
382        service_name: Uri,
383        application_name: Uri,
384    ) -> Option<ProducerEvent> {
385        let desc = mssf_core::types::ReplicaHealthQueryDescription {
386            partition_id,
387            replica_id_or_instance_id: replica.get_replica_or_instance_id(),
388            ..Default::default()
389        };
390        let replica_health = self
391            .fc
392            .get_health_manager()
393            .get_replica_health(&desc, DEFAULT_TIMEOUT, Some(token))
394            .await
395            .inspect_err(|err| {
396                tracing::error!("Failed to get replica health: {}", err);
397            })
398            .ok()?;
399        Some(ProducerEvent::Replica(
400            crate::monitoring::entities::ReplicaHealthEntity {
401                health: replica_health,
402                replica,
403                service_name,
404                application_name,
405            },
406        ))
407    }
408}
409
410// Get lists of entities
411impl HealthDataProducer {
412    async fn get_all_nodes(
413        &self,
414        token: BoxedCancelToken,
415    ) -> mssf_core::Result<Vec<NodeQueryResultItem>> {
416        // Logic to get node information goes here.
417        let desc = &Default::default();
418        let nodes = self
419            .fc
420            .get_query_manager()
421            .get_node_list(desc, DEFAULT_TIMEOUT, Some(token.clone()))
422            .await
423            .inspect_err(|err| {
424                tracing::error!("Failed to get node list: {}", err);
425            })?
426            .nodes;
427        Ok(nodes)
428    }
429
430    /// This does not include system application.
431    /// We will report system service health separately.
432    async fn get_all_applications(
433        &self,
434        token: BoxedCancelToken,
435    ) -> mssf_core::Result<Vec<mssf_core::types::ApplicationQueryResultItem>> {
436        let desc = ApplicationQueryDescription::default();
437        let apps = self
438            .fc
439            .get_query_manager()
440            .get_application_list(&desc, DEFAULT_TIMEOUT, Some(token.clone()))
441            .await
442            .inspect_err(|err| {
443                tracing::error!("Failed to get application list: {}", err);
444            })?
445            .items;
446        Ok(apps)
447    }
448    async fn get_all_services_for_app(
449        &self,
450        token: BoxedCancelToken,
451        app_name: Uri,
452    ) -> mssf_core::Result<Vec<mssf_core::types::ServiceQueryResultItem>> {
453        let app_name_cp = app_name.clone();
454        // Logic to get service information goes here.
455        let desc = mssf_core::types::ServiceQueryDescription {
456            application_name: app_name_cp,
457            ..Default::default()
458        };
459        let services = self
460            .fc
461            .get_query_manager()
462            .get_service_list(&desc, DEFAULT_TIMEOUT, Some(token.clone()))
463            .await
464            .inspect_err(|err| {
465                tracing::error!("Failed to get service list for app {app_name}: {err}");
466            })?;
467        Ok(services.items)
468    }
469
470    async fn get_all_partitions_for_svc(
471        &self,
472        token: BoxedCancelToken,
473        service_name: Uri,
474    ) -> mssf_core::Result<Vec<mssf_core::types::ServicePartitionQueryResultItem>> {
475        // Logic to get partition information goes here.
476        let desc = mssf_core::types::ServicePartitionQueryDescription {
477            service_name,
478            partition_id_filter: None,
479        };
480        let partitions = self
481            .fc
482            .get_query_manager()
483            .get_partition_list(&desc, DEFAULT_TIMEOUT, Some(token.clone()))
484            .await
485            .inspect_err(|err| {
486                tracing::error!("Failed to get partition list: {}", err);
487            })?
488            .service_partitions;
489        Ok(partitions)
490    }
491
492    async fn get_all_replicas_for_partition(
493        &self,
494        token: BoxedCancelToken,
495        partition_id: mssf_core::GUID,
496    ) -> mssf_core::Result<Vec<mssf_core::types::ServiceReplicaQueryResultItem>> {
497        // Logic to get replica information goes here.
498        let desc = mssf_core::types::ServiceReplicaQueryDescription {
499            partition_id,
500            ..Default::default()
501        };
502        let replicas = self
503            .fc
504            .get_query_manager()
505            .get_replica_list(&desc, DEFAULT_TIMEOUT, Some(token.clone()))
506            .await
507            .inspect_err(|err| {
508                tracing::error!("Failed to get replica list: {}", err);
509            })?
510            .service_replicas;
511        Ok(replicas)
512    }
513}