scylla_rs/app/node/event_loop.rs
1// Copyright 2021 IOTA Stiftung
2// SPDX-License-Identifier: Apache-2.0
3
4use super::*;
5
6#[async_trait::async_trait]
7impl EventLoop<ClusterHandle> for Node {
8 async fn event_loop(
9 &mut self,
10 _status: Result<(), Need>,
11 supervisor: &mut Option<ClusterHandle>,
12 ) -> Result<(), Need> {
13 if let Some(supervisor) = supervisor {
14 while let Some(event) = self.inbox.rx.recv().await {
15 match event {
16 NodeEvent::RegisterReporters(microservice, reporters_handles) => {
17 if let Ok(shard_id) = microservice.get_name().parse::<u16>() {
18 let mut socket_addr = self.address.clone();
19 self.service.update_microservice(microservice.get_name(), microservice);
20 // assign shard_id to socket_addr as it's going be used later as key in registry
21 socket_addr.set_port(shard_id);
22 if let Some(reporters_handles_ref) = self.reporters_handles.as_mut() {
23 reporters_handles_ref.insert(socket_addr, reporters_handles);
24 // check if we pushed all reporters of the node.
25 if reporters_handles_ref.len() == self.shard_count as usize {
26 // reporters_handles should be passed to cluster supervisor
27 if let Some(reporters_handles) = self.reporters_handles.take() {
28 let event =
29 ClusterEvent::RegisterReporters(self.service.clone(), reporters_handles);
30 supervisor.send(event).ok();
31 }
32 } else {
33 let event = ClusterEvent::Service(self.service.clone());
34 supervisor.send(event).ok();
35 }
36 } else {
37 error!("Tried to register reporters more than once!")
38 }
39 } else {
40 error!("Failed to parse shard ID!")
41 }
42 }
43 NodeEvent::Service(microservice) => {
44 self.service.update_microservice(microservice.get_name(), microservice);
45 if !self.service.is_stopping() {
46 let microservices_len = self.service.microservices.len();
47 if self.service.microservices.values().all(|ms| ms.is_maintenance())
48 && microservices_len == self.shard_count as usize
49 {
50 self.service.update_status(ServiceStatus::Maintenance);
51 } else if self.service.microservices.values().all(|ms| ms.is_running())
52 && microservices_len == self.shard_count as usize
53 {
54 // all shards are connected/running as expected
55 self.service.update_status(ServiceStatus::Running);
56 } else {
57 // degraded service, probably one of the shard is in Maintenance or degraded
58 self.service.update_status(ServiceStatus::Degraded);
59 }
60 }
61 let event = ClusterEvent::Service(self.service.clone());
62 supervisor.send(event).ok();
63 }
64 NodeEvent::Shutdown => {
65 self.handle = None;
66 self.service.update_status(ServiceStatus::Stopping);
67 // shutdown children (stages)
68 for (_, stage) in self.stages.drain() {
69 let event = StageEvent::Shutdown;
70 let _ = stage.send(event);
71 }
72 let event = ClusterEvent::Service(self.service.clone());
73 supervisor.send(event).ok();
74 // contract design:
75 // the node supervisor will only shutdown when stages drop node_txs(supervisor)
76 // and this will only happen if reporters dropped stage_txs,
77 // still reporters_txs have to go out of the scope (from ring&stages).
78 }
79 }
80 }
81 Ok(())
82 } else {
83 Err(Need::Abort)
84 }
85 }
86}