fluvio_spu_schema/server/
consumer_offset.rs1use fluvio_protocol::api::Request;
2use fluvio_protocol::record::{Offset, ReplicaKey};
3use fluvio_protocol::{Encoder, Decoder};
4use fluvio_types::PartitionId;
5
6use crate::COMMON_VERSION;
7use crate::errors::ErrorCode;
8use super::SpuServerApiKey;
9
10#[derive(Decoder, Encoder, Default, Debug)]
11pub struct UpdateConsumerOffsetRequest {
12 pub offset: Offset,
13 pub session_id: u32,
14}
15
16impl Request for UpdateConsumerOffsetRequest {
17 const API_KEY: u16 = SpuServerApiKey::UpdateConsumerOffset as u16;
18 const DEFAULT_API_VERSION: i16 = COMMON_VERSION;
19 type Response = UpdateConsumerOffsetResponse;
20}
21
22impl UpdateConsumerOffsetRequest {
23 pub fn new(offset: Offset, session_id: u32) -> Self {
24 Self { offset, session_id }
25 }
26}
27
28#[derive(Encoder, Decoder, Default, Debug)]
29pub struct UpdateConsumerOffsetResponse {
30 pub offset: Offset,
31 pub error_code: ErrorCode,
32}
33
34#[derive(Decoder, Encoder, Default, Debug)]
35pub struct DeleteConsumerOffsetRequest {
36 pub replica_id: ReplicaKey,
37 pub consumer_id: String,
38}
39
40impl DeleteConsumerOffsetRequest {
41 pub fn new(
42 topic: impl Into<String>,
43 partition: PartitionId,
44 consumer_id: impl Into<String>,
45 ) -> Self {
46 let replica_id = ReplicaKey::new(topic, partition);
47 Self {
48 replica_id,
49 consumer_id: consumer_id.into(),
50 }
51 }
52}
53
54impl Request for DeleteConsumerOffsetRequest {
55 const API_KEY: u16 = SpuServerApiKey::DeleteConsumerOffset as u16;
56 const DEFAULT_API_VERSION: i16 = COMMON_VERSION;
57 type Response = DeleteConsumerOffsetResponse;
58}
59
60#[derive(Encoder, Decoder, Default, Debug)]
61pub struct DeleteConsumerOffsetResponse {
62 pub error_code: ErrorCode,
63}
64
65#[derive(Encoder, Decoder, Default, Debug)]
66pub struct FilterOpts {
67 pub replica_id: Option<ReplicaKey>,
68 pub consumer_id: Option<String>,
69}
70
71#[derive(Decoder, Encoder, Default, Debug)]
72pub struct FetchConsumerOffsetsRequest {
73 #[fluvio(min_version = 24)]
74 pub filter_opts: Option<FilterOpts>,
75}
76
77impl FetchConsumerOffsetsRequest {
78 pub fn with_opts(replica_id: Option<ReplicaKey>, consumer_id: Option<String>) -> Self {
79 Self {
80 filter_opts: Some(FilterOpts {
81 replica_id,
82 consumer_id,
83 }),
84 }
85 }
86}
87
88impl Request for FetchConsumerOffsetsRequest {
89 const API_KEY: u16 = SpuServerApiKey::FetchConsumerOffsets as u16;
90 const DEFAULT_API_VERSION: i16 = COMMON_VERSION;
91 type Response = FetchConsumerOffsetsResponse;
92}
93
94#[derive(Encoder, Decoder, Default, Debug)]
95pub struct FetchConsumerOffsetsResponse {
96 pub error_code: ErrorCode,
97 pub consumers: Vec<ConsumerOffset>,
98}
99
100#[derive(Encoder, Decoder, Default, Debug)]
101pub struct ConsumerOffset {
102 pub consumer_id: String,
103 pub replica_id: ReplicaKey,
104 pub offset: Offset,
105 pub modified_time: u64,
106}
107
108impl ConsumerOffset {
109 pub fn new(
110 consumer_id: impl Into<String>,
111 replica_id: impl Into<ReplicaKey>,
112 offset: Offset,
113 modified_time: u64,
114 ) -> Self {
115 Self {
116 consumer_id: consumer_id.into(),
117 replica_id: replica_id.into(),
118 offset,
119 modified_time,
120 }
121 }
122}