fluss/rpc/message/
list_offsets.rs1use crate::{
19 BucketId, PartitionId, TableId, impl_read_version_type, impl_write_version_type, proto,
20};
21
22use crate::error::Result as FlussResult;
23use crate::error::{Error, FlussError};
24use crate::proto::{ErrorResponse, ListOffsetsResponse};
25use crate::rpc::frame::ReadError;
26
27use crate::rpc::api_key::ApiKey;
28use crate::rpc::api_version::ApiVersion;
29use crate::rpc::frame::WriteError;
30use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
31use std::collections::HashMap;
32
33use bytes::{Buf, BufMut};
34use prost::Message;
35
36pub const LIST_EARLIEST_OFFSET: i32 = 0;
38pub const LIST_LATEST_OFFSET: i32 = 1;
39pub const LIST_OFFSET_FROM_TIMESTAMP: i32 = 2;
40
41pub const CLIENT_FOLLOWER_SERVER_ID: i32 = -1;
43
44#[derive(Debug, Clone)]
46pub enum OffsetSpec {
47 Earliest,
49 Latest,
51 Timestamp(i64),
53}
54
55impl OffsetSpec {
56 pub fn offset_type(&self) -> i32 {
57 match self {
58 OffsetSpec::Earliest => LIST_EARLIEST_OFFSET,
59 OffsetSpec::Latest => LIST_LATEST_OFFSET,
60 OffsetSpec::Timestamp(_) => LIST_OFFSET_FROM_TIMESTAMP,
61 }
62 }
63
64 pub fn start_timestamp(&self) -> Option<i64> {
65 match self {
66 OffsetSpec::Timestamp(ts) => Some(*ts),
67 _ => None,
68 }
69 }
70}
71
72#[derive(Debug)]
73pub struct ListOffsetsRequest {
74 pub inner_request: proto::ListOffsetsRequest,
75}
76
77impl ListOffsetsRequest {
78 pub fn new(
79 table_id: TableId,
80 partition_id: Option<PartitionId>,
81 bucket_ids: Vec<BucketId>,
82 offset_spec: OffsetSpec,
83 ) -> Self {
84 ListOffsetsRequest {
85 inner_request: proto::ListOffsetsRequest {
86 follower_server_id: CLIENT_FOLLOWER_SERVER_ID,
87 offset_type: offset_spec.offset_type(),
88 table_id,
89 partition_id,
90 bucket_id: bucket_ids,
91 start_timestamp: offset_spec.start_timestamp(),
92 },
93 }
94 }
95}
96
97impl RequestBody for ListOffsetsRequest {
98 type ResponseBody = ListOffsetsResponse;
99
100 const API_KEY: ApiKey = ApiKey::ListOffsets;
101
102 const REQUEST_VERSION: ApiVersion = ApiVersion(0);
103}
104
105impl_write_version_type!(ListOffsetsRequest);
106impl_read_version_type!(ListOffsetsResponse);
107
108impl ListOffsetsResponse {
109 pub fn offsets(&self) -> FlussResult<HashMap<i32, i64>> {
110 self.buckets_resp
111 .iter()
112 .map(|resp| {
113 if let Some(error_code) = resp.error_code
114 && error_code != FlussError::None.code()
115 {
116 let api_error = ErrorResponse {
117 error_code,
118 error_message: resp.error_message.clone(),
119 }
120 .into();
121 return Err(Error::FlussAPIError { api_error });
122 }
123 resp.offset
125 .map(|offset| (resp.bucket_id, offset))
126 .ok_or_else(|| Error::UnexpectedError {
127 message: format!(
128 "Missing offset for bucket {} without error code.",
129 resp.bucket_id
130 ),
131 source: None,
132 })
133 })
134 .collect()
135 }
136}
137
138#[cfg(test)]
139mod tests {
140 use super::*;
141 use crate::proto::{ListOffsetsResponse, PbListOffsetsRespForBucket};
142
143 #[test]
144 fn offsets_returns_api_error_on_error_code() {
145 let response = ListOffsetsResponse {
146 buckets_resp: vec![PbListOffsetsRespForBucket {
147 bucket_id: 1,
148 error_code: Some(FlussError::TableNotExist.code()),
149 error_message: Some("missing".to_string()),
150 offset: None,
151 }],
152 };
153
154 let result = response.offsets();
155 assert!(matches!(result, Err(Error::FlussAPIError { .. })));
156 }
157}