use reqwest::Client as ReqwestClient;
use serde_json::Value;
use thiserror::Error;
pub struct LogClient {
api_version: String,
inner_client: ReqwestClient,
key: String,
url: url::Url,
workspace_id: String,
}
impl LogClient {
pub fn new(
api_version: String,
workspace_id: String,
key: String,
timeout_in_seconds: u64,
) -> Result<Self, NewLogClientError> {
use std::time::Duration;
let inner_client = ReqwestClient::builder()
.timeout(Duration::from_secs(timeout_in_seconds))
.build()?;
let url = Self::create_url(&workspace_id, &api_version)?;
Ok(Self {
api_version,
inner_client,
key,
url,
workspace_id,
})
}
pub fn url(&self) -> &url::Url {
&self.url
}
pub fn api_version(&self) -> &str {
&self.api_version
}
pub fn workspace_id(&self) -> &str {
&self.workspace_id
}
fn key(&self) -> &str {
&self.key
}
fn create_url(
workspace_id: &str,
api_version: &str,
) -> Result<url::Url, url::ParseError> {
let url = format!(
"https://{wsid}.ods.opinsights.azure.com/api/logs?api-version={ver}",
wsid = workspace_id,
ver = api_version
);
url::Url::parse(&url)
}
fn build_log_request(
&self,
key: &str,
workspace_id: &str,
records: Records,
) -> Result<reqwest::Request, RequestBuildError> {
let body = serde_json::to_string(&records.records)?;
let content_length = body.as_bytes().len();
let timestamp = rfc_1123_utc_now_timestamp();
let auth_header_val =
auth_header_value(key, workspace_id, content_length, ×tamp)?;
let mut req_builder = self
.inner_client
.post(self.url().clone())
.header("Authorization", auth_header_val)
.header("Content-Type", "application/json")
.header("Log-Type", records.log_type)
.header("x-ms-date", timestamp)
.body(body);
if let Some(resource_id) = records.azure_resource_id {
req_builder =
req_builder.header("x-ms-AzureResourceId", resource_id);
}
if let Some(time_generated_field) = records.time_generated_field {
req_builder = req_builder
.header("time-generated-field", time_generated_field);
}
req_builder.build().map_err(|err| err.into())
}
pub async fn log(&self, records: Records) -> Result<(), LogError> {
let req =
self.build_log_request(self.key(), self.workspace_id(), records)?;
let res = self.inner_client.execute(req).await?;
check_response(res)
}
}
fn check_response(res: reqwest::Response) -> Result<(), LogError> {
let status = res.status();
if status.is_success() {
Ok(())
} else {
use reqwest::StatusCode;
match status {
StatusCode::BAD_REQUEST => {
Err(LogError::BadRequest { response: res })
}
StatusCode::FORBIDDEN => Err(LogError::Forbidden),
StatusCode::NOT_FOUND => Err(LogError::NotFound),
StatusCode::TOO_MANY_REQUESTS => Err(LogError::TooManyRequests),
StatusCode::INTERNAL_SERVER_ERROR => {
Err(LogError::InternalServerError)
}
StatusCode::SERVICE_UNAVAILABLE => {
Err(LogError::ServiceUnavailable)
}
_ => Err(LogError::Other { response: res }),
}
}
}
fn rfc_1123_utc_now_timestamp() -> String {
format!(
"{}",
chrono::offset::Utc::now().format("%a, %d %b %Y %H:%M:%S GMT")
)
}
fn auth_header_value(
key: &str,
workspace_id: &str,
content_length: usize,
x_ms_date: &str,
) -> Result<String, base64::DecodeError> {
Ok(format!(
"SharedKey {ws_id}:{sig}",
ws_id = workspace_id,
sig = signature(key, content_length, x_ms_date)?
))
}
fn signature(
key: &str,
content_length: usize,
x_ms_date: &str,
) -> Result<String, base64::DecodeError> {
use hmac::{Hmac, Mac};
type HmacSha256 = Hmac<sha2::Sha256>;
let str_to_sign = string_to_sign(content_length, x_ms_date);
let decoded_key = base64::decode(key)?;
let mut hmac = HmacSha256::new_varkey(&decoded_key)
.expect("HMAC can take key of any size");
hmac.input(str_to_sign.as_bytes());
let result = hmac.result();
let hmac_bytes = result.code();
let b64_str = base64::encode(&hmac_bytes);
Ok(b64_str)
}
fn string_to_sign(content_length: usize, x_ms_date: &str) -> String {
format!(
"POST\n{content_length}\napplication/json\nx-ms-date:{date}\n/api/logs",
content_length = content_length,
date = x_ms_date
)
}
#[derive(Clone, Debug, PartialEq)]
pub struct Records {
log_type: String,
records: Value,
azure_resource_id: Option<String>,
time_generated_field: Option<String>,
}
impl Records {
pub fn new(
log_type: String,
records: Value,
azure_resource_id: Option<String>,
time_generated_field: Option<String>,
) -> Result<Self, NewRecordsError> {
Self::check_json_value(&records).map_err(NewRecordsError::new)?;
Ok(Self {
log_type,
records,
azure_resource_id,
time_generated_field,
})
}
fn check_json_value(val: &Value) -> Result<(), String> {
if let Some(array) = val.as_array() {
for elem in array {
if !elem.is_object() {
return Err(
"Each element of the JSON array must be an object"
.into(),
);
}
}
Ok(())
} else {
Err("JSON value must be an array".into())
}
}
}
#[derive(Clone, Debug, Error)]
#[error("Could not create a valid `Records` struct: {msg}")]
pub struct NewRecordsError {
msg: String,
}
impl NewRecordsError {
fn new(msg: String) -> Self {
Self { msg }
}
}
#[derive(Debug, Error)]
pub enum RequestBuildError {
#[error("Could not decode base64")]
Base64Decode(#[from] base64::DecodeError),
#[error("HTTP error")]
Http(#[from] reqwest::Error),
#[error("Could not convert JSON value to a UTF-8 string")]
JsonToString(#[from] serde_json::Error),
}
#[derive(Debug, Error)]
pub enum NewLogClientError {
#[error("HTTP Error")]
Http(#[from] reqwest::Error),
#[error("Could not create a valid URL for this client")]
ParseUrl(#[from] url::ParseError),
}
#[derive(Debug, Error)]
pub enum LogError {
#[error("HTTP Error")]
Http(#[from] reqwest::Error),
#[error("Could not build a HTTP request")]
RequestBuilder(#[from] RequestBuildError),
#[error("The HTTP request was not valid")]
BadRequest { response: reqwest::Response },
#[error("The service failed to authenticate the request")]
Forbidden,
#[error(
"Either the URL provided is incorrect, or the request is too large"
)]
NotFound,
#[error(
"The service is experiencing a high volume of data from your account"
)]
TooManyRequests,
#[error("The service encountered an internal error")]
InternalServerError,
#[error("The service currently is unavailable to receive requests")]
ServiceUnavailable,
#[error("The HTTP reponse had an unexpected status code")]
Other { response: reqwest::Response },
}
impl LogError {
pub fn is_retryable(&self) -> bool {
match self {
Self::TooManyRequests => true,
Self::InternalServerError => true,
Self::ServiceUnavailable => true,
_ => false,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
const API_VERSION: &str = "2016-04-01";
#[tokio::test]
async fn log_works_with_some_records() {
let client = client();
let records = records_from_value(json!(
[
{
"slot_ID": 12345,
"ID": "5cdad72f-c848-4df0-8aaa-ffe033e75d57",
"availability_Value": 100,
"performance_Value": 6.954,
"measurement_Name": "last_one_hour",
"duration": 3600,
"warning_Threshold": 0,
"critical_Threshold": 0,
"IsActive": "true"
},
{
"slot_ID": 67890,
"ID": "b6bee458-fb65-492e-996d-61c4d7fbb942",
"availability_Value": 100,
"performance_Value": 3.379,
"measurement_Name": "last_one_hour",
"duration": 3600,
"warning_Threshold": 0,
"critical_Threshold": 0,
"IsActive": "false"
}
]
));
client.log(records).await.unwrap();
}
#[tokio::test]
async fn log_works_with_no_records() {
let client = client();
let records = records_from_value(json!([]));
client.log(records).await.unwrap();
}
#[test]
fn string_to_sign() {
assert_eq!(
super::string_to_sign(1024, "Mon, 04 Apr 2016 08:00:00 GMT"),
"POST\n1024\napplication/json\nx-ms-date:Mon, 04 Apr 2016 08:00:00 GMT\n/api/logs"
)
}
fn records_from_value(val: serde_json::Value) -> Records {
Records::new("testlog".into(), val, None, None).unwrap()
}
fn client() -> LogClient {
let api_version = API_VERSION.into();
let timeout = 60;
LogClient::new(api_version, workspace_id(), key(), timeout).unwrap()
}
fn workspace_id() -> String {
std::env::var("AZURE_LOG_ANALYTICS_WORKSPACES_WORKSPACE_ID").unwrap()
}
fn key() -> String {
std::env::var("AZURE_LOG_ANALYTICS_WORKSPACES_PRIMARY_KEY").unwrap()
}
}