Skip to main content

mssf_util/monitoring/
producer.rs

1// ------------------------------------------------------------
2// Copyright (c) Microsoft Corporation.  All rights reserved.
3// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
4// ------------------------------------------------------------
5
6use crate::monitoring::{HealthEntity, NodeHealthEntity, entities::ClusterHealthEntity};
7use ::tokio::sync::mpsc;
8use mssf_core::{
9    client::FabricClient,
10    runtime::executor::BoxedCancelToken,
11    types::{
12        ApplicationHealthStatesFilter, ApplicationQueryDescription, ClusterHealthQueryDescription,
13        HealthEventsFilter, HealthStateFilterFlags, NodeHealthQueryDescription,
14        NodeHealthStatesFilter, NodeQueryResultItem, Uri,
15    },
16};
17use std::time::Duration;
18
19/// Queries SF and produces health data.
20/// User is responsible to implement a consumer to receive the data.
21pub struct HealthDataProducer {
22    fc: FabricClient,
23    interval: Duration,
24    sender: mpsc::UnboundedSender<HealthEntity>,
25    iteration: std::sync::atomic::AtomicU64,
26}
27
28/// Default timeout for FabricClient operations.
29const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
30
31pub enum Action {
32    Stop,
33}
34
35impl HealthDataProducer {
36    pub fn new(
37        fc: FabricClient,
38        interval: Duration,
39        sender: mpsc::UnboundedSender<HealthEntity>,
40    ) -> Self {
41        HealthDataProducer {
42            fc,
43            interval,
44            sender,
45            iteration: std::sync::atomic::AtomicU64::new(0),
46        }
47    }
48
49    fn send_entity(&self, entity: HealthEntity) -> Result<(), Action> {
50        self.sender.send(entity).map_err(|_| {
51            tracing::error!("Receiver dropped, cannot send more data.");
52            Action::Stop
53        })
54    }
55
56    /// Run once to produce health data.
57    pub(crate) async fn run_once(&self, token: BoxedCancelToken) -> Result<(), Action> {
58        // Get cluster health information.
59        if let Some(entity) = self.produce_cluster_health_entity(token.clone()).await {
60            self.send_entity(entity)?;
61        }
62        // Get node information.
63        if let Ok(nodes) = self.get_all_nodes(token.clone()).await {
64            for node in nodes {
65                if let Some(entity) = self.produce_node_health_entity(token.clone(), node).await {
66                    self.send_entity(entity)?;
67                }
68            }
69        }
70        // Get application information.
71        if let Ok(apps) = self.get_all_applications(token.clone()).await {
72            for app in apps {
73                let app_name = app.application_name.clone();
74                if let Some(entity) = self
75                    .produce_application_health_entity(token.clone(), app)
76                    .await
77                {
78                    self.send_entity(entity)?;
79                }
80
81                // Get service information for the application.
82                if let Ok(services) = self
83                    .get_all_services_for_app(token.clone(), app_name.clone())
84                    .await
85                {
86                    for svc in services {
87                        let svc_name = svc.get_service_name().clone();
88                        // produce service health entity
89                        if let Some(entity) =
90                            self.produce_service_health_entity(token.clone(), svc).await
91                        {
92                            self.send_entity(entity)?;
93                        }
94
95                        // Get partition information for the service.
96                        if let Ok(partitions) = self
97                            .get_all_partitions_for_svc(token.clone(), svc_name.clone())
98                            .await
99                        {
100                            for partition in partitions {
101                                let partition_id = partition.get_partition_id();
102                                // produce partition health entity
103                                if let Some(entity) = self
104                                    .produce_partition_health_entity(
105                                        token.clone(),
106                                        partition,
107                                        svc_name.clone(),
108                                        app_name.clone(),
109                                    )
110                                    .await
111                                {
112                                    self.send_entity(entity)?;
113                                }
114                                // Get replica information for the partition.
115                                if let Ok(replicas) = self
116                                    .get_all_replicas_for_partition(token.clone(), partition_id)
117                                    .await
118                                {
119                                    for replica in replicas {
120                                        // produce replica health entity
121                                        if let Some(entity) = self
122                                            .produce_replica_health_entity(
123                                                token.clone(),
124                                                partition_id,
125                                                replica,
126                                                svc_name.clone(),
127                                                app_name.clone(),
128                                            )
129                                            .await
130                                        {
131                                            self.send_entity(entity)?;
132                                        }
133                                    }
134                                }
135                            }
136                        }
137                    }
138                }
139            }
140        }
141        self.iteration
142            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
143        Ok(())
144    }
145
146    /// Run a loop to produce health data.
147    pub async fn run_loop(&self, token: BoxedCancelToken) {
148        loop {
149            let start_time = ::tokio::time::Instant::now();
150            match self.run_once(token.clone()).await {
151                Err(Action::Stop) => {
152                    tracing::info!("Health data producer stopped.");
153                    break;
154                }
155                _ => {
156                    // continue the loop
157                }
158            }
159
160            // remaining time
161            let elapsed = start_time.elapsed();
162            // wait for more time if necessary.
163            if elapsed < self.interval {
164                let wait_duration = self.interval - elapsed;
165
166                tokio::select! {
167                    _ = token.wait() => {
168                        tracing::info!("Cancellation requested, exiting health data producer loop.");
169                        break;
170                    }
171                    _ = tokio::time::sleep(wait_duration) => {}
172                }
173            }
174
175            if token.is_cancelled() {
176                tracing::info!("Cancellation requested, exiting health data producer loop.");
177                break;
178            }
179        }
180        tracing::info!("Health data producer loop exited.");
181    }
182
183    pub fn get_iteration(&self) -> u64 {
184        self.iteration.load(std::sync::atomic::Ordering::Relaxed)
185    }
186}
187
188impl HealthDataProducer {
189    async fn produce_cluster_health_entity(&self, token: BoxedCancelToken) -> Option<HealthEntity> {
190        // Ignore nodes and app health because we retrieve them separately.
191        // Technically we can get everything in one call, but the payload might be too large,
192        // and we want to get other entities not present in the cluster health.
193        // For example, each system service health is not present in this result.
194        let desc = ClusterHealthQueryDescription {
195            nodes_filter: Some(NodeHealthStatesFilter {
196                health_state_filter: HealthStateFilterFlags::NONE,
197            }),
198            applications_filter: Some(ApplicationHealthStatesFilter {
199                health_state_filter: HealthStateFilterFlags::NONE,
200            }),
201            ..Default::default()
202        };
203        let cluster_healths = self
204            .fc
205            .get_health_manager()
206            .get_cluster_health(&desc, DEFAULT_TIMEOUT, Some(token))
207            .await
208            .inspect_err(|err| {
209                tracing::error!("Failed to get cluster health: {}", err);
210            })
211            .ok()?;
212        Some(HealthEntity::Cluster(ClusterHealthEntity {
213            health: cluster_healths,
214        }))
215    }
216
217    /// Produce the health entity for a node.
218    async fn produce_node_health_entity(
219        &self,
220        token: BoxedCancelToken,
221        node: NodeQueryResultItem,
222    ) -> Option<HealthEntity> {
223        // Logic to get node health goes here.
224
225        let desc = NodeHealthQueryDescription {
226            node_name: node.name.clone(),
227            // We only care about the aggregated health state.
228            events_filter: Some(HealthEventsFilter {
229                health_state_filter: HealthStateFilterFlags::NONE,
230            }),
231            ..Default::default()
232        };
233        let node_healths = self
234            .fc
235            .get_health_manager()
236            .get_node_health(&desc, DEFAULT_TIMEOUT, Some(token))
237            .await
238            .inspect_err(|err| {
239                tracing::error!("Failed to get node health: {}", err);
240            })
241            .ok()?;
242        Some(HealthEntity::Node(NodeHealthEntity {
243            node,
244            health: node_healths,
245        }))
246    }
247
248    async fn produce_application_health_entity(
249        &self,
250        token: BoxedCancelToken,
251        app: mssf_core::types::ApplicationQueryResultItem,
252    ) -> Option<HealthEntity> {
253        let desc = mssf_core::types::ApplicationHealthQueryDescription {
254            application_name: app.application_name.clone(),
255            ..Default::default()
256        };
257        let app_health = self
258            .fc
259            .get_health_manager()
260            .get_application_health(&desc, DEFAULT_TIMEOUT, Some(token))
261            .await
262            .inspect_err(|err| {
263                tracing::error!("Failed to get application health: {}", err);
264            })
265            .ok()?;
266        Some(HealthEntity::Application(
267            crate::monitoring::entities::ApplicationHealthEntity {
268                application: app,
269                health: app_health,
270            },
271        ))
272    }
273
274    async fn produce_service_health_entity(
275        &self,
276        token: BoxedCancelToken,
277        svc: mssf_core::types::ServiceQueryResultItem,
278    ) -> Option<HealthEntity> {
279        let svc_name = svc.get_service_name().clone();
280        let desc = mssf_core::types::ServiceHealthQueryDescription {
281            service_name: svc_name,
282            ..Default::default()
283        };
284        let svc_health = self
285            .fc
286            .get_health_manager()
287            .get_service_health(&desc, DEFAULT_TIMEOUT, Some(token))
288            .await
289            .inspect_err(|err| {
290                tracing::error!("Failed to get service health: {}", err);
291            })
292            .ok()?;
293        Some(HealthEntity::Service(
294            crate::monitoring::entities::ServiceHealthEntity {
295                health: svc_health,
296                service: svc,
297            },
298        ))
299    }
300
301    async fn produce_partition_health_entity(
302        &self,
303        token: BoxedCancelToken,
304        part: mssf_core::types::ServicePartitionQueryResultItem,
305        service_name: Uri,
306        application_name: Uri,
307    ) -> Option<HealthEntity> {
308        let partition_id = part.get_partition_id();
309        let desc = mssf_core::types::PartitionHealthQueryDescription {
310            partition_id,
311            ..Default::default()
312        };
313        let part_health = self
314            .fc
315            .get_health_manager()
316            .get_partition_health(&desc, DEFAULT_TIMEOUT, Some(token))
317            .await
318            .inspect_err(|err| {
319                tracing::error!("Failed to get partition health: {}", err);
320            })
321            .ok()?;
322        Some(HealthEntity::Partition(
323            crate::monitoring::entities::PartitionHealthEntity {
324                health: part_health,
325                partition: part,
326                service_name,
327                application_name,
328            },
329        ))
330    }
331
332    async fn produce_replica_health_entity(
333        &self,
334        token: BoxedCancelToken,
335        partition_id: mssf_core::GUID,
336        replica: mssf_core::types::ServiceReplicaQueryResultItem,
337        service_name: Uri,
338        application_name: Uri,
339    ) -> Option<HealthEntity> {
340        let desc = mssf_core::types::ReplicaHealthQueryDescription {
341            partition_id,
342            replica_id_or_instance_id: replica.get_replica_or_instance_id(),
343            ..Default::default()
344        };
345        let replica_health = self
346            .fc
347            .get_health_manager()
348            .get_replica_health(&desc, DEFAULT_TIMEOUT, Some(token))
349            .await
350            .inspect_err(|err| {
351                tracing::error!("Failed to get replica health: {}", err);
352            })
353            .ok()?;
354        Some(HealthEntity::Replica(
355            crate::monitoring::entities::ReplicaHealthEntity {
356                health: replica_health,
357                replica,
358                service_name,
359                application_name,
360            },
361        ))
362    }
363}
364
365// Get lists of entities
366impl HealthDataProducer {
367    async fn get_all_nodes(
368        &self,
369        token: BoxedCancelToken,
370    ) -> mssf_core::Result<Vec<NodeQueryResultItem>> {
371        // Logic to get node information goes here.
372        let desc = &Default::default();
373        let nodes = self
374            .fc
375            .get_query_manager()
376            .get_node_list(desc, DEFAULT_TIMEOUT, Some(token.clone()))
377            .await
378            .inspect_err(|err| {
379                tracing::error!("Failed to get node list: {}", err);
380            })?
381            .nodes;
382        Ok(nodes)
383    }
384
385    /// This does not include system application.
386    /// We will report system service health separately.
387    async fn get_all_applications(
388        &self,
389        token: BoxedCancelToken,
390    ) -> mssf_core::Result<Vec<mssf_core::types::ApplicationQueryResultItem>> {
391        let desc = ApplicationQueryDescription::default();
392        let apps = self
393            .fc
394            .get_query_manager()
395            .get_application_list(&desc, DEFAULT_TIMEOUT, Some(token.clone()))
396            .await
397            .inspect_err(|err| {
398                tracing::error!("Failed to get application list: {}", err);
399            })?
400            .items;
401        Ok(apps)
402    }
403    async fn get_all_services_for_app(
404        &self,
405        token: BoxedCancelToken,
406        app_name: Uri,
407    ) -> mssf_core::Result<Vec<mssf_core::types::ServiceQueryResultItem>> {
408        let app_name_cp = app_name.clone();
409        // Logic to get service information goes here.
410        let desc = mssf_core::types::ServiceQueryDescription {
411            application_name: app_name_cp,
412            ..Default::default()
413        };
414        let services = self
415            .fc
416            .get_query_manager()
417            .get_service_list(&desc, DEFAULT_TIMEOUT, Some(token.clone()))
418            .await
419            .inspect_err(|err| {
420                tracing::error!("Failed to get service list for app {app_name}: {err}");
421            })?;
422        Ok(services.items)
423    }
424
425    async fn get_all_partitions_for_svc(
426        &self,
427        token: BoxedCancelToken,
428        service_name: Uri,
429    ) -> mssf_core::Result<Vec<mssf_core::types::ServicePartitionQueryResultItem>> {
430        // Logic to get partition information goes here.
431        let desc = mssf_core::types::ServicePartitionQueryDescription {
432            service_name,
433            partition_id_filter: None,
434        };
435        let partitions = self
436            .fc
437            .get_query_manager()
438            .get_partition_list(&desc, DEFAULT_TIMEOUT, Some(token.clone()))
439            .await
440            .inspect_err(|err| {
441                tracing::error!("Failed to get partition list: {}", err);
442            })?
443            .service_partitions;
444        Ok(partitions)
445    }
446
447    async fn get_all_replicas_for_partition(
448        &self,
449        token: BoxedCancelToken,
450        partition_id: mssf_core::GUID,
451    ) -> mssf_core::Result<Vec<mssf_core::types::ServiceReplicaQueryResultItem>> {
452        // Logic to get replica information goes here.
453        let desc = mssf_core::types::ServiceReplicaQueryDescription {
454            partition_id,
455            ..Default::default()
456        };
457        let replicas = self
458            .fc
459            .get_query_manager()
460            .get_replica_list(&desc, DEFAULT_TIMEOUT, Some(token.clone()))
461            .await
462            .inspect_err(|err| {
463                tracing::error!("Failed to get replica list: {}", err);
464            })?
465            .service_replicas;
466        Ok(replicas)
467    }
468}