use openssl::hash::MessageDigest;
use openssl::nid::Nid;
use openssl::pkey::PKey;
use openssl::rsa::Rsa;
use openssl::x509::{X509NameBuilder, X509Req, X509ReqBuilder};
use std::str;
use std::sync::Arc;
use std::vec::Vec;
use rumqttc::{self, Packet, QoS};
use crate::config::{Config, IotRegistrationStatus};
use crate::error::IoTError::{
self, AWSClientError, AWSCreateCertificateFromCsrError, AWSCreateCsrError,
AWSRegisterThingError, AWSResponseError,
};
use crate::mqtt_client::{self, AsyncClient, ConnectionSettings};
use log::{debug, error};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::fs::File;
use std::io::prelude::*;
const CREATE_FROM_CSR_REQUEST_TOPIC: &str = "$aws/certificates/create-from-csr/json";
const CREATE_FROM_CSR_RESPONSE_TOPIC: &str = "$aws/certificates/create-from-csr/json/accepted";
const CREATE_FROM_CSR_ERROR_TOPIC: &str = "$aws/certificates/create-from-csr/json/rejected";
const REGISTER_THING_REQUEST_TOPIC: &str =
"$aws/provisioning-templates/templateName/provision/json";
const REGISTER_THING_RESPONSE_TOPIC: &str =
"$aws/provisioning-templates/templateName/provision/json/accepted";
const REGISTER_THING_ERROR_TOPIC: &str =
"$aws/provisioning-templates/templateName/provision/json/rejected";
const COMMONNAME: &str = "digitaldevice.link"; const COUNTRYNAME: &str = "CH";
const STATEORPROVINCENAME: &str = "ZG";
const LOCALITYNAME: &str = "Zug";
const ORGANIZATIONNAME: &str = "digital device link";
#[derive(Deserialize, Debug, Default)]
#[serde(rename_all = "camelCase")]
struct CreateCertificateFromCsrResponse {
certificate_ownership_token: String,
certificate_id: String,
certificate_pem: String,
}
#[derive(Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
struct AWSErrorResponse {
status_code: i32,
error_code: String,
error_message: String,
}
#[derive(Serialize, Debug, Default)]
#[serde(rename_all = "camelCase")]
struct RegisterThingParameters {
serial_number: String,
material_number: String,
#[serde(rename(serialize = "GTIN"))]
gtin: String,
#[serde(rename(serialize = "IoTEndpointUrl"))]
iot_endpoint: String,
}
#[derive(Serialize, Debug, Default)]
#[serde(rename_all = "camelCase")]
struct RegisterThingRequest {
certificate_ownership_token: String,
parameters: RegisterThingParameters,
}
fn store_pem(pem: std::vec::Vec<u8>, path: &String) -> Result<(), IoTError> {
let mut file = File::create(path)?;
file.write_all(&pem)?;
Ok(())
}
fn generate_key_and_csr(
priv_key_path: &String,
pub_key_path: &String,
) -> Result<X509Req, IoTError> {
let rsa = Rsa::generate(2048)?;
let key_pair = PKey::from_rsa(rsa)?;
let priv_pem = key_pair.private_key_to_pem_pkcs8().unwrap();
let pub_pem = key_pair.public_key_to_pem().unwrap();
store_pem(priv_pem, priv_key_path)?;
store_pem(pub_pem, pub_key_path)?;
let mut req_builder = X509ReqBuilder::new()?;
req_builder.set_pubkey(&key_pair)?;
let mut x509_name = X509NameBuilder::new()?;
x509_name.append_entry_by_nid(Nid::COMMONNAME, COMMONNAME)?;
x509_name.append_entry_by_nid(Nid::COUNTRYNAME, COUNTRYNAME)?;
x509_name.append_entry_by_nid(Nid::STATEORPROVINCENAME, STATEORPROVINCENAME)?;
x509_name.append_entry_by_nid(Nid::LOCALITYNAME, LOCALITYNAME)?;
x509_name.append_entry_by_nid(Nid::ORGANIZATIONNAME, ORGANIZATIONNAME)?;
let x509_name = x509_name.build();
req_builder.set_subject_name(&x509_name)?;
req_builder.sign(&key_pair, MessageDigest::sha256())?;
let req = req_builder.build();
Ok(req)
}
fn create_csr_request(priv_key_path: &String, pub_key_path: &String) -> Result<Vec<u8>, IoTError> {
let csr_payload = generate_key_and_csr(priv_key_path, pub_key_path)
.unwrap()
.to_pem()?;
let message_json: serde_json::Value = json!({
"certificateSigningRequest": str::from_utf8(&csr_payload).unwrap()
});
let message: Vec<u8> = message_json.to_string().into_bytes();
Ok(message)
}
async fn request_new_cert_from_csr(
iot_client: &crate::mqtt_client::AsyncClient,
priv_key_path: &String,
pub_key_path: &String,
) -> Result<(), IoTError> {
iot_client
.subscribe(CREATE_FROM_CSR_RESPONSE_TOPIC.to_string(), QoS::AtMostOnce)
.await
.unwrap();
iot_client
.subscribe(CREATE_FROM_CSR_ERROR_TOPIC.to_string(), QoS::AtMostOnce)
.await
.unwrap();
let message_res = create_csr_request(priv_key_path, pub_key_path);
match message_res {
Ok(message) => {
match iot_client
.publish(
CREATE_FROM_CSR_REQUEST_TOPIC.to_string(),
QoS::AtMostOnce,
message,
)
.await
{
Ok(_) => {
debug!("CSR Request PUBLISHED");
Ok(())
}
Err(e) => {
debug!("CSR Request publishing failed. {}", e);
Err(AWSClientError)
}
}
}
Err(_) => {
debug!("Incorrect CSR Request message: {:?}", message_res);
Err(AWSCreateCsrError)
}
}
}
async fn unsubscribe_csr_flow(iot_client: &crate::mqtt_client::AsyncClient) {
iot_client
.unsubscribe(CREATE_FROM_CSR_RESPONSE_TOPIC.to_string())
.await
.unwrap();
iot_client
.unsubscribe(CREATE_FROM_CSR_ERROR_TOPIC.to_string())
.await
.unwrap();
}
fn prepare_register_thing_topics(config: &Config) -> (String, String, String) {
let register_thing_request_topic = REGISTER_THING_REQUEST_TOPIC
.replace("templateName", &config.iot.provisioning_template_name);
let register_thing_response_topic = REGISTER_THING_RESPONSE_TOPIC
.replace("templateName", &config.iot.provisioning_template_name);
let register_thing_error_topic =
REGISTER_THING_ERROR_TOPIC.replace("templateName", &config.iot.provisioning_template_name);
return (
register_thing_request_topic,
register_thing_response_topic,
register_thing_error_topic,
);
}
fn create_thing_request(
certificate_ownership_token: &String,
config: &Config,
) -> Result<Vec<u8>, IoTError> {
let parameters = RegisterThingParameters {
serial_number: config.device.instrument_serial_number.clone(),
material_number: config.device.rudi_ref.clone(),
gtin: config.device.rudi_gtin.clone(),
iot_endpoint: config.iot.endpoint.clone(),
};
let rt_payload = RegisterThingRequest {
certificate_ownership_token: certificate_ownership_token.to_string(),
parameters: parameters,
};
let message: Vec<u8> = serde_json::to_vec(&rt_payload).unwrap();
debug!(
"RegisterThing message: {}",
str::from_utf8(&message).unwrap()
);
Ok(message)
}
async fn request_to_register_thing(
iot_client: &crate::mqtt_client::AsyncClient,
config: &Config,
certificate_ownership_token: &String,
) -> Result<(), IoTError> {
let (register_thing_request_topic, register_thing_response_topic, register_thing_error_topic) =
prepare_register_thing_topics(&config);
iot_client
.subscribe(register_thing_response_topic, QoS::AtMostOnce)
.await
.unwrap();
iot_client
.subscribe(register_thing_error_topic, QoS::AtMostOnce)
.await
.unwrap();
let message_res = create_thing_request(&certificate_ownership_token, &config);
match message_res {
Ok(message) => {
match iot_client
.publish(
register_thing_request_topic.to_string(),
QoS::AtMostOnce,
message,
)
.await
{
Ok(_) => Ok(()),
Err(e) => {
debug!("ThingRequest publishing failed. {}", e);
Err(AWSClientError)
}
}
}
Err(_) => {
debug!("Incorrect ThingRequest message: {:?}", message_res);
Err(AWSCreateCsrError)
}
}
}
async fn process_fleet_provisioning_response(
event: rumqttc::Publish,
iot_client: &crate::mqtt_client::AsyncClient,
config: &mut Config,
) -> Result<IotRegistrationStatus, IoTError> {
let (_, register_thing_response_topic, register_thing_error_topic) =
prepare_register_thing_topics(&config);
match event.topic.as_str() {
CREATE_FROM_CSR_RESPONSE_TOPIC => {
let r: CreateCertificateFromCsrResponse =
serde_json::from_str(str::from_utf8(&event.payload).unwrap()).unwrap();
debug!(
"CreateCertificateFromCsrResponse - certificate id: {}",
r.certificate_id
);
config.iot.client_cert_path = format!("{}-certificate.pem.crt", r.certificate_id);
config.set_abs_path_client_cert();
let cert_pem = r.certificate_pem;
store_pem(cert_pem.into_bytes(), &config.iot.client_cert_path)
.expect("problem storing certificate");
unsubscribe_csr_flow(&iot_client).await;
request_to_register_thing(&iot_client, &config, &r.certificate_ownership_token).await?;
debug!("ThingRequest published");
Ok(IotRegistrationStatus::CertReceived)
}
CREATE_FROM_CSR_ERROR_TOPIC => {
let err: AWSErrorResponse =
serde_json::from_str(str::from_utf8(&event.payload).unwrap()).unwrap();
error!("Error response to CreateCertificateFromCsr -- status: {} -- errorCode: {} -- errorMsg: {}",
err.status_code, err.error_code, err.error_message);
config.iot.client_registration_status = IotRegistrationStatus::CsrError.to_string();
config
.store_config_to_yaml()
.expect("problem storing \"config.yaml\"");
Err(AWSCreateCertificateFromCsrError)
}
register_topic if register_topic == register_thing_response_topic => {
let r = str::from_utf8(&event.payload).unwrap().to_string();
debug!("RegisterThingResponse: \n{}", r);
let v: Value = serde_json::from_str(r.as_str()).unwrap();
let thing_name = String::from(v["thingName"].to_string().replace("\"", ""));
config.iot.client_registration_status =
IotRegistrationStatus::Registered(thing_name.clone()).to_string();
config.iot.client_id = thing_name.clone();
config
.store_config_to_yaml()
.expect("problem storing \"config.yaml\"");
serde_json::to_writer(&File::create(&"thing_config.json".to_string())?, &r)?;
Ok(IotRegistrationStatus::Registered(thing_name))
}
register_topic if register_topic == register_thing_error_topic => {
let err: AWSErrorResponse =
serde_json::from_str(str::from_utf8(&event.payload).unwrap()).unwrap();
error!(
"Error response to RegisterThing -- status: {} -- errorCode: {} -- errorMsg: {}",
err.status_code, err.error_code, err.error_message
);
config.iot.client_registration_status =
IotRegistrationStatus::RegistrationError.to_string();
config
.store_config_to_yaml()
.expect("problem storing \"config.yaml\"");
Err(AWSRegisterThingError)
}
_ => {
debug!("Received unexpected event from AWS: {:?}", event);
Err(AWSResponseError)
}
}
}
pub async fn fleet_provisioning() -> Result<IotRegistrationStatus, IoTError> {
let mut config = Config::get_config_from_yaml().expect("Problem reading \"config.yaml\"");
let aws_settings = ConnectionSettings::new_tls(
config.iot.client_id.to_owned(),
config.iot.endpoint.to_owned(),
config.iot.port.to_owned(),
config.iot.ca_path.to_owned(),
config.iot.claim_cert_path.to_owned(),
config.iot.claim_priv_key_path.to_owned(),
None,
);
let (iot_client, iot_eventloop) = AsyncClient::new(aws_settings).await?;
let iot_client_1 = Arc::new(iot_client);
let iot_client_2 = Arc::clone(&iot_client_1);
request_new_cert_from_csr(
&iot_client_1,
&config.iot.client_priv_key_path,
&config.iot.client_pub_key_path,
)
.await
.expect("Failed \"request_new_cert_from_csr\"");
let mut iot_receiver = iot_client_1.get_receiver().await;
let iot_receiver_thread = tokio::spawn(async move {
loop {
match iot_receiver.recv().await {
Ok(event) => match event {
Packet::Publish(p) => {
match process_fleet_provisioning_response(p, &iot_client_1, &mut config)
.await
{
Ok(r) => {
debug!("Got process response: {:?}", r);
if let IotRegistrationStatus::Registered(_) = r {
break;
}
}
Err(_) => {
debug!("Failed response processing from AWS");
}
}
}
_ => debug!("Got event on iot_receiver: {:?}", event),
},
Err(_) => (),
}
}
});
let iot_monitor_thread = tokio::spawn(async move {
match mqtt_client::eventloop_monitor(iot_eventloop).await {
Err(e) => {
debug!("IoT event loop error: {}", e)
}
Ok(_) => {
debug!("Unknown termination of IoT event loop")
}
};
});
tokio::select!(
_ = iot_receiver_thread => {
iot_client_2.disconnect().await.unwrap();
return Ok(IotRegistrationStatus::Registered("IRRELEVANT".to_string()))
},
_ = iot_monitor_thread => {return Ok(IotRegistrationStatus::Unknown)}
);
}
#[cfg(test)]
mod tests {
use super::*;
use openssl::nid::Nid;
use std::fs;
#[test]
fn generate_key_and_csr_test() {
let priv_key_path = "IotPrivateKey.pem".to_string();
let pub_key_path = "IotPubKey.pem".to_string();
let req = generate_key_and_csr(&priv_key_path, &pub_key_path).unwrap();
let cn = req
.subject_name()
.entries_by_nid(Nid::COMMONNAME)
.nth(0)
.unwrap()
.data();
let cn_str = cn.as_utf8().unwrap().to_string();
assert_eq!(cn_str, COMMONNAME);
println!("generate_key_and_csr -- commonName: {}\n", cn_str);
let priv_key = fs::read_to_string(&priv_key_path).unwrap();
assert!(priv_key.contains("-----END PRIVATE KEY-----"));
println!("create_csr_request -- priv_key: \n{}", priv_key);
let pub_key = fs::read_to_string(&pub_key_path).unwrap();
assert!(pub_key.contains("-----END PUBLIC KEY-----"));
println!("create_csr_request -- pub_key: \n{}", pub_key);
}
#[test]
fn create_csr_request_test() {
let priv_key_path = "IotPrivateKey.pem".to_string();
let pub_key_path = "IotPubKey.pem".to_string();
let msg = create_csr_request(&priv_key_path, &pub_key_path).unwrap();
let msg_str = str::from_utf8(&msg).unwrap();
assert!(
msg_str.contains("certificateSigningRequest")
&& msg_str.contains("-----END CERTIFICATE REQUEST-----")
);
println!("create_csr_request -- message: \n{}", msg_str);
}
#[test]
fn create_thing_request_test() {
use crate::config::{DeviceConfig, IoTConfig};
let rsp = CreateCertificateFromCsrResponse {
certificate_ownership_token: String::from("CertOwnershipToken"),
certificate_id: String::from("CertId"),
certificate_pem: String::from("CertPem"),
};
let dev_config = DeviceConfig {
device_topic_prefix: String::from("SPDIF/X320/Poke/"),
shadow_name: String::from("c16a8_shadow"),
client_id: String::from("adapterClient"),
endpoint: String::from("127.0.0.1"),
port: 1883,
username: String::from("guest"),
password: String::from("guest"),
spec_version: String::from("1.0"),
privacy: false,
rudi_gtin: String::from("8724447281187"),
rudi_ref: String::from("9818575112"),
instrument_type: String::from("16A8"),
instrument_name: String::from("Friendly_Name"),
instrument_serial_number: String::from("99998"),
source_id_type: String::from("CONFIG"),
};
let iot_config = IoTConfig {
iot_topic_prefix: String::from("SPDIF/X320/16A8/"),
shadow_name: String::from("iot_shadow"),
client_registration_status: String::from("INITIAL"),
client_id: String::from("16A8_99998"),
endpoint: String::from("a2hcybvqw6738p-ats.iot.eu-central-1.amazonaws.com"),
port: 8883,
ca_path: String::from("AmazonRootCA1.pem"),
client_cert_path: String::from("IotCertificate.pem"),
client_priv_key_path: String::from("IotPrivateKey.pem"),
client_pub_key_path: String::from("IotPubKey.pem"),
claim_cert_path: String::from("ClaimCertificate.pem"),
claim_priv_key_path: String::from("ClaimPrivateKey.pem"),
claim_pub_key_path: String::from("ClaimPubKey.pem"),
provisioning_template_name: String::from("iot-16A8-prov-templ"),
};
let config = Config {
device: dev_config,
iot: iot_config,
};
let msg = create_thing_request(&rsp.certificate_ownership_token, &config).unwrap();
let msg_str = str::from_utf8(&msg).unwrap();
assert!(msg_str.contains("certificateOwnershipToken") && msg_str.contains("8724447281187"));
println!("create_register_thing_request -- message: \n{}", msg_str);
}
#[test]
fn prepare_register_thing_topics_test() {
use crate::config::{DeviceConfig, IoTConfig};
let dev_config = DeviceConfig {
device_topic_prefix: String::from("SPDIF/X320/Poke/"),
shadow_name: String::from("c16a8_shadow"),
client_id: String::from("adapterClient"),
endpoint: String::from("127.0.0.1"),
port: 1883,
username: String::from("guest"),
password: String::from("guest"),
spec_version: String::from("1.0"),
privacy: false,
rudi_gtin: String::from("8724447281187"),
rudi_ref: String::from("9818575112"),
instrument_type: String::from("16A8"),
instrument_name: String::from("Friendly_Name"),
instrument_serial_number: String::from("99998"),
source_id_type: String::from("CONFIG"),
};
let iot_config = IoTConfig {
iot_topic_prefix: String::from("SPDIF/X320/16A8/"),
shadow_name: String::from("iot_shadow"),
client_registration_status: String::from("INITIAL"),
client_id: String::from("16A8_99998"),
endpoint: String::from("a2hcybvqw6738p-ats.iot.eu-central-1.amazonaws.com"),
port: 8883,
ca_path: String::from("AmazonRootCA1.pem"),
client_cert_path: String::from("IotCertificate.pem"),
client_priv_key_path: String::from("IotPrivateKey.pem"),
client_pub_key_path: String::from("IotPubKey.pem"),
claim_cert_path: String::from("ClaimCertificate.pem"),
claim_priv_key_path: String::from("ClaimPrivateKey.pem"),
claim_pub_key_path: String::from("ClaimPubKey.pem"),
provisioning_template_name: String::from("iot-16A8-prov-templ"),
};
let config = Config {
device: dev_config,
iot: iot_config,
};
let register_thing_request_topic_test =
"$aws/provisioning-templates/iot-16A8-prov-templ/provision/json";
let register_thing_response_topic_test =
"$aws/provisioning-templates/iot-16A8-prov-templ/provision/json/accepted";
let register_thing_error_topic_test =
"$aws/provisioning-templates/iot-16A8-prov-templ/provision/json/rejected";
let (
register_thing_request_topic,
register_thing_response_topic,
register_thing_error_topic,
) = prepare_register_thing_topics(&config);
println!("register_thing_request_topic: \t{}, \nregister_thing_response_topic: \t{}, \nregister_thing_error_topic: \t{}",
register_thing_request_topic,
register_thing_response_topic,
register_thing_error_topic
);
assert_eq!(
register_thing_request_topic,
register_thing_request_topic_test
);
assert_eq!(
register_thing_response_topic,
register_thing_response_topic_test
);
assert_eq!(register_thing_error_topic, register_thing_error_topic_test);
}
}