use crate::client::Client;
use crate::server::{Server, MSG_RETAINED_PER_CONNECTION_MAX, MSG_RETAINED_PER_WORKER_MAX};
use crate::websocket;
use crate::zhttpsocket;
use crate::zmq::SpecInfo;
use ipnet::IpNet;
use log::info;
use signal_hook;
use signal_hook::consts::TERM_SIGNALS;
use signal_hook::iterator::Signals;
use std::cmp;
use std::path::PathBuf;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::Duration;
const INIT_HWM: usize = 128;
fn make_specs(base: &str, is_server: bool) -> Result<(String, String, String), String> {
if base.starts_with("ipc:") {
if is_server {
Ok((
format!("{}-{}", base, "in"),
format!("{}-{}", base, "in-stream"),
format!("{}-{}", base, "out"),
))
} else {
Ok((
format!("{}-{}", base, "out"),
format!("{}-{}", base, "out-stream"),
format!("{}-{}", base, "in"),
))
}
} else if base.starts_with("tcp:") {
match base.rfind(':') {
Some(pos) => match base[(pos + 1)..base.len()].parse::<u16>() {
Ok(port) => Ok((
format!("{}:{}", &base[..pos], port),
format!("{}:{}", &base[..pos], port + 1),
format!("{}:{}", &base[..pos], port + 2),
)),
Err(e) => Err(format!("error parsing tcp port in base spec: {}", e)),
},
None => Err("tcp base spec must specify port".into()),
}
} else {
Err("base spec must be ipc or tcp".into())
}
}
pub enum ListenSpec {
Tcp {
addr: std::net::SocketAddr,
tls: bool,
default_cert: Option<String>,
},
Local {
path: PathBuf,
mode: Option<u32>,
user: Option<String>,
group: Option<String>,
},
}
pub struct ListenConfig {
pub spec: ListenSpec,
pub stream: bool,
}
pub struct Config {
pub instance_id: String,
pub workers: usize,
pub req_maxconn: usize,
pub stream_maxconn: usize,
pub buffer_size: usize,
pub body_buffer_size: usize,
pub messages_max: usize,
pub req_timeout: Duration,
pub stream_timeout: Duration,
pub listen: Vec<ListenConfig>,
pub zclient_req: Vec<String>,
pub zclient_stream: Vec<String>,
pub zclient_connect: bool,
pub zserver_req: Vec<String>,
pub zserver_stream: Vec<String>,
pub zserver_connect: bool,
pub ipc_file_mode: u32,
pub certs_dir: PathBuf,
pub allow_compression: bool,
pub deny: Vec<IpNet>,
}
pub struct App {
_server: Option<Server>,
_client: Option<Client>,
}
impl App {
pub fn new(config: &Config) -> Result<Self, String> {
if config.req_maxconn < config.workers {
return Err("req maxconn must be >= workers".into());
}
if config.stream_maxconn < config.workers {
return Err("stream maxconn must be >= workers".into());
}
let zmq_context = Arc::new(zmq::Context::new());
let other_hwm = cmp::max((config.req_maxconn + config.stream_maxconn) / 20, 1);
let handle_bound = cmp::max(other_hwm / config.workers, 1);
let maxconn = config.req_maxconn + config.stream_maxconn;
let server = if !config.listen.is_empty() {
let mut any_req = false;
let mut any_stream = false;
for lc in config.listen.iter() {
if lc.stream {
any_stream = true;
} else {
any_req = true;
}
}
let mut zsockman = zhttpsocket::ClientSocketManager::new(
Arc::clone(&zmq_context),
&config.instance_id,
(MSG_RETAINED_PER_CONNECTION_MAX * maxconn)
+ (MSG_RETAINED_PER_WORKER_MAX * config.workers),
INIT_HWM,
other_hwm,
handle_bound,
);
if any_req {
let mut specs = Vec::new();
for spec in config.zclient_req.iter() {
if config.zclient_connect {
info!("zhttp client connect {}", spec);
} else {
info!("zhttp client bind {}", spec);
}
specs.push(SpecInfo {
spec: spec.clone(),
bind: !config.zclient_connect,
ipc_file_mode: config.ipc_file_mode,
});
}
if let Err(e) = zsockman.set_client_req_specs(&specs) {
return Err(format!("failed to set zhttp client req specs: {}", e));
}
}
if any_stream {
let mut out_specs = Vec::new();
let mut out_stream_specs = Vec::new();
let mut in_specs = Vec::new();
for spec in config.zclient_stream.iter() {
let (out_spec, out_stream_spec, in_spec) = make_specs(spec, false)?;
if config.zclient_connect {
info!(
"zhttp client connect {} {} {}",
out_spec, out_stream_spec, in_spec
);
} else {
info!(
"zhttp client bind {} {} {}",
out_spec, out_stream_spec, in_spec
);
}
out_specs.push(SpecInfo {
spec: out_spec,
bind: !config.zclient_connect,
ipc_file_mode: config.ipc_file_mode,
});
out_stream_specs.push(SpecInfo {
spec: out_stream_spec,
bind: !config.zclient_connect,
ipc_file_mode: config.ipc_file_mode,
});
in_specs.push(SpecInfo {
spec: in_spec,
bind: !config.zclient_connect,
ipc_file_mode: config.ipc_file_mode,
});
}
if let Err(e) =
zsockman.set_client_stream_specs(&out_specs, &out_stream_specs, &in_specs)
{
return Err(format!("failed to set zhttp client stream specs: {}", e));
}
}
Some(Server::new(
&config.instance_id,
config.workers,
config.req_maxconn,
config.stream_maxconn,
config.buffer_size,
config.body_buffer_size,
config.messages_max,
config.req_timeout,
config.stream_timeout,
&config.listen,
config.certs_dir.as_path(),
config.allow_compression,
zsockman,
handle_bound,
)?)
} else {
None
};
let client = if !config.zserver_req.is_empty() || !config.zserver_stream.is_empty() {
let mut zsockman = zhttpsocket::ServerSocketManager::new(
Arc::clone(&zmq_context),
&config.instance_id,
(MSG_RETAINED_PER_CONNECTION_MAX * maxconn)
+ (MSG_RETAINED_PER_WORKER_MAX * config.workers),
INIT_HWM,
other_hwm,
handle_bound,
config.stream_maxconn,
);
if !config.zserver_req.is_empty() {
let mut specs = Vec::new();
for spec in config.zserver_req.iter() {
if config.zserver_connect {
info!("zhttp server connect {}", spec);
} else {
info!("zhttp server bind {}", spec);
}
specs.push(SpecInfo {
spec: spec.clone(),
bind: !config.zserver_connect,
ipc_file_mode: config.ipc_file_mode,
});
}
if let Err(e) = zsockman.set_server_req_specs(&specs) {
return Err(format!("failed to set zhttp server req specs: {}", e));
}
}
let zsockman = Arc::new(zsockman);
let client = Client::new(
&config.instance_id,
config.workers,
config.req_maxconn,
config.stream_maxconn,
config.buffer_size,
config.body_buffer_size,
config.messages_max,
config.req_timeout,
config.stream_timeout,
config.allow_compression,
&config.deny,
zsockman.clone(),
handle_bound,
)?;
if !config.zserver_stream.is_empty() {
let mut in_specs = Vec::new();
let mut in_stream_specs = Vec::new();
let mut out_specs = Vec::new();
for spec in config.zserver_stream.iter() {
let (in_spec, in_stream_spec, out_spec) = make_specs(spec, true)?;
if config.zserver_connect {
info!(
"zhttp server connect {} {} {}",
in_spec, in_stream_spec, out_spec
);
} else {
info!(
"zhttp server bind {} {} {}",
in_spec, in_stream_spec, out_spec
);
}
in_specs.push(SpecInfo {
spec: in_spec,
bind: !config.zserver_connect,
ipc_file_mode: config.ipc_file_mode,
});
in_stream_specs.push(SpecInfo {
spec: in_stream_spec,
bind: !config.zserver_connect,
ipc_file_mode: config.ipc_file_mode,
});
out_specs.push(SpecInfo {
spec: out_spec,
bind: !config.zserver_connect,
ipc_file_mode: config.ipc_file_mode,
});
}
if let Err(e) =
zsockman.set_server_stream_specs(&in_specs, &in_stream_specs, &out_specs)
{
return Err(format!("failed to set zhttp server stream specs: {}", e));
}
}
Some(client)
} else {
None
};
Ok(Self {
_server: server,
_client: client,
})
}
pub fn wait_for_term(&self) {
let mut signals = Signals::new(TERM_SIGNALS).unwrap();
let term_now = Arc::new(AtomicBool::new(false));
for signal_type in TERM_SIGNALS {
signal_hook::flag::register_conditional_shutdown(
*signal_type,
1, Arc::clone(&term_now),
)
.unwrap();
signal_hook::flag::register(*signal_type, Arc::clone(&term_now)).unwrap();
}
for signal in &mut signals {
match signal {
signal_type if TERM_SIGNALS.contains(&signal_type) => break,
_ => unreachable!(),
}
}
}
pub fn sizes() -> Vec<(String, usize)> {
let mut out = Vec::new();
out.extend(Server::task_sizes());
out.extend(Client::task_sizes());
out.push((
"deflate_codec_state".to_string(),
websocket::deflate_codec_state_size(),
));
out
}
}