use ruststream::BuildContext;
use crate::message::NatsMessage;
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct JetStreamContext {
info: Option<JetStreamInfo>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct JetStreamInfo {
stream: String,
consumer: String,
stream_sequence: u64,
consumer_sequence: u64,
delivered: i64,
pending: u64,
}
impl BuildContext<NatsMessage> for JetStreamContext {
fn build(msg: &NatsMessage) -> Self {
let info = match msg {
NatsMessage::JetStream(m) => m.info().map(|i| JetStreamInfo {
stream: i.stream.to_owned(),
consumer: i.consumer.to_owned(),
stream_sequence: i.stream_sequence,
consumer_sequence: i.consumer_sequence,
delivered: i.delivered,
pending: i.pending,
}),
NatsMessage::Core(_) => None,
};
Self { info }
}
}
const _: fn() = || {
fn assert_build_context<C: BuildContext<M>, M>() {}
assert_build_context::<
JetStreamContext,
<crate::NatsSubscriber as ruststream::Subscriber>::Message,
>();
};
pub mod keys {
use ruststream::Field;
use super::JetStreamContext;
#[derive(Debug, Clone, Copy)]
pub struct StreamSequence;
pub const STREAM_SEQUENCE: StreamSequence = StreamSequence;
impl Field<JetStreamContext> for StreamSequence {
type Value<'a> = Option<u64>;
fn get(self, cx: &JetStreamContext) -> Option<u64> {
cx.info.as_ref().map(|i| i.stream_sequence)
}
}
#[derive(Debug, Clone, Copy)]
pub struct ConsumerSequence;
pub const CONSUMER_SEQUENCE: ConsumerSequence = ConsumerSequence;
impl Field<JetStreamContext> for ConsumerSequence {
type Value<'a> = Option<u64>;
fn get(self, cx: &JetStreamContext) -> Option<u64> {
cx.info.as_ref().map(|i| i.consumer_sequence)
}
}
#[derive(Debug, Clone, Copy)]
pub struct Delivered;
pub const DELIVERED: Delivered = Delivered;
impl Field<JetStreamContext> for Delivered {
type Value<'a> = Option<i64>;
fn get(self, cx: &JetStreamContext) -> Option<i64> {
cx.info.as_ref().map(|i| i.delivered)
}
}
#[derive(Debug, Clone, Copy)]
pub struct Pending;
pub const PENDING: Pending = Pending;
impl Field<JetStreamContext> for Pending {
type Value<'a> = Option<u64>;
fn get(self, cx: &JetStreamContext) -> Option<u64> {
cx.info.as_ref().map(|i| i.pending)
}
}
#[derive(Debug, Clone, Copy)]
pub struct Stream;
pub const STREAM: Stream = Stream;
impl Field<JetStreamContext> for Stream {
type Value<'a> = Option<&'a str>;
fn get(self, cx: &JetStreamContext) -> Option<&str> {
cx.info.as_ref().map(|i| i.stream.as_str())
}
}
#[derive(Debug, Clone, Copy)]
pub struct Consumer;
pub const CONSUMER: Consumer = Consumer;
impl Field<JetStreamContext> for Consumer {
type Value<'a> = Option<&'a str>;
fn get(self, cx: &JetStreamContext) -> Option<&str> {
cx.info.as_ref().map(|i| i.consumer.as_str())
}
}
}
#[cfg(test)]
mod tests {
use ruststream::{BuildContext, Field};
use super::keys::{CONSUMER, CONSUMER_SEQUENCE, DELIVERED, PENDING, STREAM, STREAM_SEQUENCE};
use super::{JetStreamContext, JetStreamInfo};
use crate::message::{CoreMessage, NatsMessage};
fn populated() -> JetStreamContext {
JetStreamContext {
info: Some(JetStreamInfo {
stream: "ORDERS".to_owned(),
consumer: "orders-worker".to_owned(),
stream_sequence: 42,
consumer_sequence: 7,
delivered: 3,
pending: 5,
}),
}
}
#[test]
fn keys_read_populated_jetstream_fields() {
let cx = populated();
assert_eq!(STREAM_SEQUENCE.get(&cx), Some(42));
assert_eq!(CONSUMER_SEQUENCE.get(&cx), Some(7));
assert_eq!(DELIVERED.get(&cx), Some(3));
assert_eq!(PENDING.get(&cx), Some(5));
assert_eq!(STREAM.get(&cx), Some("ORDERS"));
assert_eq!(CONSUMER.get(&cx), Some("orders-worker"));
}
#[test]
fn keys_read_none_without_jetstream_info() {
let cx = JetStreamContext::default();
assert_eq!(STREAM_SEQUENCE.get(&cx), None);
assert_eq!(CONSUMER_SEQUENCE.get(&cx), None);
assert_eq!(DELIVERED.get(&cx), None);
assert_eq!(PENDING.get(&cx), None);
assert_eq!(STREAM.get(&cx), None);
assert_eq!(CONSUMER.get(&cx), None);
}
#[test]
fn build_over_core_message_has_no_info() {
let msg = NatsMessage::Core(Box::new(CoreMessage::new(async_nats::Message {
subject: "orders.created".into(),
reply: None,
payload: bytes::Bytes::from_static(b"{}"),
headers: None,
status: None,
description: None,
length: 2,
})));
let cx = JetStreamContext::build(&msg);
assert_eq!(cx, JetStreamContext::default());
assert_eq!(STREAM_SEQUENCE.get(&cx), None);
}
}