fluvio_spu_schema/server/
fetch_offset.rs

1//!
2//! # Fetch Topic Offsets
3//!
4//! API that allows CLI to fetch topic offsets.
5use std::fmt;
6
7use fluvio_protocol::api::Request;
8use fluvio_protocol::{Encoder, Decoder};
9use fluvio_protocol::record::PartitionOffset;
10use fluvio_protocol::record::ReplicaKey;
11
12use fluvio_types::PartitionId;
13
14use crate::COMMON_VERSION;
15use crate::errors::ErrorCode;
16use super::SpuServerApiKey;
17
18// -----------------------------------
19// FlvFetchOffsetsRequest
20// -----------------------------------
21
22/// Fetch offsets
23#[derive(Decoder, Encoder, Default, Debug)]
24pub struct FetchOffsetsRequest {
25    /// Each topic in the request.
26    pub topics: Vec<FetchOffsetTopic>,
27
28    #[fluvio(min_version = 23)]
29    pub consumer_id: Option<String>,
30}
31
32impl Request for FetchOffsetsRequest {
33    const API_KEY: u16 = SpuServerApiKey::FetchOffsets as u16;
34    const DEFAULT_API_VERSION: i16 = COMMON_VERSION;
35    type Response = FetchOffsetsResponse;
36}
37
38impl FetchOffsetsRequest {
39    /// create request with a single topic and partition
40    pub fn new(topic: String, partition: u32, consumer_id: Option<String>) -> Self {
41        Self {
42            topics: vec![FetchOffsetTopic {
43                name: topic,
44                partitions: vec![FetchOffsetPartition {
45                    partition_index: partition,
46                }],
47            }],
48            consumer_id,
49        }
50    }
51}
52
53#[derive(Decoder, Encoder, Default, Debug)]
54pub struct FetchOffsetTopic {
55    /// The topic name.
56    pub name: String,
57
58    /// Each partition in the request.
59    pub partitions: Vec<FetchOffsetPartition>,
60}
61
62#[derive(Decoder, Encoder, Default, Debug)]
63pub struct FetchOffsetPartition {
64    /// The partition index.
65    pub partition_index: PartitionId,
66}
67
68// -----------------------------------
69// FlvFetchOffsetsResponse
70// -----------------------------------
71
72#[derive(Encoder, Decoder, Default, Debug)]
73pub struct FetchOffsetsResponse {
74    /// Each topic offset in the response.
75    pub topics: Vec<FetchOffsetTopicResponse>,
76}
77
78impl FetchOffsetsResponse {
79    pub fn find_partition(self, replica: &ReplicaKey) -> Option<FetchOffsetPartitionResponse> {
80        for topic_res in self.topics {
81            if topic_res.name == replica.topic {
82                for partition_res in topic_res.partitions {
83                    if partition_res.partition_index == replica.partition {
84                        return Some(partition_res);
85                    }
86                }
87            }
88        }
89
90        None
91    }
92}
93
94#[derive(Encoder, Decoder, Default, Debug)]
95pub struct FetchOffsetTopicResponse {
96    /// The topic name
97    pub name: String,
98
99    /// Each partition in the response.
100    pub partitions: Vec<FetchOffsetPartitionResponse>,
101}
102
103#[derive(Encoder, Decoder, Default, Debug)]
104pub struct FetchOffsetPartitionResponse {
105    /// The partition error code, None for no error
106    pub error_code: ErrorCode,
107
108    /// The partition index.
109    pub partition_index: PartitionId,
110
111    /// First readable offset.
112    pub start_offset: i64,
113
114    /// Last readable offset
115    pub last_stable_offset: i64,
116}
117
118impl fmt::Display for FetchOffsetPartitionResponse {
119    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
120        write!(
121            f,
122            "error: {:#?}, partition: {}, start: {}, last: {}",
123            self.error_code, self.partition_index, self.start_offset, self.last_stable_offset
124        )
125    }
126}
127
128impl PartitionOffset for FetchOffsetPartitionResponse {
129    fn last_stable_offset(&self) -> i64 {
130        self.last_stable_offset
131    }
132
133    fn start_offset(&self) -> i64 {
134        self.start_offset
135    }
136}