mod registry;
use std::collections::HashMap;
use std::sync::{ Arc };
use parking_lot::{ Mutex, MutexGuard };
use std::str::FromStr;
use std::time::Duration;
use product_os_security::certificates::Certificates;
use serde::{ Deserialize, Serialize };
use url::Url;
use product_os_configuration::Configuration;
use product_os_store::{ProductOSKeyValueStore, ProductOSRelationalStore};
use registry::Registry;
pub use product_os_request::{ ProductOSResponse };
pub mod authentication;
mod commands;
pub use product_os_request::Method;
pub use registry::Node;
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct AuthExchangeKeyData {
pub identifier: String,
pub session: String,
pub public_key: Vec<u8>
}
pub struct Command {
requester: product_os_request::ProductOSRequester,
node_url: Url,
verify_key: Vec<u8>,
module: String,
instruction: String,
data: Option<serde_json::Value>
}
impl Command {
pub async fn command(&self) -> Result<ProductOSResponse, product_os_request::ProductOSRequestError> {
commands::command(&self.requester, self.node_url.to_owned(), self.verify_key.to_owned(), self.module.to_owned(), self.instruction.to_owned(), self.data.to_owned()).await
}
}
pub struct Ask {
requester: product_os_request::ProductOSRequester,
node_url: Url,
verify_key: Vec<u8>,
path: String,
data: Option<serde_json::Value>,
headers: HashMap<String, String>,
params: HashMap<String, String>,
method: product_os_request::Method
}
impl Ask {
pub async fn ask(&self) -> Result<ProductOSResponse, product_os_request::ProductOSRequestError> {
commands::ask(&self.requester, self.node_url.to_owned(), self.verify_key.to_owned(), self.path.to_owned(), self.data.to_owned(), self.headers.to_owned(), self.params.to_owned(), self.method.to_owned()).await
}
}
pub struct ProductOSController {
configuration: Configuration,
certificates: Certificates,
max_servers: u8,
registry: registry::Registry,
pulse_check: bool,
pulse_check_cron: String,
monitor: bool,
monitor_cron: String,
client: product_os_request::ProductOSRequester,
relational_store: Arc<ProductOSRelationalStore>,
key_value_store: Arc<ProductOSKeyValueStore>
}
impl ProductOSController {
pub fn new(configuration: product_os_configuration::Configuration, certificates: Certificates, key_value_store: Arc<ProductOSKeyValueStore>, relational_store: Arc<ProductOSRelationalStore>) -> Self {
let registry = registry::Registry::new(&configuration, key_value_store.clone(), certificates.certificates.to_owned());
let mut requester = product_os_request::ProductOSRequester::new();
requester.add_header("x-product-os-command".to_string(), registry.get_me().get_identifier(), false);
Self {
certificates,
max_servers: configuration.get_cc_max_servers(),
pulse_check: configuration.is_pulse_check_enabled(),
pulse_check_cron: configuration.get_hearbeat_cron().to_string(),
monitor: configuration.is_monitor_enabled(),
monitor_cron: configuration.get_monitor_cron().to_string(),
configuration,
relational_store,
key_value_store,
registry,
client: requester
}
}
pub fn get_registry(&self) -> &Registry {
&self.registry
}
pub async fn discover_nodes(&mut self) {
self.registry.discover_nodes().await;
for cert in self.registry.get_nodes_raw_certificates(0, true) {
self.client.add_trusted_certificate_pem(cert);
}
}
pub fn get_max_servers(&self) -> u8 {
self.max_servers
}
pub fn upsert_node_local(&mut self, identifier: String, node: Node) {
self.registry.upsert_node_local(identifier, node);
}
pub fn get_certificates(&self) -> Vec<&[u8]> {
let mut certificates = vec!();
for cert in &self.certificates.certificates {
certificates.push(cert.0.as_slice())
}
certificates
}
pub fn get_private_key(&self) -> &[u8] {
self.certificates.private.0.as_slice()
}
pub fn get_certificates_and_private_key(&self) -> Certificates {
self.certificates.to_owned()
}
pub fn validate_certificate(&self, certificate: Vec<u8>) -> bool {
tracing::trace!("Checking certificate stored {:?} vs given {:?}", self.certificates.certificates, certificate);
let provided_certificate = certificate.as_slice();
for cert in self.get_certificates() {
if cert.eq(provided_certificate) { return true; }
}
false
}
pub fn validate_verify_tag<T>(&self, query: Option<&HashMap<String, String>>, payload: Option<T>,
headers: Option<&HashMap<String, String>>, verify_identifier: String, verify_tag: String) -> bool
where T: product_os_security::AsByteVector
{
tracing::trace!("Checking verification provided {} from {}", verify_tag, verify_identifier);
let key = self.registry.get_key(verify_identifier);
tracing::trace!("Checking verification with key {:?}", key);
match key {
Some(verify_key) => {
product_os_security::verify_auth_request(query, false, payload, headers,
&["x-product-os-verify", "x-product-os-command", "x-product-os-control"],
verify_tag, Some(verify_key.as_slice()))
},
None => false
}
}
pub fn authenticate_command_control<T>(&self, query: Option<&HashMap<String, String>>, payload: Option<T>, headers: Option<&HashMap<String, String>>,
verify_identifier: Option<String>, verify_tag: Option<String>) -> Result<bool, authentication::CommandControlAuthenticateError>
where T: product_os_security::AsByteVector
{
tracing::trace!("Verify identifier {:?} and tag {:?} verify result", &verify_identifier, &verify_tag);
if verify_identifier.is_some() && verify_tag.is_some() && self.validate_verify_tag(query, payload, headers, verify_identifier.unwrap(), verify_tag.unwrap()) {
Ok(true)
}
else {
Err(authentication::CommandControlAuthenticateError {
error: authentication::CommandControlAuthenticateErrorState::KeyError(String::from("One of the keys provided was not valid"))
})
}
}
pub fn search_and_prepare_command(&self, query: HashMap<String, String>, module: String, instruction: String, data: Option<serde_json::Value>) -> Result<Command, product_os_request::ProductOSRequestError> {
let matching_node = self.registry.pick_node(query);
let client = &self.client;
match matching_node {
Some(node) => {
match self.registry.get_key(node.get_identifier()) {
None => Err(product_os_request::ProductOSRequestError {
error: product_os_request::ProductOSRequestErrorState::Error("No key found for matching node".to_string()),
generated_error: None
}),
Some(key) => Ok(Command {
requester: client.clone(),
node_url: node.get_address(),
verify_key: key,
module,
instruction,
data
})
}
},
None => Err(product_os_request::ProductOSRequestError {
error: product_os_request::ProductOSRequestErrorState::Error("No matching node found".to_string()),
generated_error: None
})
}
}
pub fn find_and_prepare_command(&self, key: String, manager: String, instruction: String, data: Option<serde_json::Value>) -> Result<Command, product_os_request::ProductOSRequestError> {
let query = HashMap::from([
("manager.key".to_string(), key),
("manager.kind".to_string(), manager.to_string()),
("manager.enabled".to_string(), true.to_string())
]);
self.search_and_prepare_command(query, manager.to_string(), instruction, data)
}
pub fn find_any_and_prepare_command(&self, capability: String, manager: String, instruction: String, data: Option<serde_json::Value>) -> Result<Command, product_os_request::ProductOSRequestError> {
let query = HashMap::from([
("capability".to_string(), capability),
("manager.enabled".to_string(), true.to_string())
]);
self.search_and_prepare_command(query, manager.to_string(), instruction, data)
}
pub async fn find_kind_and_prepare_command(&self, kind: String, manager: String, instruction: String, data: Option<serde_json::Value>) -> Result<Command, product_os_request::ProductOSRequestError> {
let query = HashMap::from([
("kind".to_string(), kind),
("manager.enabled".to_string(), true.to_string())
]);
self.search_and_prepare_command(query, manager.to_string(), instruction, data)
}
pub fn search_and_prepare_ask(&self, query: HashMap<String, String>, path: String, data: Option<serde_json::Value>, headers: HashMap<String, String>,
params: HashMap<String, String>, method: product_os_request::Method) -> Result<Ask, product_os_request::ProductOSRequestError> {
let matching_node = self.registry.pick_node(query);
let client = &self.client;
match matching_node {
Some(node) => {
match self.registry.get_key(node.get_identifier()) {
None => Err(product_os_request::ProductOSRequestError {
error: product_os_request::ProductOSRequestErrorState::Error("No key found for matching node".to_string()),
generated_error: None
}),
Some(key) => Ok(Ask {
requester: client.clone(),
node_url: node.get_address(),
verify_key: key,
path,
data,
headers,
params,
method
})
}
},
None => Err(product_os_request::ProductOSRequestError {
error: product_os_request::ProductOSRequestErrorState::Error("No matching node found".to_string()),
generated_error: None
})
}
}
pub fn find_feature_and_prepare_ask(&self, feature: String, path: String, data: Option<serde_json::Value>, headers: HashMap<String, String>,
params: HashMap<String, String>, method: product_os_request::Method) -> Result<Ask, product_os_request::ProductOSRequestError> {
let matching_node = self.registry.pick_node_for_feature(feature);
let client = &self.client;
match matching_node {
Some(node) => {
match self.registry.get_key(node.get_identifier()) {
None => Err(product_os_request::ProductOSRequestError {
error: product_os_request::ProductOSRequestErrorState::Error("No key found for matching node".to_string()),
generated_error: None
}),
Some(key) => Ok(Ask {
requester: client.clone(),
node_url: node.get_address(),
verify_key: key,
path,
data,
headers,
params,
method
})
}
},
None => Err(product_os_request::ProductOSRequestError {
error: product_os_request::ProductOSRequestErrorState::Error("No matching node found".to_string()),
generated_error: None
})
}
}
pub fn find_feature_and_ask_sync(&mut self, feature: String, path: String, data: Option<serde_json::Value>, headers: HashMap<String, String>,
params: HashMap<String, String>, method: product_os_request::Method) -> Result<ProductOSResponse, product_os_request::ProductOSRequestSyncError> {
let matching_node = self.registry.pick_node_for_feature(feature);
let client = &mut self.client;
match matching_node {
Some(node) => {
match self.registry.get_key(node.get_identifier()) {
None => Err(product_os_request::ProductOSRequestSyncError {
error: product_os_request::ProductOSRequestErrorState::Error("No key found for matching node".to_string()),
generated_error: None
}),
Some(key) => commands::ask_node_sync(client, node, key, path, data, headers, params, method)
}
},
None => Err(product_os_request::ProductOSRequestSyncError {
error: product_os_request::ProductOSRequestErrorState::Error("No matching node found".to_string()),
generated_error: None
})
}
}
pub fn get_configuration(&self) -> product_os_configuration::Configuration {
self.configuration.clone()
}
pub fn get_key(&self, identifier: String) -> Option<Vec<u8>> {
self.registry.get_key(identifier)
}
pub fn create_key_session(&mut self) -> (String, [u8; 32]) {
self.registry.create_key_session()
}
pub fn generate_key(&mut self, session_identifier: String, remote_public_key: &[u8], association: String, remote_session_identifier: Option<String>) {
self.registry.generate_key(session_identifier, remote_public_key, association, remote_session_identifier);
}
pub fn get_relational_store(&mut self) -> Arc<ProductOSRelationalStore> {
self.relational_store.clone()
}
pub fn get_key_value_store(&mut self) -> Arc<ProductOSKeyValueStore> {
self.key_value_store.clone()
}
pub async fn add_feature(&mut self, feature: Arc<dyn product_os_capabilities::Feature>, base_path: String, router: &mut product_os_router::ProductOSRouter) {
self.registry.add_feature(feature, base_path, router).await;
}
pub async fn remove_feature(&mut self, identifier: String) {
self.registry.remove_feature(identifier).await;
}
pub async fn add_service(&mut self, service: Arc<Mutex<dyn product_os_capabilities::Service>>) {
self.registry.add_service(service).await;
}
pub async fn set_service_active(&mut self, identifier: String, status: bool) {
self.registry.set_service_active(identifier, status).await;
}
pub async fn remove_service(&mut self, identifier: String) {
self.registry.remove_service(identifier).await;
}
pub async fn remove_inactive_services(&mut self, query: HashMap<String, String>) {
self.registry.remove_inactive_services(query).await;
}
pub async fn start_services(&mut self) {
self.registry.start_services().await;
}
}
pub async fn pulse_run(controller_unlocked: Arc<Mutex<ProductOSController>>) {
tracing::info!("Starting pulse run...");
let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
match controller_locked {
Some(mut controller) => {
controller.discover_nodes().await;
let self_identifier = controller.registry.get_me().get_identifier();
let control_url = controller.registry.get_me().get_address();
let nodes = controller.registry.get_nodes_endpoints(0, true);
match controller.registry.check_me_remote().await {
Some(_) => { controller.registry.update_me_status(true); },
None => {
let alive = controller.registry.update_me_status(false);
if !alive {
tracing::error!("Terminating server due to lost presence on remote registry");
std::process::exit(8);
};
}
}
let client = controller.client.clone();
let services = controller.get_registry().get_me().get_services().list();
for (_, arc_service) in services {
let service = arc_service.service.lock();
match service.status().await {
Ok(_) => {}
Err(_) => {}
}
}
std::mem::drop(controller);
for (identifier, (url, key)) in nodes {
if url != control_url {
match key {
Some(verify_key) => {
match commands::command(&client, url.clone(), verify_key, "status".to_string(), "ping".to_string(), None).await {
Ok(response) => {
let status = response.status();
match status {
product_os_request::StatusCode::UNAUTHORIZED => {
let auth: authentication::CommandControlAuthenticateError = match serde_json::from_str(response.text().await.unwrap().as_str()) {
Ok(auth_error) => auth_error,
Err(_) => authentication::CommandControlAuthenticateError { error: authentication::CommandControlAuthenticateErrorState::None }
};
tracing::error!("Error object auth {:?}", auth);
match auth.error {
authentication::CommandControlAuthenticateErrorState::KeyError(_) => {
tracing::info!("Attempting to purge remote node - keys don't match {}", identifier);
let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
match controller_locked {
Some(mut controller) => {
controller.registry.remove_node(identifier).await;
}
None => tracing::error!("Failed to lock controller")
}
},
authentication::CommandControlAuthenticateErrorState::None => ()
};
},
product_os_request::StatusCode::OK => {
let body = response.bytes().await.unwrap();
tracing::info!("Response received from {}: {} {:?}", url, status, body);
}
_ => {
let body = response.bytes().await.unwrap();
tracing::error!("Error response received from {}: {} {:?}", url, status, body);
tracing::info!("Attempting to purge remote node - keys don't match {}", identifier);
let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
match controller_locked {
Some(mut controller) => {
controller.registry.update_pulse_status(identifier, false).await;
}
None => tracing::error!("Failed to lock controller")
}
}
}
},
Err(e) => {
tracing::error!("Error encountered {:?} from {}", e, url);
match e.generated_error {
None => {}
Some(e) => {
if e.is_connect() || e.is_timeout() {
let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
match controller_locked {
Some(mut controller) => {
controller.registry.update_pulse_status(identifier, false).await;
}
None => tracing::error!("Failed to lock controller")
}
}
}
}
}
}
},
None => ()
}
}
else {
if self_identifier != identifier {
tracing::info!("Attempting to purge self duplicate node - identifier mismatch {}", identifier);
let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
match controller_locked {
Some(mut controller) => {
controller.registry.remove_node(identifier).await;
}
None => tracing::error!("Failed to lock controller")
}
}
}
}
tracing::info!("Finished pulse run...");
}
None => tracing::error!("Failed to lock controller")
}
}
fn perform_self_trust(controller: &mut ProductOSController) {
tracing::info!("Generating own key exchange...");
let self_identifier = controller.registry.get_me().get_identifier();
let (self_key_session, self_public_key) = controller.create_key_session();
let key_exchange = AuthExchangeKeyData {
identifier: self_identifier.clone(),
session: self_key_session,
public_key: self_public_key.to_vec()
};
controller.generate_key(key_exchange.session, key_exchange.public_key.as_slice(), key_exchange.identifier, None);
let mut certificates = vec!();
for cert in &controller.certificates.certificates {
certificates.push(cert.0.as_slice())
}
for cert in certificates {
controller.client.add_trusted_certificate_pem(cert.to_vec());
}
}
pub async fn perform_key_exchange(controller: &mut ProductOSController) {
tracing::info!("Performing key exchanges...");
let self_identifier = controller.registry.get_me().get_identifier();
let control_url = controller.registry.get_me().get_address();
let nodes = controller.registry.get_nodes_endpoints(0, true);
tracing::info!("Performing key exchange {:?}", nodes);
for (identifier, (url, key)) in nodes {
if url != control_url {
match key {
Some(_) => (),
None => {
tracing::info!("Authenticating {}: {}", identifier, url);
let (key_session, public_key) = controller.create_key_session();
let key_exchange = AuthExchangeKeyData {
identifier: self_identifier.clone(),
session: key_session,
public_key: public_key.to_vec()
};
tracing::trace!("Sending authentication exchange {:?}", key_exchange);
match commands::command(&controller.client, url.clone(), vec!(), "auth".to_string(), "exchange".to_string(), Some(serde_json::value::to_value(key_exchange).unwrap())).await {
Ok(response) => {
let status = response.status();
match status {
product_os_request::StatusCode::CONFLICT => {
let auth: authentication::CommandControlAuthenticateError = match serde_json::from_str(response.text().await.unwrap().as_str()) {
Ok(auth_error) => auth_error,
Err(_) => authentication::CommandControlAuthenticateError { error: authentication::CommandControlAuthenticateErrorState::None }
};
tracing::error!("Error object auth {:?}", auth);
match auth.error {
authentication::CommandControlAuthenticateErrorState::KeyError(_) => {
tracing::info!("Error from remote node - keys already exist - problem {}", identifier);
},
authentication::CommandControlAuthenticateErrorState::None => ()
};
},
product_os_request::StatusCode::OK => {
let body = response.text().await.unwrap();
tracing::info!("Response received from {}: {} {:?}", url, status, body);
let key_exchange: AuthExchangeKeyData = serde_json::from_str(body.as_str()).unwrap();
controller.generate_key(key_exchange.session, key_exchange.public_key.as_slice(), key_exchange.identifier, None);
}
_ => {
let body = response.bytes().await.unwrap();
tracing::error!("Error response received from {}: {} {:?}", url, status, body);
}
}
},
Err(e) => {
tracing::error!("Error encountered {:?} from {}", e, url);
}
}
}
}
}
}
tracing::info!("Finished key exchanges...");
}
pub async fn perform_announce(controller: &mut MutexGuard<'_, ProductOSController>) {
tracing::info!("Announce searching for nodes...");
perform_key_exchange(controller).await;
let self_identifier = controller.registry.get_me().get_identifier();
let control_url = controller.registry.get_me().get_address();
let nodes = controller.registry.get_nodes_endpoints(0, true);
tracing::info!("Starting announce...");
tracing::info!("Nodes data {:?}", nodes);
for (identifier, (url, key)) in nodes {
if url != control_url {
match key {
Some(verify_key) => {
tracing::info!("Announcing {}: {}", identifier, url);
match commands::command(&controller.client, url.clone(), verify_key, "status".to_string(), "announce".to_string(), Some(serde_json::value::to_value(controller.get_registry().get_me()).unwrap())).await {
Ok(response) => {
let status = response.status();
match status {
product_os_request::StatusCode::UNAUTHORIZED => {
let auth: authentication::CommandControlAuthenticateError = match serde_json::from_str(response.text().await.unwrap().as_str()) {
Ok(auth_error) => auth_error,
Err(_) => authentication::CommandControlAuthenticateError { error: authentication::CommandControlAuthenticateErrorState::None }
};
tracing::error!("Error object auth {:?}", auth);
match auth.error {
authentication::CommandControlAuthenticateErrorState::KeyError(_) => {
tracing::info!("Error from remote node - keys don't match {}", identifier);
},
authentication::CommandControlAuthenticateErrorState::None => ()
};
},
product_os_request::StatusCode::OK => {
let body = response.bytes().await.unwrap();
tracing::info!("Response received from {}: {} {:?}", url, status, body);
}
_ => {
let body = response.bytes().await.unwrap();
tracing::error!("Error response received from {}: {} {:?}", url, status, body);
}
}
},
Err(e) => {
tracing::error!("Error encountered {:?} from {}", e, url);
}
};
},
None => ()
}
}
else {
if identifier != self_identifier {
tracing::info!("Attempting to purge self duplicate node - identifier mismatch {}", identifier);
controller.registry.remove_node(identifier).await;
}
}
}
tracing::info!("Finished announcing...");
}
pub async fn run_controller(controller_mutex: Arc<Mutex<ProductOSController>>) {
let controller_unlocked = controller_mutex.clone();
let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
match controller_locked {
Some(mut controller) => {
perform_self_trust(&mut controller);
controller.registry.update_me().await;
tracing::info!("Added self...");
tracing::info!("Command and Control initialized for {}", controller.registry.get_me().get_identifier());
controller.discover_nodes().await;
perform_announce(&mut controller).await;
controller.start_services().await;
let pulse_check = controller.pulse_check.clone();
let pulse_check_cron = cron::Schedule::from_str(controller.pulse_check_cron.as_str()).unwrap();
let pulse_check_next = pulse_check_cron.upcoming(chrono::Utc).nth(0).unwrap();
let pulse_check_following = pulse_check_cron.upcoming(chrono::Utc).nth(1).unwrap();
let monitor = controller.monitor.clone();
let monitor_cron = cron::Schedule::from_str(controller.monitor_cron.as_str()).unwrap();
let monitor_next = monitor_cron.upcoming(chrono::Utc).nth(0).unwrap();
let monitor_following = monitor_cron.upcoming(chrono::Utc).nth(1).unwrap();
std::mem::drop(controller);
if pulse_check {
tokio::spawn(async move {
tracing::info!("Pulse check service starting...");
let start_duration = match pulse_check_next.signed_duration_since(chrono::Utc::now()).to_std() {
Ok(d) => d,
Err(_) => Duration::new(1, 0)
};
let interval_duration = match pulse_check_following.signed_duration_since(pulse_check_next).to_std() {
Ok(d) => d,
Err(_) => Duration::new(1, 0)
};
let mut delay = tokio::time::interval(start_duration);
let mut interval = tokio::time::interval(interval_duration);
delay.tick().await;
loop {
interval.tick().await;
tracing::debug!("Pulse check service running");
pulse_run(controller_mutex.clone()).await;
}
});
}
if monitor {
tokio::spawn(async move {
tracing::info!("Monitor service starting...");
let start_duration = match monitor_next.signed_duration_since(chrono::Utc::now()).to_std() {
Ok(d) => d,
Err(_) => Duration::new(1, 0)
};
let interval_duration = match monitor_following.signed_duration_since(monitor_next).to_std() {
Ok(d) => d,
Err(_) => Duration::new(1, 0)
};
let mut delay = tokio::time::interval(start_duration);
let mut interval = tokio::time::interval(interval_duration);
delay.tick().await;
loop {
interval.tick().await;
tracing::debug!("Monitor service running");
product_os_monitoring::process_statistics(None);
}
}).await.unwrap(); }
}
None => tracing::error!("Failed to lock controller")
}
}