use std::future::Future;
use bytes::Bytes;
use crate::{AckError, Headers};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RawMessage {
topic: String,
payload: Bytes,
headers: Headers,
}
impl RawMessage {
pub fn new(topic: impl Into<String>, payload: impl Into<Bytes>) -> Self {
Self {
topic: topic.into(),
payload: payload.into(),
headers: Headers::new(),
}
}
#[must_use]
pub fn with_headers(mut self, headers: Headers) -> Self {
self.headers = headers;
self
}
#[must_use]
pub fn topic(&self) -> &str {
&self.topic
}
#[must_use]
pub fn payload(&self) -> &[u8] {
&self.payload
}
#[must_use]
pub fn payload_bytes(&self) -> Bytes {
self.payload.clone()
}
#[must_use]
pub fn headers(&self) -> &Headers {
&self.headers
}
pub fn headers_mut(&mut self) -> &mut Headers {
&mut self.headers
}
}
#[derive(Debug, Clone)]
pub struct OutgoingMessage<'a> {
topic: &'a str,
payload: &'a [u8],
headers: Headers,
}
impl<'a> OutgoingMessage<'a> {
#[must_use]
pub fn new(topic: &'a str, payload: &'a [u8]) -> Self {
Self {
topic,
payload,
headers: Headers::new(),
}
}
#[must_use]
pub fn with_headers(mut self, headers: Headers) -> Self {
self.headers = headers;
self
}
#[must_use]
pub fn topic(&self) -> &str {
self.topic
}
#[must_use]
pub fn payload(&self) -> &[u8] {
self.payload
}
#[must_use]
pub fn headers(&self) -> &Headers {
&self.headers
}
}
pub trait IncomingMessage: Send + Sync {
fn payload(&self) -> &[u8];
fn headers(&self) -> &Headers;
fn ack(self) -> impl Future<Output = Result<(), AckError>> + Send;
fn nack(self, requeue: bool) -> impl Future<Output = Result<(), AckError>> + Send;
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn raw_message_construction() {
let msg = RawMessage::new("topic.a", b"payload".as_slice());
assert_eq!(msg.topic(), "topic.a");
assert_eq!(msg.payload(), b"payload");
assert!(msg.headers().is_empty());
}
#[test]
fn raw_message_with_headers() {
let mut headers = Headers::new();
headers.insert("X-Tenant", "acme");
let msg = RawMessage::new("topic.a", Bytes::from_static(b"data")).with_headers(headers);
assert_eq!(msg.headers().get_str("x-tenant"), Some("acme"));
}
#[test]
fn outgoing_message_holds_borrows() {
let topic = String::from("orders");
let payload = vec![1u8, 2, 3];
let msg = OutgoingMessage::new(&topic, &payload);
assert_eq!(msg.topic(), "orders");
assert_eq!(msg.payload(), &[1, 2, 3]);
}
}