#![deny(missing_docs, unused_import_braces, unused_qualifications)]
#![warn(
missing_debug_implementations,
missing_copy_implementations,
trivial_casts,
trivial_numeric_casts,
unsafe_code,
unstable_features
)]
#[cfg(feature = "mock")]
use std::cell::RefCell;
use std::collections::HashMap;
use std::convert::From;
#[cfg(feature = "google")]
use std::default::Default;
use std::fmt;
use std::io;
#[cfg(feature = "google")]
use std::path::Path;
use std::result::Result;
use std::time::{SystemTime, UNIX_EPOCH};
#[cfg(feature = "google")]
use base64;
use failure::Fail;
#[cfg(feature = "google")]
use google_pubsub1::{self as pubsub1, Pubsub};
#[cfg(feature = "google")]
use hyper::{self, net::HttpsConnector, Client};
#[cfg(feature = "google")]
use hyper_rustls;
use serde::Serialize;
use serde_json;
use uuid::Uuid;
use valico::json_schema::{SchemaError, Scope, ValidationState};
#[cfg(feature = "google")]
use yup_oauth2 as oauth2;
const FORMAT_VERSION_V1: Version = Version(MajorVersion(1), MinorVersion(0));
#[allow(missing_docs)]
#[derive(Debug, Fail)]
pub enum HedwigError {
#[fail(display = "Credentials file {:?} couldn't be read", _1)]
CannotOpenCredentialsFile(#[cause] io::Error, std::path::PathBuf),
#[fail(display = "Credentials file {:?} couldn't be parsed", _1)]
CannotParseCredentialsFile(#[cause] serde_json::Error, std::path::PathBuf),
#[fail(display = "Unable to deserialize schema")]
DeserializationError(#[cause] serde_json::Error),
#[fail(display = "Schema failed to compile")]
SchemaCompileError(#[cause] SchemaError),
}
impl From<SchemaError> for HedwigError {
fn from(e: SchemaError) -> Self {
HedwigError::SchemaCompileError(e)
}
}
#[allow(missing_docs)]
#[derive(Debug, Fail)]
pub enum PublishError {
#[fail(display = "Unable to serialize message")]
SerializationError(#[cause] serde_json::Error),
#[fail(display = "Message {} is not routable", _0)]
RouteError(Uuid),
#[fail(display = "API failure occurred when publishing message")]
PublishAPIFailure(#[cause] failure::Error),
#[fail(display = "Invalid from publish API: can't find published message id")]
InvalidResponseNoMessageId,
#[fail(display = "Could not parse `{}` as a schema URL", _1)]
InvalidSchemaUrl(#[cause] url::ParseError, String),
#[fail(display = "Could not resolve `{}` to a schema", _0)]
UnresolvableSchemaUrl(url::Url),
#[fail(display = "Message data doesn't validate per the schema")]
DataValidationError(#[cause] failure::Error),
}
pub trait Publisher {
type MessageIds;
fn publish(
&self,
messages: Vec<(&'static str, ValidatedMessage)>,
) -> Result<Self::MessageIds, PublishError>;
}
#[cfg(feature = "google")]
#[allow(missing_debug_implementations)]
pub struct GooglePublisher {
client: Pubsub<Client, oauth2::ServiceAccountAccess<Client>>,
google_cloud_project: String,
}
#[cfg(feature = "google")]
impl GooglePublisher {
pub fn new<P>(
google_application_credentials: P,
google_cloud_project: String,
) -> Result<GooglePublisher, HedwigError>
where
P: AsRef<Path>,
{
let path = google_application_credentials.as_ref();
let f = std::fs::OpenOptions::new()
.read(true)
.open(path)
.map_err(|e| HedwigError::CannotOpenCredentialsFile(e, path.into()))?;
let client_secret: oauth2::ServiceAccountKey = serde_json::from_reader(f)
.map_err(|e| HedwigError::CannotParseCredentialsFile(e, path.into()))?;
let auth_https = HttpsConnector::new(hyper_rustls::TlsClient::new());
let auth_client = hyper::Client::with_connector(auth_https);
let access = oauth2::ServiceAccountAccess::new(client_secret, auth_client);
let https = HttpsConnector::new(hyper_rustls::TlsClient::new());
let pubsub_client = hyper::Client::with_connector(https);
let client = Pubsub::new(pubsub_client, access);
Ok(GooglePublisher {
client,
google_cloud_project,
})
}
fn publish_batch(
&self,
topic: &str,
batch: Vec<pubsub1::PubsubMessage>,
id_vec: &mut <Self as Publisher>::MessageIds,
) -> Result<(), PublishError> {
let (_, response) = self
.client
.projects()
.topics_publish(
pubsub1::PublishRequest {
messages: Some(batch),
},
format!(
"projects/{}/topics/hedwig-{}",
self.google_cloud_project, topic
)
.as_ref(),
)
.doit()
.map_err(|e| PublishError::PublishAPIFailure(failure::err_msg(format!("{}", e))))?;
if let Some(ids) = response.message_ids {
id_vec.extend(ids.into_iter());
}
Ok(())
}
}
#[cfg(feature = "google")]
impl Publisher for GooglePublisher {
type MessageIds = Vec<String>;
fn publish(
&self,
mut messages: Vec<(&'static str, ValidatedMessage)>,
) -> Result<Self::MessageIds, PublishError> {
let mut message_ids = Vec::with_capacity(messages.len());
messages.sort_by_key(|&(k, _)| k);
let mut current_topic = "";
let mut current_batch = Vec::new();
for (topic, message) in messages {
if current_topic != topic && !current_batch.is_empty() {
self.publish_batch(current_topic, current_batch, &mut message_ids)?;
current_batch = Vec::new();
}
current_topic = topic;
let raw_message =
serde_json::to_string(&message).map_err(PublishError::SerializationError)?;
current_batch.push(pubsub1::PubsubMessage {
data: Some(base64::encode(&raw_message)),
attributes: Some(message.metadata.headers),
..Default::default()
})
}
if !current_batch.is_empty() {
self.publish_batch(current_topic, current_batch, &mut message_ids)?;
}
Ok(message_ids)
}
}
type Headers = HashMap<String, String>;
#[cfg(feature = "mock")]
#[derive(Debug, Default)]
pub struct MockPublisher {
published_messages: RefCell<HashMap<Uuid, (String, Headers)>>,
}
#[cfg(feature = "mock")]
impl Publisher for MockPublisher {
type MessageIds = ();
fn publish(
&self,
messages: Vec<(&'static str, ValidatedMessage)>,
) -> Result<Self::MessageIds, PublishError> {
for (_, message) in messages {
let serialized =
serde_json::to_string(&message).map_err(PublishError::SerializationError)?;
self.published_messages
.borrow_mut()
.insert(message.id, (serialized, message.metadata.headers));
}
Ok(())
}
}
struct Validator {
scope: Scope,
schema_id: url::Url,
}
impl Validator {
fn new(schema: &str) -> Result<Validator, HedwigError> {
let master_schema: serde_json::Value =
serde_json::from_str(schema).map_err(HedwigError::DeserializationError)?;
let mut scope = Scope::new();
let schema_id = scope.compile(master_schema, false)?;
Ok(Validator { scope, schema_id })
}
fn validate<D, T>(
&self,
message: &Message<D, T>,
schema: &str,
) -> Result<ValidationState, PublishError>
where
D: Serialize,
{
let msg_schema_ptr = schema.trim_end_matches(char::is_numeric).to_owned() + "*";
let msg_schema_url = url::Url::parse(&msg_schema_ptr)
.map_err(|e| PublishError::InvalidSchemaUrl(e, msg_schema_ptr))?;
let msg_schema = self
.scope
.resolve(&msg_schema_url)
.ok_or_else(|| PublishError::UnresolvableSchemaUrl(msg_schema_url))?;
let msg_data =
serde_json::to_value(&message.data).map_err(PublishError::SerializationError)?;
let validation_state = msg_schema.validate(&msg_data);
if !validation_state.is_strictly_valid() {
return Err(PublishError::DataValidationError(failure::err_msg(
format!("{:?}", validation_state),
)));
}
Ok(validation_state)
}
}
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, Serialize)]
pub struct MajorVersion(pub u8);
impl fmt::Display for MajorVersion {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, Serialize)]
pub struct MinorVersion(pub u8);
impl fmt::Display for MinorVersion {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
pub struct Version(pub MajorVersion, pub MinorVersion);
impl Serialize for Version {
fn serialize<S>(
&self,
serializer: S,
) -> Result<<S as serde::Serializer>::Ok, <S as serde::Serializer>::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(format!("{}.{}", self.0, self.1).as_ref())
}
}
pub type MessageRouter<T> = fn(T, MajorVersion) -> Option<&'static str>;
#[allow(missing_debug_implementations)]
pub struct Hedwig<T, P> {
validator: Validator,
publisher_name: String,
message_router: MessageRouter<T>,
publisher: P,
}
impl<T, P> Hedwig<T, P>
where
P: Publisher,
{
pub fn new(
schema: &str,
publisher_name: &str,
publisher: P,
message_router: MessageRouter<T>,
) -> Result<Hedwig<T, P>, HedwigError> {
Ok(Hedwig {
validator: Validator::new(schema)?,
publisher_name: String::from(publisher_name),
message_router,
publisher,
})
}
pub fn build_publish(&self) -> HedwigPublishBuilder<T, P> {
HedwigPublishBuilder {
hedwig: self,
messages: Vec::new(),
}
}
pub fn publish<D>(&self, msg: Message<D, T>) -> Result<P::MessageIds, PublishError>
where
D: Serialize,
T: Copy + Into<&'static str>,
{
let mut builder = self.build_publish();
builder.message(msg)?;
builder.publish()
}
}
#[allow(missing_debug_implementations)]
pub struct HedwigPublishBuilder<'hedwig, T, P> {
hedwig: &'hedwig Hedwig<T, P>,
messages: Vec<(&'static str, ValidatedMessage)>,
}
impl<'hedwig, T, P> HedwigPublishBuilder<'hedwig, T, P> {
pub fn message<D>(&mut self, msg: Message<D, T>) -> Result<&mut Self, PublishError>
where
D: Serialize,
T: Copy + Into<&'static str>,
{
let data_type = msg.data_type;
let schema_version = msg.data_schema_version;
let data_type_str = msg.data_type.into();
let schema_url = format!(
"{}#/schemas/{}/{}.{}",
self.hedwig.validator.schema_id, data_type_str, schema_version.0, schema_version.1,
);
self.hedwig.validator.validate(&msg, &schema_url)?;
let converted = msg
.into_schema(
self.hedwig.publisher_name.clone(),
schema_url,
FORMAT_VERSION_V1,
)
.map_err(PublishError::SerializationError)?;
let route = (self.hedwig.message_router)(data_type, converted.format_version.0)
.ok_or_else(|| PublishError::RouteError(converted.id))?;
self.messages.push((route, converted));
Ok(self)
}
pub fn publish(self) -> Result<P::MessageIds, PublishError>
where
P: Publisher,
{
self.hedwig.publisher.publish(self.messages)
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct Message<D, T> {
id: Option<Uuid>,
timestamp: std::time::Duration,
headers: Option<Headers>,
data: D,
data_type: T,
data_schema_version: Version,
}
impl<D, T> Message<D, T> {
pub fn new(data_type: T, data_schema_version: Version, data: D) -> Self {
Message {
id: None,
timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("time is before the unix epoch"),
headers: None,
data,
data_type,
data_schema_version,
}
}
pub fn headers(mut self, headers: Headers) -> Self {
self.headers = Some(headers);
self
}
pub fn header<H, V>(mut self, header: H, value: V) -> Self
where
H: Into<String>,
V: Into<String>,
{
if let Some(ref mut hdrs) = self.headers {
hdrs.insert(header.into(), value.into());
} else {
let mut map = HashMap::new();
map.insert(header.into(), value.into());
self.headers = Some(map);
}
self
}
pub fn id(mut self, id: Uuid) -> Self {
self.id = Some(id);
self
}
fn into_schema(
self,
publisher_name: String,
schema: String,
format_version: Version,
) -> Result<ValidatedMessage, serde_json::Error>
where
D: Serialize,
{
Ok(ValidatedMessage {
id: self.id.unwrap_or_else(Uuid::new_v4),
metadata: Metadata {
timestamp: self.timestamp.as_millis(),
publisher: publisher_name,
headers: self.headers.unwrap_or_else(HashMap::new),
},
schema,
format_version,
data: serde_json::to_value(self.data)?,
})
}
}
#[derive(Clone, Debug, PartialEq, Serialize)]
pub struct Metadata {
timestamp: u128,
publisher: String,
headers: Headers,
}
#[derive(Debug, Serialize)]
pub struct ValidatedMessage {
id: Uuid,
metadata: Metadata,
schema: String,
format_version: Version,
data: serde_json::Value,
}
#[cfg(test)]
mod tests {
use super::*;
use assert_matches::assert_matches;
use strum_macros::IntoStaticStr;
#[cfg(feature = "mock")]
impl MockPublisher {
pub fn assert_message_published<D, T>(&self, message: &Message<D, T>, headers: &Headers)
where
D: Serialize + Clone,
T: Copy + Into<&'static str>,
{
let published_messages = self.published_messages.borrow();
let (published, published_headers) = published_messages
.get(
&message
.id
.expect("asserted messages should have specified uuid"),
)
.expect("message not found");
let data_type_str = message.data_type.into();
let encoded = message.clone().into_schema(
String::from("myapp"),
format!(
"https://hedwig.standard.ai/schema#/schemas/{}/1.0",
data_type_str
),
VERSION_1_0,
);
let serialized = serde_json::to_string(&encoded.unwrap()).unwrap();
assert_eq!(published, &serialized);
assert_eq!(published_headers, headers);
}
}
#[derive(Clone, Copy, Debug, IntoStaticStr, Hash, PartialEq, Eq)]
enum MessageType {
#[strum(serialize = "user.created")]
UserCreated,
#[strum(serialize = "invalid.schema")]
#[cfg(feature = "mock")]
InvalidSchema,
#[strum(serialize = "invalid.route")]
#[cfg(feature = "mock")]
InvalidRoute,
}
#[derive(Clone, Debug, Serialize, PartialEq)]
struct UserCreatedData {
user_id: String,
}
const VERSION_1_0: Version = Version(MajorVersion(1), MinorVersion(0));
#[cfg(feature = "mock")]
const SCHEMA: &str = r#"
{
"$id": "https://hedwig.standard.ai/schema",
"$schema": "https://json-schema.org/draft-04/schema#",
"description": "Example Schema",
"schemas": {
"user.created": {
"1.*": {
"description": "A new user was created",
"type": "object",
"x-versions": [
"1.0"
],
"required": [
"user_id"
],
"properties": {
"user_id": {
"$ref": "https://hedwig.standard.ai/schema#/definitions/UserId/1.0"
}
}
}
},
"invalid.route": {
"1.*": {}
}
},
"definitions": {
"UserId": {
"1.0": {
"type": "string"
}
}
}
}"#;
#[cfg(feature = "mock")]
fn router(t: MessageType, v: MajorVersion) -> Option<&'static str> {
match (t, v) {
(MessageType::UserCreated, MajorVersion(1)) => Some("dev-user-created-v1"),
(MessageType::InvalidSchema, MajorVersion(1)) => Some("invalid-schema"),
_ => None,
}
}
#[cfg(feature = "mock")]
fn mock_hedwig() -> Hedwig<MessageType, MockPublisher> {
Hedwig::new(SCHEMA, "myapp", MockPublisher::default(), router).unwrap()
}
#[test]
fn message_constructor() {
let data = UserCreatedData {
user_id: "U_123".into(),
};
let message = Message::new(MessageType::UserCreated, VERSION_1_0, data.clone());
assert_eq!(None, message.headers);
assert_eq!(data, message.data);
assert_eq!(MessageType::UserCreated, message.data_type);
assert_eq!(VERSION_1_0, message.data_schema_version);
}
#[test]
fn message_set_headers() {
let request_id = Uuid::new_v4().to_string();
let message = Message::new(
MessageType::UserCreated,
VERSION_1_0,
UserCreatedData {
user_id: "U_123".into(),
},
)
.header("request_id", request_id.clone());
assert_eq!(
request_id,
message
.headers
.unwrap()
.get(&"request_id".to_owned())
.unwrap()
.as_str()
);
}
#[test]
fn message_with_id() {
let id = uuid::Uuid::new_v4();
let message = Message::new(
MessageType::UserCreated,
VERSION_1_0,
UserCreatedData {
user_id: "U_123".into(),
},
)
.id(id);
assert_eq!(id, message.id.unwrap());
}
#[test]
#[cfg(feature = "mock")]
fn publish() {
let hedwig = mock_hedwig();
let mut custom_headers = Headers::new();
let request_id = Uuid::new_v4().to_string();
let msg_id = Uuid::new_v4();
let message = Message::new(
MessageType::UserCreated,
VERSION_1_0,
UserCreatedData {
user_id: "U_123".into(),
},
)
.header("request_id", request_id.clone())
.id(msg_id);
custom_headers.insert("request_id".to_owned(), request_id);
let mut builder = hedwig.build_publish();
builder.message(message.clone()).unwrap();
builder.publish().unwrap();
hedwig
.publisher
.assert_message_published(&message, &custom_headers);
}
#[test]
#[cfg(feature = "google")]
fn google_publisher_credentials_error() {
let r = GooglePublisher::new("path does not exist", "myproject".into());
assert_matches!(r.err(), Some(HedwigError::CannotOpenCredentialsFile(_, _)));
}
#[test]
fn validator_deserialization_error() {
let r = Validator::new("bad json");
assert_matches!(r.err(), Some(HedwigError::DeserializationError(_)));
}
#[test]
fn validator_schema_compile_error() {
const BAD_SCHEMA: &str = r#"
{
"$schema": "https://json-schema.org/draft-04/schema#",
"definitions": {
"UserId": {
"1.0": {
"type": "bad value"
}
}
}
}"#;
let r = Validator::new(BAD_SCHEMA);
assert_matches!(r.err(), Some(HedwigError::SchemaCompileError(_)));
}
#[test]
#[cfg(feature = "mock")]
fn message_serialization_error() {
let hedwig = mock_hedwig();
#[derive(Serialize)]
struct BadUserCreatedData {
user_ids: HashMap<Vec<i32>, String>,
};
let mut user_ids = HashMap::new();
user_ids.insert(vec![32, 64], "U_123".to_owned());
let data = BadUserCreatedData { user_ids };
let m = Message::new(MessageType::UserCreated, VERSION_1_0, data);
let mut builder = hedwig.build_publish();
assert_matches!(
builder.message(m).err(),
Some(PublishError::SerializationError(_))
);
}
#[test]
#[cfg(feature = "mock")]
fn message_router_error() {
let hedwig = mock_hedwig();
let m = Message::new(MessageType::InvalidRoute, VERSION_1_0, ());
let mut builder = hedwig.build_publish();
assert_matches!(builder.message(m).err(), Some(PublishError::RouteError(_)));
}
#[test]
#[cfg(feature = "mock")]
fn message_invalid_schema_error() {
let hedwig = mock_hedwig();
let m = Message::new(MessageType::InvalidSchema, VERSION_1_0, ());
let mut builder = hedwig.build_publish();
assert_matches!(
builder.message(m).err(),
Some(PublishError::UnresolvableSchemaUrl(_))
);
}
#[test]
#[cfg(feature = "mock")]
fn message_data_validation_eror() {
let hedwig = mock_hedwig();
#[derive(Serialize)]
struct BadUserCreatedData {
user_ids: Vec<i32>,
};
let data = BadUserCreatedData { user_ids: vec![1] };
let m = Message::new(MessageType::UserCreated, VERSION_1_0, data);
let mut builder = hedwig.build_publish();
assert_matches!(
builder.message(m).err(),
Some(PublishError::DataValidationError(_))
);
}
}