fluvio_spu_schema/server/
consumer_offset.rs

1use fluvio_protocol::api::Request;
2use fluvio_protocol::record::{Offset, ReplicaKey};
3use fluvio_protocol::{Encoder, Decoder};
4use fluvio_types::PartitionId;
5
6use crate::COMMON_VERSION;
7use crate::errors::ErrorCode;
8use super::SpuServerApiKey;
9
10#[derive(Decoder, Encoder, Default, Debug)]
11pub struct UpdateConsumerOffsetRequest {
12    pub offset: Offset,
13    pub session_id: u32,
14}
15
16impl Request for UpdateConsumerOffsetRequest {
17    const API_KEY: u16 = SpuServerApiKey::UpdateConsumerOffset as u16;
18    const DEFAULT_API_VERSION: i16 = COMMON_VERSION;
19    type Response = UpdateConsumerOffsetResponse;
20}
21
22impl UpdateConsumerOffsetRequest {
23    pub fn new(offset: Offset, session_id: u32) -> Self {
24        Self { offset, session_id }
25    }
26}
27
28#[derive(Encoder, Decoder, Default, Debug)]
29pub struct UpdateConsumerOffsetResponse {
30    pub offset: Offset,
31    pub error_code: ErrorCode,
32}
33
34#[derive(Decoder, Encoder, Default, Debug)]
35pub struct DeleteConsumerOffsetRequest {
36    pub replica_id: ReplicaKey,
37    pub consumer_id: String,
38}
39
40impl DeleteConsumerOffsetRequest {
41    pub fn new(
42        topic: impl Into<String>,
43        partition: PartitionId,
44        consumer_id: impl Into<String>,
45    ) -> Self {
46        let replica_id = ReplicaKey::new(topic, partition);
47        Self {
48            replica_id,
49            consumer_id: consumer_id.into(),
50        }
51    }
52}
53
54impl Request for DeleteConsumerOffsetRequest {
55    const API_KEY: u16 = SpuServerApiKey::DeleteConsumerOffset as u16;
56    const DEFAULT_API_VERSION: i16 = COMMON_VERSION;
57    type Response = DeleteConsumerOffsetResponse;
58}
59
60#[derive(Encoder, Decoder, Default, Debug)]
61pub struct DeleteConsumerOffsetResponse {
62    pub error_code: ErrorCode,
63}
64
65#[derive(Encoder, Decoder, Default, Debug)]
66pub struct FilterOpts {
67    pub replica_id: Option<ReplicaKey>,
68    pub consumer_id: Option<String>,
69}
70
71#[derive(Decoder, Encoder, Default, Debug)]
72pub struct FetchConsumerOffsetsRequest {
73    #[fluvio(min_version = 24)]
74    pub filter_opts: Option<FilterOpts>,
75}
76
77impl FetchConsumerOffsetsRequest {
78    pub fn with_opts(replica_id: Option<ReplicaKey>, consumer_id: Option<String>) -> Self {
79        Self {
80            filter_opts: Some(FilterOpts {
81                replica_id,
82                consumer_id,
83            }),
84        }
85    }
86}
87
88impl Request for FetchConsumerOffsetsRequest {
89    const API_KEY: u16 = SpuServerApiKey::FetchConsumerOffsets as u16;
90    const DEFAULT_API_VERSION: i16 = COMMON_VERSION;
91    type Response = FetchConsumerOffsetsResponse;
92}
93
94#[derive(Encoder, Decoder, Default, Debug)]
95pub struct FetchConsumerOffsetsResponse {
96    pub error_code: ErrorCode,
97    pub consumers: Vec<ConsumerOffset>,
98}
99
100#[derive(Encoder, Decoder, Default, Debug)]
101pub struct ConsumerOffset {
102    pub consumer_id: String,
103    pub replica_id: ReplicaKey,
104    pub offset: Offset,
105    pub modified_time: u64,
106}
107
108impl ConsumerOffset {
109    pub fn new(
110        consumer_id: impl Into<String>,
111        replica_id: impl Into<ReplicaKey>,
112        offset: Offset,
113        modified_time: u64,
114    ) -> Self {
115        Self {
116            consumer_id: consumer_id.into(),
117            replica_id: replica_id.into(),
118            offset,
119            modified_time,
120        }
121    }
122}