tansu_storage/service/list_offsets.rs
1// Copyright ⓒ 2024-2025 Peter Morgan <peter.james.morgan@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{collections::BTreeSet, ops::Deref as _};
16
17use rama::{Context, Service};
18use tansu_sans_io::{
19 ApiKey, IsolationLevel, ListOffset, ListOffsetsRequest, ListOffsetsResponse,
20 list_offsets_response::{ListOffsetsPartitionResponse, ListOffsetsTopicResponse},
21};
22use tracing::{debug, error, instrument};
23
24use crate::{Error, Result, Storage, Topition};
25
26/// A [`Service`] using [`Storage`] as [`Context`] taking [`ListOffsetsRequest`] returning [`ListOffsetsResponse`].
27/// ```
28/// use rama::{Context, Layer as _, Service, layer::MapStateLayer};
29/// use tansu_sans_io::{
30/// ErrorCode, IsolationLevel, ListOffset, ListOffsetsRequest,
31/// list_offsets_request::{ListOffsetsPartition, ListOffsetsTopic},
32/// };
33/// use tansu_storage::{Error, ListOffsetsService, StorageContainer};
34/// use url::Url;
35///
36/// # #[tokio::main]
37/// # async fn main() -> Result<(), Error> {
38/// const HOST: &str = "localhost";
39/// const PORT: i32 = 9092;
40/// const NODE_ID: i32 = 111;
41///
42/// let storage = StorageContainer::builder()
43/// .cluster_id("tansu")
44/// .node_id(NODE_ID)
45/// .advertised_listener(Url::parse(&format!("tcp://{HOST}:{PORT}"))?)
46/// .storage(Url::parse("memory://tansu/")?)
47/// .build()
48/// .await?;
49///
50/// let service = MapStateLayer::new(|_| storage).into_layer(ListOffsetsService);
51///
52/// let topic = "abcba";
53///
54/// let response = service
55/// .serve(
56/// Context::default(),
57/// ListOffsetsRequest::default()
58/// .isolation_level(Some(IsolationLevel::ReadUncommitted.into()))
59/// .replica_id(NODE_ID)
60/// .topics(Some(
61/// [ListOffsetsTopic::default()
62/// .name(topic.into())
63/// .partitions(Some(
64/// [ListOffsetsPartition::default()
65/// .current_leader_epoch(Some(-1))
66/// .max_num_offsets(Some(3))
67/// .partition_index(0)
68/// .timestamp(ListOffset::Earliest.try_into()?)]
69/// .into(),
70/// ))]
71/// .into(),
72/// )),
73/// )
74/// .await?;
75///
76/// let topics = response.topics.as_deref().unwrap_or_default();
77/// assert_eq!(1, topics.len());
78/// assert_eq!(topic, topics[0].name);
79///
80/// let partitions = topics[0].partitions.as_deref().unwrap_or_default();
81/// assert_eq!(1, partitions.len());
82/// assert_eq!(0, partitions[0].partition_index);
83/// assert!(partitions[0].old_style_offsets.is_none());
84/// assert_eq!(
85/// ErrorCode::None,
86/// ErrorCode::try_from(partitions[0].error_code)?
87/// );
88/// assert_eq!(Some(-1), partitions[0].timestamp);
89/// assert_eq!(Some(0), partitions[0].offset);
90/// assert_eq!(Some(0), partitions[0].leader_epoch);
91/// # Ok(())
92/// # }
93/// ```
94#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
95pub struct ListOffsetsService;
96
97impl ApiKey for ListOffsetsService {
98 const KEY: i16 = ListOffsetsRequest::KEY;
99}
100
101impl<G> Service<G, ListOffsetsRequest> for ListOffsetsService
102where
103 G: Storage,
104{
105 type Response = ListOffsetsResponse;
106 type Error = Error;
107
108 #[instrument(skip(ctx, req))]
109 async fn serve(
110 &self,
111 ctx: Context<G>,
112 req: ListOffsetsRequest,
113 ) -> Result<Self::Response, Self::Error> {
114 let throttle_time_ms = Some(0);
115
116 let isolation_level = req
117 .isolation_level
118 .map_or(Ok(IsolationLevel::ReadUncommitted), |isolation_level| {
119 IsolationLevel::try_from(isolation_level)
120 })?;
121
122 let topics = if let Some(topics) = req.topics {
123 let mut offsets = vec![];
124
125 for topic in topics {
126 if let Some(ref partitions) = topic.partitions {
127 for partition in partitions {
128 let tp = Topition::new(topic.name.clone(), partition.partition_index);
129 let offset = ListOffset::try_from(partition.timestamp)?;
130
131 offsets.push((tp, offset));
132 }
133 }
134 }
135
136 ctx.state()
137 .list_offsets(isolation_level, offsets.deref())
138 .await
139 .inspect(|r| debug!(?r, ?offsets))
140 .inspect_err(|err| error!(?err, ?offsets))
141 .map(|offsets| {
142 offsets
143 .iter()
144 .fold(BTreeSet::new(), |mut topics, (topition, _)| {
145 _ = topics.insert(topition.topic());
146 topics
147 })
148 .iter()
149 .map(|topic_name| {
150 ListOffsetsTopicResponse::default()
151 .name((*topic_name).into())
152 .partitions(Some(
153 offsets
154 .iter()
155 .filter_map(|(topition, offset)| {
156 if topition.topic() == *topic_name {
157 Some(
158 ListOffsetsPartitionResponse::default()
159 .partition_index(topition.partition())
160 .error_code(offset.error_code().into())
161 .old_style_offsets(None)
162 .timestamp(
163 offset
164 .timestamp()
165 .unwrap_or(Some(-1))
166 .or(Some(-1)),
167 )
168 .offset(offset.offset().or(Some(0)))
169 .leader_epoch(Some(0)),
170 )
171 } else {
172 None
173 }
174 })
175 .collect(),
176 ))
177 })
178 .collect()
179 })
180 .map(Some)?
181 } else {
182 None
183 };
184
185 Ok(ListOffsetsResponse::default()
186 .throttle_time_ms(throttle_time_ms)
187 .topics(topics))
188 .inspect(|r| debug!(?r))
189 }
190}