1use std::collections::HashMap;
6use std::sync::Mutex;
7
8use crate::cache::RemoteLogMetadataCache;
9use crate::dump::{PartitionDump, RlmmCacheDump};
10use crate::error::RemoteStorageError;
11use crate::metadata::{
12 RemoteLogSegmentMetadata, RemoteLogSegmentMetadataUpdate, RemotePartitionDeleteMetadata,
13 RemotePartitionDeleteState, TopicIdPartition,
14};
15use crate::metadata_manager::RemoteLogMetadataManager;
16
17#[derive(Debug, Default)]
23pub struct InmemoryRemoteLogMetadataManager {
24 partitions: Mutex<HashMap<TopicIdPartition, RemoteLogMetadataCache>>,
25}
26
27impl InmemoryRemoteLogMetadataManager {
28 #[must_use]
30 pub fn new() -> Self {
31 Self::default()
32 }
33
34 #[must_use]
38 pub fn export(&self) -> RlmmCacheDump {
39 let guard = self.partitions.lock().expect("metadata mutex poisoned");
40 let mut partitions: Vec<PartitionDump> = guard
41 .iter()
42 .map(|(tp, cache)| PartitionDump {
43 topic_id_partition: tp.clone(),
44 segments: cache.dump_segments(),
45 delete_state: cache.delete_state(),
46 })
47 .collect();
48 partitions.sort_by(|a, b| {
50 (
51 a.topic_id_partition.topic_id,
52 a.topic_id_partition.partition,
53 )
54 .cmp(&(
55 b.topic_id_partition.topic_id,
56 b.topic_id_partition.partition,
57 ))
58 });
59 for p in &mut partitions {
62 p.segments.sort_by(|a, b| {
63 a.start_offset().cmp(&b.start_offset()).then_with(|| {
64 a.remote_log_segment_id()
65 .id
66 .cmp(&b.remote_log_segment_id().id)
67 })
68 });
69 }
70 RlmmCacheDump { partitions }
71 }
72
73 pub fn import(&self, dump: RlmmCacheDump) {
77 let mut guard = self.partitions.lock().expect("metadata mutex poisoned");
78 for p in dump.partitions {
79 let cache = guard.entry(p.topic_id_partition).or_default();
80 cache.seed(p.segments, p.delete_state);
81 }
82 }
83}
84
85impl RemoteLogMetadataManager for InmemoryRemoteLogMetadataManager {
86 fn add_remote_log_segment_metadata(
87 &self,
88 metadata: RemoteLogSegmentMetadata,
89 ) -> Result<(), RemoteStorageError> {
90 let tp = metadata.remote_log_segment_id().topic_id_partition.clone();
91 let mut guard = self.partitions.lock().expect("metadata mutex poisoned");
92 guard.entry(tp).or_default().add(metadata)
93 }
94
95 fn update_remote_log_segment_metadata(
96 &self,
97 update: RemoteLogSegmentMetadataUpdate,
98 ) -> Result<(), RemoteStorageError> {
99 let tp = update.remote_log_segment_id.topic_id_partition.clone();
100 let mut guard = self.partitions.lock().expect("metadata mutex poisoned");
101 match guard.get_mut(&tp) {
102 Some(cache) => cache.update(&update),
103 None => Err(RemoteStorageError::SegmentNotFound(
104 update.remote_log_segment_id,
105 )),
106 }
107 }
108
109 fn remote_log_segment_metadata(
110 &self,
111 topic_id_partition: &TopicIdPartition,
112 leader_epoch: i32,
113 offset: i64,
114 ) -> Result<Option<RemoteLogSegmentMetadata>, RemoteStorageError> {
115 let guard = self.partitions.lock().expect("metadata mutex poisoned");
116 Ok(guard
117 .get(topic_id_partition)
118 .and_then(|c| c.segment_for(leader_epoch, offset)))
119 }
120
121 fn highest_offset_for_epoch(
122 &self,
123 topic_id_partition: &TopicIdPartition,
124 leader_epoch: i32,
125 ) -> Result<Option<i64>, RemoteStorageError> {
126 let guard = self.partitions.lock().expect("metadata mutex poisoned");
127 Ok(guard
128 .get(topic_id_partition)
129 .and_then(|c| c.highest_offset_for_epoch(leader_epoch)))
130 }
131
132 fn list_remote_log_segments(
133 &self,
134 topic_id_partition: &TopicIdPartition,
135 ) -> Result<Vec<RemoteLogSegmentMetadata>, RemoteStorageError> {
136 let guard = self.partitions.lock().expect("metadata mutex poisoned");
137 Ok(guard
138 .get(topic_id_partition)
139 .map(RemoteLogMetadataCache::list)
140 .unwrap_or_default())
141 }
142
143 fn list_remote_log_segments_by_epoch(
144 &self,
145 topic_id_partition: &TopicIdPartition,
146 leader_epoch: i32,
147 ) -> Result<Vec<RemoteLogSegmentMetadata>, RemoteStorageError> {
148 let guard = self.partitions.lock().expect("metadata mutex poisoned");
149 Ok(guard
150 .get(topic_id_partition)
151 .map(|c| c.list_by_epoch(leader_epoch))
152 .unwrap_or_default())
153 }
154
155 fn put_remote_partition_delete_metadata(
156 &self,
157 metadata: RemotePartitionDeleteMetadata,
158 ) -> Result<(), RemoteStorageError> {
159 let mut guard = self.partitions.lock().expect("metadata mutex poisoned");
160 let cache = guard
161 .entry(metadata.topic_id_partition.clone())
162 .or_default();
163 let from = cache.delete_state();
164 if !RemotePartitionDeleteState::is_valid_transition(from, metadata.state) {
165 return Err(RemoteStorageError::InvalidPartitionDeleteTransition {
166 tp: metadata.topic_id_partition,
167 from,
168 to: metadata.state,
169 });
170 }
171 cache.set_delete_state(metadata.state);
172 Ok(())
173 }
174}
175
176#[cfg(test)]
177mod tests {
178 use super::*;
179 use assert2::assert;
180 use std::collections::BTreeMap;
181 use uuid::Uuid;
182
183 use crate::metadata::{
184 CustomMetadata, RemoteLogSegmentId, RemoteLogSegmentState, RemotePartitionDeleteState,
185 };
186
187 fn tp() -> TopicIdPartition {
188 TopicIdPartition::new(Uuid::from_u128(1), "orders", 0)
189 }
190
191 fn started(id: u128, start: i64, end: i64) -> RemoteLogSegmentMetadata {
192 RemoteLogSegmentMetadata::new(
193 RemoteLogSegmentId::new(tp(), Uuid::from_u128(id)),
194 start,
195 end,
196 end,
197 1,
198 100,
199 2048,
200 RemoteLogSegmentState::CopySegmentStarted,
201 BTreeMap::from([(0, start)]),
202 )
203 .unwrap()
204 }
205
206 fn finish(id: u128) -> RemoteLogSegmentMetadataUpdate {
207 RemoteLogSegmentMetadataUpdate {
208 remote_log_segment_id: RemoteLogSegmentId::new(tp(), Uuid::from_u128(id)),
209 event_timestamp_ms: 200,
210 custom_metadata: Some(CustomMetadata(vec![7])),
211 state: RemoteLogSegmentState::CopySegmentFinished,
212 broker_id: 1,
213 }
214 }
215
216 #[test]
217 fn add_finish_query_round_trip() {
218 let m = InmemoryRemoteLogMetadataManager::new();
219 m.add_remote_log_segment_metadata(started(10, 0, 99))
220 .unwrap();
221 m.update_remote_log_segment_metadata(finish(10)).unwrap();
222
223 let got = m
224 .remote_log_segment_metadata(&tp(), 0, 42)
225 .unwrap()
226 .expect("segment found");
227 assert!(got.remote_log_segment_id().id == Uuid::from_u128(10));
228 assert!(got.custom_metadata() == Some(&CustomMetadata(vec![7])));
229 assert!(m.highest_offset_for_epoch(&tp(), 0).unwrap() == Some(99));
230 }
231
232 #[test]
233 fn query_unknown_partition_is_none_not_error() {
234 let m = InmemoryRemoteLogMetadataManager::new();
235 let other = TopicIdPartition::new(Uuid::from_u128(999), "nope", 0);
236 assert!(m.remote_log_segment_metadata(&other, 0, 0).unwrap() == None);
237 assert!(m.highest_offset_for_epoch(&other, 0).unwrap() == None);
238 assert!(m.list_remote_log_segments(&other).unwrap().is_empty());
239 }
240
241 #[test]
242 fn inmemory_read_outcomes_are_some_none_never_not_ready() {
243 let m = InmemoryRemoteLogMetadataManager::new();
244 m.add_remote_log_segment_metadata(started(10, 0, 99))
245 .unwrap();
246 m.update_remote_log_segment_metadata(finish(10)).unwrap();
247
248 assert!(matches!(
250 m.remote_log_segment_metadata(&tp(), 0, 42),
251 Ok(Some(_))
252 ));
253 assert!(matches!(
255 m.remote_log_segment_metadata(&tp(), 0, 10_000),
256 Ok(None)
257 ));
258 let other = TopicIdPartition::new(Uuid::from_u128(999), "nope", 0);
260 let got = m.remote_log_segment_metadata(&other, 0, 0);
261 assert!(matches!(got, Ok(None)));
262 assert!(
263 !matches!(got, Err(RemoteStorageError::NotReady { .. })),
264 "in-memory manager has no consumer lag; never NotReady"
265 );
266 }
267
268 #[test]
269 fn update_before_add_errors() {
270 let m = InmemoryRemoteLogMetadataManager::new();
271 let err = m
272 .update_remote_log_segment_metadata(finish(10))
273 .unwrap_err();
274 assert!(matches!(err, RemoteStorageError::SegmentNotFound(_)));
275 }
276
277 #[test]
278 fn list_returns_all_states_ordered() {
279 let m = InmemoryRemoteLogMetadataManager::new();
280 m.add_remote_log_segment_metadata(started(11, 100, 199))
281 .unwrap();
282 m.add_remote_log_segment_metadata(started(10, 0, 99))
283 .unwrap();
284 m.update_remote_log_segment_metadata(finish(10)).unwrap();
285 let listed = m.list_remote_log_segments(&tp()).unwrap();
286 assert!(listed.len() == 2);
287 assert!(listed[0].start_offset() == 0);
288 assert!(listed[1].start_offset() == 100);
289 }
290
291 #[test]
292 fn partition_delete_lifecycle() {
293 let m = InmemoryRemoteLogMetadataManager::new();
294 for state in [
295 RemotePartitionDeleteState::DeletePartitionMarked,
296 RemotePartitionDeleteState::DeletePartitionStarted,
297 RemotePartitionDeleteState::DeletePartitionFinished,
298 ] {
299 m.put_remote_partition_delete_metadata(RemotePartitionDeleteMetadata {
300 topic_id_partition: tp(),
301 state,
302 event_timestamp_ms: 500,
303 broker_id: 1,
304 })
305 .unwrap();
306 }
307 }
308
309 #[test]
310 fn export_then_import_reproduces_cache() {
311 let m = InmemoryRemoteLogMetadataManager::new();
312 m.add_remote_log_segment_metadata(started(10, 0, 99))
313 .unwrap();
314 m.add_remote_log_segment_metadata(started(11, 100, 199))
315 .unwrap();
316 m.update_remote_log_segment_metadata(finish(10)).unwrap();
317 m.put_remote_partition_delete_metadata(RemotePartitionDeleteMetadata {
318 topic_id_partition: tp(),
319 state: RemotePartitionDeleteState::DeletePartitionMarked,
320 event_timestamp_ms: 500,
321 broker_id: 1,
322 })
323 .unwrap();
324
325 let dump = m.export();
326 let restored = InmemoryRemoteLogMetadataManager::new();
327 restored.import(dump);
328
329 let before = m.list_remote_log_segments(&tp()).unwrap();
331 let after = restored.list_remote_log_segments(&tp()).unwrap();
332 assert!(before == after);
333 assert!(restored.highest_offset_for_epoch(&tp(), 0).unwrap() == Some(99));
335 assert!(m.export() == restored.export());
337 }
338
339 #[test]
340 fn partition_delete_rejects_out_of_order() {
341 let m = InmemoryRemoteLogMetadataManager::new();
342 let err = m
343 .put_remote_partition_delete_metadata(RemotePartitionDeleteMetadata {
344 topic_id_partition: tp(),
345 state: RemotePartitionDeleteState::DeletePartitionFinished,
346 event_timestamp_ms: 500,
347 broker_id: 1,
348 })
349 .unwrap_err();
350 assert!(matches!(
351 err,
352 RemoteStorageError::InvalidPartitionDeleteTransition { .. }
353 ));
354 }
355}