Skip to main content

rio_rs/
server.rs

1//! Rio server
2
3use std::convert::TryFrom;
4use std::marker::PhantomData;
5use std::net::SocketAddr;
6use std::sync::Arc;
7
8use bon::Builder;
9use log::{error, info, warn};
10use netwatch::ip::LocalAddresses;
11use tokio::sync::mpsc;
12use tokio::task::JoinSet;
13use tokio::{net::TcpListener, sync::RwLock};
14use tower::{Service as TowerService, ServiceExt};
15
16use crate::app_data::AppData;
17use crate::cluster::membership_protocol::ClusterProvider;
18use crate::cluster::storage::MembershipStorage;
19use crate::errors::ServerError;
20use crate::object_placement::ObjectPlacement;
21use crate::protocol::pubsub::SubscriptionRequest;
22use crate::protocol::ResponseError;
23use crate::protocol::{RequestEnvelope, ResponseEnvelope};
24use crate::registry::Registry;
25use crate::service::Service;
26use crate::ObjectId;
27
28/// Internal commands, e.g., shutdown a service object
29#[derive(Debug)]
30pub enum AdminCommands {
31    ServerExit,
32    // Shutdown(hander_type, handler_id)
33    Shutdown(String, String),
34}
35
36/// Channel for [AdminCommands]
37pub type AdminReceiver = mpsc::UnboundedReceiver<AdminCommands>;
38
39/// Channel for [AdminCommands]
40pub type AdminSender = mpsc::UnboundedSender<AdminCommands>;
41
42/// Result for the internal client interface on the server
43pub type SendCommandResult = Result<Vec<u8>, ResponseError>;
44
45/// Request struct for the internal client interface on the server
46/// TODO pub?
47#[derive(Debug)]
48pub struct SendCommand {
49    pub request: RequestEnvelope,
50    pub response_channel: tokio::sync::oneshot::Sender<SendCommandResult>,
51}
52
53impl SendCommand {
54    pub fn build(
55        request: RequestEnvelope,
56    ) -> (
57        SendCommand,
58        tokio::sync::oneshot::Receiver<SendCommandResult>,
59    ) {
60        let (sender, receiver) = tokio::sync::oneshot::channel();
61        let command = SendCommand {
62            request,
63            response_channel: sender,
64        };
65        (command, receiver)
66    }
67}
68
69/// Channels for internal client
70pub type InternalClientReceiver = mpsc::UnboundedReceiver<SendCommand>;
71
72/// Channels for internal client
73pub type InternalClientSender = mpsc::UnboundedSender<SendCommand>;
74
75/// Application Server. It handles object registration ([Registry]),
76/// clustering (through [ClusterProvider]s), server state (via [AppData]),
77/// and more.
78///
79/// It handles various types of request: [AdminCommands], [RequestEnvelope], and
80/// [SubscriptionRequest].
81///
82/// More of it can be seen in [Server::run].
83///
84/// TODO example builder
85#[derive(Builder)]
86pub struct Server<S, C, P>
87where
88    S: MembershipStorage,
89    C: ClusterProvider<S>,
90    P: ObjectPlacement,
91{
92    /// Address given by the user
93    #[builder(default = "0.0.0.0:0".to_string())]
94    address: String,
95
96    /// Address given by the user
97    #[cfg(feature = "http")]
98    http_members_storage_address: Option<String>,
99
100    #[builder(with = |registry: Registry| Arc::new(RwLock::new(registry)))]
101    registry: Arc<RwLock<Registry>>,
102    cluster_provider: C,
103    #[builder(with = |provider: P| Arc::new(RwLock::new(provider)))]
104    object_placement_provider: Arc<RwLock<P>>,
105    #[builder(with = |app_data: AppData| Arc::new(app_data), default = Arc::new(AppData::new()))]
106    app_data: Arc<AppData>,
107
108    #[builder(skip = PhantomData {})]
109    _marker: PhantomData<S>,
110}
111
112type ServerResult<T> = Result<T, ServerError>;
113
114impl<S, C, P> Server<S, C, P>
115where
116    S: MembershipStorage + 'static,
117    C: ClusterProvider<S> + Send + Sync + 'static,
118    P: ObjectPlacement + 'static,
119{
120    pub async fn prepare(&self) {
121        self.cluster_provider.members_storage().prepare().await;
122        let object_placement_provider_guard = self.object_placement_provider.read().await;
123        object_placement_provider_guard.prepare().await;
124    }
125
126    pub fn app_data<Data>(&mut self, data: Data)
127    where
128        Data: Send + Sync + 'static,
129    {
130        self.app_data.set(data);
131    }
132
133    /// Setup the server for running it
134    pub async fn bind(&mut self) -> ServerResult<TcpListener> {
135        let listener = TcpListener::bind(&self.address)
136            .await
137            .map_err(|err| ServerError::Bind(err.to_string()))?;
138        Ok(listener)
139    }
140
141    /// Tries to get a local address
142    ///
143    /// It will get the first ip address for the machine where it is running,
144    /// and fallback to the address given by tokio's listener
145    ///
146    /// <div class="warning">
147    /// **TODO**
148    ///
149    /// It potentially won't work on machine with multiple interfaces. So we need to
150    /// add support to address mapping per node before merging this.
151    ///
152    /// For now, if that is your case, you need to specify the IP for binding
153    /// </div>
154    pub fn try_local_addr(listener: &TcpListener) -> ServerResult<SocketAddr> {
155        let addr_result = listener.local_addr();
156        let mut addr = addr_result.map_err(|x| {
157            let err = x.to_string();
158            ServerError::Bind(err)
159        })?;
160
161        // Try to update the local address using netwatch's LocalAddress
162        let nw_local_addr = LocalAddresses::new();
163        if let Some(first_local_address) = nw_local_addr.regular.first() {
164            addr.set_ip(*first_local_address);
165        }
166        Ok(addr)
167    }
168
169    /// Run the server forever
170    ///
171    /// This is the main loop for a Rio server. It will handle a few types of future concurrently:
172    /// - New TCP connections from clients
173    /// - [AdminCommands] messages from running objects
174    /// - [ClusterProvider] server loop
175    ///
176    /// If any of these fails, the server stops running with a [ServerError]
177    pub async fn run(&mut self, listener: TcpListener) -> ServerResult<()> {
178        let (admin_sender, admin_receiver) = mpsc::unbounded_channel::<AdminCommands>();
179        self.app_data(admin_sender);
180
181        let (internal_client_sender, internal_client_receiver) =
182            mpsc::unbounded_channel::<SendCommand>();
183        self.app_data(internal_client_sender);
184
185        let local_addr = Self::try_local_addr(&listener)?.to_string();
186
187        let mut service = Service::<S, P>::try_from(&*self)?;
188        service.address = local_addr.clone();
189        let mut accept_task = tokio::spawn(Self::accept(listener, service));
190
191        let cluster_provider = self.cluster_provider.clone();
192        let inner_local_addr = local_addr.clone();
193        let mut cluster_provider_task =
194            tokio::spawn(async move { cluster_provider.serve(&inner_local_addr).await });
195
196        let mut service = Service::<S, P>::try_from(&*self)?;
197        service.address = local_addr.clone();
198        let mut internal_client_task = tokio::spawn(async move {
199            Self::consume_internal_client_commands(internal_client_receiver, service).await
200        });
201
202        let admin_commands_fut = self.consume_admin_commands(admin_receiver);
203
204        #[cfg(feature = "http")]
205        let mut cluster_storage_http_server_task =
206            if let Some(addr) = self.http_members_storage_address.clone() {
207                let inner_members_storage = self.cluster_provider.members_storage().clone();
208                tokio::spawn(async move {
209                    crate::cluster::storage::http::serve(addr, inner_members_storage)
210                        .await
211                        .ok();
212                })
213            } else {
214                tokio::spawn(async move {
215                    warn!("HTTP Members Storage not enabled");
216                    loop {
217                        tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
218                    }
219                })
220            };
221
222        #[cfg(not(feature = "http"))]
223        let mut cluster_storage_http_server_task = tokio::spawn(async move {
224            warn!("HTTP Members Storage not enabled");
225            loop {
226                tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
227            }
228        });
229
230        tokio::select! {
231            accept_result = &mut accept_task => {
232                accept_result
233                    .map_err(|err| {
234                        error!(
235                            "accept: JoinHandle error: {}",
236                            err
237                        );
238                        ServerError::Run
239                    })??;
240            }
241            cluster_provider_serve_result = &mut cluster_provider_task => {
242                cluster_provider_serve_result
243                    .map_err(|err| {
244                        error!(
245                            "cluster provider server: JoinHandle error: {}",
246                            err
247                        );
248                        ServerError::Run
249                    })?
250                .map_err(ServerError::ClusterProviderServe)?;
251                warn!("Cluster provider has finished");
252            }
253            internal_client_result = &mut internal_client_task => {
254                internal_client_result
255                    .map_err(|err| {
256                        error!(
257                            "internal client: JoinHandle error: {}",
258                            err
259                        );
260                        ServerError::Run
261                    })??;
262                warn!("Internal client consumer finished first");
263            }
264            _ = admin_commands_fut => {
265                warn!("Admin command serve finished first");
266            }
267            _ = &mut cluster_storage_http_server_task => {
268                warn!("Http Server for Cluster Storage finished earlier");
269            }
270
271        }
272
273        info!("Stoping server");
274        accept_task.abort();
275        cluster_provider_task.abort();
276        internal_client_task.abort();
277        cluster_storage_http_server_task.abort();
278        info!("Server stopped");
279
280        Ok(())
281    }
282
283    async fn accept(listener: TcpListener, service: Service<S, P>) -> ServerResult<()> {
284        let local_addr = listener.local_addr().map_err(|_| {
285            ServerError::Bind("Cannot get the local address for the listener".to_string())
286        })?;
287        info!("Listening on `{}`", local_addr);
288        let mut joinset = JoinSet::new();
289
290        loop {
291            let (stream, _) = listener.accept().await.map_err(|_| ServerError::Run)?;
292            let mut service: Service<S, P> = service.clone();
293
294            ServiceExt::<RequestEnvelope>::ready(&mut service)
295                .await
296                .map_err(|_| ServerError::Run)?;
297            ServiceExt::<SubscriptionRequest>::ready(&mut service)
298                .await
299                .map_err(|_| ServerError::Run)?;
300
301            joinset.spawn(async move { service.run(stream).await });
302        }
303    }
304
305    /// Consumes messages coming from a mpsc channel and make the bridge to
306    /// the server service (need better naming here) to call another object service
307    async fn consume_internal_client_commands(
308        mut receiver: InternalClientReceiver,
309        service: Service<S, P>,
310    ) -> ServerResult<()> {
311        let mut joinset = JoinSet::new();
312        while let Some(message) = receiver.recv().await {
313            let mut inner_service = service.clone();
314            joinset.spawn(async move {
315                let resp = inner_service
316                    .call(message.request)
317                    .await
318                    .unwrap_or_else(ResponseEnvelope::err);
319                message
320                    .response_channel
321                    .send(resp.body)
322                    .inspect_err(|_| {
323                        error!("The caller dropped");
324                    })
325                    .ok();
326            });
327        }
328        let _ = joinset.join_all().await;
329        Ok(())
330    }
331
332    /// Receives the messages from the AdminReceiver channel
333    ///
334    /// These are operations that need to be protected from external access, and only
335    /// ServiceObjects have access to this channel - the channel is in the AppData
336    async fn consume_admin_commands(&self, mut admin_receiver: AdminReceiver) {
337        while let Some(message) = admin_receiver.recv().await {
338            match message {
339                // TODO I think this only works for the current server,
340                //      meaning, if the service object is running on another
341                //      node, it won't shutdown
342                AdminCommands::Shutdown(object_kind, object_id) => {
343                    let registry = self.registry.write().await;
344                    registry
345                        .remove(object_kind.clone(), object_id.clone())
346                        .await;
347                    self.object_placement_provider
348                        .write()
349                        .await
350                        .remove(&ObjectId(object_kind, object_id))
351                        .await;
352                }
353                AdminCommands::ServerExit => {
354                    // Exists `while` to terminate the server
355                    println!("I got a message to terminate this thing here. So Ill try");
356                    return;
357                }
358            }
359        }
360    }
361}
362
363/// Transforms a [Server] into a [Service]
364///
365/// It can't be infalible, because it needs to be bind
366/// so it can generate a Service
367impl<S, C, P> TryFrom<&Server<S, C, P>> for Service<S, P>
368where
369    S: MembershipStorage + 'static,
370    C: ClusterProvider<S> + 'static + Send + Sync,
371    P: ObjectPlacement + 'static,
372{
373    type Error = ServerError;
374    fn try_from(server: &Server<S, C, P>) -> Result<Self, Self::Error> {
375        let address = "".to_string();
376        let registry = server.registry.clone();
377        let object_placement_provider = server.object_placement_provider.clone();
378        let app_data = server.app_data.clone();
379        let members_storage = server.cluster_provider.members_storage().clone();
380
381        Ok(Service {
382            address,
383            registry,
384            members_storage,
385            object_placement_provider,
386            app_data,
387        })
388    }
389}
390
391#[cfg(test)]
392mod test {
393    use super::*;
394    use crate::cluster::membership_protocol::local::LocalClusterProvider;
395    use crate::cluster::storage::local::LocalStorage;
396    use crate::object_placement::local::LocalObjectPlacement;
397    use crate::registry::Registry;
398
399    #[tokio::test]
400    async fn client_builder_sanity_check() {
401        let _server = Server::builder()
402            .address("0.0.0.0:80".to_string())
403            .registry(Registry::default())
404            .app_data(AppData::new())
405            .cluster_provider(LocalClusterProvider {
406                members_storage: LocalStorage::default(),
407            })
408            .object_placement_provider(LocalObjectPlacement::default())
409            .build();
410    }
411}