1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
//! Hedwig is a message bus library that works with arbitrary pubsub services such as AWS SNS/SQS
//! or Google Cloud Pubsub. Messages are validated before they are published. The publisher and
//! consumer are de-coupled and fan-out is supported out of the box.
//!
//! The Rust library currently only supports publishing.
//!
//! # Examples
//!
//! Publish a message. Payload encoded with JSON and validated using a JSON Schema.
//!
//! ```
//! use hedwig::{validators, Publisher, Consumer};
//! # use uuid::Uuid;
//! # use std::{path::Path, time::SystemTime};
//! # use futures_util::{sink::SinkExt, stream::StreamExt};
//! # #[cfg(not(all(feature = "protobuf", feature = "mock")))]
//! # fn main() {}
//! # #[cfg(all(feature = "protobuf", feature = "mock"))] // example uses a protobuf validator.
//! # #[tokio::main(flavor = "current_thread")]
//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
//!
//! #[derive(Clone, PartialEq, Eq, prost::Message)]
//! struct UserCreatedMessage {
//! #[prost(string, tag = "1")]
//! user_id: String,
//! }
//!
//! impl<'a> hedwig::EncodableMessage for UserCreatedMessage {
//! type Error = validators::ProstValidatorError;
//! type Validator = validators::ProstValidator;
//! fn topic(&self) -> hedwig::Topic {
//! "user.created".into()
//! }
//! fn encode(&self, validator: &Self::Validator) -> Result<hedwig::ValidatedMessage, Self::Error> {
//! Ok(validator.validate(
//! uuid::Uuid::new_v4(),
//! SystemTime::now(),
//! "user.created/1.0",
//! Default::default(),
//! self,
//! )?)
//! }
//! }
//!
//! impl hedwig::DecodableMessage for UserCreatedMessage {
//! type Error = validators::ProstDecodeError<validators::prost::SchemaMismatchError>;
//! type Decoder =
//! validators::ProstDecoder<validators::prost::ExactSchemaMatcher<UserCreatedMessage>>;
//!
//! fn decode(msg: hedwig::ValidatedMessage, decoder: &Self::Decoder) -> Result<Self, Self::Error> {
//! decoder.decode(msg)
//! }
//! }
//!
//!
//! let publisher = /* Some publisher */
//! # hedwig::mock::MockPublisher::new();
//! let consumer = /* Consumer associated to that publisher */
//! # publisher.new_consumer("user.created", "example_subscription");
//!
//! let mut publish_sink = Publisher::<UserCreatedMessage>::publish_sink(publisher, validators::ProstValidator::new());
//! let mut consumer_stream = consumer.consume::<UserCreatedMessage>(
//! validators::ProstDecoder::new(validators::prost::ExactSchemaMatcher::new("user.created/1.0")),
//! );
//!
//! publish_sink.send(UserCreatedMessage { user_id: String::from("U_123") }).await?;
//!
//! assert_eq!(
//! "U_123",
//! consumer_stream.next().await.unwrap()?.ack().await?.user_id
//! );
//!
//! # Ok(())
//! # }
//! ```
#![cfg_attr(docsrs, feature(doc_cfg))]
use std::{borrow::Cow, collections::BTreeMap, time::SystemTime};
pub use topic::Topic;
use bytes::Bytes;
use uuid::Uuid;
mod backends;
mod consumer;
mod publisher;
mod tests;
mod topic;
pub mod validators;
pub use backends::*;
pub use consumer::*;
pub use publisher::*;
// TODO make these public somewhere?
#[cfg(feature = "google")]
pub(crate) const HEDWIG_ID: &str = "hedwig_id";
#[cfg(feature = "google")]
pub(crate) const HEDWIG_MESSAGE_TIMESTAMP: &str = "hedwig_message_timestamp";
#[cfg(feature = "google")]
pub(crate) const HEDWIG_SCHEMA: &str = "hedwig_schema";
#[cfg(feature = "google")]
pub(crate) const HEDWIG_PUBLISHER: &str = "hedwig_publisher";
#[cfg(feature = "google")]
pub(crate) const HEDWIG_FORMAT_VERSION: &str = "hedwig_format_version";
/// All errors that may be returned when operating top level APIs.
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum Error {
/// Unable to encode message payload
#[error("Unable to encode message payload")]
EncodeMessage(#[source] Box<dyn std::error::Error + Send + Sync>),
}
/// Custom headers associated with a message.
pub type Headers = BTreeMap<String, String>;
/// A validated message.
///
/// These are created by validators after encoding a user message, or when pulling messages from
/// the message service.
#[derive(Debug, Clone)]
// derive Eq only in tests so that users can't foot-shoot an expensive == over data
#[cfg_attr(test, derive(PartialEq, Eq))]
pub struct ValidatedMessage {
/// Unique message identifier.
id: Uuid,
/// The timestamp when message was created in the publishing service.
timestamp: SystemTime,
/// URI of the schema validating this message.
///
/// E.g. `https://hedwig.domain.xyz/schemas#/schemas/user.created/1.0`
schema: Cow<'static, str>,
/// Custom message headers.
///
/// This may be used to track request_id, for example.
headers: Headers,
/// The encoded message data.
data: Bytes,
}
impl ValidatedMessage {
/// Create a new validated message
pub fn new<S, D>(id: Uuid, timestamp: SystemTime, schema: S, headers: Headers, data: D) -> Self
where
S: Into<Cow<'static, str>>,
D: Into<Bytes>,
{
Self {
id,
timestamp,
schema: schema.into(),
headers,
data: data.into(),
}
}
/// Unique message identifier.
pub fn uuid(&self) -> &Uuid {
&self.id
}
/// The timestamp when message was created in the publishing service.
pub fn timestamp(&self) -> &SystemTime {
&self.timestamp
}
/// URI of the schema validating this message.
///
/// E.g. `https://hedwig.domain.xyz/schemas#/schemas/user.created/1.0`
pub fn schema(&self) -> &str {
&self.schema
}
/// Custom message headers.
///
/// This may be used to track request_id, for example.
pub fn headers(&self) -> &Headers {
&self.headers
}
/// Mutable access to the message headers
pub fn headers_mut(&mut self) -> &mut Headers {
&mut self.headers
}
/// The encoded message data.
pub fn data(&self) -> &[u8] {
&self.data
}
/// Destructure this message into just the contained data
pub fn into_data(self) -> Bytes {
self.data
}
}