ruststream_fred/
message.rs1use std::fmt::{Debug, Formatter};
4
5use bytes::Bytes;
6use fred::clients::Pool;
7use fred::interfaces::StreamsInterface;
8use ruststream::{AckError, Headers, IncomingMessage, Partitioned};
9
10use crate::convert::fields_for_publish;
11
12pub const PARTITION_KEY_HEADER: &str = "redis-partition-key";
19
20struct AckHandle {
22 pool: Pool,
23 key: String,
24 group: String,
25 id: String,
26}
27
28pub struct RedisMessage {
35 payload: Bytes,
36 headers: Headers,
37 ack: Option<AckHandle>,
38}
39
40impl Debug for RedisMessage {
41 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
42 let mut s = f.debug_struct("RedisMessage");
43 s.field("payload_len", &self.payload.len());
44 if let Some(ack) = &self.ack {
45 s.field("key", &ack.key).field("id", &ack.id);
46 }
47 s.finish_non_exhaustive()
48 }
49}
50
51impl RedisMessage {
52 pub(crate) fn new(
53 pool: Pool,
54 key: String,
55 group: String,
56 id: String,
57 payload: Bytes,
58 headers: Headers,
59 ) -> Self {
60 Self {
61 payload,
62 headers,
63 ack: Some(AckHandle {
64 pool,
65 key,
66 group,
67 id,
68 }),
69 }
70 }
71
72 #[must_use]
74 pub fn id(&self) -> Option<&str> {
75 self.ack.as_ref().map(|a| a.id.as_str())
76 }
77}
78
79impl Partitioned for RedisMessage {
80 fn partition_key(&self) -> Option<&[u8]> {
81 self.headers().get(PARTITION_KEY_HEADER)
82 }
83}
84
85impl IncomingMessage for RedisMessage {
86 fn payload(&self) -> &[u8] {
87 &self.payload
88 }
89
90 fn headers(&self) -> &Headers {
91 &self.headers
92 }
93
94 async fn ack(mut self) -> Result<(), AckError> {
95 let handle = self.ack.take().expect("RedisMessage settled twice");
96 xack(&handle).await
97 }
98
99 async fn nack(mut self, requeue: bool) -> Result<(), AckError> {
100 let handle = self.ack.take().expect("RedisMessage settled twice");
101 if requeue {
102 let fields = fields_for_publish(&self.payload, &self.headers);
105 let _: String = handle
106 .pool
107 .xadd(handle.key.as_str(), false, None::<()>, "*", fields)
108 .await
109 .map_err(|err| AckError::Broker(Box::new(err)))?;
110 }
111 xack(&handle).await
112 }
113}
114
115async fn xack(handle: &AckHandle) -> Result<(), AckError> {
116 let _: i64 = handle
117 .pool
118 .xack(
119 handle.key.as_str(),
120 handle.group.as_str(),
121 handle.id.as_str(),
122 )
123 .await
124 .map_err(|err| AckError::Broker(Box::new(err)))?;
125 Ok(())
126}