fluvio_spu_schema/server/
fetch_offset.rs

1//!
2//! # Fetch Topic Offsets
3//!
4//! API that allows CLI to fetch topic offsets.
5use std::fmt;
6
7use fluvio_protocol::api::Request;
8use fluvio_protocol::{Encoder, Decoder};
9use fluvio_protocol::record::PartitionOffset;
10use fluvio_protocol::record::ReplicaKey;
11
12use fluvio_types::PartitionId;
13
14use crate::COMMON_VERSION;
15use crate::errors::ErrorCode;
16use super::SpuServerApiKey;
17
18// -----------------------------------
19// FlvFetchOffsetsRequest
20// -----------------------------------
21
22/// Fetch offsets
23#[derive(Decoder, Encoder, Default, Debug)]
24pub struct FetchOffsetsRequest {
25    /// Each topic in the request.
26    pub topics: Vec<FetchOffsetTopic>,
27
28    /// The consumer id (DEPRECATED)
29    #[deprecated(note = "to get consumer offest use `FetchConsumerOffsetsRequest` instead")]
30    #[fluvio(min_version = 23, max_version = 23)]
31    pub consumer_id: Option<String>,
32}
33
34impl Request for FetchOffsetsRequest {
35    const API_KEY: u16 = SpuServerApiKey::FetchOffsets as u16;
36    const DEFAULT_API_VERSION: i16 = COMMON_VERSION;
37    type Response = FetchOffsetsResponse;
38}
39
40impl FetchOffsetsRequest {
41    /// create request with a single topic and partition
42    pub fn new(topic: String, partition: u32) -> Self {
43        Self {
44            topics: vec![FetchOffsetTopic {
45                name: topic,
46                partitions: vec![FetchOffsetPartition {
47                    partition_index: partition,
48                }],
49            }],
50            ..Default::default()
51        }
52    }
53}
54
55#[derive(Decoder, Encoder, Default, Debug)]
56pub struct FetchOffsetTopic {
57    /// The topic name.
58    pub name: String,
59
60    /// Each partition in the request.
61    pub partitions: Vec<FetchOffsetPartition>,
62}
63
64#[derive(Decoder, Encoder, Default, Debug)]
65pub struct FetchOffsetPartition {
66    /// The partition index.
67    pub partition_index: PartitionId,
68}
69
70// -----------------------------------
71// FlvFetchOffsetsResponse
72// -----------------------------------
73
74#[derive(Encoder, Decoder, Default, Debug)]
75pub struct FetchOffsetsResponse {
76    /// Each topic offset in the response.
77    pub topics: Vec<FetchOffsetTopicResponse>,
78}
79
80impl FetchOffsetsResponse {
81    pub fn find_partition(self, replica: &ReplicaKey) -> Option<FetchOffsetPartitionResponse> {
82        for topic_res in self.topics {
83            if topic_res.name == replica.topic {
84                for partition_res in topic_res.partitions {
85                    if partition_res.partition_index == replica.partition {
86                        return Some(partition_res);
87                    }
88                }
89            }
90        }
91
92        None
93    }
94}
95
96#[derive(Encoder, Decoder, Default, Debug)]
97pub struct FetchOffsetTopicResponse {
98    /// The topic name
99    pub name: String,
100
101    /// Each partition in the response.
102    pub partitions: Vec<FetchOffsetPartitionResponse>,
103}
104
105#[derive(Encoder, Decoder, Default, Debug)]
106pub struct FetchOffsetPartitionResponse {
107    /// The partition error code, None for no error
108    pub error_code: ErrorCode,
109
110    /// The partition index.
111    pub partition_index: PartitionId,
112
113    /// First readable offset.
114    pub start_offset: i64,
115
116    /// Last readable offset
117    pub last_stable_offset: i64,
118}
119
120impl fmt::Display for FetchOffsetPartitionResponse {
121    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
122        write!(
123            f,
124            "error: {:#?}, partition: {}, start: {}, last: {}",
125            self.error_code, self.partition_index, self.start_offset, self.last_stable_offset
126        )
127    }
128}
129
130impl PartitionOffset for FetchOffsetPartitionResponse {
131    fn last_stable_offset(&self) -> i64 {
132        self.last_stable_offset
133    }
134
135    fn start_offset(&self) -> i64 {
136        self.start_offset
137    }
138}