fluvio_spu_schema/server/
consumer_offset.rs

1use fluvio_protocol::api::Request;
2use fluvio_protocol::record::{Offset, ReplicaKey};
3use fluvio_protocol::{Encoder, Decoder};
4use fluvio_types::PartitionId;
5
6use crate::COMMON_VERSION;
7use crate::errors::ErrorCode;
8use super::SpuServerApiKey;
9
10#[derive(Decoder, Encoder, Default, Debug)]
11pub struct UpdateConsumerOffsetRequest {
12    pub offset: Offset,
13    pub session_id: u32,
14}
15
16impl Request for UpdateConsumerOffsetRequest {
17    const API_KEY: u16 = SpuServerApiKey::UpdateConsumerOffset as u16;
18    const DEFAULT_API_VERSION: i16 = COMMON_VERSION;
19    type Response = UpdateConsumerOffsetResponse;
20}
21
22impl UpdateConsumerOffsetRequest {
23    pub fn new(offset: Offset, session_id: u32) -> Self {
24        Self { offset, session_id }
25    }
26}
27
28#[derive(Encoder, Decoder, Default, Debug)]
29pub struct UpdateConsumerOffsetResponse {
30    pub offset: Offset,
31    pub error_code: ErrorCode,
32}
33
34#[derive(Decoder, Encoder, Default, Debug)]
35pub struct DeleteConsumerOffsetRequest {
36    pub replica_id: ReplicaKey,
37    pub consumer_id: String,
38}
39
40impl DeleteConsumerOffsetRequest {
41    pub fn new(
42        topic: impl Into<String>,
43        partition: PartitionId,
44        consumer_id: impl Into<String>,
45    ) -> Self {
46        let replica_id = ReplicaKey::new(topic, partition);
47        Self {
48            replica_id,
49            consumer_id: consumer_id.into(),
50        }
51    }
52}
53
54impl Request for DeleteConsumerOffsetRequest {
55    const API_KEY: u16 = SpuServerApiKey::DeleteConsumerOffset as u16;
56    const DEFAULT_API_VERSION: i16 = COMMON_VERSION;
57    type Response = DeleteConsumerOffsetResponse;
58}
59
60#[derive(Encoder, Decoder, Default, Debug)]
61pub struct DeleteConsumerOffsetResponse {
62    pub error_code: ErrorCode,
63}
64
65#[derive(Decoder, Encoder, Default, Debug)]
66pub struct FetchConsumerOffsetsRequest;
67
68impl Request for FetchConsumerOffsetsRequest {
69    const API_KEY: u16 = SpuServerApiKey::FetchConsumerOffsets as u16;
70    const DEFAULT_API_VERSION: i16 = COMMON_VERSION;
71    type Response = FetchConsumerOffsetsResponse;
72}
73
74#[derive(Encoder, Decoder, Default, Debug)]
75pub struct FetchConsumerOffsetsResponse {
76    pub error_code: ErrorCode,
77    pub consumers: Vec<ConsumerOffset>,
78}
79
80#[derive(Encoder, Decoder, Default, Debug)]
81pub struct ConsumerOffset {
82    pub consumer_id: String,
83    pub replica_id: ReplicaKey,
84    pub offset: Offset,
85    pub modified_time: u64,
86}
87
88impl ConsumerOffset {
89    pub fn new(
90        consumer_id: impl Into<String>,
91        replica_id: impl Into<ReplicaKey>,
92        offset: Offset,
93        modified_time: u64,
94    ) -> Self {
95        Self {
96            consumer_id: consumer_id.into(),
97            replica_id: replica_id.into(),
98            offset,
99            modified_time,
100        }
101    }
102}