scylla_rs/app/stage/
init.rs1use super::*;
5
6#[async_trait::async_trait]
7impl Init<NodeHandle> for Stage {
8 async fn init(&mut self, status: Result<(), Need>, supervisor: &mut Option<NodeHandle>) -> Result<(), Need> {
9 if let Some(supervisor) = supervisor.as_mut() {
10 let last_range = self.appends_num * (self.reporter_count as i16);
13 {
14 if let Some(payloads) = Arc::get_mut(&mut self.payloads) {
15 for _ in 0..last_range {
16 payloads.push(Reusable::default())
17 }
18 } else {
19 error!("Cannot acquire access to reusable payloads!");
20 return Err(Need::Abort);
21 }
22 }
23 let streams: Vec<i16> = (0..last_range).collect();
24 let mut streams_iter = streams.chunks_exact(self.appends_num as usize);
25 if let Some(reporter_handles) = self.reporters_handles.as_mut() {
26 for reporter_id in 0..self.reporter_count {
28 if let Some(streams) = streams_iter.next() {
29 let reporter = ReporterBuilder::new()
31 .session_id(self.session_id)
32 .reporter_id(reporter_id)
33 .shard_id(self.shard_id)
34 .address(self.address.clone())
35 .payloads(self.payloads.clone())
36 .streams(streams.to_owned().into_iter().collect())
37 .build();
38 if let Some(reporter_handle) = reporter.clone_handle() {
40 reporter_handles.insert(reporter_id, reporter_handle);
43
44 tokio::spawn(reporter.start(self.handle.clone()));
46 } else {
47 error!("No reporter handle found!");
48 return Err(Need::Abort);
49 }
50 } else {
51 error!("Failed to create streams!");
52 return Err(Need::Abort);
53 }
54 }
55 self.service.update_status(ServiceStatus::Initializing);
56 let event = NodeEvent::RegisterReporters(self.service.clone(), reporter_handles.clone());
57 supervisor.send(event).ok();
58 status
59 } else {
60 error!("No reporter handles container available!");
61 return Err(Need::Abort);
62 }
63 } else {
64 Err(Need::Abort)
65 }
66 }
67}