1use std::collections::HashMap;
2use std::{fmt, io, net};
3
4use scrappy_rt::net::TcpStream;
5use scrappy_service as scrappy;
6use scrappy_utils::counter::CounterGuard;
7use futures::future::{ok, Future, FutureExt, LocalBoxFuture};
8use log::error;
9
10use super::builder::bind_addr;
11use super::service::{
12 BoxedServerService, InternalServiceFactory, ServerMessage, StreamService,
13};
14use super::Token;
15
16pub struct ServiceConfig {
17 pub(crate) services: Vec<(String, net::TcpListener)>,
18 pub(crate) apply: Option<Box<dyn ServiceRuntimeConfiguration>>,
19 pub(crate) threads: usize,
20 pub(crate) backlog: i32,
21}
22
23impl ServiceConfig {
24 pub(super) fn new(threads: usize, backlog: i32) -> ServiceConfig {
25 ServiceConfig {
26 threads,
27 backlog,
28 services: Vec::new(),
29 apply: None,
30 }
31 }
32
33 pub fn workers(&mut self, num: usize) {
38 self.threads = num;
39 }
40
41 pub fn bind<U, N: AsRef<str>>(&mut self, name: N, addr: U) -> io::Result<&mut Self>
43 where
44 U: net::ToSocketAddrs,
45 {
46 let sockets = bind_addr(addr, self.backlog)?;
47
48 for lst in sockets {
49 self.listen(name.as_ref(), lst);
50 }
51
52 Ok(self)
53 }
54
55 pub fn listen<N: AsRef<str>>(&mut self, name: N, lst: net::TcpListener) -> &mut Self {
57 if self.apply.is_none() {
58 self.apply = Some(Box::new(not_configured));
59 }
60 self.services.push((name.as_ref().to_string(), lst));
61 self
62 }
63
64 pub fn apply<F>(&mut self, f: F) -> io::Result<()>
67 where
68 F: Fn(&mut ServiceRuntime) + Send + Clone + 'static,
69 {
70 self.apply = Some(Box::new(f));
71 Ok(())
72 }
73}
74
75pub(super) struct ConfiguredService {
76 rt: Box<dyn ServiceRuntimeConfiguration>,
77 names: HashMap<Token, (String, net::SocketAddr)>,
78 topics: HashMap<String, Token>,
79 services: Vec<Token>,
80}
81
82impl ConfiguredService {
83 pub(super) fn new(rt: Box<dyn ServiceRuntimeConfiguration>) -> Self {
84 ConfiguredService {
85 rt,
86 names: HashMap::new(),
87 topics: HashMap::new(),
88 services: Vec::new(),
89 }
90 }
91
92 pub(super) fn stream(&mut self, token: Token, name: String, addr: net::SocketAddr) {
93 self.names.insert(token, (name.clone(), addr));
94 self.topics.insert(name.clone(), token);
95 self.services.push(token);
96 }
97}
98
99impl InternalServiceFactory for ConfiguredService {
100 fn name(&self, token: Token) -> &str {
101 &self.names[&token].0
102 }
103
104 fn clone_factory(&self) -> Box<dyn InternalServiceFactory> {
105 Box::new(Self {
106 rt: self.rt.clone(),
107 names: self.names.clone(),
108 topics: self.topics.clone(),
109 services: self.services.clone(),
110 })
111 }
112
113 fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>> {
114 let mut rt = ServiceRuntime::new(self.topics.clone());
116 self.rt.configure(&mut rt);
117 rt.validate();
118 let mut names = self.names.clone();
119 let tokens = self.services.clone();
120
121 async move {
123 let mut services = rt.services;
124 for f in rt.onstart.into_iter() {
126 f.await;
127 }
128 let mut res = vec![];
129 for token in tokens {
130 if let Some(srv) = services.remove(&token) {
131 let newserv = srv.new_service(());
132 match newserv.await {
133 Ok(serv) => {
134 res.push((token, serv));
135 }
136 Err(_) => {
137 error!("Can not construct service");
138 return Err(());
139 }
140 }
141 } else {
142 let name = names.remove(&token).unwrap().0;
143 res.push((
144 token,
145 Box::new(StreamService::new(scrappy::fn_service(
146 move |_: TcpStream| {
147 error!("Service {:?} is not configured", name);
148 ok::<_, ()>(())
149 },
150 ))),
151 ));
152 };
153 }
154 return Ok(res);
155 }
156 .boxed_local()
157 }
158}
159
160pub(super) trait ServiceRuntimeConfiguration: Send {
161 fn clone(&self) -> Box<dyn ServiceRuntimeConfiguration>;
162
163 fn configure(&self, rt: &mut ServiceRuntime);
164}
165
166impl<F> ServiceRuntimeConfiguration for F
167where
168 F: Fn(&mut ServiceRuntime) + Send + Clone + 'static,
169{
170 fn clone(&self) -> Box<dyn ServiceRuntimeConfiguration> {
171 Box::new(self.clone())
172 }
173
174 fn configure(&self, rt: &mut ServiceRuntime) {
175 (self)(rt)
176 }
177}
178
179fn not_configured(_: &mut ServiceRuntime) {
180 error!("Service is not configured");
181}
182
183pub struct ServiceRuntime {
184 names: HashMap<String, Token>,
185 services: HashMap<Token, BoxedNewService>,
186 onstart: Vec<LocalBoxFuture<'static, ()>>,
187}
188
189impl ServiceRuntime {
190 fn new(names: HashMap<String, Token>) -> Self {
191 ServiceRuntime {
192 names,
193 services: HashMap::new(),
194 onstart: Vec::new(),
195 }
196 }
197
198 fn validate(&self) {
199 for (name, token) in &self.names {
200 if !self.services.contains_key(&token) {
201 error!("Service {:?} is not configured", name);
202 }
203 }
204 }
205
206 pub fn service<T, F>(&mut self, name: &str, service: F)
211 where
212 F: scrappy::IntoServiceFactory<T>,
213 T: scrappy::ServiceFactory<Config = (), Request = TcpStream> + 'static,
214 T::Future: 'static,
215 T::Service: 'static,
216 T::InitError: fmt::Debug,
217 {
218 if let Some(token) = self.names.get(name) {
220 self.services.insert(
221 token.clone(),
222 Box::new(ServiceFactory {
223 inner: service.into_factory(),
224 }),
225 );
226 } else {
227 panic!("Unknown service: {:?}", name);
228 }
229 }
230
231 pub fn on_start<F>(&mut self, fut: F)
233 where
234 F: Future<Output = ()> + 'static,
235 {
236 self.onstart.push(fut.boxed_local())
237 }
238}
239
240type BoxedNewService = Box<
241 dyn scrappy::ServiceFactory<
242 Request = (Option<CounterGuard>, ServerMessage),
243 Response = (),
244 Error = (),
245 InitError = (),
246 Config = (),
247 Service = BoxedServerService,
248 Future = LocalBoxFuture<'static, Result<BoxedServerService, ()>>,
249 >,
250>;
251
252struct ServiceFactory<T> {
253 inner: T,
254}
255
256impl<T> scrappy::ServiceFactory for ServiceFactory<T>
257where
258 T: scrappy::ServiceFactory<Config = (), Request = TcpStream>,
259 T::Future: 'static,
260 T::Service: 'static,
261 T::Error: 'static,
262 T::InitError: fmt::Debug + 'static,
263{
264 type Request = (Option<CounterGuard>, ServerMessage);
265 type Response = ();
266 type Error = ();
267 type InitError = ();
268 type Config = ();
269 type Service = BoxedServerService;
270 type Future = LocalBoxFuture<'static, Result<BoxedServerService, ()>>;
271
272 fn new_service(&self, _: ()) -> Self::Future {
273 let fut = self.inner.new_service(());
274 async move {
275 return match fut.await {
276 Ok(s) => Ok(Box::new(StreamService::new(s)) as BoxedServerService),
277 Err(e) => {
278 error!("Can not construct service: {:?}", e);
279 Err(())
280 }
281 };
282 }
283 .boxed_local()
284 }
285}