scylla_rs/app/stage/
init.rs

1// Copyright 2021 IOTA Stiftung
2// SPDX-License-Identifier: Apache-2.0
3
4use super::*;
5
6#[async_trait::async_trait]
7impl Init<NodeHandle> for Stage {
8    async fn init(&mut self, status: Result<(), Need>, supervisor: &mut Option<NodeHandle>) -> Result<(), Need> {
9        if let Some(supervisor) = supervisor.as_mut() {
10            // init Reusable payloads holder to enable reporter/sender/receiver
11            // to reuse the payload whenever is possible.
12            let last_range = self.appends_num * (self.reporter_count as i16);
13            {
14                if let Some(payloads) = Arc::get_mut(&mut self.payloads) {
15                    for _ in 0..last_range {
16                        payloads.push(Reusable::default())
17                    }
18                } else {
19                    error!("Cannot acquire access to reusable payloads!");
20                    return Err(Need::Abort);
21                }
22            }
23            let streams: Vec<i16> = (0..last_range).collect();
24            let mut streams_iter = streams.chunks_exact(self.appends_num as usize);
25            if let Some(reporter_handles) = self.reporters_handles.as_mut() {
26                // Start reporters
27                for reporter_id in 0..self.reporter_count {
28                    if let Some(streams) = streams_iter.next() {
29                        // build reporter
30                        let reporter = ReporterBuilder::new()
31                            .session_id(self.session_id)
32                            .reporter_id(reporter_id)
33                            .shard_id(self.shard_id)
34                            .address(self.address.clone())
35                            .payloads(self.payloads.clone())
36                            .streams(streams.to_owned().into_iter().collect())
37                            .build();
38                        // clone reporter_handle
39                        if let Some(reporter_handle) = reporter.clone_handle() {
40                            // Add reporter to reporters map
41
42                            reporter_handles.insert(reporter_id, reporter_handle);
43
44                            // Start reporter
45                            tokio::spawn(reporter.start(self.handle.clone()));
46                        } else {
47                            error!("No reporter handle found!");
48                            return Err(Need::Abort);
49                        }
50                    } else {
51                        error!("Failed to create streams!");
52                        return Err(Need::Abort);
53                    }
54                }
55                self.service.update_status(ServiceStatus::Initializing);
56                let event = NodeEvent::RegisterReporters(self.service.clone(), reporter_handles.clone());
57                supervisor.send(event).ok();
58                status
59            } else {
60                error!("No reporter handles container available!");
61                return Err(Need::Abort);
62            }
63        } else {
64            Err(Need::Abort)
65        }
66    }
67}