1use crate::monitoring::{HealthEntity, NodeHealthEntity, entities::ClusterHealthEntity};
7use ::tokio::sync::mpsc;
8use mssf_core::{
9 client::FabricClient,
10 runtime::executor::BoxedCancelToken,
11 types::{
12 ApplicationHealthStatesFilter, ApplicationQueryDescription, ClusterHealthQueryDescription,
13 HealthEventsFilter, HealthStateFilterFlags, NodeHealthQueryDescription,
14 NodeHealthStatesFilter, NodeQueryResultItem, Uri,
15 },
16};
17use std::time::Duration;
18
19pub struct HealthDataProducer {
22 fc: FabricClient,
23 interval: Duration,
24 sender: mpsc::UnboundedSender<HealthEntity>,
25 iteration: std::sync::atomic::AtomicU64,
26}
27
28const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
30
31pub enum Action {
32 Stop,
33}
34
35impl HealthDataProducer {
36 pub fn new(
37 fc: FabricClient,
38 interval: Duration,
39 sender: mpsc::UnboundedSender<HealthEntity>,
40 ) -> Self {
41 HealthDataProducer {
42 fc,
43 interval,
44 sender,
45 iteration: std::sync::atomic::AtomicU64::new(0),
46 }
47 }
48
49 fn send_entity(&self, entity: HealthEntity) -> Result<(), Action> {
50 self.sender.send(entity).map_err(|_| {
51 tracing::error!("Receiver dropped, cannot send more data.");
52 Action::Stop
53 })
54 }
55
56 pub(crate) async fn run_once(&self, token: BoxedCancelToken) -> Result<(), Action> {
58 if let Some(entity) = self.produce_cluster_health_entity(token.clone()).await {
60 self.send_entity(entity)?;
61 }
62 if let Ok(nodes) = self.get_all_nodes(token.clone()).await {
64 for node in nodes {
65 if let Some(entity) = self.produce_node_health_entity(token.clone(), node).await {
66 self.send_entity(entity)?;
67 }
68 }
69 }
70 if let Ok(apps) = self.get_all_applications(token.clone()).await {
72 for app in apps {
73 let app_name = app.application_name.clone();
74 if let Some(entity) = self
75 .produce_application_health_entity(token.clone(), app)
76 .await
77 {
78 self.send_entity(entity)?;
79 }
80
81 if let Ok(services) = self
83 .get_all_services_for_app(token.clone(), app_name.clone())
84 .await
85 {
86 for svc in services {
87 let svc_name = svc.get_service_name().clone();
88 if let Some(entity) =
90 self.produce_service_health_entity(token.clone(), svc).await
91 {
92 self.send_entity(entity)?;
93 }
94
95 if let Ok(partitions) = self
97 .get_all_partitions_for_svc(token.clone(), svc_name.clone())
98 .await
99 {
100 for partition in partitions {
101 let partition_id = partition.get_partition_id();
102 if let Some(entity) = self
104 .produce_partition_health_entity(
105 token.clone(),
106 partition,
107 svc_name.clone(),
108 app_name.clone(),
109 )
110 .await
111 {
112 self.send_entity(entity)?;
113 }
114 if let Ok(replicas) = self
116 .get_all_replicas_for_partition(token.clone(), partition_id)
117 .await
118 {
119 for replica in replicas {
120 if let Some(entity) = self
122 .produce_replica_health_entity(
123 token.clone(),
124 partition_id,
125 replica,
126 svc_name.clone(),
127 app_name.clone(),
128 )
129 .await
130 {
131 self.send_entity(entity)?;
132 }
133 }
134 }
135 }
136 }
137 }
138 }
139 }
140 }
141 self.iteration
142 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
143 Ok(())
144 }
145
146 pub async fn run_loop(&self, token: BoxedCancelToken) {
148 loop {
149 let start_time = ::tokio::time::Instant::now();
150 match self.run_once(token.clone()).await {
151 Err(Action::Stop) => {
152 tracing::info!("Health data producer stopped.");
153 break;
154 }
155 _ => {
156 }
158 }
159
160 let elapsed = start_time.elapsed();
162 if elapsed < self.interval {
164 let wait_duration = self.interval - elapsed;
165
166 tokio::select! {
167 _ = token.wait() => {
168 tracing::info!("Cancellation requested, exiting health data producer loop.");
169 break;
170 }
171 _ = tokio::time::sleep(wait_duration) => {}
172 }
173 }
174
175 if token.is_cancelled() {
176 tracing::info!("Cancellation requested, exiting health data producer loop.");
177 break;
178 }
179 }
180 tracing::info!("Health data producer loop exited.");
181 }
182
183 pub fn get_iteration(&self) -> u64 {
184 self.iteration.load(std::sync::atomic::Ordering::Relaxed)
185 }
186}
187
188impl HealthDataProducer {
189 async fn produce_cluster_health_entity(&self, token: BoxedCancelToken) -> Option<HealthEntity> {
190 let desc = ClusterHealthQueryDescription {
195 nodes_filter: Some(NodeHealthStatesFilter {
196 health_state_filter: HealthStateFilterFlags::NONE,
197 }),
198 applications_filter: Some(ApplicationHealthStatesFilter {
199 health_state_filter: HealthStateFilterFlags::NONE,
200 }),
201 ..Default::default()
202 };
203 let cluster_healths = self
204 .fc
205 .get_health_manager()
206 .get_cluster_health(&desc, DEFAULT_TIMEOUT, Some(token))
207 .await
208 .inspect_err(|err| {
209 tracing::error!("Failed to get cluster health: {}", err);
210 })
211 .ok()?;
212 Some(HealthEntity::Cluster(ClusterHealthEntity {
213 health: cluster_healths,
214 }))
215 }
216
217 async fn produce_node_health_entity(
219 &self,
220 token: BoxedCancelToken,
221 node: NodeQueryResultItem,
222 ) -> Option<HealthEntity> {
223 let desc = NodeHealthQueryDescription {
226 node_name: node.name.clone(),
227 events_filter: Some(HealthEventsFilter {
229 health_state_filter: HealthStateFilterFlags::NONE,
230 }),
231 ..Default::default()
232 };
233 let node_healths = self
234 .fc
235 .get_health_manager()
236 .get_node_health(&desc, DEFAULT_TIMEOUT, Some(token))
237 .await
238 .inspect_err(|err| {
239 tracing::error!("Failed to get node health: {}", err);
240 })
241 .ok()?;
242 Some(HealthEntity::Node(NodeHealthEntity {
243 node,
244 health: node_healths,
245 }))
246 }
247
248 async fn produce_application_health_entity(
249 &self,
250 token: BoxedCancelToken,
251 app: mssf_core::types::ApplicationQueryResultItem,
252 ) -> Option<HealthEntity> {
253 let desc = mssf_core::types::ApplicationHealthQueryDescription {
254 application_name: app.application_name.clone(),
255 ..Default::default()
256 };
257 let app_health = self
258 .fc
259 .get_health_manager()
260 .get_application_health(&desc, DEFAULT_TIMEOUT, Some(token))
261 .await
262 .inspect_err(|err| {
263 tracing::error!("Failed to get application health: {}", err);
264 })
265 .ok()?;
266 Some(HealthEntity::Application(
267 crate::monitoring::entities::ApplicationHealthEntity {
268 application: app,
269 health: app_health,
270 },
271 ))
272 }
273
274 async fn produce_service_health_entity(
275 &self,
276 token: BoxedCancelToken,
277 svc: mssf_core::types::ServiceQueryResultItem,
278 ) -> Option<HealthEntity> {
279 let svc_name = svc.get_service_name().clone();
280 let desc = mssf_core::types::ServiceHealthQueryDescription {
281 service_name: svc_name,
282 ..Default::default()
283 };
284 let svc_health = self
285 .fc
286 .get_health_manager()
287 .get_service_health(&desc, DEFAULT_TIMEOUT, Some(token))
288 .await
289 .inspect_err(|err| {
290 tracing::error!("Failed to get service health: {}", err);
291 })
292 .ok()?;
293 Some(HealthEntity::Service(
294 crate::monitoring::entities::ServiceHealthEntity {
295 health: svc_health,
296 service: svc,
297 },
298 ))
299 }
300
301 async fn produce_partition_health_entity(
302 &self,
303 token: BoxedCancelToken,
304 part: mssf_core::types::ServicePartitionQueryResultItem,
305 service_name: Uri,
306 application_name: Uri,
307 ) -> Option<HealthEntity> {
308 let partition_id = part.get_partition_id();
309 let desc = mssf_core::types::PartitionHealthQueryDescription {
310 partition_id,
311 ..Default::default()
312 };
313 let part_health = self
314 .fc
315 .get_health_manager()
316 .get_partition_health(&desc, DEFAULT_TIMEOUT, Some(token))
317 .await
318 .inspect_err(|err| {
319 tracing::error!("Failed to get partition health: {}", err);
320 })
321 .ok()?;
322 Some(HealthEntity::Partition(
323 crate::monitoring::entities::PartitionHealthEntity {
324 health: part_health,
325 partition: part,
326 service_name,
327 application_name,
328 },
329 ))
330 }
331
332 async fn produce_replica_health_entity(
333 &self,
334 token: BoxedCancelToken,
335 partition_id: mssf_core::GUID,
336 replica: mssf_core::types::ServiceReplicaQueryResultItem,
337 service_name: Uri,
338 application_name: Uri,
339 ) -> Option<HealthEntity> {
340 let desc = mssf_core::types::ReplicaHealthQueryDescription {
341 partition_id,
342 replica_id_or_instance_id: replica.get_replica_or_instance_id(),
343 ..Default::default()
344 };
345 let replica_health = self
346 .fc
347 .get_health_manager()
348 .get_replica_health(&desc, DEFAULT_TIMEOUT, Some(token))
349 .await
350 .inspect_err(|err| {
351 tracing::error!("Failed to get replica health: {}", err);
352 })
353 .ok()?;
354 Some(HealthEntity::Replica(
355 crate::monitoring::entities::ReplicaHealthEntity {
356 health: replica_health,
357 replica,
358 service_name,
359 application_name,
360 },
361 ))
362 }
363}
364
365impl HealthDataProducer {
367 async fn get_all_nodes(
368 &self,
369 token: BoxedCancelToken,
370 ) -> mssf_core::Result<Vec<NodeQueryResultItem>> {
371 let desc = &Default::default();
373 let nodes = self
374 .fc
375 .get_query_manager()
376 .get_node_list(desc, DEFAULT_TIMEOUT, Some(token.clone()))
377 .await
378 .inspect_err(|err| {
379 tracing::error!("Failed to get node list: {}", err);
380 })?
381 .nodes;
382 Ok(nodes)
383 }
384
385 async fn get_all_applications(
388 &self,
389 token: BoxedCancelToken,
390 ) -> mssf_core::Result<Vec<mssf_core::types::ApplicationQueryResultItem>> {
391 let desc = ApplicationQueryDescription::default();
392 let apps = self
393 .fc
394 .get_query_manager()
395 .get_application_list(&desc, DEFAULT_TIMEOUT, Some(token.clone()))
396 .await
397 .inspect_err(|err| {
398 tracing::error!("Failed to get application list: {}", err);
399 })?
400 .items;
401 Ok(apps)
402 }
403 async fn get_all_services_for_app(
404 &self,
405 token: BoxedCancelToken,
406 app_name: Uri,
407 ) -> mssf_core::Result<Vec<mssf_core::types::ServiceQueryResultItem>> {
408 let app_name_cp = app_name.clone();
409 let desc = mssf_core::types::ServiceQueryDescription {
411 application_name: app_name_cp,
412 ..Default::default()
413 };
414 let services = self
415 .fc
416 .get_query_manager()
417 .get_service_list(&desc, DEFAULT_TIMEOUT, Some(token.clone()))
418 .await
419 .inspect_err(|err| {
420 tracing::error!("Failed to get service list for app {app_name}: {err}");
421 })?;
422 Ok(services.items)
423 }
424
425 async fn get_all_partitions_for_svc(
426 &self,
427 token: BoxedCancelToken,
428 service_name: Uri,
429 ) -> mssf_core::Result<Vec<mssf_core::types::ServicePartitionQueryResultItem>> {
430 let desc = mssf_core::types::ServicePartitionQueryDescription {
432 service_name,
433 partition_id_filter: None,
434 };
435 let partitions = self
436 .fc
437 .get_query_manager()
438 .get_partition_list(&desc, DEFAULT_TIMEOUT, Some(token.clone()))
439 .await
440 .inspect_err(|err| {
441 tracing::error!("Failed to get partition list: {}", err);
442 })?
443 .service_partitions;
444 Ok(partitions)
445 }
446
447 async fn get_all_replicas_for_partition(
448 &self,
449 token: BoxedCancelToken,
450 partition_id: mssf_core::GUID,
451 ) -> mssf_core::Result<Vec<mssf_core::types::ServiceReplicaQueryResultItem>> {
452 let desc = mssf_core::types::ServiceReplicaQueryDescription {
454 partition_id,
455 ..Default::default()
456 };
457 let replicas = self
458 .fc
459 .get_query_manager()
460 .get_replica_list(&desc, DEFAULT_TIMEOUT, Some(token.clone()))
461 .await
462 .inspect_err(|err| {
463 tracing::error!("Failed to get replica list: {}", err);
464 })?
465 .service_replicas;
466 Ok(replicas)
467 }
468}