1pub(crate) mod listener;
2mod select_all;
3pub mod service;
4
5use futures_util::{FutureExt, StreamExt};
6use mayheap::Vec;
7use select_all::SelectAll;
8use service::MethodReply;
9
10use crate::{
11 connection::{ReadConnection, Socket, WriteConnection},
12 Call, Connection, Reply,
13};
14
15#[derive(Debug)]
19pub struct Server<Listener, Service> {
20 listener: Option<Listener>,
21 service: Service,
22}
23
24impl<Listener, Service> Server<Listener, Service>
25where
26 Listener: listener::Listener,
27 Service: service::Service,
28{
29 pub fn new(listener: Listener, service: Service) -> Self {
31 Self {
32 listener: Some(listener),
33 service,
34 }
35 }
36
37 pub async fn run(mut self) -> crate::Result<()> {
61 let mut listener = self.listener.take().unwrap();
62 let mut readers = Vec::<_, MAX_CONNECTIONS>::new();
63 let mut writers = Vec::<_, MAX_CONNECTIONS>::new();
64 let mut reply_streams =
65 Vec::<ReplyStream<Service::ReplyStream, Listener::Socket>, MAX_CONNECTIONS>::new();
66 let mut last_reply_stream_winner = None;
67 let mut last_method_call_winner = None;
68
69 loop {
70 let mut reply_stream_futures: Vec<_, MAX_CONNECTIONS> =
71 reply_streams.iter_mut().map(|s| s.stream.next()).collect();
72 let start_index = last_reply_stream_winner.map(|idx| idx + 1);
73 let mut reply_stream_select_all = SelectAll::new(start_index);
74 for future in reply_stream_futures.iter_mut() {
75 reply_stream_select_all
76 .push(future)
77 .map_err(|_| crate::Error::BufferOverflow)?;
78 }
79
80 futures_util::select_biased! {
81 conn = listener.accept().fuse() => {
83 let conn = conn?;
84 let (read, write) = conn.split();
85 readers
86 .push(read)
87 .map_err(|_| crate::Error::BufferOverflow)?;
88 writers
89 .push(write)
90 .map_err(|_| crate::Error::BufferOverflow)?;
91 }
92 res = self.get_next_call(
94 unsafe { &mut *(&mut readers as *mut _) },
97 last_method_call_winner.map(|idx| idx + 1),
98 ).fuse() => {
99 let (idx, call) = res?;
100 last_method_call_winner = Some(idx);
101
102 let mut stream = None;
103 let mut remove = true;
104 match call {
105 Ok(call) => match self.handle_call(call, &mut writers[idx]).await {
106 Ok(None) => remove = false,
107 Ok(Some(s)) => stream = Some(s),
108 Err(e) => warn!("Error writing to connection: {:?}", e),
109 },
110 Err(e) => warn!("Error reading from socket: {:?}", e),
111 }
112
113 if stream.is_some() || remove {
114 let reader = readers.remove(idx);
115 let writer = writers.remove(idx);
116
117 #[cfg(not(feature = "std"))]
118 drop(reply_stream_futures);
119 if let Some(stream) = stream.map(|s| ReplyStream::new(s, reader, writer)) {
120 reply_streams
121 .push(stream)
122 .map_err(|_| crate::Error::BufferOverflow)?;
123 }
124 }
125 }
126 reply = reply_stream_select_all.fuse() => {
128 #[cfg(not(feature = "std"))]
129 drop(reply_stream_futures);
130 let (idx, reply) = reply;
131 last_reply_stream_winner = Some(idx);
132 let id = reply_streams.get(idx).unwrap().conn.id();
133
134 match reply {
135 Some(reply) => {
136 if let Err(e) = reply_streams
137 .get_mut(idx)
138 .unwrap()
139 .conn
140 .write_mut()
141 .send_reply(&reply)
142 .await
143 {
144 warn!("Error writing to client {}: {:?}", id, e);
145 reply_streams.remove(idx);
146 }
147 }
148 None => {
149 trace!("Stream closed for client {}", id);
150 let stream = reply_streams.remove(idx);
151
152 let (read, write) = stream.conn.split();
153 readers
154 .push(read)
155 .map_err(|_| crate::Error::BufferOverflow)?;
156 writers
157 .push(write)
158 .map_err(|_| crate::Error::BufferOverflow)?;
159 }
160 }
161 }
162 }
163 }
164 }
165
166 async fn get_next_call<'r>(
175 &mut self,
176 readers: &'r mut Vec<
177 ReadConnection<<<Listener as crate::Listener>::Socket as Socket>::ReadHalf>,
178 16,
179 >,
180 start_index: Option<usize>,
181 ) -> crate::Result<(usize, crate::Result<Call<Service::MethodCall<'r>>>)> {
182 let mut read_futures: Vec<_, 16> = readers.iter_mut().map(|r| r.receive_call()).collect();
183 let mut select_all = SelectAll::new(start_index);
184 for future in &mut read_futures {
185 unsafe {
187 select_all
188 .push_unchecked(future)
189 .map_err(|_| crate::Error::BufferOverflow)?;
190 }
191 }
192
193 Ok(select_all.await)
194 }
195
196 async fn handle_call(
197 &mut self,
198 call: Call<Service::MethodCall<'_>>,
199 writer: &mut WriteConnection<<Listener::Socket as Socket>::WriteHalf>,
200 ) -> crate::Result<Option<Service::ReplyStream>> {
201 let mut stream = None;
202 match self.service.handle(call).await {
203 MethodReply::Single(params) => {
204 let reply = Reply::new(params).set_continues(Some(false));
205 writer.send_reply(&reply).await?
206 }
207 MethodReply::Error(err) => writer.send_error(&err).await?,
208 MethodReply::Multi(s) => {
209 trace!("Client {} now turning into a reply stream", writer.id());
210 stream = Some(s)
211 }
212 }
213
214 Ok(stream)
215 }
216}
217
218const MAX_CONNECTIONS: usize = 16;
219
220#[derive(Debug)]
222struct ReplyStream<St, Sock: Socket> {
223 stream: St,
224 conn: Connection<Sock>,
225}
226
227impl<St, Sock> ReplyStream<St, Sock>
228where
229 Sock: Socket,
230{
231 fn new(
232 stream: St,
233 read_conn: ReadConnection<Sock::ReadHalf>,
234 write_conn: WriteConnection<Sock::WriteHalf>,
235 ) -> Self {
236 Self {
237 stream,
238 conn: Connection::join(read_conn, write_conn),
239 }
240 }
241}