use std::{fs, io};
use std::cell::RefCell;
use std::collections::HashMap;
use std::net::IpAddr;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use clap::crate_version;
use daemonbase::error::Failed;
use log::error;
use serde::Deserialize;
use tokio::runtime;
use crate::{http, metrics};
use crate::comms::{Gate, GateAgent, Link};
use crate::config::{Config, ConfigFile, Marked};
use crate::targets::Target;
use crate::units::Unit;
#[derive(Clone, Debug, Default, Deserialize)]
pub struct HttpClientConfig {
#[cfg(feature = "socks")]
#[serde(default, rename = "http-proxies")]
proxies: Vec<String>,
#[serde(default, rename = "http-root-certs")]
root_certs: Vec<PathBuf>,
#[serde(rename = "http-user-agent")]
user_agent: Option<String>,
#[serde(rename = "http-client-addr")]
local_addr: Option<IpAddr>,
}
#[derive(Debug)]
pub struct Component {
name: Arc<str>,
http_config: Arc<HttpClientConfig>,
metrics: metrics::Collection,
http_resources: http::Resources,
}
impl Component {
fn new(
name: String,
http_config: Arc<HttpClientConfig>,
metrics: metrics::Collection,
http_resources: http::Resources,
) -> Self {
Component {
name: name.into(), http_config, metrics, http_resources,
}
}
pub fn name(&self) -> &Arc<str> {
&self.name
}
pub fn register_metrics(&mut self, source: Arc<dyn metrics::Source>) {
self.metrics.register(self.name.clone(), Arc::downgrade(&source));
}
pub fn register_http_resource(
&mut self, process: Arc<dyn http::ProcessRequest>
) {
self.http_resources.register(Arc::downgrade(&process))
}
pub fn http_client(&self) -> Result<reqwest::ClientBuilder, String> {
let mut builder = reqwest::Client::builder();
#[cfg(feature = "socks")]
for proxy in &self.http_config.proxies {
let proxy = match reqwest::Proxy::all(proxy) {
Ok(proxy) => proxy,
Err(err) => {
return Err(format!(
"Invalid rrdp-proxy '{proxy}': {err}"
));
}
};
builder = builder.proxy(proxy);
}
for path in &self.http_config.root_certs {
builder = builder.add_root_certificate(
Self::load_cert(path)?
);
}
builder = builder.user_agent(
match self.http_config.user_agent.as_ref() {
Some(agent) => agent.as_str(),
None => concat!("RTRTR ", crate_version!()),
}
);
#[cfg(feature = "native-tls")]
{
builder = builder.use_native_tls();
}
if let Some(addr) = self.http_config.local_addr {
builder = builder.local_address(addr)
}
Ok(builder)
}
fn load_cert(path: &Path) -> Result<reqwest::Certificate, String> {
let mut file = match fs::File::open(path) {
Ok(file) => file,
Err(err) => {
return Err(format!(
"Cannot open rrdp-root-cert file '{}': {}'",
path.display(), err
));
}
};
let mut data = Vec::new();
if let Err(err) = io::Read::read_to_end(&mut file, &mut data) {
return Err(format!(
"Cannot read rrdp-root-cert file '{}': {}'",
path.display(), err
));
}
reqwest::Certificate::from_pem(&data).map_err(|err| {
format!(
"Cannot decode rrdp-root-cert file '{}': {}'",
path.display(), err
)
})
}
}
#[derive(Default)]
pub struct Manager {
units: HashMap<String, GateAgent>,
pending: HashMap<String, Gate>,
http_config: Arc<HttpClientConfig>,
metrics: metrics::Collection,
http_resources: http::Resources,
}
impl Manager {
pub fn new(http_config: &HttpClientConfig) -> Self {
Self {
http_config: http_config.clone().into(),
.. Default::default()
}
}
pub fn load(
file: ConfigFile
) -> Result<(Self, Config), Failed> {
GATES.with(|gates| {
gates.replace(Some(Default::default()))
});
let config = match Config::from_toml(file.bytes(), file.dir()) {
Ok(config) => config,
Err(err) => {
match file.path() {
Some(path) => error!("{}: {}", path.display(), err),
None => error!("{}", err)
}
return Err(Failed)
}
};
let mut manager = Self::new(&config.http_client);
let gates = GATES.with(|gates| gates.replace(None) ).unwrap();
let mut errs = Vec::new();
for (name, load) in gates {
if let Some(gate) = load.gate {
if !config.units.units.contains_key(&name) {
for mut link in load.links {
link.resolve_config(&file);
errs.push(link.mark(
format!("unresolved link to unit '{name}'")
))
}
}
else {
manager.units.insert(name.clone(), load.agent);
manager.pending.insert(name, gate);
}
}
}
if !errs.is_empty() {
for err in errs {
error!("{}", err);
}
return Err(Failed)
}
Ok((manager, config))
}
pub fn add_components<F, T>(
&mut self, runtime: &runtime::Handle, op: F
) -> Result<T, Failed>
where
F: FnOnce(&mut UnitSet, &mut TargetSet) -> T
{
GATES.with(|gates| {
gates.replace(
Some(self.units.iter().map(|(key, value)| {
(key.clone(), value.clone().into())
}).collect())
)
});
let mut units = UnitSet::new();
let mut targets = TargetSet::new();
let res = op(&mut units, &mut targets);
let gates = GATES.with(|gates| gates.replace(None)).unwrap();
let mut errs = Vec::new();
for (name, load) in gates {
if let Some(gate) = load.gate {
if !units.units.contains_key(&name) {
errs.push(
format!("unresolved link to unit '{name}'")
)
}
else {
self.units.insert(name.clone(), load.agent);
self.pending.insert(name, gate);
}
}
}
if !errs.is_empty() {
for err in errs {
error!("{}", err);
}
return Err(Failed)
}
self.spawn(&mut units, &mut targets, runtime);
Ok(res)
}
pub fn spawn(
&mut self,
units: &mut UnitSet,
targets: &mut TargetSet,
runtime: &runtime::Handle,
) {
for (name, unit) in units.units.drain() {
let gate = match self.pending.remove(&name) {
Some(gate) => gate,
None => {
error!("Unit {} is unused and will not be started.", name);
continue
}
};
let controller = Component::new(
name, self.http_config.clone(), self.metrics.clone(),
self.http_resources.clone()
);
runtime.spawn(unit.run(controller, gate));
}
for (name, target) in targets.targets.drain() {
let controller = Component::new(
name, self.http_config.clone(), self.metrics.clone(),
self.http_resources.clone()
);
runtime.spawn(target.run(controller));
}
}
pub fn metrics(&self) -> metrics::Collection {
self.metrics.clone()
}
pub fn http_resources(&self) -> http::Resources {
self.http_resources.clone()
}
}
#[derive(Default, Deserialize)]
#[serde(transparent)]
pub struct UnitSet {
units: HashMap<String, Unit>,
}
impl UnitSet {
pub fn new() -> Self {
Default::default()
}
pub fn insert(&mut self, name: impl Into<String>, unit: Unit) {
self.units.insert(name.into(), unit);
}
}
#[derive(Default, Deserialize)]
#[serde(transparent)]
pub struct TargetSet {
targets: HashMap<String, Target>,
}
impl TargetSet {
pub fn new() -> Self {
Default::default()
}
pub fn insert(&mut self, name: impl Into<String>, target: Target) {
self.targets.insert(name.into(), target);
}
}
struct LoadUnit {
gate: Option<Gate>,
agent: GateAgent,
links: Vec<Marked<()>>,
}
impl Default for LoadUnit {
fn default() -> Self {
let (gate, agent) = Gate::new();
LoadUnit {
gate: Some(gate),
agent,
links: Vec::new()
}
}
}
impl From<GateAgent> for LoadUnit {
fn from(agent: GateAgent) -> Self {
LoadUnit {
gate: None,
agent,
links: Vec::new()
}
}
}
thread_local!(
static GATES: RefCell<Option<HashMap<String, LoadUnit>>> = const {
RefCell::new(None)
}
);
pub fn load_link(name: Marked<String>) -> Link {
GATES.with(|gates| {
let mut gates = gates.borrow_mut();
let gates = gates.as_mut().unwrap();
let mark = name.mark(());
let name = name.into_inner();
let unit = gates.entry(name).or_default();
unit.links.push(mark);
unit.agent.create_link()
})
}