1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
//! Hedwig is a message bus library that works with arbitrary pubsub services such as AWS SNS/SQS
//! or Google Cloud Pubsub. Messages are validated before they are published. The publisher and
//! consumer are de-coupled and fan-out is supported out of the box.
//!
//! The Rust library currently only supports publishing.
//!
//! # Examples
//!
//! Publish a message. Payload encoded with JSON and validated using a JSON Schema.
//!
//! ```
//! use hedwig::{validators, Publisher, Consumer};
//! # use uuid::Uuid;
//! # use std::{path::Path, time::SystemTime};
//! # use futures_util::{sink::SinkExt, stream::StreamExt};
//! # #[cfg(not(all(feature = "protobuf", feature = "mock")))]
//! # fn main() {}
//! # #[cfg(all(feature = "protobuf", feature = "mock"))] // example uses a protobuf validator.
//! # #[tokio::main(flavor = "current_thread")]
//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
//!
//! #[derive(Clone, PartialEq, Eq, prost::Message)]
//! struct UserCreatedMessage {
//!     #[prost(string, tag = "1")]
//!     user_id: String,
//! }
//!
//! impl<'a> hedwig::EncodableMessage for UserCreatedMessage {
//!     type Error = validators::ProstValidatorError;
//!     type Validator = validators::ProstValidator;
//!     fn topic(&self) -> hedwig::Topic {
//!         "user.created".into()
//!     }
//!     fn encode(&self, validator: &Self::Validator) -> Result<hedwig::ValidatedMessage, Self::Error> {
//!         Ok(validator.validate(
//!             uuid::Uuid::new_v4(),
//!             SystemTime::now(),
//!             "user.created/1.0",
//!             Default::default(),
//!             self,
//!         )?)
//!     }
//! }
//!
//! impl hedwig::DecodableMessage for UserCreatedMessage {
//!     type Error = validators::ProstDecodeError<validators::prost::SchemaMismatchError>;
//!     type Decoder =
//!         validators::ProstDecoder<validators::prost::ExactSchemaMatcher<UserCreatedMessage>>;
//!
//!     fn decode(msg: hedwig::ValidatedMessage, decoder: &Self::Decoder) -> Result<Self, Self::Error> {
//!         decoder.decode(msg)
//!     }
//! }
//!
//!
//! let publisher = /* Some publisher */
//! # hedwig::mock::MockPublisher::new();
//! let consumer = /* Consumer associated to that publisher */
//! # publisher.new_consumer("user.created", "example_subscription");
//!
//! let mut publish_sink = Publisher::<UserCreatedMessage>::publish_sink(publisher, validators::ProstValidator::new());
//! let mut consumer_stream = consumer.consume::<UserCreatedMessage>(
//!     validators::ProstDecoder::new(validators::prost::ExactSchemaMatcher::new("user.created/1.0")),
//! );
//!
//! publish_sink.send(UserCreatedMessage { user_id: String::from("U_123") }).await?;
//!
//! assert_eq!(
//!     "U_123",
//!     consumer_stream.next().await.unwrap()?.ack().await?.user_id
//! );
//!
//! # Ok(())
//! # }
//! ```
#![cfg_attr(docsrs, feature(doc_cfg))]

use std::{borrow::Cow, collections::BTreeMap, time::SystemTime};
pub use topic::Topic;

use bytes::Bytes;
use uuid::Uuid;

mod backends;
mod consumer;
mod publisher;
mod tests;
mod topic;
pub mod validators;

pub use backends::*;
pub use consumer::*;
pub use publisher::*;

// TODO make these public somewhere?
#[cfg(feature = "google")]
pub(crate) const HEDWIG_ID: &str = "hedwig_id";
#[cfg(feature = "google")]
pub(crate) const HEDWIG_MESSAGE_TIMESTAMP: &str = "hedwig_message_timestamp";
#[cfg(feature = "google")]
pub(crate) const HEDWIG_SCHEMA: &str = "hedwig_schema";
#[cfg(feature = "google")]
pub(crate) const HEDWIG_PUBLISHER: &str = "hedwig_publisher";
#[cfg(feature = "google")]
pub(crate) const HEDWIG_FORMAT_VERSION: &str = "hedwig_format_version";

/// All errors that may be returned when operating top level APIs.
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum Error {
    /// Unable to encode message payload
    #[error("Unable to encode message payload")]
    EncodeMessage(#[source] Box<dyn std::error::Error + Send + Sync>),
}

/// Custom headers associated with a message.
pub type Headers = BTreeMap<String, String>;

/// A validated message.
///
/// These are created by validators after encoding a user message, or when pulling messages from
/// the message service.
#[derive(Debug, Clone)]
// derive Eq only in tests so that users can't foot-shoot an expensive == over data
#[cfg_attr(test, derive(PartialEq, Eq))]
pub struct ValidatedMessage {
    /// Unique message identifier.
    id: Uuid,
    /// The timestamp when message was created in the publishing service.
    timestamp: SystemTime,
    /// URI of the schema validating this message.
    ///
    /// E.g. `https://hedwig.domain.xyz/schemas#/schemas/user.created/1.0`
    schema: Cow<'static, str>,
    /// Custom message headers.
    ///
    /// This may be used to track request_id, for example.
    headers: Headers,
    /// The encoded message data.
    data: Bytes,
}

impl ValidatedMessage {
    /// Create a new validated message
    pub fn new<S, D>(id: Uuid, timestamp: SystemTime, schema: S, headers: Headers, data: D) -> Self
    where
        S: Into<Cow<'static, str>>,
        D: Into<Bytes>,
    {
        Self {
            id,
            timestamp,
            schema: schema.into(),
            headers,
            data: data.into(),
        }
    }

    /// Unique message identifier.
    pub fn uuid(&self) -> &Uuid {
        &self.id
    }

    /// The timestamp when message was created in the publishing service.
    pub fn timestamp(&self) -> &SystemTime {
        &self.timestamp
    }

    /// URI of the schema validating this message.
    ///
    /// E.g. `https://hedwig.domain.xyz/schemas#/schemas/user.created/1.0`
    pub fn schema(&self) -> &str {
        &self.schema
    }

    /// Custom message headers.
    ///
    /// This may be used to track request_id, for example.
    pub fn headers(&self) -> &Headers {
        &self.headers
    }

    /// Mutable access to the message headers
    pub fn headers_mut(&mut self) -> &mut Headers {
        &mut self.headers
    }

    /// The encoded message data.
    pub fn data(&self) -> &[u8] {
        &self.data
    }

    /// Destructure this message into just the contained data
    pub fn into_data(self) -> Bytes {
        self.data
    }
}