product-os-command-control 0.0.14

Product OS : Command and Control provides a set of tools for running command and control across a distributed set of Product OS : Servers.
Documentation
use std::prelude::v1::*;

use headers::{ Header, HeaderName };

use serde::{ Deserialize, Serialize };
use lazy_static::lazy_static;
use product_os_request::ProductOSClient;


#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct CommandControlAuthenticateError {
    pub error: CommandControlAuthenticateErrorState
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum CommandControlAuthenticateErrorState {
    KeyError(String),
    None
}

impl std::error::Error for CommandControlAuthenticateError {}

impl std::fmt::Display for CommandControlAuthenticateError {
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        match &self.error {
            CommandControlAuthenticateErrorState::KeyError(m) => write!(f, "{}", m),
            CommandControlAuthenticateErrorState::None => write!(f, "No error")
        }
    }
}

lazy_static! {
    static ref X_INTERACT_COMMAND: HeaderName = HeaderName::from_static("x-product-os-command");
    static ref X_INTERACT_CONTROL: HeaderName = HeaderName::from_static("x-product-os-control");
    static ref X_INTERACT_VERIFY: HeaderName = HeaderName::from_static("x-product-os-verify");
}

pub struct XProductOSCommandHeader(String);

impl XProductOSCommandHeader {
    pub fn value(self) -> String {
        self.0
    }
}

impl Header for XProductOSCommandHeader {
    fn name() -> &'static HeaderName {
        &X_INTERACT_COMMAND
    }

    fn decode<'i, I>(values: &mut I) -> Result<Self, headers::Error>
        where
            I: Iterator<Item = &'i headers::HeaderValue> {
        let value = values
            .next()
            .ok_or_else(headers::Error::invalid)?;

        Ok(XProductOSCommandHeader(value.to_str().unwrap().to_string()))
        // Err(headers::Error::invalid())
    }

    fn encode<E>(&self, values: &mut E)
        where
            E: Extend<headers::HeaderValue> {
        let value = headers::HeaderValue::from_str(self.0.as_str()).unwrap();
        values.extend(std::iter::once(value));
    }
}

pub struct XProductOSControlHeader(String);

impl XProductOSControlHeader {
    pub fn value(self) -> String {
        self.0
    }
}

impl Header for XProductOSControlHeader {
    fn name() -> &'static HeaderName {
        &X_INTERACT_CONTROL
    }

    fn decode<'i, I>(values: &mut I) -> Result<Self, headers::Error>
        where
            I: Iterator<Item = &'i headers::HeaderValue> {
        let value = values
            .next()
            .ok_or_else(headers::Error::invalid)?;

        Ok(XProductOSControlHeader(value.to_str().unwrap().to_string()))
        // Err(headers::Error::invalid())
    }

    fn encode<E>(&self, values: &mut E)
        where
            E: Extend<headers::HeaderValue> {
        let value = headers::HeaderValue::from_str(self.0.as_str()).unwrap();
        values.extend(std::iter::once(value));
    }
}

pub struct XProductOSVerifyHeader(String);

impl XProductOSVerifyHeader {
    pub fn value(self) -> String {
        self.0
    }
}

impl Header for XProductOSVerifyHeader {
    fn name() -> &'static HeaderName {
        &X_INTERACT_VERIFY
    }

    fn decode<'i, I>(values: &mut I) -> Result<Self, headers::Error>
        where
            I: Iterator<Item = &'i headers::HeaderValue> {
        let value = values
            .next()
            .ok_or_else(headers::Error::invalid)?;

        Ok(XProductOSVerifyHeader(value.to_str().unwrap().to_string()))
        // Err(headers::Error::invalid())
    }

    fn encode<E>(&self, values: &mut E)
        where
            E: Extend<headers::HeaderValue> {
        let value = headers::HeaderValue::from_str(self.0.as_str()).unwrap();
        values.extend(std::iter::once(value));
    }
}

#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct AuthExchangeKeyData {
    pub identifier: String,
    pub session: String,
    pub public_key: Vec<u8>
}


pub fn perform_self_trust(controller: &mut crate::ProductOSController) {
    tracing::info!("Generating own key exchange...");

    let self_identifier = controller.registry.get_me().get_identifier();

    let (self_key_session, self_public_key) = controller.create_key_session();
    let key_exchange = AuthExchangeKeyData {
        identifier: self_identifier.clone(),
        session: self_key_session,
        public_key: self_public_key.to_vec()
    };
    controller.generate_key(key_exchange.session, key_exchange.public_key.as_slice(), key_exchange.identifier, None);

    let mut certificates = vec!();
    for cert in &controller.certificates.certificates {
        certificates.push(cert.as_slice())
    }

    for cert in certificates {
        controller.requester.add_trusted_certificate_pem(cert.to_vec());
        controller.client.build(&controller.requester);
    }
}




pub async fn perform_key_exchange(controller: &mut crate::ProductOSController) {
    tracing::info!("Performing key exchanges...");

    let self_identifier = controller.registry.get_me().get_identifier();
    let control_url = controller.registry.get_me().get_address();

    let nodes = controller.registry.get_nodes_endpoints(0, true);

    tracing::info!("Performing key exchange {:?}", nodes);
    for (identifier, (url, key)) in nodes {
        if url != control_url {
            match key {
                Some(_) => (),
                None => {
                    tracing::info!("Authenticating {}: {}", identifier, url);

                    let (key_session, public_key) = controller.create_key_session();
                    let key_exchange = AuthExchangeKeyData {
                        identifier: self_identifier.clone(),
                        session: key_session,
                        public_key: public_key.to_vec()
                    };

                    tracing::trace!("Sending authentication exchange {:?}", key_exchange);

                    match crate::commands::command(&controller.client, url.clone(), vec!(), "auth".to_string(), "exchange".to_string(), Some(serde_json::value::to_value(key_exchange).unwrap())).await {
                        Ok(response) => {
                            let status = response.status();

                            match status {
                                product_os_request::StatusCode::CONFLICT => {
                                    let auth: CommandControlAuthenticateError = match serde_json::from_str(controller.client.text(response).await.unwrap().as_str()) {
                                        Ok(auth_error) => auth_error,
                                        Err(_) => CommandControlAuthenticateError { error: CommandControlAuthenticateErrorState::None }
                                    };

                                    tracing::error!("Error object auth {:?}", auth);

                                    match auth.error {
                                        CommandControlAuthenticateErrorState::KeyError(_) => {
                                            tracing::info!("Error from remote node - keys already exist - problem {}", identifier);
                                        },
                                        CommandControlAuthenticateErrorState::None => ()
                                    };
                                },
                                product_os_request::StatusCode::OK => {
                                    let body = controller.client.text(response).await.unwrap();
                                    tracing::info!("Response received from {}: {} {:?}", url, status, body);

                                    let key_exchange: AuthExchangeKeyData = serde_json::from_str(body.as_str()).unwrap();
                                    controller.generate_key(key_exchange.session, key_exchange.public_key.as_slice(), key_exchange.identifier, None);
                                }
                                _ => {
                                    let body = controller.client.bytes(response).await.unwrap();
                                    tracing::error!("Error response received from {}: {} {:?}", url, status, body);
                                }
                            }
                        },
                        Err(e) => {
                            tracing::error!("Error encountered {:?} from {}", e, url);
                        }
                    }
                }
            }
        }
    }

    tracing::info!("Finished key exchanges...");
}