use std::{future::Future, time::Duration};
use bytes::Bytes;
use crate::{AckError, Headers};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RawMessage {
name: String,
payload: Bytes,
headers: Headers,
}
impl RawMessage {
pub fn new(name: impl Into<String>, payload: impl Into<Bytes>) -> Self {
Self {
name: name.into(),
payload: payload.into(),
headers: Headers::new(),
}
}
#[must_use]
pub fn with_headers(mut self, headers: Headers) -> Self {
self.headers = headers;
self
}
#[must_use]
pub fn name(&self) -> &str {
&self.name
}
#[must_use]
pub fn payload(&self) -> &[u8] {
&self.payload
}
#[must_use]
pub fn payload_bytes(&self) -> Bytes {
self.payload.clone()
}
#[must_use]
pub fn headers(&self) -> &Headers {
&self.headers
}
pub fn headers_mut(&mut self) -> &mut Headers {
&mut self.headers
}
}
#[derive(Debug, Clone)]
pub struct OutgoingMessage<'a> {
name: &'a str,
payload: &'a [u8],
headers: Headers,
}
impl<'a> OutgoingMessage<'a> {
#[must_use]
pub fn new(name: &'a str, payload: &'a [u8]) -> Self {
Self {
name,
payload,
headers: Headers::new(),
}
}
#[must_use]
pub fn with_headers(mut self, headers: Headers) -> Self {
self.headers = headers;
self
}
#[must_use]
pub fn name(&self) -> &str {
self.name
}
#[must_use]
pub fn payload(&self) -> &[u8] {
self.payload
}
#[must_use]
pub fn headers(&self) -> &Headers {
&self.headers
}
}
pub trait IncomingMessage: Send + Sync {
fn payload(&self) -> &[u8];
fn headers(&self) -> &Headers;
fn partition_key(&self) -> Option<&[u8]> {
None
}
fn ack(self) -> impl Future<Output = Result<(), AckError>> + Send;
fn nack(self, requeue: bool) -> impl Future<Output = Result<(), AckError>> + Send;
fn nack_after(self, delay: Duration) -> impl Future<Output = Result<(), AckError>> + Send
where
Self: Sized,
{
let _ = delay;
self.nack(true)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn raw_message_construction() {
let msg = RawMessage::new("name.a", b"payload".as_slice());
assert_eq!(msg.name(), "name.a");
assert_eq!(msg.payload(), b"payload");
assert!(msg.headers().is_empty());
}
#[test]
fn raw_message_with_headers() {
let mut headers = Headers::new();
headers.insert("X-Tenant", "acme");
let msg = RawMessage::new("name.a", Bytes::from_static(b"data")).with_headers(headers);
assert_eq!(msg.headers().get_str("x-tenant"), Some("acme"));
}
#[test]
fn outgoing_message_holds_borrows() {
let name = String::from("orders");
let payload = vec![1u8, 2, 3];
let msg = OutgoingMessage::new(&name, &payload);
assert_eq!(msg.name(), "orders");
assert_eq!(msg.payload(), &[1, 2, 3]);
}
#[test]
fn raw_message_payload_bytes_and_headers_mut() {
let mut msg = RawMessage::new("n", b"data".as_slice());
assert_eq!(msg.payload_bytes(), Bytes::from_static(b"data"));
msg.headers_mut().insert("k", "v");
assert_eq!(msg.headers().get_str("k"), Some("v"));
}
#[tokio::test]
async fn incoming_message_defaults_apply_without_override() {
use crate::AckError;
struct Stub {
payload: Vec<u8>,
headers: Headers,
}
impl IncomingMessage for Stub {
fn payload(&self) -> &[u8] {
&self.payload
}
fn headers(&self) -> &Headers {
&self.headers
}
async fn ack(self) -> Result<(), AckError> {
Ok(())
}
async fn nack(self, _requeue: bool) -> Result<(), AckError> {
Ok(())
}
}
let stub = Stub {
payload: b"body".to_vec(),
headers: Headers::new(),
};
assert_eq!(stub.payload(), b"body");
assert!(stub.partition_key().is_none());
stub.nack_after(Duration::from_secs(1)).await.unwrap();
}
}