use core::str::FromStr;
use std::prelude::v1::*;
use std::collections::BTreeMap;
use std::sync::Arc;
use serde::{ Deserialize, Serialize };
use product_os_capabilities::{Features, ServiceError, Services, What};
use product_os_security::{AsByteVector, DHKeyStore, RandomGenerator, certificates::Certificates, RandomGeneratorTemplate, RNG, StdRng, SeedableRng};
use product_os_store::ProductOSKeyValueStore;
use chrono::{DateTime, Utc };
use parking_lot::Mutex;
use product_os_request::Uri;
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct Node {
id: uuid::Uuid,
machine_id: String,
uri: String,
process_id: u32,
certificate: Vec<u8>,
capabilities: Vec<String>,
services: Services,
features: Features,
failures: u8,
created_at: DateTime<Utc>,
updated_at: DateTime<Utc>
}
impl AsByteVector for &Node {
fn as_byte_vector(&self) -> Vec<u8> {
let mut bytes = vec!();
bytes.extend_from_slice(self.id.clone().as_bytes());
bytes.extend_from_slice(self.machine_id.clone().as_bytes());
bytes.extend_from_slice(self.uri.to_string().as_bytes());
bytes.extend_from_slice(self.certificate.clone().as_slice());
bytes.extend_from_slice(&[self.failures.clone().clone()]);
bytes.extend_from_slice(self.created_at.to_string().as_bytes());
bytes.extend_from_slice(self.updated_at.to_string().as_bytes());
bytes
}
}
impl Node {
pub fn default(config: &product_os_configuration::Configuration, certificates: Certificates) -> Self {
let machine_uid = match machine_uid::get() {
Ok(uid) => uid,
Err(e) => panic!("Unable to generate machine id: {}", e)
};
Self {
id: uuid::Uuid::new_v4(),
uri: Uri::from_str(config.url_address().as_str()).unwrap().to_string(),
process_id: std::process::id(),
machine_id: product_os_security::create_string_hash(machine_uid.as_str()),
certificate: certificates.certificates.first().unwrap().to_owned(),
capabilities: Vec::new(),
services: Services::new(),
features: Features::new(),
failures: 0,
created_at: Utc::now(),
updated_at: Utc::now()
}
}
pub fn get_identifier(&self) -> String {
self.id.to_string()
}
pub fn get_protocol(&self) -> String {
let uri = Uri::from_str(self.uri.as_str()).unwrap();
match uri.scheme() {
None => String::new(),
Some(s) => s.to_string()
}
}
pub fn get_address(&self) -> Uri {
Uri::from_str(self.uri.as_str()).unwrap()
}
pub fn get_process_id(&self) -> u32 {
self.process_id
}
pub fn get_certificate(&self) -> Vec<u8> {
self.certificate.to_owned()
}
pub fn get_failures(&self) -> u8 {
self.failures
}
pub fn get_features(&self) -> &Features {
&self.features
}
pub fn get_services(&self) -> &Services {
&self.services
}
pub fn match_node(&self, selector: &str, search_value: &str) -> bool {
let mut matched = true;
let search = selector;
let value = search_value;
match search {
"feature" => {
match self.features.get(value) {
Some(_) => (),
None => matched = false
}
},
"capability" => {
for capability in &self.capabilities {
if capability != value { matched = false; }
}
},
"service.kind" => {
match self.services.find(value.to_string()) {
Some(_) => (),
None => matched = false
}
},
"service.enabled" => {
for (_, service) in self.services.list() {
if service.enabled.to_string() != value { matched = false; }
}
},
"service.active" => {
for (_, service) in self.services.list() {
if service.active.to_string() != value { matched = false; }
}
},
_ => ()
}
matched
}
pub fn match_node_query(&self, query: &BTreeMap<&str, &str>) -> bool {
let mut matched = true;
for (s, v) in query {
let search = s;
let value = v;
if !self.match_node(search, value) { matched = false }
}
matched
}
pub fn get_created_at(&self) -> DateTime<Utc> {
self.created_at
}
pub fn get_last_updated_at(&self) -> DateTime<Utc> {
self.updated_at
}
}
pub struct Registry {
me: Node,
nodes: BTreeMap<String, Node>,
key_store: DHKeyStore,
store: Arc<ProductOSKeyValueStore>,
max_failures: u8
}
impl Registry {
pub fn new(config: &product_os_configuration::Configuration, key_value_store: Arc<ProductOSKeyValueStore>, certificates: Certificates) -> Self {
let me = Node::default(config, certificates);
let registry = Registry {
me,
nodes: BTreeMap::new(),
key_store: DHKeyStore::new(),
store: key_value_store,
max_failures: config.get_cc_max_failures(),
};
registry
}
pub fn get_max_failures(&self) -> u8 {
self.max_failures.to_owned()
}
async fn upsert_me_remote(&mut self) {
self.store.group_set(self.me.id.to_string().as_str(), serde_json::to_string(&self.me).unwrap().as_str()).unwrap_or_default()
}
async fn upsert_node_remote(&mut self, node: &Node) {
tracing::info!("Upserting node: {:?}", node);
self.store.group_set(node.id.to_string().as_str(), serde_json::to_string(node).unwrap().as_str()).unwrap_or_default();
}
async fn remove_node_remote(&mut self, identifier: &str) {
self.store.group_remove(identifier).unwrap_or_default()
}
async fn get_node_remote(&mut self, id: &str) -> Option<Node> {
match self.store.group_get(id) {
Ok(v) => {
let mut node: Node = serde_json::from_str(v.as_str()).unwrap();
let _ = node.features.setup_router(); Some(node)
},
Err(_) => None
}
}
pub async fn check_me_remote(&mut self) -> Option<&Node> {
let id = self.me.id.to_string();
match self.get_node_remote(id.as_str()).await {
Some(node) => {
if node.id != self.me.id {
None
}
else {
Some(&self.me)
}
},
None => { None }
}
}
pub fn get_me(&self) -> &Node {
&self.me
}
pub async fn update_me(&mut self) {
self.upsert_me_remote().await;
}
pub fn update_me_status(&mut self, success: bool) -> bool {
let failures = if success { 0 } else { self.me.failures.to_owned() + 1 };
if failures < self.max_failures {
self.me.failures = failures;
self.me.updated_at = Utc::now();
true
}
else {
false
}
}
pub async fn update_pulse_status(&mut self, id: &str, success: bool) -> bool {
match self.get_node_remote(id).await {
Some(mut node) => {
let failures = if success { 0 } else { node.failures + 1 };
if failures < self.max_failures {
node.failures = failures;
node.updated_at = Utc::now();
self.upsert_node_remote(&node).await;
self.nodes.insert(node.id.to_string(), node);
true
}
else {
tracing::info!("Removing node due to failures count {}: {:?}", failures, node);
self.remove_node(node.id.to_string().as_str()).await;
false
}
},
None => false
}
}
pub fn upsert_node_local(&mut self, identifier: String, mut node: Node) {
let _ = node.features.setup_router(); self.nodes.insert(identifier, node);
}
pub fn find_nodes(&self, query: BTreeMap<&str, &str>, exclude_me: bool) -> BTreeMap<String, &Node> {
let mut result = BTreeMap::new();
let me = self.me.id.to_string();
for (id, node) in &self.nodes {
if (!exclude_me || (exclude_me && !me.eq(id))) && node.match_node_query(&query) { result.insert(id.to_string(), node); }
}
result
}
pub fn get_node(&self, id: &str) -> Option<&Node> {
self.nodes.get(id)
}
pub fn get_nodes(&self, skip: u8, exclude_me: bool) -> BTreeMap<String, &Node> {
let mut nodes = BTreeMap::new();
let mut count = 0;
let me = self.me.id.to_string();
for (id, node) in self.nodes.iter() {
if (!exclude_me || (exclude_me && !me.eq(id))) && count >= skip { nodes.insert(id.to_string(), node); }
count = count + 1;
}
nodes
}
pub fn get_nodes_certificates(&self, skip: u8, exclude_me: bool) -> Vec<Vec<u8>> {
let mut certificates = Vec::new();
let mut count = 0;
let me = self.me.id.to_string();
for (id, node) in self.nodes.iter() {
if (!exclude_me || (exclude_me && !me.eq(id))) && count >= skip { certificates.push(node.get_certificate()) }
count = count + 1;
}
certificates
}
pub fn get_nodes_raw_certificates(&self, skip: u8, exclude_me: bool) -> Vec<Vec<u8>> {
let mut certificates = Vec::new();
let mut count = 0;
let me = self.me.id.to_string();
for (id, node) in self.nodes.iter() {
if (!exclude_me || (exclude_me && !me.eq(id))) && count >= skip { certificates.push(node.get_certificate()) }
count = count + 1;
}
certificates
}
pub fn get_nodes_endpoints(&self, skip: u8, exclude_me: bool) -> BTreeMap<String, (Uri, Option<Vec<u8>>)> {
let mut node_map = BTreeMap::new();
let mut count = 0;
let me = self.me.id.to_string();
for (id, node) in self.nodes.iter() {
if (!exclude_me || (exclude_me && !me.eq(id))) && count >= skip {
let address = node.get_address();
let key = self.get_key(id);
node_map.insert(id.to_string(), (address, key));
}
count = count + 1;
}
node_map
}
pub async fn remove_node(&mut self, identifier: &str) -> Option<Node> {
match self.nodes.remove(identifier) {
Some(node) => {
self.remove_node_remote(node.id.to_string().as_str()).await;
Some(node)
},
None => None
}
}
pub fn pick_node(&self, query: BTreeMap<&str, &str>) -> Option<&Node> {
let eligible_nodes = Vec::from_iter(self.find_nodes(query, false).into_iter());
let select = RandomGenerator::new(Some(RNG::Std(StdRng::from_entropy()))).get_random_usize(0, eligible_nodes.len());
match eligible_nodes.get(select) {
Some(value) => Some(value.1),
None => None
}
}
pub fn pick_node_for_capability(&self, capability: &str) -> Option<&Node> {
let mut query = BTreeMap::new();
query.insert("capability", capability);
self.pick_node(query)
}
pub async fn add_feature(&mut self, feature: Arc<dyn product_os_capabilities::Feature>, base_path: String, router: &mut product_os_router::ProductOSRouter) {
let _ = self.me.features.add(feature, base_path, router).await; self.update_me().await;
}
pub async fn add_feature_mut(&mut self, feature: Arc<Mutex<dyn product_os_capabilities::Feature>>, base_path: String, router: &mut product_os_router::ProductOSRouter) {
let _ = self.me.features.add_mut(feature, base_path, router).await; self.update_me().await;
}
pub fn pick_node_for_feature(&self, feature: &str) -> Option<&Node> {
let mut query = BTreeMap::new();
query.insert("feature", feature);
self.pick_node(query)
}
pub async fn remove_feature(&mut self, identifier: &str) {
let _ = self.me.features.remove(identifier); self.update_me().await;
}
pub async fn add_service(&mut self, service: Arc<dyn product_os_capabilities::Service>) {
self.me.services.add(service).await;
self.update_me().await;
}
pub async fn add_service_mut(&mut self, service: Arc<Mutex<dyn product_os_capabilities::Service>>) {
let _ = self.me.services.add_mut(service).await; self.update_me().await;
}
pub async fn set_service_active(&mut self, identifier: String, status: bool) {
let id = identifier.as_str();
match self.me.services.get_mut(id) {
None => (),
Some(s) => {
s.active = status;
self.update_me().await;
}
}
}
pub async fn remove_service(&mut self, identifier: &str) {
self.me.services.remove(identifier);
self.update_me().await;
}
pub async fn remove_inactive_services(&mut self, query: BTreeMap<&str, &str>) {
let mut matches = Vec::new();
for (identifier, _) in self.find_nodes(query, true) {
matches.push(identifier.to_owned());
}
for identifier in &matches {
self.remove_service(identifier).await;
}
if matches.len() > 0 { self.update_me().await };
}
pub async fn start_services(&mut self) -> Result<(), ()> {
for (_, service) in self.me.services.list_mut() {
match service.start().await {
Ok(_) => {}
Err(_) => return Err(())
}
}
Ok(())
}
pub async fn start_service(&mut self, identifier: &str) -> Result<(), ()> {
match self.me.services.get_mut(identifier) {
None => Err(()),
Some(s) => s.start().await
}
}
pub async fn stop_service(&mut self, identifier: &str) -> Result<(), ()> {
match self.me.services.get_mut(identifier) {
None => Err(()),
Some(s) => s.stop().await
}
}
pub async fn restart_service(&mut self, identifier: &str) -> Result<(), ()> {
match self.me.services.get_mut(identifier) {
None => Err(()),
Some(s) => s.restart().await
}
}
pub async fn call_service(&mut self, identifier: &str, action: &What, input: &Option<serde_json::Value>) -> Result<Option<serde_json::Value>, ServiceError> {
match self.me.services.get_mut(identifier) {
None => Err(ServiceError::GenericError(format!("Service {} not found", identifier))),
Some(s) => s.call(action, input).await
}
}
pub async fn discover_nodes(&mut self) {
let mut nodes = BTreeMap::new();
match self.store.group_find(None) {
Ok(ns) => {
nodes = ns;
},
Err(_) => {
tracing::error!("Error getting nodes from store store");
}
}
for (id, node) in nodes {
match serde_json::from_str(node.as_str()) {
Ok(n) => {
let mut node: Node = n;
let _ = node.features.setup_router(); tracing::trace!("Importing remote node: {:?}", node.id);
self.upsert_node_local(node.id.to_string(), node);
},
Err(e) => {
tracing::error!("Error importing remote node {} - purging: {:?}", id, e);
self.remove_node_remote(id.as_str()).await;
}
}
}
}
pub fn get_key(&self, identifier: &str) -> Option<Vec<u8>> {
match self.key_store.get_key(identifier) {
Some(k) => Some(k.to_vec()),
None => None
}
}
pub fn create_key_session(&mut self) -> (String, [u8; 32]) {
self.key_store.create_session()
}
pub fn generate_key(&mut self, session_identifier: &str, remote_public_key: &[u8], association: String, remote_session_identifier: Option<String>) {
self.key_store.generate_key(session_identifier, remote_public_key, association, remote_session_identifier);
}
}