scylla_rs/app/stage/
mod.rs

1// Copyright 2021 IOTA Stiftung
2// SPDX-License-Identifier: Apache-2.0
3
4use super::{
5    node::{NodeEvent, NodeHandle},
6    *,
7};
8use receiver::ReceiverBuilder;
9use reporter::ReporterBuilder;
10pub use reporter::{ReporterEvent, ReporterHandle};
11use sender::SenderBuilder;
12use std::{
13    cell::UnsafeCell,
14    collections::HashMap,
15    net::SocketAddr,
16    ops::{Deref, DerefMut},
17    sync::Arc,
18};
19use tokio::net::TcpStream;
20
21mod event_loop;
22mod init;
23mod receiver;
24mod reporter;
25mod sender;
26mod terminating;
27
28/// The reporters of shard id to its corresponding sender of stage reporter events.
29#[derive(Clone)]
30pub struct ReportersHandles(HashMap<u8, ReporterHandle>);
31/// The thread-safe reusable payloads.
32pub type Payloads = Arc<Vec<Reusable>>;
33
34impl Deref for ReportersHandles {
35    type Target = HashMap<u8, ReporterHandle>;
36    fn deref(&self) -> &Self::Target {
37        &self.0
38    }
39}
40
41impl DerefMut for ReportersHandles {
42    fn deref_mut(&mut self) -> &mut Self::Target {
43        &mut self.0
44    }
45}
46
47impl Shutdown for ReportersHandles {
48    fn shutdown(self) -> Option<Self>
49    where
50        Self: Sized,
51    {
52        for reporter_handle in self.values() {
53            let _ = reporter_handle.send(ReporterEvent::Session(reporter::Session::Shutdown));
54        }
55        None
56    }
57}
58
59// Stage builder
60builder!(StageBuilder {
61    address: SocketAddr,
62    authenticator: PasswordAuth,
63    reporter_count: u8,
64    shard_id: u16,
65    buffer_size: usize,
66    recv_buffer_size: Option<u32>,
67    send_buffer_size: Option<u32>,
68    handle: StageHandle,
69    inbox: StageInbox
70});
71
72/// StageHandle to be passed to the children (reporter/s)
73#[derive(Clone)]
74pub struct StageHandle {
75    tx: mpsc::UnboundedSender<StageEvent>,
76}
77/// StageInbox is used to recv events
78pub struct StageInbox {
79    rx: mpsc::UnboundedReceiver<StageEvent>,
80}
81
82impl Deref for StageHandle {
83    type Target = mpsc::UnboundedSender<StageEvent>;
84
85    fn deref(&self) -> &Self::Target {
86        &self.tx
87    }
88}
89
90impl DerefMut for StageHandle {
91    fn deref_mut(&mut self) -> &mut Self::Target {
92        &mut self.tx
93    }
94}
95
96/// Stage event enum.
97pub enum StageEvent {
98    /// Reporter child status change
99    Reporter(Service),
100    /// Establish connection to scylla shard.
101    Connect,
102    /// Shutdwon a stage.
103    Shutdown,
104}
105/// Stage state
106pub struct Stage {
107    service: Service,
108    address: SocketAddr,
109    authenticator: PasswordAuth,
110    appends_num: i16,
111    reporter_count: u8,
112    reporters_handles: Option<ReportersHandles>,
113    session_id: usize,
114    shard_id: u16,
115    payloads: Payloads,
116    buffer_size: usize,
117    recv_buffer_size: Option<u32>,
118    send_buffer_size: Option<u32>,
119    handle: Option<StageHandle>,
120    inbox: StageInbox,
121}
122impl Stage {
123    pub(crate) fn clone_handle(&self) -> Option<StageHandle> {
124        self.handle.clone()
125    }
126}
127#[derive(Default)]
128/// The reusable sender payload.
129pub struct Reusable {
130    value: UnsafeCell<Option<Vec<u8>>>,
131}
132impl Reusable {
133    #[allow(clippy::mut_from_ref)]
134    /// Return as mutable sender payload value.
135    pub fn as_mut(&self) -> &mut Option<Vec<u8>> {
136        unsafe { self.value.get().as_mut().unwrap() }
137    }
138    /// Return as reference sender payload.
139    pub fn as_ref_payload(&self) -> Option<&Vec<u8>> {
140        unsafe { self.value.get().as_ref().unwrap().as_ref() }
141    }
142    /// Return as mutable sender payload.
143    pub fn as_mut_payload(&self) -> Option<&mut Vec<u8>> {
144        self.as_mut().as_mut()
145    }
146}
147unsafe impl Sync for Reusable {}
148
149impl ActorBuilder<NodeHandle> for StageBuilder {}
150
151/// implementation of builder
152impl Builder for StageBuilder {
153    type State = Stage;
154    fn build(self) -> Self::State {
155        let (tx, rx) = mpsc::unbounded_channel::<StageEvent>();
156        let handle = Some(StageHandle { tx });
157        let inbox = StageInbox { rx };
158        // create reusable payloads as giveload
159        let vector: Vec<Reusable> = Vec::new();
160        let payloads: Payloads = Arc::new(vector);
161        let reporter_count = self.reporter_count.unwrap();
162        Self::State {
163            service: Service::new(),
164            address: self.address.unwrap(),
165            authenticator: self.authenticator.unwrap(),
166            appends_num: 32767 / (reporter_count as i16),
167            reporter_count,
168            reporters_handles: Some(ReportersHandles(HashMap::with_capacity(reporter_count as usize))),
169            session_id: 0,
170            shard_id: self.shard_id.unwrap(),
171            payloads,
172            buffer_size: self.buffer_size.unwrap_or(1024000),
173            recv_buffer_size: self.recv_buffer_size.unwrap(),
174            send_buffer_size: self.send_buffer_size.unwrap(),
175            handle,
176            inbox,
177        }
178        .set_name()
179    }
180}
181
182/// impl name of the Node
183impl Name for Stage {
184    fn set_name(mut self) -> Self {
185        // create name from the shard_id
186        let name = self.shard_id.to_string();
187        self.service.update_name(name);
188        self
189    }
190    fn get_name(&self) -> String {
191        self.service.get_name()
192    }
193}
194
195#[async_trait::async_trait]
196impl AknShutdown<Stage> for NodeHandle {
197    async fn aknowledge_shutdown(self, mut _state: Stage, _status: Result<(), Need>) {
198        _state.service.update_status(ServiceStatus::Stopped);
199        let event = NodeEvent::Service(_state.service.clone());
200        let _ = self.send(event);
201    }
202}