scrappy_server/
config.rs

1use std::collections::HashMap;
2use std::{fmt, io, net};
3
4use scrappy_rt::net::TcpStream;
5use scrappy_service as scrappy;
6use scrappy_utils::counter::CounterGuard;
7use futures::future::{ok, Future, FutureExt, LocalBoxFuture};
8use log::error;
9
10use super::builder::bind_addr;
11use super::service::{
12    BoxedServerService, InternalServiceFactory, ServerMessage, StreamService,
13};
14use super::Token;
15
16pub struct ServiceConfig {
17    pub(crate) services: Vec<(String, net::TcpListener)>,
18    pub(crate) apply: Option<Box<dyn ServiceRuntimeConfiguration>>,
19    pub(crate) threads: usize,
20    pub(crate) backlog: i32,
21}
22
23impl ServiceConfig {
24    pub(super) fn new(threads: usize, backlog: i32) -> ServiceConfig {
25        ServiceConfig {
26            threads,
27            backlog,
28            services: Vec::new(),
29            apply: None,
30        }
31    }
32
33    /// Set number of workers to start.
34    ///
35    /// By default server uses number of available logical cpu as workers
36    /// count.
37    pub fn workers(&mut self, num: usize) {
38        self.threads = num;
39    }
40
41    /// Add new service to server
42    pub fn bind<U, N: AsRef<str>>(&mut self, name: N, addr: U) -> io::Result<&mut Self>
43    where
44        U: net::ToSocketAddrs,
45    {
46        let sockets = bind_addr(addr, self.backlog)?;
47
48        for lst in sockets {
49            self.listen(name.as_ref(), lst);
50        }
51
52        Ok(self)
53    }
54
55    /// Add new service to server
56    pub fn listen<N: AsRef<str>>(&mut self, name: N, lst: net::TcpListener) -> &mut Self {
57        if self.apply.is_none() {
58            self.apply = Some(Box::new(not_configured));
59        }
60        self.services.push((name.as_ref().to_string(), lst));
61        self
62    }
63
64    /// Register service configuration function. This function get called
65    /// during worker runtime configuration. It get executed in worker thread.
66    pub fn apply<F>(&mut self, f: F) -> io::Result<()>
67    where
68        F: Fn(&mut ServiceRuntime) + Send + Clone + 'static,
69    {
70        self.apply = Some(Box::new(f));
71        Ok(())
72    }
73}
74
75pub(super) struct ConfiguredService {
76    rt: Box<dyn ServiceRuntimeConfiguration>,
77    names: HashMap<Token, (String, net::SocketAddr)>,
78    topics: HashMap<String, Token>,
79    services: Vec<Token>,
80}
81
82impl ConfiguredService {
83    pub(super) fn new(rt: Box<dyn ServiceRuntimeConfiguration>) -> Self {
84        ConfiguredService {
85            rt,
86            names: HashMap::new(),
87            topics: HashMap::new(),
88            services: Vec::new(),
89        }
90    }
91
92    pub(super) fn stream(&mut self, token: Token, name: String, addr: net::SocketAddr) {
93        self.names.insert(token, (name.clone(), addr));
94        self.topics.insert(name.clone(), token);
95        self.services.push(token);
96    }
97}
98
99impl InternalServiceFactory for ConfiguredService {
100    fn name(&self, token: Token) -> &str {
101        &self.names[&token].0
102    }
103
104    fn clone_factory(&self) -> Box<dyn InternalServiceFactory> {
105        Box::new(Self {
106            rt: self.rt.clone(),
107            names: self.names.clone(),
108            topics: self.topics.clone(),
109            services: self.services.clone(),
110        })
111    }
112
113    fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>> {
114        // configure services
115        let mut rt = ServiceRuntime::new(self.topics.clone());
116        self.rt.configure(&mut rt);
117        rt.validate();
118        let mut names = self.names.clone();
119        let tokens = self.services.clone();
120
121        // construct services
122        async move {
123            let mut services = rt.services;
124            // TODO: Proper error handling here
125            for f in rt.onstart.into_iter() {
126                f.await;
127            }
128            let mut res = vec![];
129            for token in tokens {
130                if let Some(srv) = services.remove(&token) {
131                    let newserv = srv.new_service(());
132                    match newserv.await {
133                        Ok(serv) => {
134                            res.push((token, serv));
135                        }
136                        Err(_) => {
137                            error!("Can not construct service");
138                            return Err(());
139                        }
140                    }
141                } else {
142                    let name = names.remove(&token).unwrap().0;
143                    res.push((
144                        token,
145                        Box::new(StreamService::new(scrappy::fn_service(
146                            move |_: TcpStream| {
147                                error!("Service {:?} is not configured", name);
148                                ok::<_, ()>(())
149                            },
150                        ))),
151                    ));
152                };
153            }
154            return Ok(res);
155        }
156        .boxed_local()
157    }
158}
159
160pub(super) trait ServiceRuntimeConfiguration: Send {
161    fn clone(&self) -> Box<dyn ServiceRuntimeConfiguration>;
162
163    fn configure(&self, rt: &mut ServiceRuntime);
164}
165
166impl<F> ServiceRuntimeConfiguration for F
167where
168    F: Fn(&mut ServiceRuntime) + Send + Clone + 'static,
169{
170    fn clone(&self) -> Box<dyn ServiceRuntimeConfiguration> {
171        Box::new(self.clone())
172    }
173
174    fn configure(&self, rt: &mut ServiceRuntime) {
175        (self)(rt)
176    }
177}
178
179fn not_configured(_: &mut ServiceRuntime) {
180    error!("Service is not configured");
181}
182
183pub struct ServiceRuntime {
184    names: HashMap<String, Token>,
185    services: HashMap<Token, BoxedNewService>,
186    onstart: Vec<LocalBoxFuture<'static, ()>>,
187}
188
189impl ServiceRuntime {
190    fn new(names: HashMap<String, Token>) -> Self {
191        ServiceRuntime {
192            names,
193            services: HashMap::new(),
194            onstart: Vec::new(),
195        }
196    }
197
198    fn validate(&self) {
199        for (name, token) in &self.names {
200            if !self.services.contains_key(&token) {
201                error!("Service {:?} is not configured", name);
202            }
203        }
204    }
205
206    /// Register service.
207    ///
208    /// Name of the service must be registered during configuration stage with
209    /// *ServiceConfig::bind()* or *ServiceConfig::listen()* methods.
210    pub fn service<T, F>(&mut self, name: &str, service: F)
211    where
212        F: scrappy::IntoServiceFactory<T>,
213        T: scrappy::ServiceFactory<Config = (), Request = TcpStream> + 'static,
214        T::Future: 'static,
215        T::Service: 'static,
216        T::InitError: fmt::Debug,
217    {
218        // let name = name.to_owned();
219        if let Some(token) = self.names.get(name) {
220            self.services.insert(
221                token.clone(),
222                Box::new(ServiceFactory {
223                    inner: service.into_factory(),
224                }),
225            );
226        } else {
227            panic!("Unknown service: {:?}", name);
228        }
229    }
230
231    /// Execute future before services initialization.
232    pub fn on_start<F>(&mut self, fut: F)
233    where
234        F: Future<Output = ()> + 'static,
235    {
236        self.onstart.push(fut.boxed_local())
237    }
238}
239
240type BoxedNewService = Box<
241    dyn scrappy::ServiceFactory<
242        Request = (Option<CounterGuard>, ServerMessage),
243        Response = (),
244        Error = (),
245        InitError = (),
246        Config = (),
247        Service = BoxedServerService,
248        Future = LocalBoxFuture<'static, Result<BoxedServerService, ()>>,
249    >,
250>;
251
252struct ServiceFactory<T> {
253    inner: T,
254}
255
256impl<T> scrappy::ServiceFactory for ServiceFactory<T>
257where
258    T: scrappy::ServiceFactory<Config = (), Request = TcpStream>,
259    T::Future: 'static,
260    T::Service: 'static,
261    T::Error: 'static,
262    T::InitError: fmt::Debug + 'static,
263{
264    type Request = (Option<CounterGuard>, ServerMessage);
265    type Response = ();
266    type Error = ();
267    type InitError = ();
268    type Config = ();
269    type Service = BoxedServerService;
270    type Future = LocalBoxFuture<'static, Result<BoxedServerService, ()>>;
271
272    fn new_service(&self, _: ()) -> Self::Future {
273        let fut = self.inner.new_service(());
274        async move {
275            return match fut.await {
276                Ok(s) => Ok(Box::new(StreamService::new(s)) as BoxedServerService),
277                Err(e) => {
278                    error!("Can not construct service: {:?}", e);
279                    Err(())
280                }
281            };
282        }
283        .boxed_local()
284    }
285}