1use std::convert::TryFrom;
4use std::marker::PhantomData;
5use std::net::SocketAddr;
6use std::sync::Arc;
7
8use bon::Builder;
9use log::{error, info, warn};
10use netwatch::ip::LocalAddresses;
11use tokio::sync::mpsc;
12use tokio::task::JoinSet;
13use tokio::{net::TcpListener, sync::RwLock};
14use tower::{Service as TowerService, ServiceExt};
15
16use crate::app_data::AppData;
17use crate::cluster::membership_protocol::ClusterProvider;
18use crate::cluster::storage::MembershipStorage;
19use crate::errors::ServerError;
20use crate::object_placement::ObjectPlacement;
21use crate::protocol::pubsub::SubscriptionRequest;
22use crate::protocol::ResponseError;
23use crate::protocol::{RequestEnvelope, ResponseEnvelope};
24use crate::registry::Registry;
25use crate::service::Service;
26use crate::ObjectId;
27
28#[derive(Debug)]
30pub enum AdminCommands {
31 ServerExit,
32 Shutdown(String, String),
34}
35
36pub type AdminReceiver = mpsc::UnboundedReceiver<AdminCommands>;
38
39pub type AdminSender = mpsc::UnboundedSender<AdminCommands>;
41
42pub type SendCommandResult = Result<Vec<u8>, ResponseError>;
44
45#[derive(Debug)]
48pub struct SendCommand {
49 pub request: RequestEnvelope,
50 pub response_channel: tokio::sync::oneshot::Sender<SendCommandResult>,
51}
52
53impl SendCommand {
54 pub fn build(
55 request: RequestEnvelope,
56 ) -> (
57 SendCommand,
58 tokio::sync::oneshot::Receiver<SendCommandResult>,
59 ) {
60 let (sender, receiver) = tokio::sync::oneshot::channel();
61 let command = SendCommand {
62 request,
63 response_channel: sender,
64 };
65 (command, receiver)
66 }
67}
68
69pub type InternalClientReceiver = mpsc::UnboundedReceiver<SendCommand>;
71
72pub type InternalClientSender = mpsc::UnboundedSender<SendCommand>;
74
75#[derive(Builder)]
86pub struct Server<S, C, P>
87where
88 S: MembershipStorage,
89 C: ClusterProvider<S>,
90 P: ObjectPlacement,
91{
92 #[builder(default = "0.0.0.0:0".to_string())]
94 address: String,
95
96 #[cfg(feature = "http")]
98 http_members_storage_address: Option<String>,
99
100 #[builder(with = |registry: Registry| Arc::new(RwLock::new(registry)))]
101 registry: Arc<RwLock<Registry>>,
102 cluster_provider: C,
103 #[builder(with = |provider: P| Arc::new(RwLock::new(provider)))]
104 object_placement_provider: Arc<RwLock<P>>,
105 #[builder(with = |app_data: AppData| Arc::new(app_data), default = Arc::new(AppData::new()))]
106 app_data: Arc<AppData>,
107
108 #[builder(skip = PhantomData {})]
109 _marker: PhantomData<S>,
110}
111
112type ServerResult<T> = Result<T, ServerError>;
113
114impl<S, C, P> Server<S, C, P>
115where
116 S: MembershipStorage + 'static,
117 C: ClusterProvider<S> + Send + Sync + 'static,
118 P: ObjectPlacement + 'static,
119{
120 pub async fn prepare(&self) {
121 self.cluster_provider.members_storage().prepare().await;
122 let object_placement_provider_guard = self.object_placement_provider.read().await;
123 object_placement_provider_guard.prepare().await;
124 }
125
126 pub fn app_data<Data>(&mut self, data: Data)
127 where
128 Data: Send + Sync + 'static,
129 {
130 self.app_data.set(data);
131 }
132
133 pub async fn bind(&mut self) -> ServerResult<TcpListener> {
135 let listener = TcpListener::bind(&self.address)
136 .await
137 .map_err(|err| ServerError::Bind(err.to_string()))?;
138 Ok(listener)
139 }
140
141 pub fn try_local_addr(listener: &TcpListener) -> ServerResult<SocketAddr> {
155 let addr_result = listener.local_addr();
156 let mut addr = addr_result.map_err(|x| {
157 let err = x.to_string();
158 ServerError::Bind(err)
159 })?;
160
161 let nw_local_addr = LocalAddresses::new();
163 if let Some(first_local_address) = nw_local_addr.regular.first() {
164 addr.set_ip(*first_local_address);
165 }
166 Ok(addr)
167 }
168
169 pub async fn run(&mut self, listener: TcpListener) -> ServerResult<()> {
178 let (admin_sender, admin_receiver) = mpsc::unbounded_channel::<AdminCommands>();
179 self.app_data(admin_sender);
180
181 let (internal_client_sender, internal_client_receiver) =
182 mpsc::unbounded_channel::<SendCommand>();
183 self.app_data(internal_client_sender);
184
185 let local_addr = Self::try_local_addr(&listener)?.to_string();
186
187 let mut service = Service::<S, P>::try_from(&*self)?;
188 service.address = local_addr.clone();
189 let mut accept_task = tokio::spawn(Self::accept(listener, service));
190
191 let cluster_provider = self.cluster_provider.clone();
192 let inner_local_addr = local_addr.clone();
193 let mut cluster_provider_task =
194 tokio::spawn(async move { cluster_provider.serve(&inner_local_addr).await });
195
196 let mut service = Service::<S, P>::try_from(&*self)?;
197 service.address = local_addr.clone();
198 let mut internal_client_task = tokio::spawn(async move {
199 Self::consume_internal_client_commands(internal_client_receiver, service).await
200 });
201
202 let admin_commands_fut = self.consume_admin_commands(admin_receiver);
203
204 #[cfg(feature = "http")]
205 let mut cluster_storage_http_server_task =
206 if let Some(addr) = self.http_members_storage_address.clone() {
207 let inner_members_storage = self.cluster_provider.members_storage().clone();
208 tokio::spawn(async move {
209 crate::cluster::storage::http::serve(addr, inner_members_storage)
210 .await
211 .ok();
212 })
213 } else {
214 tokio::spawn(async move {
215 warn!("HTTP Members Storage not enabled");
216 loop {
217 tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
218 }
219 })
220 };
221
222 #[cfg(not(feature = "http"))]
223 let mut cluster_storage_http_server_task = tokio::spawn(async move {
224 warn!("HTTP Members Storage not enabled");
225 loop {
226 tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
227 }
228 });
229
230 tokio::select! {
231 accept_result = &mut accept_task => {
232 accept_result
233 .map_err(|err| {
234 error!(
235 "accept: JoinHandle error: {}",
236 err
237 );
238 ServerError::Run
239 })??;
240 }
241 cluster_provider_serve_result = &mut cluster_provider_task => {
242 cluster_provider_serve_result
243 .map_err(|err| {
244 error!(
245 "cluster provider server: JoinHandle error: {}",
246 err
247 );
248 ServerError::Run
249 })?
250 .map_err(ServerError::ClusterProviderServe)?;
251 warn!("Cluster provider has finished");
252 }
253 internal_client_result = &mut internal_client_task => {
254 internal_client_result
255 .map_err(|err| {
256 error!(
257 "internal client: JoinHandle error: {}",
258 err
259 );
260 ServerError::Run
261 })??;
262 warn!("Internal client consumer finished first");
263 }
264 _ = admin_commands_fut => {
265 warn!("Admin command serve finished first");
266 }
267 _ = &mut cluster_storage_http_server_task => {
268 warn!("Http Server for Cluster Storage finished earlier");
269 }
270
271 }
272
273 info!("Stoping server");
274 accept_task.abort();
275 cluster_provider_task.abort();
276 internal_client_task.abort();
277 cluster_storage_http_server_task.abort();
278 info!("Server stopped");
279
280 Ok(())
281 }
282
283 async fn accept(listener: TcpListener, service: Service<S, P>) -> ServerResult<()> {
284 let local_addr = listener.local_addr().map_err(|_| {
285 ServerError::Bind("Cannot get the local address for the listener".to_string())
286 })?;
287 info!("Listening on `{}`", local_addr);
288 let mut joinset = JoinSet::new();
289
290 loop {
291 let (stream, _) = listener.accept().await.map_err(|_| ServerError::Run)?;
292 let mut service: Service<S, P> = service.clone();
293
294 ServiceExt::<RequestEnvelope>::ready(&mut service)
295 .await
296 .map_err(|_| ServerError::Run)?;
297 ServiceExt::<SubscriptionRequest>::ready(&mut service)
298 .await
299 .map_err(|_| ServerError::Run)?;
300
301 joinset.spawn(async move { service.run(stream).await });
302 }
303 }
304
305 async fn consume_internal_client_commands(
308 mut receiver: InternalClientReceiver,
309 service: Service<S, P>,
310 ) -> ServerResult<()> {
311 let mut joinset = JoinSet::new();
312 while let Some(message) = receiver.recv().await {
313 let mut inner_service = service.clone();
314 joinset.spawn(async move {
315 let resp = inner_service
316 .call(message.request)
317 .await
318 .unwrap_or_else(ResponseEnvelope::err);
319 message
320 .response_channel
321 .send(resp.body)
322 .inspect_err(|_| {
323 error!("The caller dropped");
324 })
325 .ok();
326 });
327 }
328 let _ = joinset.join_all().await;
329 Ok(())
330 }
331
332 async fn consume_admin_commands(&self, mut admin_receiver: AdminReceiver) {
337 while let Some(message) = admin_receiver.recv().await {
338 match message {
339 AdminCommands::Shutdown(object_kind, object_id) => {
343 let registry = self.registry.write().await;
344 registry
345 .remove(object_kind.clone(), object_id.clone())
346 .await;
347 self.object_placement_provider
348 .write()
349 .await
350 .remove(&ObjectId(object_kind, object_id))
351 .await;
352 }
353 AdminCommands::ServerExit => {
354 println!("I got a message to terminate this thing here. So Ill try");
356 return;
357 }
358 }
359 }
360 }
361}
362
363impl<S, C, P> TryFrom<&Server<S, C, P>> for Service<S, P>
368where
369 S: MembershipStorage + 'static,
370 C: ClusterProvider<S> + 'static + Send + Sync,
371 P: ObjectPlacement + 'static,
372{
373 type Error = ServerError;
374 fn try_from(server: &Server<S, C, P>) -> Result<Self, Self::Error> {
375 let address = "".to_string();
376 let registry = server.registry.clone();
377 let object_placement_provider = server.object_placement_provider.clone();
378 let app_data = server.app_data.clone();
379 let members_storage = server.cluster_provider.members_storage().clone();
380
381 Ok(Service {
382 address,
383 registry,
384 members_storage,
385 object_placement_provider,
386 app_data,
387 })
388 }
389}
390
391#[cfg(test)]
392mod test {
393 use super::*;
394 use crate::cluster::membership_protocol::local::LocalClusterProvider;
395 use crate::cluster::storage::local::LocalStorage;
396 use crate::object_placement::local::LocalObjectPlacement;
397 use crate::registry::Registry;
398
399 #[tokio::test]
400 async fn client_builder_sanity_check() {
401 let _server = Server::builder()
402 .address("0.0.0.0:80".to_string())
403 .registry(Registry::default())
404 .app_data(AppData::new())
405 .cluster_provider(LocalClusterProvider {
406 members_storage: LocalStorage::default(),
407 })
408 .object_placement_provider(LocalObjectPlacement::default())
409 .build();
410 }
411}