scylla_rs/app/node/
init.rs

1// Copyright 2021 IOTA Stiftung
2// SPDX-License-Identifier: Apache-2.0
3
4use super::*;
5
6#[async_trait::async_trait]
7impl Init<ClusterHandle> for Node {
8    async fn init(&mut self, status: Result<(), Need>, supervisor: &mut Option<ClusterHandle>) -> Result<(), Need> {
9        self.service.update_status(ServiceStatus::Initializing);
10        let event = ClusterEvent::Service(self.service.clone());
11        if let Some(supervisor) = supervisor.as_mut() {
12            supervisor.send(event).ok();
13            // spawn stages
14            for shard_id in 0..self.shard_count {
15                let stage = StageBuilder::new()
16                    .address(self.address.clone())
17                    .shard_id(shard_id)
18                    .reporter_count(self.reporter_count)
19                    .buffer_size(self.buffer_size)
20                    .recv_buffer_size(self.recv_buffer_size)
21                    .send_buffer_size(self.send_buffer_size)
22                    .authenticator(self.authenticator.clone())
23                    .build();
24                if let Some(stage_handle) = stage.clone_handle() {
25                    self.stages.insert(shard_id, stage_handle);
26                    tokio::spawn(stage.start(self.handle.clone()));
27                } else {
28                    error!("No stage handle found!");
29                    return Err(Need::Abort);
30                }
31            }
32
33            status
34        } else {
35            Err(Need::Abort)
36        }
37    }
38}