use std::{cell::RefCell, fmt, io, marker, mem, net, rc::Rc};
use ntex_io::Io;
use ntex_service::{IntoServiceFactory, ServiceFactory, cfg::SharedCfg};
use ntex_util::{HashMap, future::BoxFuture, future::Ready};
use super::factory::{
self, BoxServerService, FactoryService, FactoryServiceType, NetService,
};
use super::{Token, builder::bind_addr, socket::Listener};
#[derive(Clone, Debug)]
pub struct Config(Rc<InnerServiceConfig>);
#[derive(Debug)]
pub(super) struct InnerServiceConfig {
pub(super) config: RefCell<SharedCfg>,
}
impl Default for Config {
fn default() -> Self {
Self(Rc::new(InnerServiceConfig {
config: RefCell::new(SharedCfg::default()),
}))
}
}
impl Config {
pub fn config<T: Into<SharedCfg>>(&self, cfg: T) -> &Self {
*self.0.config.borrow_mut() = cfg.into();
self
}
pub(super) fn get_config(&self) -> SharedCfg {
self.0.config.borrow().clone()
}
}
#[derive(Clone, Debug)]
pub struct ServiceConfig(pub(super) Rc<RefCell<ServiceConfigInner>>);
#[derive(Debug)]
struct Socket {
name: String,
config: SharedCfg,
sockets: Vec<(Token, Listener, SharedCfg)>,
}
pub(super) struct ServiceConfigInner {
token: Token,
on_start_set: bool,
on_start: Vec<Box<dyn OnWorkerStart>>,
sockets: Vec<Socket>,
backlog: i32,
}
impl fmt::Debug for ServiceConfigInner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ServiceConfigInner")
.field("token", &self.token)
.field("backlog", &self.backlog)
.field("sockets", &self.sockets)
.finish()
}
}
impl ServiceConfig {
pub(super) fn new(token: Token, backlog: i32) -> Self {
ServiceConfig(Rc::new(RefCell::new(ServiceConfigInner {
token,
backlog,
sockets: Vec::new(),
on_start_set: false,
on_start: vec![OnWorkerStartWrapper::create(|_| {
not_configured();
Ready::Ok::<_, &str>(())
})],
})))
}
pub fn bind<U, N: AsRef<str>>(&self, name: N, addr: U) -> io::Result<&Self>
where
U: net::ToSocketAddrs,
{
let mut inner = self.0.borrow_mut();
let sockets = bind_addr(addr, inner.backlog)?;
let socket = Socket {
name: name.as_ref().to_string(),
config: SharedCfg::default(),
sockets: sockets
.into_iter()
.map(|lst| {
(
inner.token.next(),
Listener::from_tcp(lst),
SharedCfg::default(),
)
})
.collect(),
};
inner.sockets.push(socket);
Ok(self)
}
pub fn listen<N: AsRef<str>>(&self, name: N, lst: net::TcpListener) -> &Self {
let mut inner = self.0.borrow_mut();
let socket = Socket {
name: name.as_ref().to_string(),
config: SharedCfg::default(),
sockets: vec![(
inner.token.next(),
Listener::from_tcp(lst),
SharedCfg::default(),
)],
};
inner.sockets.push(socket);
self
}
pub fn config<N, T>(&self, name: N, cfg: T) -> &Self
where
N: AsRef<str>,
T: Into<SharedCfg>,
{
let cfg = cfg.into();
let mut inner = self.0.borrow_mut();
for sock in &mut inner.sockets {
if sock.name == name.as_ref() {
sock.config = cfg.clone();
for item in &mut sock.sockets {
item.2 = cfg.clone();
}
}
}
self
}
pub fn on_worker_start<F, E>(&self, f: F) -> &Self
where
F: AsyncFn(ServiceRuntime) -> Result<(), E> + Send + Clone + 'static,
E: fmt::Display + 'static,
{
let mut inner = self.0.borrow_mut();
if !inner.on_start_set {
inner.on_start.clear();
inner.on_start_set = true;
}
inner.on_start.push(OnWorkerStartWrapper::create(f));
self
}
pub(super) fn into_factory(
self,
) -> (Token, Vec<(Token, String, Listener)>, FactoryServiceType) {
let mut inner = self.0.borrow_mut();
let mut sockets = Vec::new();
let mut names = HashMap::default();
for (idx, s) in mem::take(&mut inner.sockets).into_iter().enumerate() {
names.insert(
s.name.clone(),
Entry {
idx,
name: s.name.clone(),
config: s.config,
tokens: s
.sockets
.iter()
.map(|(token, _, cfg)| (*token, cfg.clone()))
.collect(),
},
);
sockets.extend(
s.sockets
.into_iter()
.map(|(token, lst, _)| (token, s.name.clone(), lst)),
);
}
(
inner.token,
sockets,
Box::new(ConfiguredService {
names,
on_start: mem::take(&mut inner.on_start),
}),
)
}
}
struct ConfiguredService {
names: HashMap<String, Entry>,
on_start: Vec<Box<dyn OnWorkerStart>>,
}
impl FactoryService for ConfiguredService {
fn clone_factory(&self) -> FactoryServiceType {
Box::new(Self {
names: self.names.clone(),
on_start: self.on_start.iter().map(|cb| (*cb).clone()).collect(),
})
}
fn create(&self) -> BoxFuture<'static, Result<Vec<NetService>, ()>> {
let rt = ServiceRuntime::new(self.names.clone());
let on_start: Vec<_> = self
.on_start
.iter()
.map(|cb| cb.run(ServiceRuntime(rt.0.clone())))
.collect();
Box::pin(async move {
for fut in on_start {
fut.await?;
}
rt.validate();
let names = mem::take(&mut rt.0.borrow_mut().names);
let mut services = mem::take(&mut rt.0.borrow_mut().services);
let mut res = Vec::new();
while let Some(svc) = services.pop() {
if let Some(svc) = svc {
for entry in names.values() {
if entry.idx == services.len() {
res.push(NetService {
config: entry.config.clone(),
name: std::sync::Arc::from(entry.name.clone()),
tokens: entry.tokens.clone(),
factory: svc,
});
break;
}
}
}
}
Ok(res)
})
}
}
fn not_configured() {
log::error!("Service is not configured");
}
pub struct ServiceRuntime(Rc<RefCell<ServiceRuntimeInner>>);
#[derive(Debug, Clone)]
struct Entry {
idx: usize,
name: String,
config: SharedCfg,
tokens: Vec<(Token, SharedCfg)>,
}
struct ServiceRuntimeInner {
names: HashMap<String, Entry>,
services: Vec<Option<BoxServerService>>,
}
impl fmt::Debug for ServiceRuntime {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let inner = self.0.borrow();
f.debug_struct("ServiceRuntimer")
.field("names", &inner.names)
.field("services", &inner.services)
.finish()
}
}
impl ServiceRuntime {
fn new(names: HashMap<String, Entry>) -> Self {
ServiceRuntime(Rc::new(RefCell::new(ServiceRuntimeInner {
services: (0..names.len()).map(|_| None).collect(),
names,
})))
}
fn validate(&self) {
let inner = self.0.as_ref().borrow();
for (name, item) in &inner.names {
if inner.services[item.idx].is_none() {
log::error!("Service {name:?} is not configured");
}
}
}
pub fn service<T, F>(&self, name: &str, service: F) -> &Self
where
F: IntoServiceFactory<T, Io, SharedCfg>,
T: ServiceFactory<Io, SharedCfg> + 'static,
T::Service: 'static,
T::InitError: fmt::Debug,
{
let mut inner = self.0.borrow_mut();
if let Some(entry) = inner.names.get_mut(name) {
let idx = entry.idx;
inner.services[idx] = Some(factory::create_boxed_factory(
name.to_string(),
service.into_factory(),
));
} else {
panic!("Unknown service: {name:?}");
}
self
}
pub fn config<T: Into<SharedCfg>>(&self, name: &str, cfg: T) -> &Self {
let mut inner = self.0.borrow_mut();
if let Some(entry) = inner.names.get_mut(name) {
entry.config = cfg.into();
}
self
}
}
trait OnWorkerStart: Send {
fn clone(&self) -> Box<dyn OnWorkerStart>;
fn run(&self, rt: ServiceRuntime) -> BoxFuture<'static, Result<(), ()>>;
}
struct OnWorkerStartWrapper<F, E> {
pub(super) f: F,
pub(super) _t: marker::PhantomData<E>,
}
impl<F, E> OnWorkerStartWrapper<F, E>
where
F: AsyncFn(ServiceRuntime) -> Result<(), E> + Send + Clone + 'static,
E: fmt::Display + 'static,
{
pub(super) fn create(f: F) -> Box<dyn OnWorkerStart + Send> {
Box::new(Self {
f,
_t: marker::PhantomData,
})
}
}
unsafe impl<F, E> Send for OnWorkerStartWrapper<F, E> where F: Send {}
impl<F, E> OnWorkerStart for OnWorkerStartWrapper<F, E>
where
F: AsyncFn(ServiceRuntime) -> Result<(), E> + Send + Clone + 'static,
E: fmt::Display + 'static,
{
fn clone(&self) -> Box<dyn OnWorkerStart> {
Box::new(Self {
f: self.f.clone(),
_t: marker::PhantomData,
})
}
fn run(&self, rt: ServiceRuntime) -> BoxFuture<'static, Result<(), ()>> {
let f = self.f.clone();
Box::pin(async move {
(f)(rt).await.map_err(|e| {
log::error!("On worker start callback failed: {e}");
})
})
}
}