scylla_rs/app/node/
event_loop.rs

1// Copyright 2021 IOTA Stiftung
2// SPDX-License-Identifier: Apache-2.0
3
4use super::*;
5
6#[async_trait::async_trait]
7impl EventLoop<ClusterHandle> for Node {
8    async fn event_loop(
9        &mut self,
10        _status: Result<(), Need>,
11        supervisor: &mut Option<ClusterHandle>,
12    ) -> Result<(), Need> {
13        if let Some(supervisor) = supervisor {
14            while let Some(event) = self.inbox.rx.recv().await {
15                match event {
16                    NodeEvent::RegisterReporters(microservice, reporters_handles) => {
17                        if let Ok(shard_id) = microservice.get_name().parse::<u16>() {
18                            let mut socket_addr = self.address.clone();
19                            self.service.update_microservice(microservice.get_name(), microservice);
20                            // assign shard_id to socket_addr as it's going be used later as key in registry
21                            socket_addr.set_port(shard_id);
22                            if let Some(reporters_handles_ref) = self.reporters_handles.as_mut() {
23                                reporters_handles_ref.insert(socket_addr, reporters_handles);
24                                // check if we pushed all reporters of the node.
25                                if reporters_handles_ref.len() == self.shard_count as usize {
26                                    // reporters_handles should be passed to cluster supervisor
27                                    if let Some(reporters_handles) = self.reporters_handles.take() {
28                                        let event =
29                                            ClusterEvent::RegisterReporters(self.service.clone(), reporters_handles);
30                                        supervisor.send(event).ok();
31                                    }
32                                } else {
33                                    let event = ClusterEvent::Service(self.service.clone());
34                                    supervisor.send(event).ok();
35                                }
36                            } else {
37                                error!("Tried to register reporters more than once!")
38                            }
39                        } else {
40                            error!("Failed to parse shard ID!")
41                        }
42                    }
43                    NodeEvent::Service(microservice) => {
44                        self.service.update_microservice(microservice.get_name(), microservice);
45                        if !self.service.is_stopping() {
46                            let microservices_len = self.service.microservices.len();
47                            if self.service.microservices.values().all(|ms| ms.is_maintenance())
48                                && microservices_len == self.shard_count as usize
49                            {
50                                self.service.update_status(ServiceStatus::Maintenance);
51                            } else if self.service.microservices.values().all(|ms| ms.is_running())
52                                && microservices_len == self.shard_count as usize
53                            {
54                                // all shards are connected/running as expected
55                                self.service.update_status(ServiceStatus::Running);
56                            } else {
57                                // degraded service, probably one of the shard is in Maintenance or degraded
58                                self.service.update_status(ServiceStatus::Degraded);
59                            }
60                        }
61                        let event = ClusterEvent::Service(self.service.clone());
62                        supervisor.send(event).ok();
63                    }
64                    NodeEvent::Shutdown => {
65                        self.handle = None;
66                        self.service.update_status(ServiceStatus::Stopping);
67                        // shutdown children (stages)
68                        for (_, stage) in self.stages.drain() {
69                            let event = StageEvent::Shutdown;
70                            let _ = stage.send(event);
71                        }
72                        let event = ClusterEvent::Service(self.service.clone());
73                        supervisor.send(event).ok();
74                        // contract design:
75                        // the node supervisor will only shutdown when stages drop node_txs(supervisor)
76                        // and this will only happen if reporters dropped stage_txs,
77                        // still reporters_txs have to go out of the scope (from ring&stages).
78                    }
79                }
80            }
81            Ok(())
82        } else {
83            Err(Need::Abort)
84        }
85    }
86}