samsa/protocol/list_offsets/request.rs
1//! Encoding and creation for List Offsets requests.
2//!
3//! Used to ask for all messages before a certain time (ms). There are two special values. Specify -1 to receive the latest offset (i.e. the offset of the next coming message) and -2 to receive the earliest available offset. This applies to all versions of the API. Note that because offsets are pulled in descending order, asking for the earliest offset will always return you a single element.
4//!
5//! ### Example
6//! ```rust
7//! let mut list_offsets_request = protocol::ListOffsetsRequest::new(correlation_id, client_id, -1);
8//! broker_conn.send_request(&list_offsets_request).await?;
9//! ```
10//!
11//! ### Protocol Def
12//! ```text
13//! ListOffsets Request (Version: 1) => replica_id [topics]
14//! replica_id => INT32
15//! topics => name [partitions]
16//! name => STRING
17//! partitions => partition_index timestamp
18//! partition_index => INT32
19//! timestamp => INT64
20//! ```
21//!
22//! Note we are using version 1 of the request.
23
24use crate::{encode::ToByte, protocol::HeaderRequest};
25
26const API_KEY_METADATA: i16 = 2;
27const API_VERSION: i16 = 1;
28
29/// The base List Offsets request object.
30///
31/// ### Example
32/// ```rust
33/// let mut list_offsets_request = protocol::ListOffsetsRequest::new(correlation_id, client_id, -1);
34/// broker_conn.send_request(&list_offsets_request).await?;
35/// ```
36#[derive(Debug)]
37pub struct ListOffsetsRequest<'a> {
38 pub header: HeaderRequest<'a>,
39 /// The broker ID of the requester, or -1 if this request is being made by a normal consumer.
40 pub replica_id: i32,
41 /// Each topic in the request.
42 pub topics: Vec<Topic<'a>>,
43}
44
45/// Each topic in the request.
46#[derive(Debug)]
47pub struct Topic<'a> {
48 /// The topic name.
49 pub name: &'a str,
50 /// Each partition in the request.
51 pub partitions: Vec<Partition>,
52}
53
54/// Each partition in the request.
55#[derive(Debug)]
56pub struct Partition {
57 /// The partition index.
58 pub partition_index: i32,
59 /// The current timestamp.
60 pub timestamp: i64,
61}
62
63impl<'a> ListOffsetsRequest<'a> {
64 pub fn new(correlation_id: i32, client_id: &'a str, replica_id: i32) -> Self {
65 let header = HeaderRequest::new(API_KEY_METADATA, API_VERSION, correlation_id, client_id);
66 Self {
67 header,
68 replica_id,
69 topics: vec![],
70 }
71 }
72
73 pub fn add(&mut self, topic_name: &'a str, partition_index: i32, timestamp: i64) {
74 match self
75 .topics
76 .iter_mut()
77 .find(|topic| topic.name == topic_name)
78 {
79 None => self.topics.push(Topic {
80 name: topic_name,
81 partitions: vec![Partition {
82 partition_index,
83 timestamp,
84 }],
85 }),
86 Some(topic) => {
87 if !topic
88 .partitions
89 .iter_mut()
90 .any(|partition| partition.partition_index == partition_index)
91 {
92 topic.partitions.push(Partition {
93 partition_index,
94 timestamp,
95 })
96 }
97 }
98 }
99 }
100}
101
102impl ToByte for ListOffsetsRequest<'_> {
103 fn encode<T: bytes::BufMut>(&self, buffer: &mut T) -> crate::error::Result<()> {
104 tracing::trace!("Encoding ListOffsetRequest {:?}", self);
105 self.header.encode(buffer)?;
106 self.replica_id.encode(buffer)?;
107 self.topics.encode(buffer)?;
108 Ok(())
109 }
110}
111
112impl ToByte for Topic<'_> {
113 fn encode<T: bytes::BufMut>(&self, buffer: &mut T) -> crate::error::Result<()> {
114 self.name.encode(buffer)?;
115 self.partitions.encode(buffer)?;
116 Ok(())
117 }
118}
119
120impl ToByte for Partition {
121 fn encode<T: bytes::BufMut>(&self, buffer: &mut T) -> crate::error::Result<()> {
122 self.partition_index.encode(buffer)?;
123 self.timestamp.encode(buffer)?;
124 Ok(())
125 }
126}