1use crate::monitoring::{HealthEntity, NodeHealthEntity, entities::ClusterHealthEntity};
7use ::tokio::sync::mpsc;
8use mssf_core::{
9 client::FabricClient,
10 runtime::executor::BoxedCancelToken,
11 types::{
12 ApplicationHealthStatesFilter, ApplicationQueryDescription, ClusterHealthQueryDescription,
13 HealthEventsFilter, HealthStateFilterFlags, NodeHealthQueryDescription,
14 NodeHealthStatesFilter, NodeQueryResultItem, Uri,
15 },
16};
17use std::time::Duration;
18
19pub struct HealthDataProducer {
22 fc: FabricClient,
23 interval: Duration,
24 sender: mpsc::UnboundedSender<HealthEntity>,
25 iteration: std::sync::atomic::AtomicU64,
26}
27
28const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
30
31pub enum Action {
32 Stop,
33}
34
35impl HealthDataProducer {
36 pub fn new(
37 fc: FabricClient,
38 interval: Duration,
39 sender: mpsc::UnboundedSender<HealthEntity>,
40 ) -> Self {
41 HealthDataProducer {
42 fc,
43 interval,
44 sender,
45 iteration: std::sync::atomic::AtomicU64::new(0),
46 }
47 }
48
49 fn send_entity(&self, entity: HealthEntity) -> Result<(), Action> {
50 self.sender.send(entity).map_err(|_| {
51 tracing::error!("Receiver dropped, cannot send more data.");
52 Action::Stop
53 })
54 }
55
56 pub(crate) async fn run_once(&self, token: BoxedCancelToken) -> Result<(), Action> {
58 if let Some(entity) = self.produce_cluster_health_entity(token.clone()).await {
60 self.send_entity(entity)?;
61 }
62 if let Ok(nodes) = self.get_all_nodes(token.clone()).await {
64 for node in nodes {
65 if let Some(entity) = self.produce_node_health_entity(token.clone(), node).await {
66 self.send_entity(entity)?;
67 }
68 }
69 }
70 if let Ok(apps) = self.get_all_applications(token.clone()).await {
72 for app in apps {
73 let app_name = app.application_name.clone();
74 if let Some(entity) = self
75 .produce_application_health_entity(token.clone(), app)
76 .await
77 {
78 self.send_entity(entity)?;
79 }
80
81 if let Ok(services) = self.get_all_services_for_app(token.clone(), app_name).await {
83 for svc in services {
84 let svc_name = svc.get_service_name().clone();
85 if let Some(entity) =
87 self.produce_service_health_entity(token.clone(), svc).await
88 {
89 self.send_entity(entity)?;
90 }
91
92 if let Ok(partitions) = self
94 .get_all_partitions_for_svc(token.clone(), svc_name)
95 .await
96 {
97 for partition in partitions {
98 let partition_id = partition.get_partition_id();
99 if let Some(entity) = self
101 .produce_partition_health_entity(token.clone(), partition)
102 .await
103 {
104 self.send_entity(entity)?;
105 }
106 if let Ok(replicas) = self
108 .get_all_replicas_for_partition(token.clone(), partition_id)
109 .await
110 {
111 for replica in replicas {
112 if let Some(entity) = self
114 .produce_replica_health_entity(
115 token.clone(),
116 partition_id,
117 replica,
118 )
119 .await
120 {
121 self.send_entity(entity)?;
122 }
123 }
124 }
125 }
126 }
127 }
128 }
129 }
130 }
131 self.iteration
132 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
133 Ok(())
134 }
135
136 pub async fn run_loop(&self, token: BoxedCancelToken) {
138 loop {
139 let start_time = ::tokio::time::Instant::now();
140 match self.run_once(token.clone()).await {
141 Err(Action::Stop) => {
142 tracing::info!("Health data producer stopped.");
143 break;
144 }
145 _ => {
146 }
148 }
149
150 let elapsed = start_time.elapsed();
152 if elapsed < self.interval {
154 let wait_duration = self.interval - elapsed;
155
156 tokio::select! {
157 _ = token.wait() => {
158 tracing::info!("Cancellation requested, exiting health data producer loop.");
159 break;
160 }
161 _ = tokio::time::sleep(wait_duration) => {}
162 }
163 }
164
165 if token.is_cancelled() {
166 tracing::info!("Cancellation requested, exiting health data producer loop.");
167 break;
168 }
169 }
170 tracing::info!("Health data producer loop exited.");
171 }
172
173 pub fn get_iteration(&self) -> u64 {
174 self.iteration.load(std::sync::atomic::Ordering::Relaxed)
175 }
176}
177
178impl HealthDataProducer {
179 async fn produce_cluster_health_entity(&self, token: BoxedCancelToken) -> Option<HealthEntity> {
180 let desc = ClusterHealthQueryDescription {
185 nodes_filter: Some(NodeHealthStatesFilter {
186 health_state_filter: HealthStateFilterFlags::NONE,
187 }),
188 applications_filter: Some(ApplicationHealthStatesFilter {
189 health_state_filter: HealthStateFilterFlags::NONE,
190 }),
191 ..Default::default()
192 };
193 let cluster_healths = self
194 .fc
195 .get_health_manager()
196 .get_cluster_health(&desc, DEFAULT_TIMEOUT, Some(token))
197 .await
198 .inspect_err(|err| {
199 tracing::error!("Failed to get cluster health: {}", err);
200 })
201 .ok()?;
202 Some(HealthEntity::Cluster(ClusterHealthEntity {
203 health: cluster_healths,
204 }))
205 }
206
207 async fn produce_node_health_entity(
209 &self,
210 token: BoxedCancelToken,
211 node: NodeQueryResultItem,
212 ) -> Option<HealthEntity> {
213 let desc = NodeHealthQueryDescription {
216 node_name: node.name.clone(),
217 events_filter: Some(HealthEventsFilter {
219 health_state_filter: HealthStateFilterFlags::NONE,
220 }),
221 ..Default::default()
222 };
223 let node_healths = self
224 .fc
225 .get_health_manager()
226 .get_node_health(&desc, DEFAULT_TIMEOUT, Some(token))
227 .await
228 .inspect_err(|err| {
229 tracing::error!("Failed to get node health: {}", err);
230 })
231 .ok()?;
232 Some(HealthEntity::Node(NodeHealthEntity {
233 node,
234 health: node_healths,
235 }))
236 }
237
238 async fn produce_application_health_entity(
239 &self,
240 token: BoxedCancelToken,
241 app: mssf_core::types::ApplicationQueryResultItem,
242 ) -> Option<HealthEntity> {
243 let desc = mssf_core::types::ApplicationHealthQueryDescription {
244 application_name: app.application_name.clone(),
245 ..Default::default()
246 };
247 let app_health = self
248 .fc
249 .get_health_manager()
250 .get_application_health(&desc, DEFAULT_TIMEOUT, Some(token))
251 .await
252 .inspect_err(|err| {
253 tracing::error!("Failed to get application health: {}", err);
254 })
255 .ok()?;
256 Some(HealthEntity::Application(
257 crate::monitoring::entities::ApplicationHealthEntity {
258 application: app,
259 health: app_health,
260 },
261 ))
262 }
263
264 async fn produce_service_health_entity(
265 &self,
266 token: BoxedCancelToken,
267 svc: mssf_core::types::ServiceQueryResultItem,
268 ) -> Option<HealthEntity> {
269 let svc_name = svc.get_service_name().clone();
270 let desc = mssf_core::types::ServiceHealthQueryDescription {
271 service_name: svc_name,
272 ..Default::default()
273 };
274 let svc_health = self
275 .fc
276 .get_health_manager()
277 .get_service_health(&desc, DEFAULT_TIMEOUT, Some(token))
278 .await
279 .inspect_err(|err| {
280 tracing::error!("Failed to get service health: {}", err);
281 })
282 .ok()?;
283 Some(HealthEntity::Service(
284 crate::monitoring::entities::ServiceHealthEntity {
285 health: svc_health,
286 service: svc,
287 },
288 ))
289 }
290 async fn produce_partition_health_entity(
291 &self,
292 token: BoxedCancelToken,
293 part: mssf_core::types::ServicePartitionQueryResultItem,
294 ) -> Option<HealthEntity> {
295 let partition_id = part.get_partition_id();
296 let desc = mssf_core::types::PartitionHealthQueryDescription {
297 partition_id,
298 ..Default::default()
299 };
300 let part_health = self
301 .fc
302 .get_health_manager()
303 .get_partition_health(&desc, DEFAULT_TIMEOUT, Some(token))
304 .await
305 .inspect_err(|err| {
306 tracing::error!("Failed to get partition health: {}", err);
307 })
308 .ok()?;
309 Some(HealthEntity::Partition(
310 crate::monitoring::entities::PartitionHealthEntity {
311 health: part_health,
312 partition: part,
313 },
314 ))
315 }
316 async fn produce_replica_health_entity(
317 &self,
318 token: BoxedCancelToken,
319 partition_id: mssf_core::GUID,
320 replica: mssf_core::types::ServiceReplicaQueryResultItem,
321 ) -> Option<HealthEntity> {
322 let desc = mssf_core::types::ReplicaHealthQueryDescription {
323 partition_id,
324 replica_id_or_instance_id: replica.get_replica_or_instance_id(),
325 ..Default::default()
326 };
327 let replica_health = self
328 .fc
329 .get_health_manager()
330 .get_replica_health(&desc, DEFAULT_TIMEOUT, Some(token))
331 .await
332 .inspect_err(|err| {
333 tracing::error!("Failed to get replica health: {}", err);
334 })
335 .ok()?;
336 Some(HealthEntity::Replica(
337 crate::monitoring::entities::ReplicaHealthEntity {
338 health: replica_health,
339 replica,
340 },
341 ))
342 }
343}
344
345impl HealthDataProducer {
347 async fn get_all_nodes(
348 &self,
349 token: BoxedCancelToken,
350 ) -> mssf_core::Result<Vec<NodeQueryResultItem>> {
351 let desc = &Default::default();
353 let nodes = self
354 .fc
355 .get_query_manager()
356 .get_node_list(desc, DEFAULT_TIMEOUT, Some(token.clone()))
357 .await
358 .inspect_err(|err| {
359 tracing::error!("Failed to get node list: {}", err);
360 })?
361 .nodes;
362 Ok(nodes)
363 }
364
365 async fn get_all_applications(
368 &self,
369 token: BoxedCancelToken,
370 ) -> mssf_core::Result<Vec<mssf_core::types::ApplicationQueryResultItem>> {
371 let desc = ApplicationQueryDescription::default();
372 let apps = self
373 .fc
374 .get_query_manager()
375 .get_application_list(&desc, DEFAULT_TIMEOUT, Some(token.clone()))
376 .await
377 .inspect_err(|err| {
378 tracing::error!("Failed to get application list: {}", err);
379 })?
380 .items;
381 Ok(apps)
382 }
383 async fn get_all_services_for_app(
384 &self,
385 token: BoxedCancelToken,
386 app_name: Uri,
387 ) -> mssf_core::Result<Vec<mssf_core::types::ServiceQueryResultItem>> {
388 let app_name_cp = app_name.clone();
389 let desc = mssf_core::types::ServiceQueryDescription {
391 application_name: app_name_cp,
392 ..Default::default()
393 };
394 let services = self
395 .fc
396 .get_query_manager()
397 .get_service_list(&desc, DEFAULT_TIMEOUT, Some(token.clone()))
398 .await
399 .inspect_err(|err| {
400 tracing::error!("Failed to get service list for app {app_name}: {err}");
401 })?;
402 Ok(services.items)
403 }
404
405 async fn get_all_partitions_for_svc(
406 &self,
407 token: BoxedCancelToken,
408 service_name: Uri,
409 ) -> mssf_core::Result<Vec<mssf_core::types::ServicePartitionQueryResultItem>> {
410 let desc = mssf_core::types::ServicePartitionQueryDescription {
412 service_name,
413 partition_id_filter: None,
414 };
415 let partitions = self
416 .fc
417 .get_query_manager()
418 .get_partition_list(&desc, DEFAULT_TIMEOUT, Some(token.clone()))
419 .await
420 .inspect_err(|err| {
421 tracing::error!("Failed to get partition list: {}", err);
422 })?
423 .service_partitions;
424 Ok(partitions)
425 }
426
427 async fn get_all_replicas_for_partition(
428 &self,
429 token: BoxedCancelToken,
430 partition_id: mssf_core::GUID,
431 ) -> mssf_core::Result<Vec<mssf_core::types::ServiceReplicaQueryResultItem>> {
432 let desc = mssf_core::types::ServiceReplicaQueryDescription {
434 partition_id,
435 ..Default::default()
436 };
437 let replicas = self
438 .fc
439 .get_query_manager()
440 .get_replica_list(&desc, DEFAULT_TIMEOUT, Some(token.clone()))
441 .await
442 .inspect_err(|err| {
443 tracing::error!("Failed to get replica list: {}", err);
444 })?
445 .service_replicas;
446 Ok(replicas)
447 }
448}