1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
use azure_sdk_core::{
errors::{check_status_extract_body, AzureError},
COMPLETE_ENCODE_SET,
};
use futures::future::{self, Future};
use hyper::{self, header, StatusCode};
use hyper_rustls::HttpsConnector;
use ring::hmac;
use std::ops::Add;
use time::Duration;
mod client;
pub use self::client::Client;
type HttpClient = hyper::Client<HttpsConnector<hyper::client::HttpConnector>>;
#[inline]
fn send_event_prepare<B: Into<String>>(
http_client: &HttpClient,
namespace: &str,
event_hub: &str,
policy_name: &str,
signing_key: &hmac::SigningKey,
event_body: B,
duration: Duration,
) -> Result<hyper::client::ResponseFuture, AzureError> {
let url = format!("https://{}.servicebus.windows.net/{}/messages", namespace, event_hub);
debug!("url == {:?}", url);
let sas = generate_signature(policy_name, signing_key, &url, duration);
debug!("sas == {}", sas);
let event_body = event_body.into();
let request = hyper::Request::post(url)
.header(header::AUTHORIZATION, ::bytes::Bytes::from(sas))
.body(event_body.into())?;
Ok(http_client.request(request))
}
fn send_event(
http_client: &HttpClient,
namespace: &str,
event_hub: &str,
policy_name: &str,
hmac: &hmac::SigningKey,
event_body: &str,
duration: Duration,
) -> impl Future<Item = (), Error = AzureError> {
let req = send_event_prepare(http_client, namespace, event_hub, policy_name, hmac, event_body, duration);
future::result(req)
.from_err()
.and_then(move |future_response| check_status_extract_body(future_response, StatusCode::CREATED))
.and_then(|_| Ok(()))
}
fn generate_signature(policy_name: &str, signing_key: &hmac::SigningKey, url: &str, ttl: Duration) -> String {
use url::{form_urlencoded::Serializer, percent_encoding::utf8_percent_encode};
let expiry = ::chrono::Utc::now().add(ttl).timestamp();
debug!("expiry == {:?}", expiry);
let url_encoded = utf8_percent_encode(url, COMPLETE_ENCODE_SET);
let str_to_sign = format!("{}\n{}", url_encoded, expiry);
debug!("str_to_sign == {:?}", str_to_sign);
let sig = hmac::sign(signing_key, str_to_sign.as_bytes());
let sig = {
let sig = ::base64::encode(sig.as_ref());
debug!("sig == {}", sig);
let mut ser = Serializer::new(String::new());
ser.append_pair("sig", &sig);
let sig = ser.finish();
debug!("sig == {}", sig);
sig
};
debug!("sig == {:?}", sig);
format!(
"SharedAccessSignature sr={}&{}&se={}&skn={}",
&url_encoded, sig, expiry, policy_name
)
}