1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
//!
//! # Fetch Topic Offsets
//!
//! API that allows CLI to fetch topic offsets.
use std::fmt;

use fluvio_protocol::api::Request;
use fluvio_protocol::{Encoder, Decoder};
use fluvio_protocol::record::PartitionOffset;
use fluvio_protocol::record::ReplicaKey;

use fluvio_types::PartitionId;

use crate::COMMON_VERSION;
use crate::errors::ErrorCode;
use super::SpuServerApiKey;

// -----------------------------------
// FlvFetchOffsetsRequest
// -----------------------------------

/// Fetch offsets
#[derive(Decoder, Encoder, Default, Debug)]
pub struct FetchOffsetsRequest {
    /// Each topic in the request.
    pub topics: Vec<FetchOffsetTopic>,

    #[fluvio(min_version = 23)]
    pub consumer_id: Option<String>,
}

impl Request for FetchOffsetsRequest {
    const API_KEY: u16 = SpuServerApiKey::FetchOffsets as u16;
    const DEFAULT_API_VERSION: i16 = COMMON_VERSION;
    type Response = FetchOffsetsResponse;
}

impl FetchOffsetsRequest {
    /// create request with a single topic and partition
    pub fn new(topic: String, partition: u32, consumer_id: Option<String>) -> Self {
        Self {
            topics: vec![FetchOffsetTopic {
                name: topic,
                partitions: vec![FetchOffsetPartition {
                    partition_index: partition,
                }],
            }],
            consumer_id,
        }
    }
}

#[derive(Decoder, Encoder, Default, Debug)]
pub struct FetchOffsetTopic {
    /// The topic name.
    pub name: String,

    /// Each partition in the request.
    pub partitions: Vec<FetchOffsetPartition>,
}

#[derive(Decoder, Encoder, Default, Debug)]
pub struct FetchOffsetPartition {
    /// The partition index.
    pub partition_index: PartitionId,
}

// -----------------------------------
// FlvFetchOffsetsResponse
// -----------------------------------

#[derive(Encoder, Decoder, Default, Debug)]
pub struct FetchOffsetsResponse {
    /// Each topic offset in the response.
    pub topics: Vec<FetchOffsetTopicResponse>,
}

impl FetchOffsetsResponse {
    pub fn find_partition(self, replica: &ReplicaKey) -> Option<FetchOffsetPartitionResponse> {
        for topic_res in self.topics {
            if topic_res.name == replica.topic {
                for partition_res in topic_res.partitions {
                    if partition_res.partition_index == replica.partition {
                        return Some(partition_res);
                    }
                }
            }
        }

        None
    }
}

#[derive(Encoder, Decoder, Default, Debug)]
pub struct FetchOffsetTopicResponse {
    /// The topic name
    pub name: String,

    /// Each partition in the response.
    pub partitions: Vec<FetchOffsetPartitionResponse>,
}

#[derive(Encoder, Decoder, Default, Debug)]
pub struct FetchOffsetPartitionResponse {
    /// The partition error code, None for no error
    pub error_code: ErrorCode,

    /// The partition index.
    pub partition_index: PartitionId,

    /// First readable offset.
    pub start_offset: i64,

    /// Last readable offset
    pub last_stable_offset: i64,
}

impl fmt::Display for FetchOffsetPartitionResponse {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(
            f,
            "error: {:#?}, partition: {}, start: {}, last: {}",
            self.error_code, self.partition_index, self.start_offset, self.last_stable_offset
        )
    }
}

impl PartitionOffset for FetchOffsetPartitionResponse {
    fn last_stable_offset(&self) -> i64 {
        self.last_stable_offset
    }

    fn start_offset(&self) -> i64 {
        self.start_offset
    }
}