Skip to main content

crabka_remote_storage/
inmemory.rs

1//! [`InmemoryRemoteLogMetadataManager`] — a process-memory reference
2//! [`RemoteLogMetadataManager`], mirroring Kafka's test fixture of the same
3//! name. Tiered-storage tests run against this manager.
4
5use std::collections::HashMap;
6use std::sync::Mutex;
7
8use crate::cache::RemoteLogMetadataCache;
9use crate::dump::{PartitionDump, RlmmCacheDump};
10use crate::error::RemoteStorageError;
11use crate::metadata::{
12    RemoteLogSegmentMetadata, RemoteLogSegmentMetadataUpdate, RemotePartitionDeleteMetadata,
13    RemotePartitionDeleteState, TopicIdPartition,
14};
15use crate::metadata_manager::RemoteLogMetadataManager;
16
17/// In-memory [`RemoteLogMetadataManager`]: one
18/// `RemoteLogMetadataCache` per partition behind a single
19/// mutex. Not durable — state is lost on restart — but enforces the full
20/// lifecycle state machine, so it is a faithful stand-in for the
21/// topic-backed production manager in tests and single-process setups.
22#[derive(Debug, Default)]
23pub struct InmemoryRemoteLogMetadataManager {
24    partitions: Mutex<HashMap<TopicIdPartition, RemoteLogMetadataCache>>,
25}
26
27impl InmemoryRemoteLogMetadataManager {
28    /// Construct an empty manager.
29    #[must_use]
30    pub fn new() -> Self {
31        Self::default()
32    }
33
34    /// Dump every partition's segment + partition-delete metadata for
35    /// snapshotting. The result is order-independent;
36    /// [`Self::import`] re-derives ordering and the epoch index.
37    #[must_use]
38    pub fn export(&self) -> RlmmCacheDump {
39        let guard = self.partitions.lock().expect("metadata mutex poisoned");
40        let mut partitions: Vec<PartitionDump> = guard
41            .iter()
42            .map(|(tp, cache)| PartitionDump {
43                topic_id_partition: tp.clone(),
44                segments: cache.dump_segments(),
45                delete_state: cache.delete_state(),
46            })
47            .collect();
48        // Stable order so `export()` is deterministic and comparable.
49        partitions.sort_by(|a, b| {
50            (
51                a.topic_id_partition.topic_id,
52                a.topic_id_partition.partition,
53            )
54                .cmp(&(
55                    b.topic_id_partition.topic_id,
56                    b.topic_id_partition.partition,
57                ))
58        });
59        // Within a partition, sort segments by (start_offset, id) so the
60        // dump is canonical regardless of HashMap iteration order.
61        for p in &mut partitions {
62            p.segments.sort_by(|a, b| {
63                a.start_offset().cmp(&b.start_offset()).then_with(|| {
64                    a.remote_log_segment_id()
65                        .id
66                        .cmp(&b.remote_log_segment_id().id)
67                })
68            });
69        }
70        RlmmCacheDump { partitions }
71    }
72
73    /// Seed the cache from a dump, bypassing transition validation
74    /// Intended for a freshly-constructed manager during
75    /// snapshot restore; existing partitions are overwritten.
76    pub fn import(&self, dump: RlmmCacheDump) {
77        let mut guard = self.partitions.lock().expect("metadata mutex poisoned");
78        for p in dump.partitions {
79            let cache = guard.entry(p.topic_id_partition).or_default();
80            cache.seed(p.segments, p.delete_state);
81        }
82    }
83}
84
85impl RemoteLogMetadataManager for InmemoryRemoteLogMetadataManager {
86    fn add_remote_log_segment_metadata(
87        &self,
88        metadata: RemoteLogSegmentMetadata,
89    ) -> Result<(), RemoteStorageError> {
90        let tp = metadata.remote_log_segment_id().topic_id_partition.clone();
91        let mut guard = self.partitions.lock().expect("metadata mutex poisoned");
92        guard.entry(tp).or_default().add(metadata)
93    }
94
95    fn update_remote_log_segment_metadata(
96        &self,
97        update: RemoteLogSegmentMetadataUpdate,
98    ) -> Result<(), RemoteStorageError> {
99        let tp = update.remote_log_segment_id.topic_id_partition.clone();
100        let mut guard = self.partitions.lock().expect("metadata mutex poisoned");
101        match guard.get_mut(&tp) {
102            Some(cache) => cache.update(&update),
103            None => Err(RemoteStorageError::SegmentNotFound(
104                update.remote_log_segment_id,
105            )),
106        }
107    }
108
109    fn remote_log_segment_metadata(
110        &self,
111        topic_id_partition: &TopicIdPartition,
112        leader_epoch: i32,
113        offset: i64,
114    ) -> Result<Option<RemoteLogSegmentMetadata>, RemoteStorageError> {
115        let guard = self.partitions.lock().expect("metadata mutex poisoned");
116        Ok(guard
117            .get(topic_id_partition)
118            .and_then(|c| c.segment_for(leader_epoch, offset)))
119    }
120
121    fn highest_offset_for_epoch(
122        &self,
123        topic_id_partition: &TopicIdPartition,
124        leader_epoch: i32,
125    ) -> Result<Option<i64>, RemoteStorageError> {
126        let guard = self.partitions.lock().expect("metadata mutex poisoned");
127        Ok(guard
128            .get(topic_id_partition)
129            .and_then(|c| c.highest_offset_for_epoch(leader_epoch)))
130    }
131
132    fn list_remote_log_segments(
133        &self,
134        topic_id_partition: &TopicIdPartition,
135    ) -> Result<Vec<RemoteLogSegmentMetadata>, RemoteStorageError> {
136        let guard = self.partitions.lock().expect("metadata mutex poisoned");
137        Ok(guard
138            .get(topic_id_partition)
139            .map(RemoteLogMetadataCache::list)
140            .unwrap_or_default())
141    }
142
143    fn list_remote_log_segments_by_epoch(
144        &self,
145        topic_id_partition: &TopicIdPartition,
146        leader_epoch: i32,
147    ) -> Result<Vec<RemoteLogSegmentMetadata>, RemoteStorageError> {
148        let guard = self.partitions.lock().expect("metadata mutex poisoned");
149        Ok(guard
150            .get(topic_id_partition)
151            .map(|c| c.list_by_epoch(leader_epoch))
152            .unwrap_or_default())
153    }
154
155    fn put_remote_partition_delete_metadata(
156        &self,
157        metadata: RemotePartitionDeleteMetadata,
158    ) -> Result<(), RemoteStorageError> {
159        let mut guard = self.partitions.lock().expect("metadata mutex poisoned");
160        let cache = guard
161            .entry(metadata.topic_id_partition.clone())
162            .or_default();
163        let from = cache.delete_state();
164        if !RemotePartitionDeleteState::is_valid_transition(from, metadata.state) {
165            return Err(RemoteStorageError::InvalidPartitionDeleteTransition {
166                tp: metadata.topic_id_partition,
167                from,
168                to: metadata.state,
169            });
170        }
171        cache.set_delete_state(metadata.state);
172        Ok(())
173    }
174}
175
176#[cfg(test)]
177mod tests {
178    use super::*;
179    use assert2::assert;
180    use std::collections::BTreeMap;
181    use uuid::Uuid;
182
183    use crate::metadata::{
184        CustomMetadata, RemoteLogSegmentId, RemoteLogSegmentState, RemotePartitionDeleteState,
185    };
186
187    fn tp() -> TopicIdPartition {
188        TopicIdPartition::new(Uuid::from_u128(1), "orders", 0)
189    }
190
191    fn started(id: u128, start: i64, end: i64) -> RemoteLogSegmentMetadata {
192        RemoteLogSegmentMetadata::new(
193            RemoteLogSegmentId::new(tp(), Uuid::from_u128(id)),
194            start,
195            end,
196            end,
197            1,
198            100,
199            2048,
200            RemoteLogSegmentState::CopySegmentStarted,
201            BTreeMap::from([(0, start)]),
202        )
203        .unwrap()
204    }
205
206    fn finish(id: u128) -> RemoteLogSegmentMetadataUpdate {
207        RemoteLogSegmentMetadataUpdate {
208            remote_log_segment_id: RemoteLogSegmentId::new(tp(), Uuid::from_u128(id)),
209            event_timestamp_ms: 200,
210            custom_metadata: Some(CustomMetadata(vec![7])),
211            state: RemoteLogSegmentState::CopySegmentFinished,
212            broker_id: 1,
213        }
214    }
215
216    #[test]
217    fn add_finish_query_round_trip() {
218        let m = InmemoryRemoteLogMetadataManager::new();
219        m.add_remote_log_segment_metadata(started(10, 0, 99))
220            .unwrap();
221        m.update_remote_log_segment_metadata(finish(10)).unwrap();
222
223        let got = m
224            .remote_log_segment_metadata(&tp(), 0, 42)
225            .unwrap()
226            .expect("segment found");
227        assert!(got.remote_log_segment_id().id == Uuid::from_u128(10));
228        assert!(got.custom_metadata() == Some(&CustomMetadata(vec![7])));
229        assert!(m.highest_offset_for_epoch(&tp(), 0).unwrap() == Some(99));
230    }
231
232    #[test]
233    fn query_unknown_partition_is_none_not_error() {
234        let m = InmemoryRemoteLogMetadataManager::new();
235        let other = TopicIdPartition::new(Uuid::from_u128(999), "nope", 0);
236        assert!(m.remote_log_segment_metadata(&other, 0, 0).unwrap() == None);
237        assert!(m.highest_offset_for_epoch(&other, 0).unwrap() == None);
238        assert!(m.list_remote_log_segments(&other).unwrap().is_empty());
239    }
240
241    #[test]
242    fn inmemory_read_outcomes_are_some_none_never_not_ready() {
243        let m = InmemoryRemoteLogMetadataManager::new();
244        m.add_remote_log_segment_metadata(started(10, 0, 99))
245            .unwrap();
246        m.update_remote_log_segment_metadata(finish(10)).unwrap();
247
248        // Found.
249        assert!(matches!(
250            m.remote_log_segment_metadata(&tp(), 0, 42),
251            Ok(Some(_))
252        ));
253        // Caught up, no covering segment → genuine miss.
254        assert!(matches!(
255            m.remote_log_segment_metadata(&tp(), 0, 10_000),
256            Ok(None)
257        ));
258        // Unknown partition → genuine miss, NOT NotReady.
259        let other = TopicIdPartition::new(Uuid::from_u128(999), "nope", 0);
260        let got = m.remote_log_segment_metadata(&other, 0, 0);
261        assert!(matches!(got, Ok(None)));
262        assert!(
263            !matches!(got, Err(RemoteStorageError::NotReady { .. })),
264            "in-memory manager has no consumer lag; never NotReady"
265        );
266    }
267
268    #[test]
269    fn update_before_add_errors() {
270        let m = InmemoryRemoteLogMetadataManager::new();
271        let err = m
272            .update_remote_log_segment_metadata(finish(10))
273            .unwrap_err();
274        assert!(matches!(err, RemoteStorageError::SegmentNotFound(_)));
275    }
276
277    #[test]
278    fn list_returns_all_states_ordered() {
279        let m = InmemoryRemoteLogMetadataManager::new();
280        m.add_remote_log_segment_metadata(started(11, 100, 199))
281            .unwrap();
282        m.add_remote_log_segment_metadata(started(10, 0, 99))
283            .unwrap();
284        m.update_remote_log_segment_metadata(finish(10)).unwrap();
285        let listed = m.list_remote_log_segments(&tp()).unwrap();
286        assert!(listed.len() == 2);
287        assert!(listed[0].start_offset() == 0);
288        assert!(listed[1].start_offset() == 100);
289    }
290
291    #[test]
292    fn partition_delete_lifecycle() {
293        let m = InmemoryRemoteLogMetadataManager::new();
294        for state in [
295            RemotePartitionDeleteState::DeletePartitionMarked,
296            RemotePartitionDeleteState::DeletePartitionStarted,
297            RemotePartitionDeleteState::DeletePartitionFinished,
298        ] {
299            m.put_remote_partition_delete_metadata(RemotePartitionDeleteMetadata {
300                topic_id_partition: tp(),
301                state,
302                event_timestamp_ms: 500,
303                broker_id: 1,
304            })
305            .unwrap();
306        }
307    }
308
309    #[test]
310    fn export_then_import_reproduces_cache() {
311        let m = InmemoryRemoteLogMetadataManager::new();
312        m.add_remote_log_segment_metadata(started(10, 0, 99))
313            .unwrap();
314        m.add_remote_log_segment_metadata(started(11, 100, 199))
315            .unwrap();
316        m.update_remote_log_segment_metadata(finish(10)).unwrap();
317        m.put_remote_partition_delete_metadata(RemotePartitionDeleteMetadata {
318            topic_id_partition: tp(),
319            state: RemotePartitionDeleteState::DeletePartitionMarked,
320            event_timestamp_ms: 500,
321            broker_id: 1,
322        })
323        .unwrap();
324
325        let dump = m.export();
326        let restored = InmemoryRemoteLogMetadataManager::new();
327        restored.import(dump);
328
329        // list_remote_log_segments matches across the partition.
330        let before = m.list_remote_log_segments(&tp()).unwrap();
331        let after = restored.list_remote_log_segments(&tp()).unwrap();
332        assert!(before == after);
333        // Finished segment still queryable post-import.
334        assert!(restored.highest_offset_for_epoch(&tp(), 0).unwrap() == Some(99));
335        // Re-exporting yields the same dump (idempotent round trip).
336        assert!(m.export() == restored.export());
337    }
338
339    #[test]
340    fn partition_delete_rejects_out_of_order() {
341        let m = InmemoryRemoteLogMetadataManager::new();
342        let err = m
343            .put_remote_partition_delete_metadata(RemotePartitionDeleteMetadata {
344                topic_id_partition: tp(),
345                state: RemotePartitionDeleteState::DeletePartitionFinished,
346                event_timestamp_ms: 500,
347                broker_id: 1,
348            })
349            .unwrap_err();
350        assert!(matches!(
351            err,
352            RemoteStorageError::InvalidPartitionDeleteTransition { .. }
353        ));
354    }
355}