#![no_std]
extern crate no_std_compat as std;
use std::prelude::v1::*;
pub mod authentication;
mod registry;
mod commands;
use std::collections::BTreeMap;
use std::sync::Arc;
use parking_lot::{ Mutex, MutexGuard };
use product_os_configuration::{KeyValueKind, KeyValueStore, RelationalKind, RelationalStore, Stores};
use std::str::FromStr;
use std::time::Duration;
use serde::{ Deserialize, Serialize };
use std::num::TryFromIntError;
pub use product_os_request::Method;
use product_os_request::{ProductOSClient, ProductOSResponse};
pub use registry::Node;
pub struct ProductOSController {
configuration: product_os_configuration::Configuration,
certificates: product_os_security::certificates::Certificates,
max_servers: u8,
registry: registry::Registry,
pulse_check: bool,
pulse_check_cron: String,
#[cfg(feature = "monitor")]
monitor: bool,
#[cfg(feature = "monitor")]
monitor_cron: String,
requester: product_os_request::ProductOSRequester,
client: product_os_request::ProductOSRequestClient,
#[cfg(feature = "relational_store")]
relational_store: Arc<product_os_store::ProductOSRelationalStore>,
key_value_store: Arc<product_os_store::ProductOSKeyValueStore>,
}
impl ProductOSController {
pub fn new(configuration: product_os_configuration::Configuration, certificates:
product_os_security::certificates::Certificates,
key_value_store: Option<Arc<product_os_store::ProductOSKeyValueStore>>,
#[cfg(feature = "relational_store")] relational_store: Option<Arc<product_os_store::ProductOSRelationalStore>>) -> Self {
let key_value_store: Arc<product_os_store::ProductOSKeyValueStore> = match key_value_store {
Some(store) => store,
None => Arc::new(product_os_store::ProductOSKeyValueStore::new(&KeyValueStore {
enabled: false,
kind: KeyValueKind::Sink,
host: "".to_string(),
port: 0,
secure: false,
db_number: 0,
username: None,
password: None,
pool_size: 0,
default_limit: 0,
default_offset: 0,
prefix: None,
}))
};
#[cfg(feature = "relational_store")]
let relational_store: Arc<product_os_store::ProductOSRelationalStore> = match relational_store {
Some(store) => store,
None => Arc::new(product_os_store::ProductOSRelationalStore::new(&RelationalStore {
enabled: false,
kind: RelationalKind::Sink,
host: "".to_string(),
port: 0,
secure: false,
db_name: "".to_string(),
username: None,
password: None,
pool_size: 0,
default_limit: 0,
default_offset: 0,
prefix: None,
}))
};
let mut certs: Vec<product_os_security::certificates::Certificate> = vec![];
for cert in &certificates.certificates {
certs.push(product_os_security::certificates::Certificate {
certificate: cert.to_owned(),
private: certificates.private.to_owned(),
})
}
let registry = registry::Registry::new(&configuration, key_value_store.clone(), certs);
let mut requester = product_os_request::ProductOSRequester::new();
requester.add_header("x-product-os-command".to_string(), registry.get_me().get_identifier(), false);
let mut request_client = product_os_request::ProductOSRequestClient::new();
request_client.build(&requester);
Self {
certificates,
max_servers: configuration.get_cc_max_servers(),
pulse_check: configuration.is_pulse_check_enabled(),
pulse_check_cron: configuration.get_hearbeat_cron().to_string(),
#[cfg(feature = "monitor")]
monitor: configuration.is_monitor_enabled(),
#[cfg(feature = "monitor")]
monitor_cron: configuration.get_monitor_cron().to_string(),
configuration,
#[cfg(feature = "relational_store")]
relational_store,
key_value_store,
registry,
requester,
client: request_client
}
}
pub fn get_registry(&self) -> ®istry::Registry {
&self.registry
}
pub async fn discover_nodes(&mut self) {
self.registry.discover_nodes().await;
for cert in self.registry.get_nodes_raw_certificates(0, true) {
self.requester.add_trusted_certificate_pem(cert);
}
self.client.build(&self.requester);
}
pub fn get_max_servers(&self) -> u8 {
self.max_servers.to_owned()
}
pub fn upsert_node_local(&mut self, identifier: String, node: Node) {
self.registry.upsert_node_local(identifier, node);
}
pub fn get_certificates(&self) -> Vec<&[u8]> {
let mut certificates = vec!();
for cert in &self.certificates.certificates {
certificates.push(cert.as_slice())
}
certificates
}
pub fn get_private_key(&self) -> &[u8] {
self.certificates.private.as_slice()
}
pub fn get_certificates_and_private_key(&self) -> product_os_security::certificates::Certificates {
self.certificates.to_owned()
}
pub fn validate_certificate(&self, certificate: Vec<u8>) -> bool {
tracing::trace!("Checking certificate stored {:?} vs given {:?}", self.certificates.certificates, certificate);
let provided_certificate = certificate.as_slice();
for cert in self.get_certificates() {
if cert.eq(provided_certificate) { return true; }
}
false
}
pub fn validate_verify_tag<T>(&self, query: Option<&BTreeMap<String, String>>, payload: Option<T>,
headers: Option<&BTreeMap<String, String>>, verify_identifier: String, verify_tag: String) -> bool
where T: product_os_security::AsByteVector
{
tracing::trace!("Checking verification provided {} from {}", verify_tag, verify_identifier);
let key = self.registry.get_key(verify_identifier);
tracing::trace!("Checking verification with key {:?}", key);
match key {
Some(verify_key) => {
product_os_security::verify_auth_request(query, false, payload, headers,
&["x-product-os-verify", "x-product-os-command", "x-product-os-control"],
verify_tag, Some(verify_key.as_slice()))
},
None => false
}
}
pub fn authenticate_command_control<T>(&self, query: Option<&BTreeMap<String, String>>, payload: Option<T>, headers: Option<&BTreeMap<String, String>>,
verify_identifier: Option<String>, verify_tag: Option<String>) -> Result<bool, authentication::CommandControlAuthenticateError>
where T: product_os_security::AsByteVector
{
tracing::trace!("Verify identifier {:?} and tag {:?} verify result", &verify_identifier, &verify_tag);
if verify_identifier.is_some() && verify_tag.is_some() && self.validate_verify_tag(query, payload, headers, verify_identifier.unwrap(), verify_tag.unwrap()) {
Ok(true)
}
else {
Err(authentication::CommandControlAuthenticateError {
error: authentication::CommandControlAuthenticateErrorState::KeyError(String::from("One of the keys provided was not valid"))
})
}
}
pub fn search_and_prepare_command(&self, query: BTreeMap<String, String>, module: String, instruction: String, data: Option<serde_json::Value>) -> Result<crate::commands::Command, product_os_request::ProductOSRequestError> {
let matching_node = self.registry.pick_node(query);
let client = &self.client;
match matching_node {
Some(node) => {
match self.registry.get_key(node.get_identifier()) {
None => Err(product_os_request::ProductOSRequestError::Error(format!("No key found for matching node"))),
Some(key) => Ok(commands::Command {
requester: client.clone(),
node_url: node.get_address(),
verify_key: key,
module,
instruction,
data
})
}
},
None => Err(product_os_request::ProductOSRequestError::Error(format!("No matching node found")))
}
}
pub fn find_and_prepare_command(&self, key: String, manager: String, instruction: String, data: Option<serde_json::Value>) -> Result<commands::Command, product_os_request::ProductOSRequestError> {
let query = BTreeMap::from([
("manager.key".to_string(), key),
("manager.kind".to_string(), manager.to_string()),
("manager.enabled".to_string(), true.to_string())
]);
self.search_and_prepare_command(query, manager.to_string(), instruction, data)
}
pub fn find_any_and_prepare_command(&self, capability: String, manager: String, instruction: String, data: Option<serde_json::Value>) -> Result<commands::Command, product_os_request::ProductOSRequestError> {
let query = BTreeMap::from([
("capability".to_string(), capability),
("manager.enabled".to_string(), true.to_string())
]);
self.search_and_prepare_command(query, manager.to_string(), instruction, data)
}
pub async fn find_kind_and_prepare_command(&self, kind: String, manager: String, instruction: String, data: Option<serde_json::Value>) -> Result<commands::Command, product_os_request::ProductOSRequestError> {
let query = BTreeMap::from([
("kind".to_string(), kind),
("manager.enabled".to_string(), true.to_string())
]);
self.search_and_prepare_command(query, manager.to_string(), instruction, data)
}
pub fn search_and_prepare_ask(&self, query: BTreeMap<String, String>, path: String, data: Option<serde_json::Value>, headers: BTreeMap<String, String>,
params: BTreeMap<String, String>, method: product_os_request::Method) -> Result<commands::Ask, product_os_request::ProductOSRequestError> {
let matching_node = self.registry.pick_node(query);
let client = &self.client;
match matching_node {
Some(node) => {
match self.registry.get_key(node.get_identifier()) {
None => Err(product_os_request::ProductOSRequestError::Error(format!("No key found for matching node"))),
Some(key) => Ok(commands::Ask {
requester: client.clone(),
node_url: node.get_address(),
verify_key: key,
path,
data,
headers,
params,
method
})
}
},
None => Err(product_os_request::ProductOSRequestError::Error(format!("No matching node found")))
}
}
pub async fn find_feature_and_ask(&self, feature: String, path: String, data: Option<serde_json::Value>, headers: BTreeMap<String, String>,
params: BTreeMap<String, String>, method: product_os_request::Method) -> Result<ProductOSResponse<product_os_request::BodyBytes>, product_os_request::ProductOSRequestError> {
let matching_node = self.registry.pick_node_for_feature(feature);
let client = &self.client;
match matching_node {
Some(node) => {
match self.registry.get_key(node.get_identifier()) {
None => Err(product_os_request::ProductOSRequestError::Error(format!("No key found for matching node"))),
Some(key) => commands::ask_node(client, node, key, path, data, headers, params, method).await
}
},
None => Err(product_os_request::ProductOSRequestError::Error(format!("No matching node found")))
}
}
pub fn find_feature_and_prepare_ask(&self, feature: String, path: String, data: Option<serde_json::Value>, headers: BTreeMap<String, String>,
params: BTreeMap<String, String>, method: product_os_request::Method) -> Result<commands::Ask, product_os_request::ProductOSRequestError> {
let matching_node = self.registry.pick_node_for_feature(feature);
let client = &self.client;
match matching_node {
Some(node) => {
match self.registry.get_key(node.get_identifier()) {
None => Err(product_os_request::ProductOSRequestError::Error(format!("No key found for matching node"))),
Some(key) => Ok(commands::Ask {
requester: client.clone(),
node_url: node.get_address(),
verify_key: key,
path,
data,
headers,
params,
method
})
}
},
None => Err(product_os_request::ProductOSRequestError::Error(format!("No matching node found")))
}
}
pub fn get_configuration(&self) -> product_os_configuration::Configuration {
self.configuration.clone()
}
pub fn get_key(&self, identifier: String) -> Option<Vec<u8>> {
self.registry.get_key(identifier)
}
pub fn create_key_session(&mut self) -> (String, [u8; 32]) {
self.registry.create_key_session()
}
pub fn generate_key(&mut self, session_identifier: String, remote_public_key: &[u8], association: String, remote_session_identifier: Option<String>) {
self.registry.generate_key(session_identifier, remote_public_key, association, remote_session_identifier);
}
#[cfg(feature = "relational_store")]
pub fn get_relational_store(&mut self) -> Arc<product_os_store::ProductOSRelationalStore> {
self.relational_store.clone()
}
pub fn get_key_value_store(&mut self) -> Arc<product_os_store::ProductOSKeyValueStore> {
self.key_value_store.clone()
}
pub async fn add_feature(&mut self, feature: Arc<dyn product_os_capabilities::Feature>, base_path: String, router: &mut product_os_router::ProductOSRouter) {
self.registry.add_feature(feature, base_path, router).await;
}
pub async fn add_feature_mut(&mut self, feature: Arc<Mutex<dyn product_os_capabilities::Feature>>, base_path: String, router: &mut product_os_router::ProductOSRouter) {
self.registry.add_feature_mut(feature, base_path, router).await;
}
pub async fn remove_feature(&mut self, identifier: String) {
self.registry.remove_feature(identifier).await;
}
pub async fn add_service(&mut self, service: Arc<dyn product_os_capabilities::Service>) {
self.registry.add_service(service).await;
}
pub async fn add_service_mut(&mut self, service: Arc<Mutex<dyn product_os_capabilities::Service>>) {
self.registry.add_service_mut(service).await;
}
pub async fn set_service_active(&mut self, identifier: String, status: bool) {
self.registry.set_service_active(identifier, status).await;
}
pub async fn remove_service(&mut self, identifier: String) {
self.registry.remove_service(identifier).await;
}
pub async fn remove_inactive_services(&mut self, query: BTreeMap<String, String>) {
self.registry.remove_inactive_services(query).await;
}
pub async fn start_services(&mut self) {
self.registry.start_services().await;
}
}
pub async fn pulse_run(controller_unlocked: Arc<Mutex<ProductOSController>>) {
tracing::info!("Starting pulse run...");
let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
match controller_locked {
Some(mut controller) => {
#[cfg(feature = "distributed")]
controller.discover_nodes().await;
#[cfg(feature = "distributed")]
let self_identifier = controller.registry.get_me().get_identifier();
#[cfg(feature = "distributed")]
let control_url = controller.registry.get_me().get_address();
#[cfg(feature = "distributed")]
let nodes = controller.registry.get_nodes_endpoints(0, true);
#[cfg(feature = "distributed")] {
match controller.registry.check_me_remote().await {
Some(_) => { controller.registry.update_me_status(true); },
None => {
let alive = controller.registry.update_me_status(false);
if !alive {
tracing::error!("Terminating server due to lost presence on remote registry");
std::process::exit(8);
};
}
}
}
let services = controller.get_registry().get_me().get_services().list();
for (_, service) in services {
match service.status().await {
Ok(_) => {}
Err(_) => {}
}
}
#[cfg(feature = "distributed")]
let client = controller.client.clone();
std::mem::drop(controller);
#[cfg(feature = "distributed")]
for (identifier, (url, key)) in nodes {
if url != control_url {
match key {
Some(verify_key) => {
match commands::command(&client, url.clone(), verify_key, "status".to_string(), "ping".to_string(), None).await {
Ok(response) => {
let status = response.status();
match status {
product_os_request::StatusCode::UNAUTHORIZED => {
let auth: authentication::CommandControlAuthenticateError = match serde_json::from_str(response.text().await.unwrap().as_str()) {
Ok(auth_error) => auth_error,
Err(_) => authentication::CommandControlAuthenticateError { error: authentication::CommandControlAuthenticateErrorState::None }
};
tracing::error!("Error object auth {:?}", auth);
match auth.error {
authentication::CommandControlAuthenticateErrorState::KeyError(_) => {
tracing::info!("Attempting to purge remote node - keys don't match {}", identifier);
let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
match controller_locked {
Some(mut controller) => {
controller.registry.remove_node(identifier).await;
}
None => tracing::error!("Failed to lock controller")
}
},
authentication::CommandControlAuthenticateErrorState::None => ()
};
},
product_os_request::StatusCode::OK => {
let body = client.bytes(response).await.unwrap();
tracing::info!("Response received from {}: {} {:?}", url, status, body);
}
_ => {
let body = client.bytes(response).await.unwrap();
tracing::error!("Error response received from {}: {} {:?}", url, status, body);
tracing::info!("Attempting to purge remote node - keys don't match {}", identifier);
let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
match controller_locked {
Some(mut controller) => {
controller.registry.update_pulse_status(identifier, false).await;
}
None => tracing::error!("Failed to lock controller")
}
}
}
},
Err(e) => {
tracing::error!("Error encountered {:?} from {}", e, url);
match e.generated_error {
None => {}
Some(e) => {
if e.is_connect() || e.is_timeout() {
let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
match controller_locked {
Some(mut controller) => {
controller.registry.update_pulse_status(identifier, false).await;
}
None => tracing::error!("Failed to lock controller")
}
}
}
}
}
}
},
None => ()
}
} else {
if self_identifier != identifier {
tracing::info!("Attempting to purge self duplicate node - identifier mismatch {}", identifier);
let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
match controller_locked {
Some(mut controller) => {
controller.registry.remove_node(identifier).await;
}
None => tracing::error!("Failed to lock controller")
}
}
}
}
tracing::info!("Finished pulse run...");
}
None => tracing::error!("Failed to lock controller")
}
}
#[cfg(feature = "distributed")]
pub async fn perform_announce(controller: &mut MutexGuard<'_, ProductOSController>) {
tracing::info!("Announce searching for nodes...");
authentication::perform_key_exchange(controller).await;
let self_identifier = controller.registry.get_me().get_identifier();
let control_url = controller.registry.get_me().get_address();
let nodes = controller.registry.get_nodes_endpoints(0, true);
tracing::info!("Starting announce...");
tracing::info!("Nodes data {:?}", nodes);
for (identifier, (url, key)) in nodes {
if url != control_url {
match key {
Some(verify_key) => {
tracing::info!("Announcing {}: {}", identifier, url);
match commands::command(&controller.client, url.clone(), verify_key, "status".to_string(), "announce".to_string(), Some(serde_json::value::to_value(controller.get_registry().get_me()).unwrap())).await {
Ok(response) => {
let status = response.status();
match status {
product_os_request::StatusCode::UNAUTHORIZED => {
let auth: authentication::CommandControlAuthenticateError = match serde_json::from_str(response.text().await.unwrap().as_str()) {
Ok(auth_error) => auth_error,
Err(_) => authentication::CommandControlAuthenticateError { error: authentication::CommandControlAuthenticateErrorState::None }
};
tracing::error!("Error object auth {:?}", auth);
match auth.error {
authentication::CommandControlAuthenticateErrorState::KeyError(_) => {
tracing::info!("Error from remote node - keys don't match {}", identifier);
},
authentication::CommandControlAuthenticateErrorState::None => ()
};
},
product_os_request::StatusCode::OK => {
let body = client.bytes(response).await.unwrap();
tracing::info!("Response received from {}: {} {:?}", url, status, body);
}
_ => {
let body = client.bytes(response).await.unwrap();
tracing::error!("Error response received from {}: {} {:?}", url, status, body);
}
}
},
Err(e) => {
tracing::error!("Error encountered {:?} from {}", e, url);
}
};
},
None => ()
}
}
else {
if identifier != self_identifier {
tracing::info!("Attempting to purge self duplicate node - identifier mismatch {}", identifier);
controller.registry.remove_node(identifier).await;
}
}
}
tracing::info!("Finished announcing...");
}
pub async fn run_controller<X, E: product_os_async_executor::Executor<X> + product_os_async_executor::Timer>(controller_mutex: Arc<Mutex<ProductOSController>>, executor: &impl product_os_async_executor::Executor<X>) {
let controller_unlocked = controller_mutex.clone();
let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
match controller_locked {
Some(mut controller) => {
#[cfg(feature = "distributed")] {
authentication::perform_self_trust(&mut controller);
controller.registry.update_me().await;
tracing::info!("Added self...");
tracing::info!("Command and Control registered for {}", controller.registry.get_me().get_identifier());
controller.discover_nodes().await;
perform_announce(&mut controller).await;
controller.start_services().await;
}
let pulse_check = controller.pulse_check.clone();
let pulse_check_cron = cron::Schedule::from_str(controller.pulse_check_cron.as_str()).unwrap();
let pulse_check_next = pulse_check_cron.upcoming(chrono::Utc).nth(0).unwrap();
let pulse_check_following = pulse_check_cron.upcoming(chrono::Utc).nth(1).unwrap();
#[cfg(feature = "monitor")]
let monitor = controller.monitor.clone();
#[cfg(feature = "monitor")]
let monitor_cron = cron::Schedule::from_str(controller.monitor_cron.as_str()).unwrap();
#[cfg(feature = "monitor")]
let monitor_next = monitor_cron.upcoming(chrono::Utc).nth(0).unwrap();
#[cfg(feature = "monitor")]
let monitor_following = monitor_cron.upcoming(chrono::Utc).nth(1).unwrap();
std::mem::drop(controller);
if pulse_check {
let _ = E::spawn_from_executor(executor, async move {
tracing::info!("Pulse check service starting...");
let start_duration = match u32::try_from(match pulse_check_next.signed_duration_since(chrono::Utc::now()).to_std() {
Ok(d) => d,
Err(_) => Duration::new(1, 0)
}.as_millis()) {
Ok(u) => u,
Err(_) => panic!("Period defined is too large for timer")
};
let interval_duration = match u32::try_from(match pulse_check_following.signed_duration_since(pulse_check_next).to_std() {
Ok(d) => d,
Err(_) => Duration::new(1, 0)
}.as_millis()) {
Ok(u) => u,
Err(_) => panic!("Period defined is too large for timer")
};
let mut delay = E::once(start_duration).await;
let mut interval = E::interval(interval_duration).await;
delay.tick().await;
loop {
interval.tick().await;
tracing::debug!("Pulse check service running");
pulse_run(controller_mutex.clone()).await;
}
});
}
#[cfg(feature = "monitor")]
if monitor {
let _ = E::spawn_from_executor(executor, async move {
tracing::info!("Monitor service starting...");
let start_duration = match u32::try_from(match pulse_check_next.signed_duration_since(chrono::Utc::now()).to_std() {
Ok(d) => d,
Err(_) => Duration::new(1, 0)
}.as_millis()) {
Ok(u) => u,
Err(_) => panic!("Period defined is too large for timer")
};
let interval_duration = match u32::try_from(match pulse_check_following.signed_duration_since(pulse_check_next).to_std() {
Ok(d) => d,
Err(_) => Duration::new(1, 0)
}.as_millis()) {
Ok(u) => u,
Err(_) => panic!("Period defined is too large for timer")
};
let mut delay = E::once(start_duration).await;
let mut interval = E::interval(interval_duration).await;
delay.tick().await;
loop {
interval.tick().await;
tracing::debug!("Monitor service running");
product_os_monitoring::process_statistics(None);
}
}); }
}
None => tracing::error!("Failed to lock controller")
}
}