1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
//! Hedwig is a message bus library that works with arbitrary pubsub services such as AWS SNS/SQS
//! or Google Cloud Pubsub. Messages are validated before they are published. The publisher and
//! consumer are de-coupled and fan-out is supported out of the box.
//!
//! The Rust library currently only supports publishing.
//!
//! # Examples
//!
//! Publish a message. Payload encoded with JSON and validated using a JSON Schema.
//!
//! ```
//! use uuid::Uuid;
//! use std::{path::Path, time::SystemTime};
//! use futures_util::stream::StreamExt;
//!
//! # #[cfg(not(feature = "json-schema"))]
//! # fn main() {}
//!
//! # #[cfg(feature = "json-schema")] // example uses a JSON Schema validator.
//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
//! let schema = r#"{
//!     "$id": "https://hedwig.corp/schema",
//!     "$schema": "https://json-schema.org/draft-04/schema#",
//!     "description": "Example Schema",
//!     "schemas": {
//!         "user-created": {
//!             "1.*": {
//!                 "description": "A new user was created",
//!                 "type": "object",
//!                 "x-versions": [
//!                     "1.0"
//!                 ],
//!                 "required": [
//!                     "user_id"
//!                 ],
//!                 "properties": {
//!                     "user_id": {
//!                         "$ref": "https://hedwig.corp/schema#/definitions/UserId/1.0"
//!                     }
//!                 }
//!             }
//!         }
//!     },
//!     "definitions": {
//!         "UserId": {
//!             "1.0": {
//!                 "type": "string"
//!             }
//!         }
//!     }
//! }"#;
//!
//! #[derive(serde::Serialize)]
//! struct UserCreatedMessage {
//!     user_id: String,
//! }
//!
//! impl<'a> hedwig::publish::EncodableMessage for &'a UserCreatedMessage {
//!     type Error = hedwig::validators::JsonSchemaValidatorError;
//!     type Validator = hedwig::validators::JsonSchemaValidator;
//!     fn topic(&self) -> hedwig::Topic { "user.created".into() }
//!     fn encode(self, validator: &Self::Validator)
//!     -> Result<hedwig::ValidatedMessage, Self::Error> {
//!         validator.validate(
//!             Uuid::new_v4(),
//!             SystemTime::now(),
//!             "https://hedwig.corp/schema#/schemas/user.created/1.0",
//!             hedwig::Headers::new(),
//!             self,
//!         )
//!     }
//! }
//!
//! let publisher = /* Some publisher */
//! # hedwig::publish::NullPublisher;
//! let validator = hedwig::validators::JsonSchemaValidator::new(schema)?;
//! let mut batch = hedwig::publish::PublishBatch::new();
//! batch.message(&validator, &UserCreatedMessage { user_id: String::from("U_123") });
//! let mut result_stream = batch.publish(&publisher);
//! let mut next_batch = hedwig::publish::PublishBatch::new();
//! async {
//!     while let Some(result) = result_stream.next().await {
//!         match result {
//!             (Ok(id), _, msg) => {
//!                 println!("message {} published successfully: {:?}", msg.uuid(), id);
//!             }
//!             (Err(e), topic, msg) => {
//!                 eprintln!("failed to publish {}: {}", msg.uuid(), e);
//!                 next_batch.push(topic, msg);
//!             }
//!         }
//!     }
//! };
//! # Ok(())
//! # }
//! ```
#![cfg_attr(docsrs, feature(doc_cfg))]

use std::{borrow::Cow, collections::BTreeMap, time::SystemTime};
pub use topic::Topic;

use bytes::Bytes;
use uuid::Uuid;

#[cfg(feature = "publish")]
#[cfg_attr(docsrs, doc(cfg(feature = "publish")))]
pub mod publish;

#[cfg(feature = "consume")]
#[cfg_attr(docsrs, doc(cfg(feature = "consume")))]
pub mod consume;

#[cfg(test)]
mod tests;
mod topic;
pub mod validators;

/// All errors that may be returned when operating top level APIs.
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum Error {
    /// Unable to encode message payload
    #[error("Unable to encode message payload")]
    EncodeMessage(#[source] Box<dyn std::error::Error + Send + Sync>),
}

/// Custom headers associated with a message.
pub type Headers = BTreeMap<String, String>;

/// A validated message.
///
/// These are created by validators after encoding a user message, or when pulling messages from
/// the message service.
#[derive(Debug, Clone)]
// derive Eq only in tests so that users can't foot-shoot an expensive == over data
#[cfg_attr(test, derive(PartialEq, Eq))]
pub struct ValidatedMessage {
    /// Unique message identifier.
    id: Uuid,
    /// The timestamp when message was created in the publishing service.
    timestamp: SystemTime,
    /// URI of the schema validating this message.
    ///
    /// E.g. `https://hedwig.domain.xyz/schemas#/schemas/user.created/1.0`
    schema: Cow<'static, str>,
    /// Custom message headers.
    ///
    /// This may be used to track request_id, for example.
    headers: Headers,
    /// The encoded message data.
    data: Bytes,
}

impl ValidatedMessage {
    /// Create a new validated message
    pub fn new<S, D>(id: Uuid, timestamp: SystemTime, schema: S, headers: Headers, data: D) -> Self
    where
        S: Into<Cow<'static, str>>,
        D: Into<Bytes>,
    {
        Self {
            id,
            timestamp,
            schema: schema.into(),
            headers,
            data: data.into(),
        }
    }

    /// Unique message identifier.
    pub fn uuid(&self) -> &Uuid {
        &self.id
    }

    /// The timestamp when message was created in the publishing service.
    pub fn timestamp(&self) -> &SystemTime {
        &self.timestamp
    }

    /// URI of the schema validating this message.
    ///
    /// E.g. `https://hedwig.domain.xyz/schemas#/schemas/user.created/1.0`
    pub fn schema(&self) -> &str {
        &self.schema
    }

    /// Custom message headers.
    ///
    /// This may be used to track request_id, for example.
    pub fn headers(&self) -> &Headers {
        &self.headers
    }

    /// The encoded message data.
    pub fn data(&self) -> &[u8] {
        &self.data
    }
}