zlink_core/server/
mod.rs

1pub(crate) mod listener;
2mod select_all;
3pub mod service;
4
5use futures_util::{FutureExt, StreamExt};
6use mayheap::Vec;
7use select_all::SelectAll;
8use service::MethodReply;
9
10use crate::{
11    connection::{ReadConnection, Socket, WriteConnection},
12    Call, Connection, Reply,
13};
14
15/// A server.
16///
17/// The server listens for incoming connections and handles method calls using a service.
18#[derive(Debug)]
19pub struct Server<Listener, Service> {
20    listener: Option<Listener>,
21    service: Service,
22}
23
24impl<Listener, Service> Server<Listener, Service>
25where
26    Listener: listener::Listener,
27    Service: service::Service,
28{
29    /// Create a new server that serves `service` to incomming connections from `listener`.
30    pub fn new(listener: Listener, service: Service) -> Self {
31        Self {
32            listener: Some(listener),
33            service,
34        }
35    }
36
37    /// Run the server.
38    ///
39    /// # Caveats
40    ///
41    /// Due to [a bug in the rust compiler][abrc], the future returned by this method can not be
42    /// treated as `Send`, even if all the specific types involved are `Send`. A major consequence
43    /// of this fact unfortunately, is that it can not be spawned in a task of a multi-threaded
44    /// runtime. For example, you can not currently do `tokio::spawn(server.run())`.
45    ///
46    /// Fortunately, there are easy workarounds for this. You can either:
47    ///
48    /// * Use a thread-local runtime (for example [`tokio::runtime::LocalRuntime`] or
49    ///   [`tokio::task::LocalSet`]) to run the server in a local task, perhaps in a seprate thread.
50    /// * Use some common API to run multiple futures at once, such as [`futures::select!`] or
51    ///   [`tokio::select!`].
52    ///
53    /// Most importantly, this is most likely a temporary issue and will be fixed in the future. 😊
54    ///
55    /// [abrc]: https://github.com/rust-lang/rust/issues/100013
56    /// [`tokio::runtime::LocalRuntime`]: https://docs.rs/tokio/latest/tokio/runtime/struct.LocalRuntime.html
57    /// [`tokio::task::LocalSet`]: https://docs.rs/tokio/latest/tokio/task/struct.LocalSet.html
58    /// [`futures::select!`]: https://docs.rs/futures/latest/futures/macro.select.html
59    /// [`tokio::select!`]: https://docs.rs/tokio/latest/tokio/macro.select.html
60    pub async fn run(mut self) -> crate::Result<()> {
61        let mut listener = self.listener.take().unwrap();
62        let mut readers = Vec::<_, MAX_CONNECTIONS>::new();
63        let mut writers = Vec::<_, MAX_CONNECTIONS>::new();
64        let mut reply_streams =
65            Vec::<ReplyStream<Service::ReplyStream, Listener::Socket>, MAX_CONNECTIONS>::new();
66        let mut last_reply_stream_winner = None;
67        let mut last_method_call_winner = None;
68
69        loop {
70            let mut reply_stream_futures: Vec<_, MAX_CONNECTIONS> =
71                reply_streams.iter_mut().map(|s| s.stream.next()).collect();
72            let start_index = last_reply_stream_winner.map(|idx| idx + 1);
73            let mut reply_stream_select_all = SelectAll::new(start_index);
74            for future in reply_stream_futures.iter_mut() {
75                reply_stream_select_all
76                    .push(future)
77                    .map_err(|_| crate::Error::BufferOverflow)?;
78            }
79
80            futures_util::select_biased! {
81                // 1. Accept a new connection.
82                conn = listener.accept().fuse() => {
83                    let conn = conn?;
84                    let (read, write) = conn.split();
85                    readers
86                        .push(read)
87                        .map_err(|_| crate::Error::BufferOverflow)?;
88                    writers
89                        .push(write)
90                        .map_err(|_| crate::Error::BufferOverflow)?;
91                }
92                // 2. Read method calls from the existing connections and handle them.
93                res = self.get_next_call(
94                    // SAFETY: `readers` is not invalidated or dropped until the output of this
95                    // future is dropped.
96                    unsafe { &mut *(&mut readers as *mut _) },
97                    last_method_call_winner.map(|idx| idx + 1),
98                ).fuse() => {
99                        let (idx, call) = res?;
100                        last_method_call_winner = Some(idx);
101
102                        let mut stream = None;
103                        let mut remove = true;
104                        match call {
105                            Ok(call) => match self.handle_call(call, &mut writers[idx]).await {
106                                Ok(None) => remove = false,
107                                Ok(Some(s)) => stream = Some(s),
108                                Err(e) => warn!("Error writing to connection: {:?}", e),
109                            },
110                            Err(e) => warn!("Error reading from socket: {:?}", e),
111                        }
112
113                        if stream.is_some() || remove {
114                            let reader = readers.remove(idx);
115                            let writer = writers.remove(idx);
116
117                            #[cfg(not(feature = "std"))]
118                            drop(reply_stream_futures);
119                            if let Some(stream) = stream.map(|s| ReplyStream::new(s, reader, writer)) {
120                                reply_streams
121                                    .push(stream)
122                                    .map_err(|_| crate::Error::BufferOverflow)?;
123                            }
124                        }
125                }
126                // 3. Read replies from the reply streams and send them off.
127                reply = reply_stream_select_all.fuse() => {
128                    #[cfg(not(feature = "std"))]
129                    drop(reply_stream_futures);
130                    let (idx, reply) = reply;
131                    last_reply_stream_winner = Some(idx);
132                    let id = reply_streams.get(idx).unwrap().conn.id();
133
134                    match reply {
135                        Some(reply) => {
136                            if let Err(e) = reply_streams
137                                .get_mut(idx)
138                                .unwrap()
139                                .conn
140                                .write_mut()
141                                .send_reply(&reply)
142                                .await
143                            {
144                                warn!("Error writing to client {}: {:?}", id, e);
145                                reply_streams.remove(idx);
146                            }
147                        }
148                        None => {
149                            trace!("Stream closed for client {}", id);
150                            let stream = reply_streams.remove(idx);
151
152                            let (read, write) = stream.conn.split();
153                            readers
154                                .push(read)
155                                .map_err(|_| crate::Error::BufferOverflow)?;
156                            writers
157                                .push(write)
158                                .map_err(|_| crate::Error::BufferOverflow)?;
159                        }
160                    }
161                }
162            }
163        }
164    }
165
166    /// Read the next method call from the connection.
167    ///
168    /// # Return value
169    ///
170    /// On success, this method returns a tuple containing:
171    ///
172    /// * The index of the reader that yielded a call.
173    /// * A Result, containing a method call if reading was successful.
174    async fn get_next_call<'r>(
175        &mut self,
176        readers: &'r mut Vec<
177            ReadConnection<<<Listener as crate::Listener>::Socket as Socket>::ReadHalf>,
178            16,
179        >,
180        start_index: Option<usize>,
181    ) -> crate::Result<(usize, crate::Result<Call<Service::MethodCall<'r>>>)> {
182        let mut read_futures: Vec<_, 16> = readers.iter_mut().map(|r| r.receive_call()).collect();
183        let mut select_all = SelectAll::new(start_index);
184        for future in &mut read_futures {
185            // Safety: `future` is in fact `Unpin` but the compiler doesn't know that.
186            unsafe {
187                select_all
188                    .push_unchecked(future)
189                    .map_err(|_| crate::Error::BufferOverflow)?;
190            }
191        }
192
193        Ok(select_all.await)
194    }
195
196    async fn handle_call(
197        &mut self,
198        call: Call<Service::MethodCall<'_>>,
199        writer: &mut WriteConnection<<Listener::Socket as Socket>::WriteHalf>,
200    ) -> crate::Result<Option<Service::ReplyStream>> {
201        let mut stream = None;
202        match self.service.handle(call).await {
203            MethodReply::Single(params) => {
204                let reply = Reply::new(params).set_continues(Some(false));
205                writer.send_reply(&reply).await?
206            }
207            MethodReply::Error(err) => writer.send_error(&err).await?,
208            MethodReply::Multi(s) => {
209                trace!("Client {} now turning into a reply stream", writer.id());
210                stream = Some(s)
211            }
212        }
213
214        Ok(stream)
215    }
216}
217
218const MAX_CONNECTIONS: usize = 16;
219
220/// Method reply stream and connection pair.
221#[derive(Debug)]
222struct ReplyStream<St, Sock: Socket> {
223    stream: St,
224    conn: Connection<Sock>,
225}
226
227impl<St, Sock> ReplyStream<St, Sock>
228where
229    Sock: Socket,
230{
231    fn new(
232        stream: St,
233        read_conn: ReadConnection<Sock::ReadHalf>,
234        write_conn: WriteConnection<Sock::WriteHalf>,
235    ) -> Self {
236        Self {
237            stream,
238            conn: Connection::join(read_conn, write_conn),
239        }
240    }
241}