scylla_rs/app/stage/
mod.rs1use super::{
5 node::{NodeEvent, NodeHandle},
6 *,
7};
8use receiver::ReceiverBuilder;
9use reporter::ReporterBuilder;
10pub use reporter::{ReporterEvent, ReporterHandle};
11use sender::SenderBuilder;
12use std::{
13 cell::UnsafeCell,
14 collections::HashMap,
15 net::SocketAddr,
16 ops::{Deref, DerefMut},
17 sync::Arc,
18};
19use tokio::net::TcpStream;
20
21mod event_loop;
22mod init;
23mod receiver;
24mod reporter;
25mod sender;
26mod terminating;
27
28#[derive(Clone)]
30pub struct ReportersHandles(HashMap<u8, ReporterHandle>);
31pub type Payloads = Arc<Vec<Reusable>>;
33
34impl Deref for ReportersHandles {
35 type Target = HashMap<u8, ReporterHandle>;
36 fn deref(&self) -> &Self::Target {
37 &self.0
38 }
39}
40
41impl DerefMut for ReportersHandles {
42 fn deref_mut(&mut self) -> &mut Self::Target {
43 &mut self.0
44 }
45}
46
47impl Shutdown for ReportersHandles {
48 fn shutdown(self) -> Option<Self>
49 where
50 Self: Sized,
51 {
52 for reporter_handle in self.values() {
53 let _ = reporter_handle.send(ReporterEvent::Session(reporter::Session::Shutdown));
54 }
55 None
56 }
57}
58
59builder!(StageBuilder {
61 address: SocketAddr,
62 authenticator: PasswordAuth,
63 reporter_count: u8,
64 shard_id: u16,
65 buffer_size: usize,
66 recv_buffer_size: Option<u32>,
67 send_buffer_size: Option<u32>,
68 handle: StageHandle,
69 inbox: StageInbox
70});
71
72#[derive(Clone)]
74pub struct StageHandle {
75 tx: mpsc::UnboundedSender<StageEvent>,
76}
77pub struct StageInbox {
79 rx: mpsc::UnboundedReceiver<StageEvent>,
80}
81
82impl Deref for StageHandle {
83 type Target = mpsc::UnboundedSender<StageEvent>;
84
85 fn deref(&self) -> &Self::Target {
86 &self.tx
87 }
88}
89
90impl DerefMut for StageHandle {
91 fn deref_mut(&mut self) -> &mut Self::Target {
92 &mut self.tx
93 }
94}
95
96pub enum StageEvent {
98 Reporter(Service),
100 Connect,
102 Shutdown,
104}
105pub struct Stage {
107 service: Service,
108 address: SocketAddr,
109 authenticator: PasswordAuth,
110 appends_num: i16,
111 reporter_count: u8,
112 reporters_handles: Option<ReportersHandles>,
113 session_id: usize,
114 shard_id: u16,
115 payloads: Payloads,
116 buffer_size: usize,
117 recv_buffer_size: Option<u32>,
118 send_buffer_size: Option<u32>,
119 handle: Option<StageHandle>,
120 inbox: StageInbox,
121}
122impl Stage {
123 pub(crate) fn clone_handle(&self) -> Option<StageHandle> {
124 self.handle.clone()
125 }
126}
127#[derive(Default)]
128pub struct Reusable {
130 value: UnsafeCell<Option<Vec<u8>>>,
131}
132impl Reusable {
133 #[allow(clippy::mut_from_ref)]
134 pub fn as_mut(&self) -> &mut Option<Vec<u8>> {
136 unsafe { self.value.get().as_mut().unwrap() }
137 }
138 pub fn as_ref_payload(&self) -> Option<&Vec<u8>> {
140 unsafe { self.value.get().as_ref().unwrap().as_ref() }
141 }
142 pub fn as_mut_payload(&self) -> Option<&mut Vec<u8>> {
144 self.as_mut().as_mut()
145 }
146}
147unsafe impl Sync for Reusable {}
148
149impl ActorBuilder<NodeHandle> for StageBuilder {}
150
151impl Builder for StageBuilder {
153 type State = Stage;
154 fn build(self) -> Self::State {
155 let (tx, rx) = mpsc::unbounded_channel::<StageEvent>();
156 let handle = Some(StageHandle { tx });
157 let inbox = StageInbox { rx };
158 let vector: Vec<Reusable> = Vec::new();
160 let payloads: Payloads = Arc::new(vector);
161 let reporter_count = self.reporter_count.unwrap();
162 Self::State {
163 service: Service::new(),
164 address: self.address.unwrap(),
165 authenticator: self.authenticator.unwrap(),
166 appends_num: 32767 / (reporter_count as i16),
167 reporter_count,
168 reporters_handles: Some(ReportersHandles(HashMap::with_capacity(reporter_count as usize))),
169 session_id: 0,
170 shard_id: self.shard_id.unwrap(),
171 payloads,
172 buffer_size: self.buffer_size.unwrap_or(1024000),
173 recv_buffer_size: self.recv_buffer_size.unwrap(),
174 send_buffer_size: self.send_buffer_size.unwrap(),
175 handle,
176 inbox,
177 }
178 .set_name()
179 }
180}
181
182impl Name for Stage {
184 fn set_name(mut self) -> Self {
185 let name = self.shard_id.to_string();
187 self.service.update_name(name);
188 self
189 }
190 fn get_name(&self) -> String {
191 self.service.get_name()
192 }
193}
194
195#[async_trait::async_trait]
196impl AknShutdown<Stage> for NodeHandle {
197 async fn aknowledge_shutdown(self, mut _state: Stage, _status: Result<(), Need>) {
198 _state.service.update_status(ServiceStatus::Stopped);
199 let event = NodeEvent::Service(_state.service.clone());
200 let _ = self.send(event);
201 }
202}