pub use self::file::Location;
use crate::symbology::{Route, Venue};
use anyhow::{anyhow, Result};
use futures::{
channel::mpsc::{self, UnboundedSender},
prelude::*,
select_biased,
};
use fxhash::FxHashMap;
use log::debug;
use netidx::{
config::Config as NetidxCfg,
path::Path,
publisher::{BindCfg, Publisher, PublisherBuilder, UpdateBatch, Val, Value},
subscriber::{DesiredAuth, Subscriber},
};
use once_cell::sync::OnceCell;
use openssl::{pkey::Private, rsa::Rsa, x509::X509};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::{fs, net::SocketAddr, ops::Deref, path::PathBuf, sync::Arc, time::Duration};
use tokio::task;
pub mod file {
use super::Component;
use anyhow::{bail, Result};
use netidx::{path::Path, subscriber::DesiredAuth};
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
fn default_component_location_override() -> Vec<(Component, Location)> {
vec![
(Component::Core, Location::Local),
(Component::Symbology, Location::Local),
]
}
fn default_local_machine_components() -> Vec<Component> {
vec![
Component::Core,
Component::NetidxResolver,
Component::WsProxy,
Component::Symbology,
]
}
fn default_wsproxy_addr() -> String {
"127.0.0.1:6001".into()
}
fn default_default_component_location() -> Location {
Location::Hosted
}
fn default_hosted_base() -> Path {
Path::from("/architect")
}
fn default_local_base() -> Path {
Path::from("/local/architect")
}
fn default_registration_servers() -> Vec<String> {
vec!["https://54.163.187.179:5999".into(), "https://35.84.43.204:5999".into()]
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum Location {
Hosted,
Local,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(not(debug_assertions), serde(deny_unknown_fields))]
pub struct Config {
#[serde(default)]
pub netidx_config: Option<PathBuf>,
#[serde(default)]
pub publisher_slack: Option<usize>,
#[serde(default)]
pub desired_auth: Option<DesiredAuth>,
#[serde(default)]
pub bind_config: Option<String>,
#[serde(default = "default_component_location_override")]
pub component_location_override: Vec<(Component, Location)>,
#[serde(default = "default_default_component_location")]
pub default_component_location: Location,
#[serde(default = "default_local_machine_components")]
pub local_machine_components: Vec<Component>,
#[serde(default = "default_wsproxy_addr")]
pub wsproxy_addr: String,
#[serde(default = "default_hosted_base")]
pub hosted_base: Path,
#[serde(default = "default_local_base")]
pub local_base: Path,
#[serde(default = "default_registration_servers")]
pub registration_servers: Vec<String>,
}
impl Default for Config {
fn default() -> Self {
Config {
netidx_config: None,
publisher_slack: None,
desired_auth: None,
bind_config: None,
component_location_override: default_component_location_override(),
default_component_location: default_default_component_location(),
local_machine_components: default_local_machine_components(),
wsproxy_addr: default_wsproxy_addr(),
hosted_base: default_hosted_base(),
local_base: default_local_base(),
registration_servers: default_registration_servers(),
}
}
}
impl Config {
pub fn default_config_dir() -> Result<PathBuf> {
match dirs::config_dir() {
None => bail!("no default config dir could be found"),
Some(mut path) => {
path.push("architect");
Ok(path)
}
}
}
pub fn api_file() -> &'static str {
"api.json"
}
pub fn core_file() -> &'static str {
"core.json"
}
pub fn limits_file() -> &'static str {
"limits.json"
}
pub async fn load<P: AsRef<std::path::Path>>(path: P) -> Result<Self> {
if path.as_ref().exists() {
let d = tokio::fs::read(path).await?;
Ok(serde_json::from_slice(&d)?)
} else {
Ok(Self::default())
}
}
pub async fn load_default() -> Result<Self> {
Self::load(Self::default_config_dir()?.join(Self::api_file())).await
}
}
}
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
pub struct Cpty {
#[serde(
serialize_with = "crate::symbology::Venue::serialize_by_name",
deserialize_with = "crate::symbology::Venue::deserialize_by_name_default"
)]
pub venue: Venue,
#[serde(
serialize_with = "crate::symbology::Route::serialize_by_name",
deserialize_with = "crate::symbology::Route::deserialize_by_name_default"
)]
pub route: Route,
}
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Serialize, Deserialize)]
pub enum Component {
Symbology,
Qf(Cpty),
HistoricalQf(Cpty),
Candles(Cpty),
HistoricalCandles(Cpty),
Core,
QfApi(Cpty),
UserDb,
NetidxResolver,
WsProxy,
BlockchainDb,
}
#[derive(Debug, Clone)]
pub struct Paths {
pub hosted_base: Path,
pub local_base: Path,
pub default_component_location: Location,
pub component_location_override: FxHashMap<Component, Location>,
}
impl Paths {
fn base(&self, location: Option<&Location>) -> &Path {
match location.unwrap_or(&self.default_component_location) {
Location::Hosted => &self.hosted_base,
Location::Local => &self.local_base,
}
}
pub fn sym_hosted(&self) -> Path {
self.base(Some(&Location::Hosted)).append("symbology")
}
pub fn sym(&self) -> Path {
let loc = self.component_location_override.get(&Component::Symbology);
let base = self.base(loc);
base.append("symbology")
}
pub fn sym_api(&self) -> Path {
self.sym().append("api")
}
pub fn sym_hosted_api(&self) -> Path {
self.sym_hosted().append("api")
}
pub fn sym_db(&self) -> Path {
self.sym().append("db")
}
pub fn sym_stats(&self) -> Path {
self.sym().append("stats")
}
pub fn sym_log(&self) -> Path {
self.sym().append("log")
}
pub fn sym_coingecko(&self) -> Path {
self.sym().append("coingecko")
}
pub fn core(&self) -> Path {
let loc = self.component_location_override.get(&Component::Core);
let base = self.base(loc);
base.append("core")
}
pub fn core_flow(&self) -> Path {
self.core().append("orderflow")
}
pub fn core_limit(&self) -> Path {
self.core().append("limits")
}
pub fn core_halts(&self) -> Path {
self.core().append("halts")
}
pub fn core_pos(&self) -> Path {
self.core().append("pos")
}
pub fn core_clock(&self) -> Path {
self.core().append("clock")
}
pub fn core_balances(&self) -> Path {
self.core().append("balances")
}
pub fn core_fees(&self) -> Path {
self.core().append("fees")
}
pub fn core_users(&self) -> Path {
self.core().append("users")
}
pub fn core_cpty(&self) -> Path {
self.core().append("cpty")
}
pub fn core_cpty_pings(&self) -> Path {
self.core().append("cpty_pings")
}
pub fn core_tms(&self) -> Path {
self.core().append("tms")
}
pub fn core_tms_pings(&self) -> Path {
self.core().append("tms_pings")
}
pub fn core_stats(&self) -> Path {
self.core().append("stats")
}
pub fn core_log(&self) -> Path {
self.core().append("log")
}
pub fn core_alerts(&self) -> Path {
self.core().append("alerts")
}
pub fn core_secrets(&self) -> Path {
self.core().append("secrets")
}
pub fn qf(&self, cpty: Option<Cpty>) -> Path {
let loc =
cpty.and_then(|c| self.component_location_override.get(&Component::Qf(c)));
let base = self.base(loc);
base.append("qf")
}
pub fn qf_rt(&self, cpty: Option<Cpty>) -> Path {
self.qf(cpty).append("rt")
}
pub fn qf_rt_status(&self, cpty: Cpty) -> Path {
self.qf_rt(Some(cpty))
.append("status")
.append(&cpty.venue.name)
.append(&cpty.route.name)
}
pub fn qf_ohlc(&self, cpty: Option<Cpty>) -> Path {
self.qf(cpty).append("ohlc")
}
pub fn qf_hist_rt(&self, cpty: Option<Cpty>) -> Path {
self.qf(cpty).append("hist/rt")
}
pub fn qf_hist_ohlc(&self, cpty: Option<Cpty>) -> Path {
self.qf(cpty).append("hist/ohlc")
}
pub fn qf_api(&self, cpty: Option<Cpty>) -> Path {
let loc =
cpty.and_then(|c| self.component_location_override.get(&Component::QfApi(c)));
let base = self.base(loc);
base.append("qf/api")
}
pub fn userdb_api(&self) -> Path {
let loc = self.component_location_override.get(&Component::UserDb);
self.base(loc).append("userdb")
}
pub fn blockchain_api(&self, blockchain: Venue) -> Path {
let loc = self.component_location_override.get(&Component::BlockchainDb);
self.base(loc).append("blockchain").append(&blockchain.name)
}
}
pub enum StatCmd {
Set(Value),
AddAcc(Value),
SubAcc(Value),
MulAcc(Value),
DivAcc(Value),
}
#[derive(Debug)]
pub struct CommonInner {
pub config: file::Config,
pub netidx_config: NetidxCfg,
pub netidx_config_path: Option<PathBuf>,
pub desired_auth: DesiredAuth,
pub bind_config: BindCfg,
pub publisher: Publisher,
pub subscriber: Subscriber,
pub paths: Paths,
pub wsproxy_addr: SocketAddr,
pub registration_servers: Vec<String>,
pub local_machine_components: Vec<Component>,
pub stats: OnceCell<UnboundedSender<(Path, StatCmd)>>,
}
#[derive(Debug, Clone)]
pub struct Common(pub Arc<CommonInner>);
impl Deref for Common {
type Target = CommonInner;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl Common {
async fn load_int<P: AsRef<std::path::Path>>(path: Option<P>) -> Result<Self> {
let f = match path {
Some(path) => {
debug!("loading config from {:?}", path.as_ref());
file::Config::load(path).await?
}
None => {
debug!("loading config from default");
file::Config::load_default().await?
}
};
let config = f.clone();
let netidx_config = task::block_in_place(|| match &f.netidx_config {
None => {
debug!("loading default netidx config");
NetidxCfg::load_default()
}
Some(p) => {
debug!("loading netidx config from {}", p.display());
NetidxCfg::load(p)
}
})?;
let desired_auth = match f.desired_auth {
None => netidx_config.default_auth(),
Some(a) => a,
};
let bind_config = match f.bind_config {
None => netidx_config.default_bind_config.clone(),
Some(b) => b.parse::<BindCfg>()?,
};
debug!("creating publisher");
let publisher = PublisherBuilder::new(netidx_config.clone())
.slack(f.publisher_slack.unwrap_or(3))
.bind_cfg(Some(bind_config.clone()))
.desired_auth(desired_auth.clone())
.build()
.await?;
debug!("creating subscriber");
let subscriber = Subscriber::new(netidx_config.clone(), desired_auth.clone())?;
Ok(Self(Arc::new(CommonInner {
config,
netidx_config,
netidx_config_path: f.netidx_config,
desired_auth,
bind_config,
publisher,
subscriber,
paths: Paths {
component_location_override: FxHashMap::from_iter(
f.component_location_override,
),
default_component_location: f.default_component_location,
hosted_base: f.hosted_base,
local_base: f.local_base,
},
wsproxy_addr: f.wsproxy_addr.parse::<SocketAddr>()?,
registration_servers: f.registration_servers,
local_machine_components: f.local_machine_components,
stats: OnceCell::new(),
})))
}
pub async fn load<P: AsRef<std::path::Path>>(path: P) -> Result<Self> {
Self::load_int(Some(path)).await
}
pub async fn load_default() -> Result<Self> {
Self::load_int(None::<&str>).await
}
pub fn get_tls_identity(
&self,
) -> Result<(&netidx::config::Tls, &netidx::config::TlsIdentity)> {
let tls =
self.netidx_config.tls.as_ref().ok_or_else(|| anyhow!("no tls config"))?;
let identity = tls
.identities
.get("xyz.architect.")
.ok_or_else(|| anyhow!("architect.xyz identity not found"))?;
Ok((tls, identity))
}
pub fn load_private_key(&self) -> Result<Rsa<Private>> {
let (tls, identity) = self.get_tls_identity()?;
let password = netidx::tls::load_key_password(
tls.askpass.as_ref().map(|s| s.as_str()),
&identity.private_key,
)?;
let pem = fs::read(&identity.private_key)?;
Ok(Rsa::private_key_from_pem_passphrase(&pem, password.as_bytes())?)
}
pub fn load_certificate(&self) -> Result<X509> {
let (_, identity) = self.get_tls_identity()?;
Ok(X509::from_pem(&fs::read(&identity.certificate)?)?)
}
pub fn init_stats(&self, component: Component) -> Result<()> {
let publisher = self.publisher.clone();
let paths = self.paths.clone();
let (log_tx, mut log_rx) = futures::channel::mpsc::channel(3);
let max_level = log::max_level();
let log_level_rel = Path::from("log-level");
let (log_level_path, log_level_path_alias) =
Self::full_and_alias_path(&log_level_rel, &paths, component)?;
let log_level_val = publisher.publish(log_level_path, max_level.to_string())?;
let () = publisher.alias(log_level_val.id(), log_level_path_alias)?;
publisher.writes(log_level_val.id(), log_tx);
let (stats_tx, mut stats_rx) = mpsc::unbounded();
let timeout = Some(Duration::from_secs(30));
tokio::spawn(async move {
let mut values: FxHashMap<Path, (Value, Val)> = FxHashMap::default();
loop {
select_biased! {
mut r = log_rx.select_next_some() => {
if let Some(wr) = r.drain(..).last() {
debug!("subscriber submitted log-level: {}", wr.value);
if let Ok(value) = wr.value.cast_to::<String>() {
if let Some(new_level) = Self::log_level_of_string(value) {
log::set_max_level(new_level);
debug!("new max log level: {}", new_level);
let mut batch = publisher.start_batch();
log_level_val.update(&mut batch, new_level.to_string());
batch.commit(timeout).await
}
}
}
},
(path, cmd) = stats_rx.select_next_some().fuse() => {
let mut batch = publisher.start_batch();
Self::process_stat(
&publisher,
&paths,
component,
path,
cmd,
&mut values,
&mut batch);
while let Ok(Some((path, cmd))) = stats_rx.try_next() {
Self::process_stat(
&publisher,
&paths,
component,
path,
cmd,
&mut values,
&mut batch,
)
}
batch.commit(timeout).await
},
complete => break,
}
}
});
self.stats.set(stats_tx).map_err(|_| anyhow!("stats was already initialized"))
}
fn stat_cmd(&self, path: impl Into<Path>, cmd: StatCmd) {
match self.stats.get() {
None => debug!("stat not initialized. call init_stat"),
Some(tx) => match tx.unbounded_send((path.into(), cmd)) {
Ok(()) => (),
Err(e) => debug!("couldn't tx.send stat: {}", e.to_string()),
},
}
}
pub fn stat(&self, path: impl Into<Path>, stat: impl Into<Value>) {
self.stat_cmd(path, StatCmd::Set(stat.into()))
}
pub fn stat_add_acc(&self, path: impl Into<Path>, stat: impl Into<Value>) {
self.stat_cmd(path, StatCmd::AddAcc(stat.into()))
}
pub fn stat_sub_acc(&self, path: impl Into<Path>, stat: impl Into<Value>) {
self.stat_cmd(path, StatCmd::SubAcc(stat.into()))
}
pub fn stat_mul_acc(&self, path: impl Into<Path>, stat: impl Into<Value>) {
self.stat_cmd(path, StatCmd::MulAcc(stat.into()))
}
pub fn stat_div_acc(&self, path: impl Into<Path>, stat: impl Into<Value>) {
self.stat_cmd(path, StatCmd::DivAcc(stat.into()))
}
fn log_level_of_string(s: String) -> Option<log::LevelFilter> {
match s.to_ascii_lowercase().as_str() {
"error" => Some(log::LevelFilter::Error),
"warn" => Some(log::LevelFilter::Warn),
"info" => Some(log::LevelFilter::Info),
"debug" => Some(log::LevelFilter::Debug),
"trace" => Some(log::LevelFilter::Trace),
"off" => Some(log::LevelFilter::Off),
_ => None,
}
}
fn full_and_alias_path(
path: &Path,
paths: &Paths,
component: Component,
) -> Result<(Path, Path)> {
let loc = paths.component_location_override.get(&component);
let base = paths.base(loc);
let comp = Self::component_part(component);
let host = Self::my_hostname()?;
let admin_base = Path::append(base, "admin");
let full_path =
admin_base.append("by-service").append(&comp).append(&host).append(&path);
let alias_path =
admin_base.append("by-host").append(&host).append(&comp).append(&path);
Ok((full_path, alias_path))
}
fn process_stat(
publisher: &Publisher,
paths: &Paths,
component: Component,
path: Path,
cmd: StatCmd,
values: &mut FxHashMap<Path, (Value, Val)>,
batch: &mut UpdateBatch,
) {
fn compute_value(cur: Option<&Value>, cmd: StatCmd) -> Value {
match cmd {
StatCmd::Set(v) => v,
StatCmd::AddAcc(v) => match cur {
None => v,
Some(cur) => cur.clone() + v,
},
StatCmd::SubAcc(v) => match cur {
None => Value::I64(0) - v,
Some(cur) => cur.clone() - v,
},
StatCmd::MulAcc(v) => match cur {
None => Value::I64(0),
Some(cur) => cur.clone() * v,
},
StatCmd::DivAcc(v) => match cur {
None => Value::I64(0),
Some(cur) => cur.clone() / v,
},
}
}
if let Some((cur, val)) = values.get_mut(&path) {
*cur = compute_value(Some(cur), cmd);
val.update_changed(batch, cur.clone());
return;
}
let value = compute_value(None, cmd);
let (full_path, alias_path) =
match Self::full_and_alias_path(&path, &paths, component) {
Ok(tuple) => tuple,
Err(e) => {
debug!("failed to compute paths: {}", e.to_string());
return;
}
};
let val = match publisher.publish(full_path.clone().into(), value.clone()) {
Err(e) => {
debug!("failed to publish stat {}: {}", full_path, e);
return;
}
Ok(val) => val,
};
if let Err(e) = publisher.alias(val.id(), alias_path.clone().into()) {
debug!("failed to publish stat alias {}: {}", alias_path, e);
return;
};
if let Some(_oldval) = values.insert(path.clone(), (value, val)) {
debug!("unexpected existing stat at path {}", full_path)
}
}
fn cpty_parts(cpty: Cpty) -> String {
format!("{}/{}", cpty.venue, cpty.route)
}
fn component_part(component: Component) -> String {
match component {
Component::Symbology => "symbology".to_string(),
Component::Core => "core".to_string(),
Component::UserDb => "userdb".to_string(),
Component::BlockchainDb => "blockchain".to_string(),
Component::Candles(c) => format!("ohlc/{}", Self::cpty_parts(c)),
Component::QfApi(c) => format!("qf/api/{}", Self::cpty_parts(c)),
Component::Qf(c) => format!("qf/{}", Self::cpty_parts(c)),
Component::NetidxResolver => "netidx-resolver".to_string(),
Component::WsProxy => "wsproxy".to_string(),
Component::HistoricalQf(c) => {
format!("hist-qf/{}", Self::cpty_parts(c))
}
Component::HistoricalCandles(c) => {
format!("hist-ohlc/{}", Self::cpty_parts(c))
}
}
}
fn my_hostname() -> Result<String> {
use sys_info::hostname;
hostname().map_err(|e| anyhow!("could not determine hostname: {}", e))
}
}