fluvio_spu_schema/server/
consumer_offset.rs1use fluvio_protocol::api::Request;
2use fluvio_protocol::record::{Offset, ReplicaKey};
3use fluvio_protocol::{Encoder, Decoder};
4use fluvio_types::PartitionId;
5
6use crate::COMMON_VERSION;
7use crate::errors::ErrorCode;
8use super::SpuServerApiKey;
9
10#[derive(Decoder, Encoder, Default, Debug)]
11pub struct UpdateConsumerOffsetRequest {
12 pub offset: Offset,
13 pub session_id: u32,
14}
15
16impl Request for UpdateConsumerOffsetRequest {
17 const API_KEY: u16 = SpuServerApiKey::UpdateConsumerOffset as u16;
18 const DEFAULT_API_VERSION: i16 = COMMON_VERSION;
19 type Response = UpdateConsumerOffsetResponse;
20}
21
22impl UpdateConsumerOffsetRequest {
23 pub fn new(offset: Offset, session_id: u32) -> Self {
24 Self { offset, session_id }
25 }
26}
27
28#[derive(Encoder, Decoder, Default, Debug)]
29pub struct UpdateConsumerOffsetResponse {
30 pub offset: Offset,
31 pub error_code: ErrorCode,
32}
33
34#[derive(Decoder, Encoder, Default, Debug)]
35pub struct DeleteConsumerOffsetRequest {
36 pub replica_id: ReplicaKey,
37 pub consumer_id: String,
38}
39
40impl DeleteConsumerOffsetRequest {
41 pub fn new(
42 topic: impl Into<String>,
43 partition: PartitionId,
44 consumer_id: impl Into<String>,
45 ) -> Self {
46 let replica_id = ReplicaKey::new(topic, partition);
47 Self {
48 replica_id,
49 consumer_id: consumer_id.into(),
50 }
51 }
52}
53
54impl Request for DeleteConsumerOffsetRequest {
55 const API_KEY: u16 = SpuServerApiKey::DeleteConsumerOffset as u16;
56 const DEFAULT_API_VERSION: i16 = COMMON_VERSION;
57 type Response = DeleteConsumerOffsetResponse;
58}
59
60#[derive(Encoder, Decoder, Default, Debug)]
61pub struct DeleteConsumerOffsetResponse {
62 pub error_code: ErrorCode,
63}
64
65#[derive(Decoder, Encoder, Default, Debug)]
66pub struct FetchConsumerOffsetsRequest;
67
68impl Request for FetchConsumerOffsetsRequest {
69 const API_KEY: u16 = SpuServerApiKey::FetchConsumerOffsets as u16;
70 const DEFAULT_API_VERSION: i16 = COMMON_VERSION;
71 type Response = FetchConsumerOffsetsResponse;
72}
73
74#[derive(Encoder, Decoder, Default, Debug)]
75pub struct FetchConsumerOffsetsResponse {
76 pub error_code: ErrorCode,
77 pub consumers: Vec<ConsumerOffset>,
78}
79
80#[derive(Encoder, Decoder, Default, Debug)]
81pub struct ConsumerOffset {
82 pub consumer_id: String,
83 pub replica_id: ReplicaKey,
84 pub offset: Offset,
85 pub modified_time: u64,
86}
87
88impl ConsumerOffset {
89 pub fn new(
90 consumer_id: impl Into<String>,
91 replica_id: impl Into<ReplicaKey>,
92 offset: Offset,
93 modified_time: u64,
94 ) -> Self {
95 Self {
96 consumer_id: consumer_id.into(),
97 replica_id: replica_id.into(),
98 offset,
99 modified_time,
100 }
101 }
102}