1use crate::monitoring::{
7 NodeHealthEntity, ProducerEvent,
8 entities::{ClusterHealthEntity, LoopKind},
9};
10use ::tokio::sync::mpsc;
11use mssf_core::{
12 client::FabricClient,
13 runtime::executor::BoxedCancelToken,
14 types::{
15 ApplicationHealthStatesFilter, ApplicationQueryDescription, ClusterHealthQueryDescription,
16 HealthEventsFilter, HealthStateFilterFlags, NodeHealthQueryDescription,
17 NodeHealthStatesFilter, NodeQueryResultItem, Uri,
18 },
19};
20use std::time::Duration;
21
22pub struct HealthDataProducer {
25 fc: FabricClient,
26 interval: Duration,
27 sender: mpsc::UnboundedSender<ProducerEvent>,
28}
29
30const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
32
33pub enum Action {
34 Stop,
35}
36
37impl HealthDataProducer {
38 pub fn new(
39 fc: FabricClient,
40 interval: Duration,
41 sender: mpsc::UnboundedSender<ProducerEvent>,
42 ) -> Self {
43 HealthDataProducer {
44 fc,
45 interval,
46 sender,
47 }
48 }
49
50 fn send_event(&self, event: ProducerEvent) -> Result<(), Action> {
51 self.sender.send(event).map_err(|_| {
52 tracing::error!("Receiver dropped, cannot send more data.");
53 Action::Stop
54 })
55 }
56
57 pub(crate) async fn run_once_cluster_and_nodes(
62 &self,
63 token: BoxedCancelToken,
64 ) -> Result<(), Action> {
65 if let Some(entity) = self.produce_cluster_health_entity(token.clone()).await {
67 self.send_event(entity)?;
68 }
69 if let Ok(nodes) = self.get_all_nodes(token.clone()).await {
71 for node in nodes {
72 if let Some(entity) = self.produce_node_health_entity(token.clone(), node).await {
73 self.send_event(entity)?;
74 }
75 }
76 }
77 self.send_event(ProducerEvent::IterationComplete(LoopKind::ClusterNode))?;
78 Ok(())
79 }
80
81 pub(crate) async fn run_once_applications(
87 &self,
88 token: BoxedCancelToken,
89 ) -> Result<(), Action> {
90 if let Ok(apps) = self.get_all_applications(token.clone()).await {
92 for app in apps {
93 let app_name = app.application_name.clone();
94 if let Some(entity) = self
95 .produce_application_health_entity(token.clone(), app)
96 .await
97 {
98 self.send_event(entity)?;
99 }
100
101 if let Ok(services) = self
103 .get_all_services_for_app(token.clone(), app_name.clone())
104 .await
105 {
106 for svc in services {
107 let svc_name = svc.get_service_name().clone();
108 if let Some(entity) =
110 self.produce_service_health_entity(token.clone(), svc).await
111 {
112 self.send_event(entity)?;
113 }
114
115 if let Ok(partitions) = self
117 .get_all_partitions_for_svc(token.clone(), svc_name.clone())
118 .await
119 {
120 for partition in partitions {
121 let partition_id = partition.get_partition_id();
122 if let Some(entity) = self
124 .produce_partition_health_entity(
125 token.clone(),
126 partition,
127 svc_name.clone(),
128 app_name.clone(),
129 )
130 .await
131 {
132 self.send_event(entity)?;
133 }
134 if let Ok(replicas) = self
136 .get_all_replicas_for_partition(token.clone(), partition_id)
137 .await
138 {
139 for replica in replicas {
140 if let Some(entity) = self
142 .produce_replica_health_entity(
143 token.clone(),
144 partition_id,
145 replica,
146 svc_name.clone(),
147 app_name.clone(),
148 )
149 .await
150 {
151 self.send_event(entity)?;
152 }
153 }
154 }
155 }
156 }
157 }
158 }
159 }
160 }
161 self.send_event(ProducerEvent::IterationComplete(LoopKind::Application))?;
162 Ok(())
163 }
164
165 pub async fn run_loop(&self, token: BoxedCancelToken) {
170 tokio::join!(
171 self.run_cluster_node_loop(token.clone()),
172 self.run_application_loop(token.clone()),
173 );
174 tracing::info!("Health data producer loops exited.");
175 }
176
177 pub async fn run_cluster_node_loop(&self, token: BoxedCancelToken) {
179 self.run_interval_loop(token, "cluster/node", |token| {
180 self.run_once_cluster_and_nodes(token)
181 })
182 .await;
183 }
184
185 pub async fn run_application_loop(&self, token: BoxedCancelToken) {
187 self.run_interval_loop(token, "application", |token| {
188 self.run_once_applications(token)
189 })
190 .await;
191 }
192
193 async fn run_interval_loop<F, Fut>(&self, token: BoxedCancelToken, name: &str, mut run_once: F)
195 where
196 F: FnMut(BoxedCancelToken) -> Fut,
197 Fut: std::future::Future<Output = Result<(), Action>>,
198 {
199 loop {
200 let start_time = ::tokio::time::Instant::now();
201 if let Err(Action::Stop) = run_once(token.clone()).await {
202 tracing::info!("Health data {name} producer stopped.");
203 break;
204 }
205
206 let elapsed = start_time.elapsed();
208 if elapsed < self.interval {
210 let wait_duration = self.interval - elapsed;
211
212 tokio::select! {
213 _ = token.wait() => {
214 tracing::info!("Cancellation requested, exiting health data {name} producer loop.");
215 break;
216 }
217 _ = tokio::time::sleep(wait_duration) => {}
218 }
219 }
220
221 if token.is_cancelled() {
222 tracing::info!("Cancellation requested, exiting health data {name} producer loop.");
223 break;
224 }
225 }
226 tracing::info!("Health data {name} producer loop exited.");
227 }
228}
229
230impl HealthDataProducer {
231 async fn produce_cluster_health_entity(
232 &self,
233 token: BoxedCancelToken,
234 ) -> Option<ProducerEvent> {
235 let desc = ClusterHealthQueryDescription {
240 nodes_filter: Some(NodeHealthStatesFilter {
241 health_state_filter: HealthStateFilterFlags::NONE,
242 }),
243 applications_filter: Some(ApplicationHealthStatesFilter {
244 health_state_filter: HealthStateFilterFlags::NONE,
245 }),
246 ..Default::default()
247 };
248 let cluster_healths = self
249 .fc
250 .get_health_manager()
251 .get_cluster_health(&desc, DEFAULT_TIMEOUT, Some(token))
252 .await
253 .inspect_err(|err| {
254 tracing::error!("Failed to get cluster health: {}", err);
255 })
256 .ok()?;
257 Some(ProducerEvent::Cluster(ClusterHealthEntity {
258 health: cluster_healths,
259 }))
260 }
261
262 async fn produce_node_health_entity(
264 &self,
265 token: BoxedCancelToken,
266 node: NodeQueryResultItem,
267 ) -> Option<ProducerEvent> {
268 let desc = NodeHealthQueryDescription {
271 node_name: node.name.clone(),
272 events_filter: Some(HealthEventsFilter {
274 health_state_filter: HealthStateFilterFlags::NONE,
275 }),
276 ..Default::default()
277 };
278 let node_healths = self
279 .fc
280 .get_health_manager()
281 .get_node_health(&desc, DEFAULT_TIMEOUT, Some(token))
282 .await
283 .inspect_err(|err| {
284 tracing::error!("Failed to get node health: {}", err);
285 })
286 .ok()?;
287 Some(ProducerEvent::Node(NodeHealthEntity {
288 node,
289 health: node_healths,
290 }))
291 }
292
293 async fn produce_application_health_entity(
294 &self,
295 token: BoxedCancelToken,
296 app: mssf_core::types::ApplicationQueryResultItem,
297 ) -> Option<ProducerEvent> {
298 let desc = mssf_core::types::ApplicationHealthQueryDescription {
299 application_name: app.application_name.clone(),
300 ..Default::default()
301 };
302 let app_health = self
303 .fc
304 .get_health_manager()
305 .get_application_health(&desc, DEFAULT_TIMEOUT, Some(token))
306 .await
307 .inspect_err(|err| {
308 tracing::error!("Failed to get application health: {}", err);
309 })
310 .ok()?;
311 Some(ProducerEvent::Application(
312 crate::monitoring::entities::ApplicationHealthEntity {
313 application: app,
314 health: app_health,
315 },
316 ))
317 }
318
319 async fn produce_service_health_entity(
320 &self,
321 token: BoxedCancelToken,
322 svc: mssf_core::types::ServiceQueryResultItem,
323 ) -> Option<ProducerEvent> {
324 let svc_name = svc.get_service_name().clone();
325 let desc = mssf_core::types::ServiceHealthQueryDescription {
326 service_name: svc_name,
327 ..Default::default()
328 };
329 let svc_health = self
330 .fc
331 .get_health_manager()
332 .get_service_health(&desc, DEFAULT_TIMEOUT, Some(token))
333 .await
334 .inspect_err(|err| {
335 tracing::error!("Failed to get service health: {}", err);
336 })
337 .ok()?;
338 Some(ProducerEvent::Service(
339 crate::monitoring::entities::ServiceHealthEntity {
340 health: svc_health,
341 service: svc,
342 },
343 ))
344 }
345
346 async fn produce_partition_health_entity(
347 &self,
348 token: BoxedCancelToken,
349 part: mssf_core::types::ServicePartitionQueryResultItem,
350 service_name: Uri,
351 application_name: Uri,
352 ) -> Option<ProducerEvent> {
353 let partition_id = part.get_partition_id();
354 let desc = mssf_core::types::PartitionHealthQueryDescription {
355 partition_id,
356 ..Default::default()
357 };
358 let part_health = self
359 .fc
360 .get_health_manager()
361 .get_partition_health(&desc, DEFAULT_TIMEOUT, Some(token))
362 .await
363 .inspect_err(|err| {
364 tracing::error!("Failed to get partition health: {}", err);
365 })
366 .ok()?;
367 Some(ProducerEvent::Partition(
368 crate::monitoring::entities::PartitionHealthEntity {
369 health: part_health,
370 partition: part,
371 service_name,
372 application_name,
373 },
374 ))
375 }
376
377 async fn produce_replica_health_entity(
378 &self,
379 token: BoxedCancelToken,
380 partition_id: mssf_core::GUID,
381 replica: mssf_core::types::ServiceReplicaQueryResultItem,
382 service_name: Uri,
383 application_name: Uri,
384 ) -> Option<ProducerEvent> {
385 let desc = mssf_core::types::ReplicaHealthQueryDescription {
386 partition_id,
387 replica_id_or_instance_id: replica.get_replica_or_instance_id(),
388 ..Default::default()
389 };
390 let replica_health = self
391 .fc
392 .get_health_manager()
393 .get_replica_health(&desc, DEFAULT_TIMEOUT, Some(token))
394 .await
395 .inspect_err(|err| {
396 tracing::error!("Failed to get replica health: {}", err);
397 })
398 .ok()?;
399 Some(ProducerEvent::Replica(
400 crate::monitoring::entities::ReplicaHealthEntity {
401 health: replica_health,
402 replica,
403 service_name,
404 application_name,
405 },
406 ))
407 }
408}
409
410impl HealthDataProducer {
412 async fn get_all_nodes(
413 &self,
414 token: BoxedCancelToken,
415 ) -> mssf_core::Result<Vec<NodeQueryResultItem>> {
416 let desc = &Default::default();
418 let nodes = self
419 .fc
420 .get_query_manager()
421 .get_node_list(desc, DEFAULT_TIMEOUT, Some(token.clone()))
422 .await
423 .inspect_err(|err| {
424 tracing::error!("Failed to get node list: {}", err);
425 })?
426 .nodes;
427 Ok(nodes)
428 }
429
430 async fn get_all_applications(
433 &self,
434 token: BoxedCancelToken,
435 ) -> mssf_core::Result<Vec<mssf_core::types::ApplicationQueryResultItem>> {
436 let desc = ApplicationQueryDescription::default();
437 let apps = self
438 .fc
439 .get_query_manager()
440 .get_application_list(&desc, DEFAULT_TIMEOUT, Some(token.clone()))
441 .await
442 .inspect_err(|err| {
443 tracing::error!("Failed to get application list: {}", err);
444 })?
445 .items;
446 Ok(apps)
447 }
448 async fn get_all_services_for_app(
449 &self,
450 token: BoxedCancelToken,
451 app_name: Uri,
452 ) -> mssf_core::Result<Vec<mssf_core::types::ServiceQueryResultItem>> {
453 let app_name_cp = app_name.clone();
454 let desc = mssf_core::types::ServiceQueryDescription {
456 application_name: app_name_cp,
457 ..Default::default()
458 };
459 let services = self
460 .fc
461 .get_query_manager()
462 .get_service_list(&desc, DEFAULT_TIMEOUT, Some(token.clone()))
463 .await
464 .inspect_err(|err| {
465 tracing::error!("Failed to get service list for app {app_name}: {err}");
466 })?;
467 Ok(services.items)
468 }
469
470 async fn get_all_partitions_for_svc(
471 &self,
472 token: BoxedCancelToken,
473 service_name: Uri,
474 ) -> mssf_core::Result<Vec<mssf_core::types::ServicePartitionQueryResultItem>> {
475 let desc = mssf_core::types::ServicePartitionQueryDescription {
477 service_name,
478 partition_id_filter: None,
479 };
480 let partitions = self
481 .fc
482 .get_query_manager()
483 .get_partition_list(&desc, DEFAULT_TIMEOUT, Some(token.clone()))
484 .await
485 .inspect_err(|err| {
486 tracing::error!("Failed to get partition list: {}", err);
487 })?
488 .service_partitions;
489 Ok(partitions)
490 }
491
492 async fn get_all_replicas_for_partition(
493 &self,
494 token: BoxedCancelToken,
495 partition_id: mssf_core::GUID,
496 ) -> mssf_core::Result<Vec<mssf_core::types::ServiceReplicaQueryResultItem>> {
497 let desc = mssf_core::types::ServiceReplicaQueryDescription {
499 partition_id,
500 ..Default::default()
501 };
502 let replicas = self
503 .fc
504 .get_query_manager()
505 .get_replica_list(&desc, DEFAULT_TIMEOUT, Some(token.clone()))
506 .await
507 .inspect_err(|err| {
508 tracing::error!("Failed to get replica list: {}", err);
509 })?
510 .service_replicas;
511 Ok(replicas)
512 }
513}