streambed_logged/
compaction.rs

1use std::{error::Error, fmt::Debug};
2
3use super::*;
4
5use log::{debug, error};
6use smol_str::SmolStr;
7use tokio::task::JoinHandle;
8use tokio_stream::StreamExt;
9
10const ACTIVE_FILE_CONSUMER_IDLE_TIMEOUT: Duration = Duration::from_millis(10);
11
12/// A map of keys to offsets
13pub type CompactionMap = HashMap<Key, Offset>;
14
15/// Returned by a compaction reducer function when there is no capacity
16/// to process any more keys. A compactor will use this information to
17/// determine whether another compaction pass is required.
18#[derive(Debug, PartialEq, Eq)]
19pub struct MaxKeysReached(pub bool);
20
21/// A compactor strategy's role is to be fed consumer records for a single topic
22/// and ultimately determine, for each record key, what the earliest offset is
23/// that may be retained. Upon the consumer completing, logged will then proceed
24/// to remove unwanted records from the commit log.
25#[async_trait]
26pub trait CompactionStrategy {
27    /// The state to manage throughout a compaction run.
28    type S: Debug + Send;
29
30    /// The type of state required to manage keys. It may be that no state is
31    /// required i.e. if the key field from the record is used.
32    type KS: Debug + Send + Clone;
33
34    /// Produce the initial key state for the compaction.
35    fn key_init(&self) -> Self::KS;
36
37    /// The key function computes a key that will be used by the compactor for
38    /// subsequent use. In simple scenarios, this key can be the key field from
39    /// the topic's record itself, which is the default assumption here.
40    /// Returning None will result in this record being excluded from compaction.
41    fn key(key_state: &mut Self::KS, r: &ConsumerRecord) -> Option<Key>;
42
43    /// Produce the initial state for the reducer function.
44    async fn init(&self) -> Self::S;
45
46    /// The reducer function receives a mutable state reference, a key and a consumer
47    /// record, and returns a bool indicating whether the function has
48    /// reached its maximum number of distinct keys. If it has then
49    /// compaction may occur again once the current consumption of records
50    /// has finished. The goal is to avoid needing to process every type of
51    /// topic/partition/key in one go i.e. a subset can be processed, and then another
52    /// subset etc. This strategy helps to manage memory.
53    fn reduce(state: &mut Self::S, key: Key, record: ConsumerRecord) -> MaxKeysReached;
54
55    /// The collect function is responsible for mapping the state into
56    /// a map of keys and their minimum offsets. The compactor will filter out
57    /// the first n keys in a subsequent run where n is the number of keys
58    /// found on the first run. This permits memory to be controlled given
59    /// a large number of distinct keys. The cost of the re-run strategy is that
60    /// additional compaction scans will be required, resulting in more I/O.
61    fn collect(state: Self::S) -> CompactionMap;
62}
63
64/// A qualified id is a string that identifies a type of a record and some key derived from the record e.g.
65/// a an entity and a primary key of the entity.
66pub type Qid = SmolStr;
67
68/// The goal of key-based retention is to keep the latest record for a given
69/// key within a topic. This is the same as Kafka's key-based retention and
70/// effectively makes the commit log a key value store.
71///
72/// KeyBasedRetention is guaranteed to return a key that is the record's key.
73pub struct KeyBasedRetention {
74    max_compaction_keys: usize,
75    qid_from_record: Option<fn(&ConsumerRecord) -> Option<Qid>>,
76}
77
78impl KeyBasedRetention {
79    /// A max_compaction_keys parameter is used to limit the number of distinct topic/partition/keys
80    /// processed in a single run of the compactor.
81    pub fn new(max_compaction_keys: usize) -> Self {
82        Self {
83            qid_from_record: None,
84            max_compaction_keys,
85        }
86    }
87
88    /// Similar to the above, but the qualified id of a record will be determined by a function.
89    /// A max_compaction_keys parameter is used to limit the number of distinct topic/partition/keys
90    /// processed in a single run of the compactor.
91    pub fn with_qids(
92        qid_from_record: fn(&ConsumerRecord) -> Option<Qid>,
93        max_compaction_keys: usize,
94    ) -> Self {
95        Self {
96            qid_from_record: Some(qid_from_record),
97            max_compaction_keys,
98        }
99    }
100}
101
102/// The state associated with key based retention.
103pub type KeyBasedRetentionState = (CompactionMap, usize);
104
105#[async_trait]
106impl CompactionStrategy for KeyBasedRetention {
107    type S = KeyBasedRetentionState;
108    type KS = Option<(HashMap<Qid, Key>, fn(&ConsumerRecord) -> Option<Qid>)>;
109
110    fn key_init(&self) -> Self::KS {
111        self.qid_from_record.map(|id_from_record| {
112            (
113                HashMap::with_capacity(self.max_compaction_keys),
114                id_from_record,
115            )
116        })
117    }
118
119    fn key(key_state: &mut Self::KS, r: &ConsumerRecord) -> Option<Key> {
120        if let Some((key_map, id_from_record)) = key_state {
121            let id = (id_from_record)(r)?;
122            if let Some(key) = key_map.get(id.as_str()) {
123                Some(*key)
124            } else {
125                let key = key_map.len() as Key;
126                let _ = key_map.insert(id, key);
127                Some(key)
128            }
129        } else {
130            Some(r.key)
131        }
132    }
133
134    async fn init(&self) -> Self::S {
135        (
136            CompactionMap::with_capacity(self.max_compaction_keys),
137            self.max_compaction_keys,
138        )
139    }
140
141    fn reduce(state: &mut Self::S, key: Key, record: ConsumerRecord) -> MaxKeysReached {
142        let (compaction_map, max_keys) = state;
143
144        let l = compaction_map.len();
145        match compaction_map.entry(key) {
146            Entry::Occupied(mut e) => {
147                *e.get_mut() = record.offset;
148                MaxKeysReached(false)
149            }
150            Entry::Vacant(e) if l < *max_keys => {
151                e.insert(record.offset);
152                MaxKeysReached(false)
153            }
154            Entry::Vacant(_) => MaxKeysReached(true),
155        }
156    }
157
158    fn collect(state: Self::S) -> CompactionMap {
159        let (compaction_map, _) = state;
160        compaction_map
161    }
162}
163
164/// Similar to [KeyBasedRetention], but instead of retaining the latest offset for a key. this strategy retains
165/// the oldest nth offset associated with a key.
166pub struct NthKeyBasedRetention {
167    max_compaction_keys: usize,
168    max_records_per_key: usize,
169    qid_from_record: Option<fn(&ConsumerRecord) -> Option<Qid>>,
170}
171
172impl NthKeyBasedRetention {
173    /// A max_compaction_keys parameter is used to limit the number of distinct topic/partition/keys
174    /// processed in a single run of the compactor. The max_records_per_key is used to retain the
175    /// nth oldest key.
176    pub fn new(max_compaction_keys: usize, max_records_per_key: usize) -> Self {
177        Self {
178            max_compaction_keys,
179            max_records_per_key,
180            qid_from_record: None,
181        }
182    }
183
184    /// Similar to the above, but the qualified id of a record will be determined by a function.
185    /// A max_compaction_keys parameter is used to limit the number of distinct topic/partition/keys
186    /// processed in a single run of the compactor. The max_records_per_key is used to retain the
187    /// nth oldest key.
188    pub fn with_qids(
189        max_compaction_keys: usize,
190        max_records_per_key: usize,
191        qid_from_record: fn(&ConsumerRecord) -> Option<Qid>,
192    ) -> Self {
193        Self {
194            qid_from_record: Some(qid_from_record),
195            max_compaction_keys,
196            max_records_per_key,
197        }
198    }
199}
200
201/// The state associated with nth key based retention.
202pub type NthKeyBasedRetentionState = (HashMap<Key, VecDeque<Offset>>, usize, usize);
203
204#[async_trait]
205impl CompactionStrategy for NthKeyBasedRetention {
206    type S = NthKeyBasedRetentionState;
207    type KS = Option<(HashMap<Qid, Key>, fn(&ConsumerRecord) -> Option<Qid>)>;
208
209    fn key_init(&self) -> Self::KS {
210        self.qid_from_record.map(|id_from_record| {
211            (
212                HashMap::with_capacity(self.max_compaction_keys),
213                id_from_record,
214            )
215        })
216    }
217
218    fn key(key_state: &mut Self::KS, r: &ConsumerRecord) -> Option<Key> {
219        if let Some((key_map, id_from_record)) = key_state {
220            let id = (id_from_record)(r)?;
221            if let Some(key) = key_map.get(id.as_str()) {
222                Some(*key)
223            } else {
224                let key = key_map.len() as Key;
225                let _ = key_map.insert(id, key);
226                Some(key)
227            }
228        } else {
229            Some(r.key)
230        }
231    }
232
233    async fn init(&self) -> Self::S {
234        (
235            HashMap::with_capacity(self.max_compaction_keys),
236            self.max_compaction_keys,
237            self.max_records_per_key,
238        )
239    }
240
241    fn reduce(state: &mut Self::S, key: Key, record: ConsumerRecord) -> MaxKeysReached {
242        let (compaction_map, max_keys, max_records_per_key) = state;
243
244        let l = compaction_map.len();
245        match compaction_map.entry(key) {
246            Entry::Occupied(mut e) => {
247                let offsets = e.get_mut();
248                if offsets.len() == *max_records_per_key {
249                    offsets.pop_front();
250                }
251                offsets.push_back(record.offset);
252                MaxKeysReached(false)
253            }
254            Entry::Vacant(e) if l < *max_keys => {
255                let mut offsets = VecDeque::with_capacity(*max_records_per_key);
256                offsets.push_back(record.offset);
257                e.insert(offsets);
258                MaxKeysReached(false)
259            }
260            Entry::Vacant(_) => MaxKeysReached(true),
261        }
262    }
263
264    fn collect(state: Self::S) -> CompactionMap {
265        let (compaction_map, _, _) = state;
266        compaction_map
267            .into_iter()
268            .flat_map(|(k, mut v)| v.pop_front().map(|v| (k, v)))
269            .collect::<CompactionMap>()
270    }
271}
272
273/// Responsible for interacting with a commit log for the purposes of
274/// subscribing to a provided topic.
275#[derive(Clone)]
276pub(crate) struct ScopedTopicSubscriber<CL>
277where
278    CL: CommitLog,
279{
280    commit_log: CL,
281    subscriptions: Vec<Subscription>,
282}
283
284impl<CL> ScopedTopicSubscriber<CL>
285where
286    CL: CommitLog,
287{
288    pub fn new(commit_log: CL, topic: Topic) -> Self {
289        Self {
290            commit_log,
291            subscriptions: vec![Subscription { topic }],
292        }
293    }
294
295    pub fn subscribe<'a>(&'a self) -> Pin<Box<dyn Stream<Item = ConsumerRecord> + Send + 'a>> {
296        self.commit_log.scoped_subscribe(
297            "compactor",
298            vec![],
299            self.subscriptions.clone(),
300            Some(ACTIVE_FILE_CONSUMER_IDLE_TIMEOUT),
301        )
302    }
303}
304
305/// Responsible for performing operations on topic storage.
306pub(crate) struct TopicStorageOps<E, W>
307where
308    E: Error,
309    W: Write,
310{
311    age_active: Box<dyn FnMut() -> Result<Option<Offset>, E> + Send>,
312    new_work_writer: Box<dyn FnMut() -> Result<W, E> + Send>,
313    replace_history_files: Box<dyn FnMut() -> Result<(), E> + Send>,
314}
315
316impl<E, W> TopicStorageOps<E, W>
317where
318    E: Error,
319    W: Write,
320{
321    pub fn new<AA, NWF, RCHF, RPHF>(
322        age_active: AA,
323        new_work_file: NWF,
324        mut recover_history_files: RCHF,
325        replace_history_files: RPHF,
326    ) -> Self
327    where
328        AA: FnMut() -> Result<Option<Offset>, E> + Send + 'static,
329        NWF: FnMut() -> Result<W, E> + Send + 'static,
330        RCHF: FnMut() -> Result<(), E> + Send + 'static,
331        RPHF: FnMut() -> Result<(), E> + Send + 'static,
332    {
333        let _ = recover_history_files();
334
335        Self {
336            age_active: Box::new(age_active),
337            new_work_writer: Box::new(new_work_file),
338            replace_history_files: Box::new(replace_history_files),
339        }
340    }
341
342    pub fn age_active(&mut self) -> Result<Option<Offset>, E> {
343        (self.age_active)()
344    }
345
346    pub fn new_work_writer(&mut self) -> Result<W, E> {
347        (self.new_work_writer)()
348    }
349
350    pub fn replace_history_files(&mut self) -> Result<(), E> {
351        (self.replace_history_files)()
352    }
353}
354
355/// A compactor actually performs the work of compaction. It is a state
356/// machine with the following major transitions:
357///
358/// Idle -> Analyzing -> Compacting -> Idle
359///
360/// An async function named `step` is provided that will step through
361/// the state machine. Its present stratgey is to back-pressure by not
362/// returning when the size of the active file has reached the threshold
363/// for compaction when compaction is already in progress. If a producer
364/// is sensitive to back-pressure (this should be rare given the correct
365/// dimensioning of the compactor's configuration) then awaiting on
366/// producing messages can be avoided. The primary aim of the compactor is to
367/// manage storage space. Exhausting storage can perhaps create a
368/// similar number of problems upstream as back-pressuring and awaiting
369/// a reply to producing a message. It is left to the application developer
370/// on which strategy should be adopted and it will depend on the real-time
371/// consequences of being back-pressured.
372///
373/// While idle, we are notified with the file size of the active portion
374/// of the commit log. If the active size exceeds a provided threshold of
375/// bytes, e.g. the erase size of a flash drive, then we move to the
376/// Analysing stage.
377///
378/// During analysis, we call upon the compaction strategy to collect a
379/// map of offsets. These offsets are then supplied to the Compacting stage.
380///
381/// The Compacting stage will perform the work of producing a new history
382/// file given the map of offsets.
383///
384/// If the analysis stage did not finish entirely then analysis is run
385/// again until it is. Otherwise, back to idle.
386///
387/// A note on running the strategy again: It is not scalable to store a
388/// set of all of the keys we have previously encountered so as to avoid
389/// using them again on a subsequent run. Instead, we note the record
390/// offset of the first record where the compaction strategy detects
391/// that the max number of compaction keys has been reached. We then
392/// begin our subsequent scan from that offset. We are therefore
393/// guaranteed to encounter a key that was not able to be processed on
394/// the run so far. The worst case scenario is that we would re-run
395/// compaction having only ever discovered one new key to process given
396/// a heavy presence of prior-run keys being detected again before other
397/// keys become apparent. Detecting only one new key at a time would
398/// slow down compaction overall, and back-pressure would ultimately
399/// occur on producing to the log. Compaction should eventually finish
400/// though. We also expect the worst-case scenario to be avoidable
401/// given consideration by an application developer in terms of the
402/// number of keys that can be processed by a strategy in one run. Application
403/// developers should at least strive to dimension their compaction
404/// strategies with a number of keys that are sufficient to require
405/// only a single compaction pass.
406pub(crate) struct Compactor<E, W, CL, CS>
407where
408    E: Error,
409    W: Write,
410    CL: CommitLog,
411    CS: CompactionStrategy + Send + 'static,
412{
413    compaction_strategy: CS,
414    compaction_threshold: u64,
415    scoped_topic_subscriber: ScopedTopicSubscriber<CL>,
416    topic_storage_ops: TopicStorageOps<E, W>,
417
418    state: State<CS>,
419}
420
421#[derive(Debug)]
422enum CompactionError {
423    CannotSerialize,
424    #[allow(dead_code)]
425    IoError(io::Error),
426}
427
428enum State<CS>
429where
430    CS: CompactionStrategy,
431{
432    Idle,
433    PreparingAnalyze(Option<Offset>),
434    Analyzing(JoinHandle<(CS::KS, CS::S, Option<Offset>)>, Offset),
435    PreparingCompaction(CS::KS, CompactionMap, Offset, Option<Offset>),
436    Compacting(JoinHandle<Result<(), CompactionError>>, Option<Offset>),
437}
438
439impl<CS> Debug for State<CS>
440where
441    CS: CompactionStrategy,
442{
443    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
444        match self {
445            Self::Idle => write!(f, "Idle"),
446            Self::PreparingAnalyze(arg0) => f.debug_tuple("PreparingAnalyze").field(arg0).finish(),
447            Self::Analyzing(arg0, arg1) => {
448                f.debug_tuple("Analyzing").field(arg0).field(arg1).finish()
449            }
450            Self::PreparingCompaction(arg0, arg1, arg2, arg3) => f
451                .debug_tuple("PreparingCompaction")
452                .field(arg0)
453                .field(arg1)
454                .field(arg2)
455                .field(arg3)
456                .finish(),
457            Self::Compacting(arg0, arg1) => {
458                f.debug_tuple("Compacting").field(arg0).field(arg1).finish()
459            }
460        }
461    }
462}
463
464impl<E, W, CL, CS> Compactor<E, W, CL, CS>
465where
466    E: Error + Send + 'static,
467    W: Write + Send + 'static,
468    CL: CommitLog + Clone + Send + 'static,
469    CS: CompactionStrategy + Send + 'static,
470{
471    pub fn new(
472        compaction_strategy: CS,
473        compaction_threshold: u64,
474        scoped_topic_subscriber: ScopedTopicSubscriber<CL>,
475        topic_storage_ops: TopicStorageOps<E, W>,
476    ) -> Self {
477        Self {
478            compaction_strategy,
479            compaction_threshold,
480            scoped_topic_subscriber,
481            topic_storage_ops,
482            state: State::Idle,
483        }
484    }
485
486    pub fn is_idle(&self) -> bool {
487        matches!(self.state, State::Idle)
488    }
489
490    pub async fn step(&mut self, mut active_file_size: u64) {
491        loop {
492            let mut step_again = false;
493            let next_state = match &mut self.state {
494                State::Idle if active_file_size < self.compaction_threshold => None,
495                State::Idle => {
496                    step_again = true;
497                    Some(State::PreparingAnalyze(None))
498                }
499                State::PreparingAnalyze(mut next_start_offset) => {
500                    let r = self.topic_storage_ops.age_active();
501                    if let Ok(Some(end_offset)) = r {
502                        let task_scoped_topic_subscriber = self.scoped_topic_subscriber.clone();
503                        let task_init = self.compaction_strategy.init().await;
504                        let task_key_state_init = self.compaction_strategy.key_init();
505                        let h = tokio::spawn(async move {
506                            let mut strategy_state = task_init;
507                            let mut key_state = task_key_state_init;
508                            let mut records = task_scoped_topic_subscriber.subscribe();
509                            let start_offset = next_start_offset;
510                            next_start_offset = None;
511                            while let Some(record) = records.next().await {
512                                let record_offset = record.offset;
513                                if record_offset > end_offset {
514                                    break;
515                                }
516                                if Some(record_offset) >= start_offset {
517                                    if let Some(key) = CS::key(&mut key_state, &record) {
518                                        if matches!(
519                                            CS::reduce(&mut strategy_state, key, record),
520                                            MaxKeysReached(true)
521                                        ) && next_start_offset.is_none()
522                                        {
523                                            next_start_offset = Some(record_offset);
524                                        }
525                                    }
526                                }
527                            }
528                            (key_state, strategy_state, next_start_offset)
529                        });
530                        step_again = true;
531                        Some(State::Analyzing(h, end_offset))
532                    } else {
533                        error!("Could not age the active file/locate end offset. Aborting compaction. {r:?}");
534                        Some(State::Idle)
535                    }
536                }
537                State::Analyzing(h, end_offset) => {
538                    step_again = active_file_size >= self.compaction_threshold;
539                    if step_again || h.is_finished() {
540                        let r = h.await;
541                        let s = if let Ok((key_state, strategy_state, next_start_offset)) = r {
542                            let compaction_map = CS::collect(strategy_state);
543                            State::PreparingCompaction(
544                                key_state,
545                                compaction_map,
546                                *end_offset,
547                                next_start_offset,
548                            )
549                        } else {
550                            error!("Some error analysing compaction: {r:?}");
551                            State::Idle
552                        };
553                        Some(s)
554                    } else {
555                        None
556                    }
557                }
558                State::PreparingCompaction(
559                    key_state,
560                    compaction_map,
561                    end_offset,
562                    next_start_offset,
563                ) => {
564                    let r = self.topic_storage_ops.new_work_writer();
565                    if let Ok(mut writer) = r {
566                        let mut task_key_state = key_state.clone();
567                        let task_compaction_map = compaction_map.clone();
568                        let task_end_offset = *end_offset;
569                        let task_scoped_topic_subscriber = self.scoped_topic_subscriber.clone();
570                        let h = tokio::spawn(async move {
571                            let mut records = task_scoped_topic_subscriber.subscribe();
572                            while let Some(record) = records.next().await {
573                                if record.offset > task_end_offset {
574                                    break;
575                                }
576                                let key = CS::key(&mut task_key_state, &record);
577                                let copy = if let Some(key) = key {
578                                    task_compaction_map
579                                        .get(&key)
580                                        .map(|min_offset| record.offset >= *min_offset)
581                                        .unwrap_or(true)
582                                } else {
583                                    false
584                                };
585                                if copy {
586                                    let storable_record = StorableRecord {
587                                        version: 0,
588                                        headers: record
589                                            .headers
590                                            .into_iter()
591                                            .map(|h| StorableHeader {
592                                                key: h.key,
593                                                value: h.value,
594                                            })
595                                            .collect(),
596                                        timestamp: record.timestamp,
597                                        key: record.key,
598                                        value: record.value,
599                                        offset: record.offset,
600                                    };
601
602                                    let Ok(buf) =
603                                        postcard::to_stdvec_crc32(&storable_record, CRC.digest())
604                                    else {
605                                        return Err(CompactionError::CannotSerialize);
606                                    };
607                                    writer.write_all(&buf).map_err(CompactionError::IoError)?;
608                                }
609                            }
610                            writer.flush().map_err(CompactionError::IoError)
611                        });
612                        step_again = true;
613                        Some(State::Compacting(h, *next_start_offset))
614                    } else {
615                        error!("Could not create the new temp file. Aborting compaction.");
616                        Some(State::Idle)
617                    }
618                }
619                State::Compacting(h, next_start_offset) => {
620                    step_again = active_file_size >= self.compaction_threshold;
621                    if step_again || h.is_finished() {
622                        let r = h.await;
623                        let s = if r.is_ok() {
624                            let r = self.topic_storage_ops.replace_history_files();
625                            if r.is_ok() {
626                                if next_start_offset.is_some() {
627                                    warn!("Subsequent logging pass required from offset {next_start_offset:?}");
628                                    State::PreparingAnalyze(*next_start_offset)
629                                } else {
630                                    State::Idle
631                                }
632                            } else {
633                                error!("Some error during compaction: {r:?}");
634                                State::Idle
635                            }
636                        } else {
637                            error!(
638                                "Some error replacing the history file during compaction: {r:?}"
639                            );
640                            State::Idle
641                        };
642                        Some(s)
643                    } else {
644                        None
645                    }
646                }
647            };
648            if let Some(next_state) = next_state {
649                debug!("Compaction moving to {next_state:?}");
650                self.state = next_state;
651            }
652            if !step_again {
653                break;
654            }
655            active_file_size = 0;
656        }
657    }
658}
659
660#[cfg(test)]
661mod tests {
662    use std::{
663        env,
664        sync::atomic::{AtomicU32, Ordering},
665    };
666
667    use super::*;
668
669    #[tokio::test]
670    async fn test_key_based_retention() {
671        let topic = Topic::from("my-topic");
672
673        let r0 = ConsumerRecord {
674            topic: topic.clone(),
675            headers: vec![],
676            timestamp: None,
677            key: 0,
678            value: b"some-value-2".to_vec(),
679            partition: 0,
680            offset: 0,
681        };
682
683        let r1 = ConsumerRecord {
684            topic: topic.clone(),
685            headers: vec![],
686            timestamp: None,
687            key: 1,
688            value: b"some-value-2".to_vec(),
689            partition: 0,
690            offset: 1,
691        };
692
693        let r2 = ConsumerRecord {
694            topic: topic.clone(),
695            headers: vec![],
696            timestamp: None,
697            key: 0,
698            value: b"some-value-2".to_vec(),
699            partition: 0,
700            offset: 2,
701        };
702
703        let mut expected_compactor_result = HashMap::new();
704        expected_compactor_result.insert(0, 2);
705
706        let compaction = KeyBasedRetention::new(1);
707
708        let mut key_state = compaction.key_init();
709        let mut state = compaction.init().await;
710
711        assert_eq!(
712            KeyBasedRetention::reduce(
713                &mut state,
714                KeyBasedRetention::key(&mut key_state, &r0).unwrap(),
715                r0
716            ),
717            MaxKeysReached(false)
718        );
719
720        assert_eq!(
721            KeyBasedRetention::reduce(
722                &mut state,
723                KeyBasedRetention::key(&mut key_state, &r1).unwrap(),
724                r1
725            ),
726            MaxKeysReached(true),
727        );
728
729        assert_eq!(
730            KeyBasedRetention::reduce(
731                &mut state,
732                KeyBasedRetention::key(&mut key_state, &r2).unwrap(),
733                r2
734            ),
735            MaxKeysReached(false)
736        );
737
738        assert_eq!(KeyBasedRetention::collect(state), expected_compactor_result);
739    }
740
741    #[tokio::test]
742    async fn test_nth_key_based_retention() {
743        let topic = Topic::from("my-topic");
744
745        let r0 = ConsumerRecord {
746            topic: topic.clone(),
747            headers: vec![],
748            timestamp: None,
749            key: 0,
750            value: b"some-value-2".to_vec(),
751            partition: 0,
752            offset: 0,
753        };
754
755        let r1 = ConsumerRecord {
756            topic: topic.clone(),
757            headers: vec![],
758            timestamp: None,
759            key: 1,
760            value: b"some-value-2".to_vec(),
761            partition: 0,
762            offset: 1,
763        };
764
765        let r2 = ConsumerRecord {
766            topic: topic.clone(),
767            headers: vec![],
768            timestamp: None,
769            key: 0,
770            value: b"some-value-2".to_vec(),
771            partition: 0,
772            offset: 2,
773        };
774
775        let r3 = ConsumerRecord {
776            topic: topic.clone(),
777            headers: vec![],
778            timestamp: None,
779            key: 0,
780            value: b"some-value-2".to_vec(),
781            partition: 0,
782            offset: 3,
783        };
784
785        let mut expected_compactor_result = HashMap::new();
786        expected_compactor_result.insert(0, 2);
787
788        let compaction = NthKeyBasedRetention::new(1, 2);
789
790        let mut key_state = compaction.key_init();
791        let mut state = compaction.init().await;
792
793        assert_eq!(
794            NthKeyBasedRetention::reduce(
795                &mut state,
796                NthKeyBasedRetention::key(&mut key_state, &r0).unwrap(),
797                r0
798            ),
799            MaxKeysReached(false)
800        );
801
802        assert_eq!(
803            NthKeyBasedRetention::reduce(
804                &mut state,
805                NthKeyBasedRetention::key(&mut key_state, &r1).unwrap(),
806                r1
807            ),
808            MaxKeysReached(true),
809        );
810
811        assert_eq!(
812            NthKeyBasedRetention::reduce(
813                &mut state,
814                NthKeyBasedRetention::key(&mut key_state, &r2).unwrap(),
815                r2
816            ),
817            MaxKeysReached(false)
818        );
819
820        assert_eq!(
821            NthKeyBasedRetention::reduce(
822                &mut state,
823                NthKeyBasedRetention::key(&mut key_state, &r3).unwrap(),
824                r3
825            ),
826            MaxKeysReached(false)
827        );
828
829        assert_eq!(
830            NthKeyBasedRetention::collect(state),
831            expected_compactor_result
832        );
833    }
834
835    // Test the ability to put compaction strategies together that retains
836    // the last ten copies of a specific type of key for a topic, but the rest of the
837    // keys should leverage key based retention.
838
839    // We will have events where the battery level and name change events will use
840    // key based retention, but we keep ten copies of the temperature sensed events via
841    // nth key based retention.
842
843    // We start off with our modelling of events.
844
845    type TemperatureSensorId = u32;
846
847    #[derive(Deserialize, Serialize)]
848    enum TemperatureSensorEvent {
849        BatteryLevelSensed(TemperatureSensorId, u32),
850        NameChanged(TemperatureSensorId, String),
851        TemperatureSensed(TemperatureSensorId, u32),
852    }
853
854    // Our event keys will occupy the top 12 bits of the key, meaning
855    // that we can have 4K types of record. We use the bottom 32
856    // bits as the sensor id.
857    const EVENT_TYPE_BIT_SHIFT: usize = 52;
858
859    // Convert from events into keys - this is a one-way process.
860    impl From<TemperatureSensorEvent> for Key {
861        fn from(val: TemperatureSensorEvent) -> Self {
862            let event_key = match val {
863                TemperatureSensorEvent::BatteryLevelSensed(id, _) => {
864                    TemperatureSensorEventKey::BatteryLevelSensed(id)
865                }
866                TemperatureSensorEvent::NameChanged(id, _) => {
867                    TemperatureSensorEventKey::NameChanged(id)
868                }
869                TemperatureSensorEvent::TemperatureSensed(id, _) => {
870                    TemperatureSensorEventKey::TemperatureSensed(id)
871                }
872            };
873            let (event_type, id) = match event_key {
874                TemperatureSensorEventKey::BatteryLevelSensed(id) => (0u64, id),
875                TemperatureSensorEventKey::NameChanged(id) => (1u64, id),
876                TemperatureSensorEventKey::TemperatureSensed(id) => (2u64, id),
877            };
878            (event_type << EVENT_TYPE_BIT_SHIFT) | (id as u64)
879        }
880    }
881
882    // Introduce a type that represents just the key components of our
883    // event model object. This is so that we can conveniently coearce
884    // keys into something readable in the code.
885
886    enum TemperatureSensorEventKey {
887        BatteryLevelSensed(TemperatureSensorId),
888        NameChanged(TemperatureSensorId),
889        TemperatureSensed(TemperatureSensorId),
890    }
891
892    struct TemperatureSensorEventKeyParseError;
893
894    impl TryFrom<Key> for TemperatureSensorEventKey {
895        type Error = TemperatureSensorEventKeyParseError;
896
897        fn try_from(value: Key) -> Result<Self, Self::Error> {
898            let id = (value & 0x0000_0000_FFFF_FFFF) as u32;
899            match value >> EVENT_TYPE_BIT_SHIFT {
900                0 => Ok(TemperatureSensorEventKey::BatteryLevelSensed(id)),
901                1 => Ok(TemperatureSensorEventKey::NameChanged(id)),
902                2 => Ok(TemperatureSensorEventKey::TemperatureSensed(id)),
903                _ => Err(TemperatureSensorEventKeyParseError),
904            }
905        }
906    }
907
908    // We introduce a type here that captures behavior associated
909    // with our specific topic, including the ability to be registered
910    // for compaction.
911
912    struct TemperatureSensorTopic;
913
914    impl TemperatureSensorTopic {
915        fn name() -> Topic {
916            Topic::from("temp-sensor-events")
917        }
918    }
919
920    // This is the state object that will be used during compaction.
921    // We are using a hybrid of retention strategies, and of course,
922    // you can have your own.
923
924    #[derive(Debug)]
925    struct TemperatureSensorCompactionState {
926        temperature_events: NthKeyBasedRetentionState,
927        remaining_events: KeyBasedRetentionState,
928    }
929
930    const MAX_TEMPERATURE_SENSOR_IDS_PER_COMPACTION: usize = 10;
931    const MAX_TEMPERATURE_SENSOR_TEMPS_PER_ID: usize = 10;
932
933    #[async_trait]
934    impl CompactionStrategy for TemperatureSensorTopic {
935        type S = TemperatureSensorCompactionState;
936        type KS = ();
937
938        fn key_init(&self) -> Self::KS {}
939
940        fn key(_key_state: &mut Self::KS, r: &ConsumerRecord) -> Option<Key> {
941            Some(r.key)
942        }
943
944        async fn init(&self) -> TemperatureSensorCompactionState {
945            TemperatureSensorCompactionState {
946                temperature_events: NthKeyBasedRetention::new(
947                    MAX_TEMPERATURE_SENSOR_IDS_PER_COMPACTION,
948                    MAX_TEMPERATURE_SENSOR_TEMPS_PER_ID,
949                )
950                .init()
951                .await,
952                // We only have two types of event that we wish to use
953                // with key based retention: battery level and name changes.
954                remaining_events: KeyBasedRetention::new(
955                    2 * MAX_TEMPERATURE_SENSOR_IDS_PER_COMPACTION,
956                )
957                .init()
958                .await,
959            }
960        }
961
962        fn reduce(
963            state: &mut TemperatureSensorCompactionState,
964            key: Key,
965            record: ConsumerRecord,
966        ) -> MaxKeysReached {
967            let Ok(event_type) = TemperatureSensorEventKey::try_from(key) else {
968                return MaxKeysReached(false);
969            };
970
971            if matches!(event_type, TemperatureSensorEventKey::TemperatureSensed(_)) {
972                NthKeyBasedRetention::reduce(&mut state.temperature_events, key, record)
973            } else {
974                KeyBasedRetention::reduce(&mut state.remaining_events, key, record)
975            }
976        }
977
978        fn collect(state: TemperatureSensorCompactionState) -> CompactionMap {
979            let mut compaction_map = NthKeyBasedRetention::collect(state.temperature_events);
980            compaction_map.extend(KeyBasedRetention::collect(state.remaining_events));
981            compaction_map
982        }
983    }
984
985    // Now let's test all of that out!
986
987    #[tokio::test]
988    async fn test_both_retention_types() {
989        let e0 = TemperatureSensorEvent::BatteryLevelSensed(0, 10);
990        let v0 = postcard::to_stdvec(&e0).unwrap();
991        let r0 = ConsumerRecord {
992            topic: TemperatureSensorTopic::name(),
993            headers: vec![],
994            timestamp: None,
995            key: e0.into(),
996            value: v0,
997            partition: 0,
998            offset: 0,
999        };
1000
1001        let e1 = TemperatureSensorEvent::BatteryLevelSensed(0, 8);
1002        let v1 = postcard::to_stdvec(&e1).unwrap();
1003        let r1 = ConsumerRecord {
1004            topic: TemperatureSensorTopic::name(),
1005            headers: vec![],
1006            timestamp: None,
1007            key: e1.into(),
1008            value: v1,
1009            partition: 0,
1010            offset: 1,
1011        };
1012
1013        let e2 = TemperatureSensorEvent::TemperatureSensed(0, 30);
1014        let v2 = postcard::to_stdvec(&e2).unwrap();
1015        let r2 = ConsumerRecord {
1016            topic: TemperatureSensorTopic::name(),
1017            headers: vec![],
1018            timestamp: None,
1019            key: e2.into(),
1020            value: v2,
1021            partition: 0,
1022            offset: 2,
1023        };
1024
1025        let e3 = TemperatureSensorEvent::TemperatureSensed(0, 31);
1026        let v3 = postcard::to_stdvec(&e3).unwrap();
1027        let r3 = ConsumerRecord {
1028            topic: TemperatureSensorTopic::name(),
1029            headers: vec![],
1030            timestamp: None,
1031            key: e3.into(),
1032            value: v3,
1033            partition: 0,
1034            offset: 3,
1035        };
1036
1037        let mut expected_compactor_result = HashMap::new();
1038
1039        expected_compactor_result.insert(r1.key, 1);
1040
1041        expected_compactor_result.insert(r2.key, 2);
1042
1043        let compaction = TemperatureSensorTopic;
1044
1045        let mut state = compaction.init().await;
1046
1047        assert_eq!(
1048            TemperatureSensorTopic::reduce(
1049                &mut state,
1050                TemperatureSensorTopic::key(&mut (), &r0).unwrap(),
1051                r0
1052            ),
1053            MaxKeysReached(false)
1054        );
1055
1056        assert_eq!(
1057            TemperatureSensorTopic::reduce(
1058                &mut state,
1059                TemperatureSensorTopic::key(&mut (), &r1).unwrap(),
1060                r1
1061            ),
1062            MaxKeysReached(false),
1063        );
1064
1065        assert_eq!(
1066            TemperatureSensorTopic::reduce(
1067                &mut state,
1068                TemperatureSensorTopic::key(&mut (), &r2).unwrap(),
1069                r2
1070            ),
1071            MaxKeysReached(false)
1072        );
1073
1074        assert_eq!(
1075            TemperatureSensorTopic::reduce(
1076                &mut state,
1077                TemperatureSensorTopic::key(&mut (), &r3).unwrap(),
1078                r3
1079            ),
1080            MaxKeysReached(false)
1081        );
1082
1083        assert_eq!(
1084            TemperatureSensorTopic::collect(state),
1085            expected_compactor_result
1086        );
1087    }
1088
1089    #[tokio::test]
1090    async fn test_key_state() {
1091        let compaction = KeyBasedRetention::with_qids(
1092            |r| std::str::from_utf8(&r.value).map(|s| s.into()).ok(),
1093            2,
1094        );
1095
1096        let mut key_state = compaction.key_init();
1097
1098        assert_eq!(
1099            KeyBasedRetention::key(
1100                &mut key_state,
1101                &ConsumerRecord {
1102                    topic: "some-topic".into(),
1103                    headers: vec![],
1104                    timestamp: None,
1105                    key: 0,
1106                    value: "some-key".as_bytes().to_vec(),
1107                    partition: 0,
1108                    offset: 10,
1109                },
1110            ),
1111            Some(0)
1112        );
1113
1114        assert_eq!(
1115            KeyBasedRetention::key(
1116                &mut key_state,
1117                &ConsumerRecord {
1118                    topic: "some-topic".into(),
1119                    headers: vec![],
1120                    timestamp: None,
1121                    key: 0,
1122                    value: "some-key".as_bytes().to_vec(),
1123                    partition: 0,
1124                    offset: 10,
1125                },
1126            ),
1127            Some(0)
1128        );
1129
1130        assert_eq!(
1131            KeyBasedRetention::key(
1132                &mut key_state,
1133                &ConsumerRecord {
1134                    topic: "some-topic".into(),
1135                    headers: vec![],
1136                    timestamp: None,
1137                    key: 0,
1138                    value: "some-other-key".as_bytes().to_vec(),
1139                    partition: 0,
1140                    offset: 10,
1141                },
1142            ),
1143            Some(1)
1144        );
1145
1146        assert_eq!(
1147            key_state.unwrap().0,
1148            HashMap::from([("some-key".into(), 0), ("some-other-key".into(), 1)])
1149        );
1150    }
1151
1152    #[derive(Clone)]
1153    struct TestCommitLog;
1154
1155    #[async_trait]
1156    impl CommitLog for TestCommitLog {
1157        async fn offsets(&self, _topic: Topic, _partition: Partition) -> Option<PartitionOffsets> {
1158            todo!()
1159        }
1160
1161        async fn produce(&self, _record: ProducerRecord) -> ProduceReply {
1162            todo!()
1163        }
1164
1165        fn scoped_subscribe<'a>(
1166            &'a self,
1167            _consumer_group_name: &str,
1168            _offsets: Vec<ConsumerOffset>,
1169            _subscriptions: Vec<Subscription>,
1170            _idle_timeout: Option<Duration>,
1171        ) -> Pin<Box<dyn Stream<Item = ConsumerRecord> + Send + 'a>> {
1172            Box::pin(stream!({
1173                yield ConsumerRecord {
1174                    topic: Topic::from(""),
1175                    headers: vec![],
1176                    timestamp: None,
1177                    key: 0,
1178                    value: b"".to_vec(),
1179                    partition: 0,
1180                    offset: 0,
1181                };
1182                yield ConsumerRecord {
1183                    topic: Topic::from(""),
1184                    headers: vec![],
1185                    timestamp: None,
1186                    key: 1,
1187                    value: b"".to_vec(),
1188                    partition: 0,
1189                    offset: 1,
1190                };
1191            }))
1192        }
1193    }
1194
1195    struct TestCompactionStrategy;
1196
1197    #[async_trait]
1198    impl CompactionStrategy for TestCompactionStrategy {
1199        type S = CompactionMap;
1200        type KS = ();
1201
1202        fn key_init(&self) -> Self::KS {}
1203
1204        fn key(_key_state: &mut Self::KS, r: &ConsumerRecord) -> Option<Key> {
1205            Some(r.key)
1206        }
1207
1208        async fn init(&self) -> Self::S {
1209            CompactionMap::new()
1210        }
1211
1212        fn reduce(state: &mut Self::S, key: Key, record: ConsumerRecord) -> MaxKeysReached {
1213            if state.is_empty() {
1214                state.insert(key, record.offset);
1215                MaxKeysReached(false)
1216            } else {
1217                MaxKeysReached(true)
1218            }
1219        }
1220
1221        fn collect(state: Self::S) -> CompactionMap {
1222            state
1223        }
1224    }
1225
1226    #[tokio::test]
1227    async fn test_compactor_end_to_end() {
1228        let topic = Topic::from("my-topic");
1229
1230        let compaction_dir = env::temp_dir().join("test_compactor_end_to_end");
1231        let _ = fs::remove_dir_all(&compaction_dir);
1232        let _ = fs::create_dir_all(&compaction_dir);
1233        println!("Writing to {compaction_dir:?}");
1234
1235        let cl = TestCommitLog;
1236        let cs = TestCompactionStrategy;
1237        let sts = ScopedTopicSubscriber::new(cl, topic);
1238
1239        let num_ages = Arc::new(AtomicU32::new(0));
1240        let tso_num_ages = num_ages.clone();
1241        let num_new_work_writers = Arc::new(AtomicU32::new(0));
1242        let tso_num_new_work_writers = num_new_work_writers.clone();
1243        let num_recover_histories = Arc::new(AtomicU32::new(0));
1244        let tso_num_recover_histories = num_recover_histories.clone();
1245        let num_rename_histories = Arc::new(AtomicU32::new(0));
1246        let tso_num_rename_histories = num_rename_histories.clone();
1247        let work_file = compaction_dir.join("work_file");
1248        let tso_work_file = work_file.clone();
1249
1250        let tso = TopicStorageOps::new(
1251            move || {
1252                tso_num_ages.clone().fetch_add(1, Ordering::Relaxed);
1253                Ok(Some(1))
1254            },
1255            move || {
1256                tso_num_new_work_writers
1257                    .clone()
1258                    .fetch_add(1, Ordering::Relaxed);
1259                File::create(tso_work_file.clone())
1260            },
1261            move || {
1262                tso_num_recover_histories
1263                    .clone()
1264                    .fetch_add(1, Ordering::Relaxed);
1265                Ok(())
1266            },
1267            move || {
1268                tso_num_rename_histories
1269                    .clone()
1270                    .fetch_add(1, Ordering::Relaxed);
1271                Ok(())
1272            },
1273        );
1274
1275        let mut c = Compactor::new(cs, 1, sts, tso);
1276
1277        let mut steps = 1u32;
1278        c.step(1).await;
1279        while steps < 10 && !c.is_idle() {
1280            c.step(1).await;
1281            steps = steps.wrapping_add(1);
1282        }
1283
1284        assert!(c.is_idle());
1285
1286        assert_eq!(num_ages.load(Ordering::Relaxed), 2);
1287        assert_eq!(num_new_work_writers.load(Ordering::Relaxed), 2);
1288        assert_eq!(num_recover_histories.load(Ordering::Relaxed), 1);
1289        assert_eq!(num_rename_histories.load(Ordering::Relaxed), 2);
1290
1291        let mut f = File::open(work_file).unwrap();
1292        let mut buf = vec![];
1293        let _ = f.read_to_end(&mut buf).unwrap();
1294
1295        // Two records should have been written back out.
1296        assert_eq!(
1297            buf,
1298            [0, 0, 0, 0, 0, 0, 138, 124, 42, 87, 0, 0, 0, 1, 0, 1, 247, 109, 0, 0]
1299        );
1300    }
1301}