1use std::{error::Error, fmt::Debug};
2
3use super::*;
4
5use log::{debug, error};
6use smol_str::SmolStr;
7use tokio::task::JoinHandle;
8use tokio_stream::StreamExt;
9
10const ACTIVE_FILE_CONSUMER_IDLE_TIMEOUT: Duration = Duration::from_millis(10);
11
12pub type CompactionMap = HashMap<Key, Offset>;
14
15#[derive(Debug, PartialEq, Eq)]
19pub struct MaxKeysReached(pub bool);
20
21#[async_trait]
26pub trait CompactionStrategy {
27 type S: Debug + Send;
29
30 type KS: Debug + Send + Clone;
33
34 fn key_init(&self) -> Self::KS;
36
37 fn key(key_state: &mut Self::KS, r: &ConsumerRecord) -> Option<Key>;
42
43 async fn init(&self) -> Self::S;
45
46 fn reduce(state: &mut Self::S, key: Key, record: ConsumerRecord) -> MaxKeysReached;
54
55 fn collect(state: Self::S) -> CompactionMap;
62}
63
64pub type Qid = SmolStr;
67
68pub struct KeyBasedRetention {
74 max_compaction_keys: usize,
75 qid_from_record: Option<fn(&ConsumerRecord) -> Option<Qid>>,
76}
77
78impl KeyBasedRetention {
79 pub fn new(max_compaction_keys: usize) -> Self {
82 Self {
83 qid_from_record: None,
84 max_compaction_keys,
85 }
86 }
87
88 pub fn with_qids(
92 qid_from_record: fn(&ConsumerRecord) -> Option<Qid>,
93 max_compaction_keys: usize,
94 ) -> Self {
95 Self {
96 qid_from_record: Some(qid_from_record),
97 max_compaction_keys,
98 }
99 }
100}
101
102pub type KeyBasedRetentionState = (CompactionMap, usize);
104
105#[async_trait]
106impl CompactionStrategy for KeyBasedRetention {
107 type S = KeyBasedRetentionState;
108 type KS = Option<(HashMap<Qid, Key>, fn(&ConsumerRecord) -> Option<Qid>)>;
109
110 fn key_init(&self) -> Self::KS {
111 self.qid_from_record.map(|id_from_record| {
112 (
113 HashMap::with_capacity(self.max_compaction_keys),
114 id_from_record,
115 )
116 })
117 }
118
119 fn key(key_state: &mut Self::KS, r: &ConsumerRecord) -> Option<Key> {
120 if let Some((key_map, id_from_record)) = key_state {
121 let id = (id_from_record)(r)?;
122 if let Some(key) = key_map.get(id.as_str()) {
123 Some(*key)
124 } else {
125 let key = key_map.len() as Key;
126 let _ = key_map.insert(id, key);
127 Some(key)
128 }
129 } else {
130 Some(r.key)
131 }
132 }
133
134 async fn init(&self) -> Self::S {
135 (
136 CompactionMap::with_capacity(self.max_compaction_keys),
137 self.max_compaction_keys,
138 )
139 }
140
141 fn reduce(state: &mut Self::S, key: Key, record: ConsumerRecord) -> MaxKeysReached {
142 let (compaction_map, max_keys) = state;
143
144 let l = compaction_map.len();
145 match compaction_map.entry(key) {
146 Entry::Occupied(mut e) => {
147 *e.get_mut() = record.offset;
148 MaxKeysReached(false)
149 }
150 Entry::Vacant(e) if l < *max_keys => {
151 e.insert(record.offset);
152 MaxKeysReached(false)
153 }
154 Entry::Vacant(_) => MaxKeysReached(true),
155 }
156 }
157
158 fn collect(state: Self::S) -> CompactionMap {
159 let (compaction_map, _) = state;
160 compaction_map
161 }
162}
163
164pub struct NthKeyBasedRetention {
167 max_compaction_keys: usize,
168 max_records_per_key: usize,
169 qid_from_record: Option<fn(&ConsumerRecord) -> Option<Qid>>,
170}
171
172impl NthKeyBasedRetention {
173 pub fn new(max_compaction_keys: usize, max_records_per_key: usize) -> Self {
177 Self {
178 max_compaction_keys,
179 max_records_per_key,
180 qid_from_record: None,
181 }
182 }
183
184 pub fn with_qids(
189 max_compaction_keys: usize,
190 max_records_per_key: usize,
191 qid_from_record: fn(&ConsumerRecord) -> Option<Qid>,
192 ) -> Self {
193 Self {
194 qid_from_record: Some(qid_from_record),
195 max_compaction_keys,
196 max_records_per_key,
197 }
198 }
199}
200
201pub type NthKeyBasedRetentionState = (HashMap<Key, VecDeque<Offset>>, usize, usize);
203
204#[async_trait]
205impl CompactionStrategy for NthKeyBasedRetention {
206 type S = NthKeyBasedRetentionState;
207 type KS = Option<(HashMap<Qid, Key>, fn(&ConsumerRecord) -> Option<Qid>)>;
208
209 fn key_init(&self) -> Self::KS {
210 self.qid_from_record.map(|id_from_record| {
211 (
212 HashMap::with_capacity(self.max_compaction_keys),
213 id_from_record,
214 )
215 })
216 }
217
218 fn key(key_state: &mut Self::KS, r: &ConsumerRecord) -> Option<Key> {
219 if let Some((key_map, id_from_record)) = key_state {
220 let id = (id_from_record)(r)?;
221 if let Some(key) = key_map.get(id.as_str()) {
222 Some(*key)
223 } else {
224 let key = key_map.len() as Key;
225 let _ = key_map.insert(id, key);
226 Some(key)
227 }
228 } else {
229 Some(r.key)
230 }
231 }
232
233 async fn init(&self) -> Self::S {
234 (
235 HashMap::with_capacity(self.max_compaction_keys),
236 self.max_compaction_keys,
237 self.max_records_per_key,
238 )
239 }
240
241 fn reduce(state: &mut Self::S, key: Key, record: ConsumerRecord) -> MaxKeysReached {
242 let (compaction_map, max_keys, max_records_per_key) = state;
243
244 let l = compaction_map.len();
245 match compaction_map.entry(key) {
246 Entry::Occupied(mut e) => {
247 let offsets = e.get_mut();
248 if offsets.len() == *max_records_per_key {
249 offsets.pop_front();
250 }
251 offsets.push_back(record.offset);
252 MaxKeysReached(false)
253 }
254 Entry::Vacant(e) if l < *max_keys => {
255 let mut offsets = VecDeque::with_capacity(*max_records_per_key);
256 offsets.push_back(record.offset);
257 e.insert(offsets);
258 MaxKeysReached(false)
259 }
260 Entry::Vacant(_) => MaxKeysReached(true),
261 }
262 }
263
264 fn collect(state: Self::S) -> CompactionMap {
265 let (compaction_map, _, _) = state;
266 compaction_map
267 .into_iter()
268 .flat_map(|(k, mut v)| v.pop_front().map(|v| (k, v)))
269 .collect::<CompactionMap>()
270 }
271}
272
273#[derive(Clone)]
276pub(crate) struct ScopedTopicSubscriber<CL>
277where
278 CL: CommitLog,
279{
280 commit_log: CL,
281 subscriptions: Vec<Subscription>,
282}
283
284impl<CL> ScopedTopicSubscriber<CL>
285where
286 CL: CommitLog,
287{
288 pub fn new(commit_log: CL, topic: Topic) -> Self {
289 Self {
290 commit_log,
291 subscriptions: vec![Subscription { topic }],
292 }
293 }
294
295 pub fn subscribe<'a>(&'a self) -> Pin<Box<dyn Stream<Item = ConsumerRecord> + Send + 'a>> {
296 self.commit_log.scoped_subscribe(
297 "compactor",
298 vec![],
299 self.subscriptions.clone(),
300 Some(ACTIVE_FILE_CONSUMER_IDLE_TIMEOUT),
301 )
302 }
303}
304
305pub(crate) struct TopicStorageOps<E, W>
307where
308 E: Error,
309 W: Write,
310{
311 age_active: Box<dyn FnMut() -> Result<Option<Offset>, E> + Send>,
312 new_work_writer: Box<dyn FnMut() -> Result<W, E> + Send>,
313 replace_history_files: Box<dyn FnMut() -> Result<(), E> + Send>,
314}
315
316impl<E, W> TopicStorageOps<E, W>
317where
318 E: Error,
319 W: Write,
320{
321 pub fn new<AA, NWF, RCHF, RPHF>(
322 age_active: AA,
323 new_work_file: NWF,
324 mut recover_history_files: RCHF,
325 replace_history_files: RPHF,
326 ) -> Self
327 where
328 AA: FnMut() -> Result<Option<Offset>, E> + Send + 'static,
329 NWF: FnMut() -> Result<W, E> + Send + 'static,
330 RCHF: FnMut() -> Result<(), E> + Send + 'static,
331 RPHF: FnMut() -> Result<(), E> + Send + 'static,
332 {
333 let _ = recover_history_files();
334
335 Self {
336 age_active: Box::new(age_active),
337 new_work_writer: Box::new(new_work_file),
338 replace_history_files: Box::new(replace_history_files),
339 }
340 }
341
342 pub fn age_active(&mut self) -> Result<Option<Offset>, E> {
343 (self.age_active)()
344 }
345
346 pub fn new_work_writer(&mut self) -> Result<W, E> {
347 (self.new_work_writer)()
348 }
349
350 pub fn replace_history_files(&mut self) -> Result<(), E> {
351 (self.replace_history_files)()
352 }
353}
354
355pub(crate) struct Compactor<E, W, CL, CS>
407where
408 E: Error,
409 W: Write,
410 CL: CommitLog,
411 CS: CompactionStrategy + Send + 'static,
412{
413 compaction_strategy: CS,
414 compaction_threshold: u64,
415 scoped_topic_subscriber: ScopedTopicSubscriber<CL>,
416 topic_storage_ops: TopicStorageOps<E, W>,
417
418 state: State<CS>,
419}
420
421#[derive(Debug)]
422enum CompactionError {
423 CannotSerialize,
424 #[allow(dead_code)]
425 IoError(io::Error),
426}
427
428enum State<CS>
429where
430 CS: CompactionStrategy,
431{
432 Idle,
433 PreparingAnalyze(Option<Offset>),
434 Analyzing(JoinHandle<(CS::KS, CS::S, Option<Offset>)>, Offset),
435 PreparingCompaction(CS::KS, CompactionMap, Offset, Option<Offset>),
436 Compacting(JoinHandle<Result<(), CompactionError>>, Option<Offset>),
437}
438
439impl<CS> Debug for State<CS>
440where
441 CS: CompactionStrategy,
442{
443 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
444 match self {
445 Self::Idle => write!(f, "Idle"),
446 Self::PreparingAnalyze(arg0) => f.debug_tuple("PreparingAnalyze").field(arg0).finish(),
447 Self::Analyzing(arg0, arg1) => {
448 f.debug_tuple("Analyzing").field(arg0).field(arg1).finish()
449 }
450 Self::PreparingCompaction(arg0, arg1, arg2, arg3) => f
451 .debug_tuple("PreparingCompaction")
452 .field(arg0)
453 .field(arg1)
454 .field(arg2)
455 .field(arg3)
456 .finish(),
457 Self::Compacting(arg0, arg1) => {
458 f.debug_tuple("Compacting").field(arg0).field(arg1).finish()
459 }
460 }
461 }
462}
463
464impl<E, W, CL, CS> Compactor<E, W, CL, CS>
465where
466 E: Error + Send + 'static,
467 W: Write + Send + 'static,
468 CL: CommitLog + Clone + Send + 'static,
469 CS: CompactionStrategy + Send + 'static,
470{
471 pub fn new(
472 compaction_strategy: CS,
473 compaction_threshold: u64,
474 scoped_topic_subscriber: ScopedTopicSubscriber<CL>,
475 topic_storage_ops: TopicStorageOps<E, W>,
476 ) -> Self {
477 Self {
478 compaction_strategy,
479 compaction_threshold,
480 scoped_topic_subscriber,
481 topic_storage_ops,
482 state: State::Idle,
483 }
484 }
485
486 pub fn is_idle(&self) -> bool {
487 matches!(self.state, State::Idle)
488 }
489
490 pub async fn step(&mut self, mut active_file_size: u64) {
491 loop {
492 let mut step_again = false;
493 let next_state = match &mut self.state {
494 State::Idle if active_file_size < self.compaction_threshold => None,
495 State::Idle => {
496 step_again = true;
497 Some(State::PreparingAnalyze(None))
498 }
499 State::PreparingAnalyze(mut next_start_offset) => {
500 let r = self.topic_storage_ops.age_active();
501 if let Ok(Some(end_offset)) = r {
502 let task_scoped_topic_subscriber = self.scoped_topic_subscriber.clone();
503 let task_init = self.compaction_strategy.init().await;
504 let task_key_state_init = self.compaction_strategy.key_init();
505 let h = tokio::spawn(async move {
506 let mut strategy_state = task_init;
507 let mut key_state = task_key_state_init;
508 let mut records = task_scoped_topic_subscriber.subscribe();
509 let start_offset = next_start_offset;
510 next_start_offset = None;
511 while let Some(record) = records.next().await {
512 let record_offset = record.offset;
513 if record_offset > end_offset {
514 break;
515 }
516 if Some(record_offset) >= start_offset {
517 if let Some(key) = CS::key(&mut key_state, &record) {
518 if matches!(
519 CS::reduce(&mut strategy_state, key, record),
520 MaxKeysReached(true)
521 ) && next_start_offset.is_none()
522 {
523 next_start_offset = Some(record_offset);
524 }
525 }
526 }
527 }
528 (key_state, strategy_state, next_start_offset)
529 });
530 step_again = true;
531 Some(State::Analyzing(h, end_offset))
532 } else {
533 error!("Could not age the active file/locate end offset. Aborting compaction. {r:?}");
534 Some(State::Idle)
535 }
536 }
537 State::Analyzing(h, end_offset) => {
538 step_again = active_file_size >= self.compaction_threshold;
539 if step_again || h.is_finished() {
540 let r = h.await;
541 let s = if let Ok((key_state, strategy_state, next_start_offset)) = r {
542 let compaction_map = CS::collect(strategy_state);
543 State::PreparingCompaction(
544 key_state,
545 compaction_map,
546 *end_offset,
547 next_start_offset,
548 )
549 } else {
550 error!("Some error analysing compaction: {r:?}");
551 State::Idle
552 };
553 Some(s)
554 } else {
555 None
556 }
557 }
558 State::PreparingCompaction(
559 key_state,
560 compaction_map,
561 end_offset,
562 next_start_offset,
563 ) => {
564 let r = self.topic_storage_ops.new_work_writer();
565 if let Ok(mut writer) = r {
566 let mut task_key_state = key_state.clone();
567 let task_compaction_map = compaction_map.clone();
568 let task_end_offset = *end_offset;
569 let task_scoped_topic_subscriber = self.scoped_topic_subscriber.clone();
570 let h = tokio::spawn(async move {
571 let mut records = task_scoped_topic_subscriber.subscribe();
572 while let Some(record) = records.next().await {
573 if record.offset > task_end_offset {
574 break;
575 }
576 let key = CS::key(&mut task_key_state, &record);
577 let copy = if let Some(key) = key {
578 task_compaction_map
579 .get(&key)
580 .map(|min_offset| record.offset >= *min_offset)
581 .unwrap_or(true)
582 } else {
583 false
584 };
585 if copy {
586 let storable_record = StorableRecord {
587 version: 0,
588 headers: record
589 .headers
590 .into_iter()
591 .map(|h| StorableHeader {
592 key: h.key,
593 value: h.value,
594 })
595 .collect(),
596 timestamp: record.timestamp,
597 key: record.key,
598 value: record.value,
599 offset: record.offset,
600 };
601
602 let Ok(buf) =
603 postcard::to_stdvec_crc32(&storable_record, CRC.digest())
604 else {
605 return Err(CompactionError::CannotSerialize);
606 };
607 writer.write_all(&buf).map_err(CompactionError::IoError)?;
608 }
609 }
610 writer.flush().map_err(CompactionError::IoError)
611 });
612 step_again = true;
613 Some(State::Compacting(h, *next_start_offset))
614 } else {
615 error!("Could not create the new temp file. Aborting compaction.");
616 Some(State::Idle)
617 }
618 }
619 State::Compacting(h, next_start_offset) => {
620 step_again = active_file_size >= self.compaction_threshold;
621 if step_again || h.is_finished() {
622 let r = h.await;
623 let s = if r.is_ok() {
624 let r = self.topic_storage_ops.replace_history_files();
625 if r.is_ok() {
626 if next_start_offset.is_some() {
627 warn!("Subsequent logging pass required from offset {next_start_offset:?}");
628 State::PreparingAnalyze(*next_start_offset)
629 } else {
630 State::Idle
631 }
632 } else {
633 error!("Some error during compaction: {r:?}");
634 State::Idle
635 }
636 } else {
637 error!(
638 "Some error replacing the history file during compaction: {r:?}"
639 );
640 State::Idle
641 };
642 Some(s)
643 } else {
644 None
645 }
646 }
647 };
648 if let Some(next_state) = next_state {
649 debug!("Compaction moving to {next_state:?}");
650 self.state = next_state;
651 }
652 if !step_again {
653 break;
654 }
655 active_file_size = 0;
656 }
657 }
658}
659
660#[cfg(test)]
661mod tests {
662 use std::{
663 env,
664 sync::atomic::{AtomicU32, Ordering},
665 };
666
667 use super::*;
668
669 #[tokio::test]
670 async fn test_key_based_retention() {
671 let topic = Topic::from("my-topic");
672
673 let r0 = ConsumerRecord {
674 topic: topic.clone(),
675 headers: vec![],
676 timestamp: None,
677 key: 0,
678 value: b"some-value-2".to_vec(),
679 partition: 0,
680 offset: 0,
681 };
682
683 let r1 = ConsumerRecord {
684 topic: topic.clone(),
685 headers: vec![],
686 timestamp: None,
687 key: 1,
688 value: b"some-value-2".to_vec(),
689 partition: 0,
690 offset: 1,
691 };
692
693 let r2 = ConsumerRecord {
694 topic: topic.clone(),
695 headers: vec![],
696 timestamp: None,
697 key: 0,
698 value: b"some-value-2".to_vec(),
699 partition: 0,
700 offset: 2,
701 };
702
703 let mut expected_compactor_result = HashMap::new();
704 expected_compactor_result.insert(0, 2);
705
706 let compaction = KeyBasedRetention::new(1);
707
708 let mut key_state = compaction.key_init();
709 let mut state = compaction.init().await;
710
711 assert_eq!(
712 KeyBasedRetention::reduce(
713 &mut state,
714 KeyBasedRetention::key(&mut key_state, &r0).unwrap(),
715 r0
716 ),
717 MaxKeysReached(false)
718 );
719
720 assert_eq!(
721 KeyBasedRetention::reduce(
722 &mut state,
723 KeyBasedRetention::key(&mut key_state, &r1).unwrap(),
724 r1
725 ),
726 MaxKeysReached(true),
727 );
728
729 assert_eq!(
730 KeyBasedRetention::reduce(
731 &mut state,
732 KeyBasedRetention::key(&mut key_state, &r2).unwrap(),
733 r2
734 ),
735 MaxKeysReached(false)
736 );
737
738 assert_eq!(KeyBasedRetention::collect(state), expected_compactor_result);
739 }
740
741 #[tokio::test]
742 async fn test_nth_key_based_retention() {
743 let topic = Topic::from("my-topic");
744
745 let r0 = ConsumerRecord {
746 topic: topic.clone(),
747 headers: vec![],
748 timestamp: None,
749 key: 0,
750 value: b"some-value-2".to_vec(),
751 partition: 0,
752 offset: 0,
753 };
754
755 let r1 = ConsumerRecord {
756 topic: topic.clone(),
757 headers: vec![],
758 timestamp: None,
759 key: 1,
760 value: b"some-value-2".to_vec(),
761 partition: 0,
762 offset: 1,
763 };
764
765 let r2 = ConsumerRecord {
766 topic: topic.clone(),
767 headers: vec![],
768 timestamp: None,
769 key: 0,
770 value: b"some-value-2".to_vec(),
771 partition: 0,
772 offset: 2,
773 };
774
775 let r3 = ConsumerRecord {
776 topic: topic.clone(),
777 headers: vec![],
778 timestamp: None,
779 key: 0,
780 value: b"some-value-2".to_vec(),
781 partition: 0,
782 offset: 3,
783 };
784
785 let mut expected_compactor_result = HashMap::new();
786 expected_compactor_result.insert(0, 2);
787
788 let compaction = NthKeyBasedRetention::new(1, 2);
789
790 let mut key_state = compaction.key_init();
791 let mut state = compaction.init().await;
792
793 assert_eq!(
794 NthKeyBasedRetention::reduce(
795 &mut state,
796 NthKeyBasedRetention::key(&mut key_state, &r0).unwrap(),
797 r0
798 ),
799 MaxKeysReached(false)
800 );
801
802 assert_eq!(
803 NthKeyBasedRetention::reduce(
804 &mut state,
805 NthKeyBasedRetention::key(&mut key_state, &r1).unwrap(),
806 r1
807 ),
808 MaxKeysReached(true),
809 );
810
811 assert_eq!(
812 NthKeyBasedRetention::reduce(
813 &mut state,
814 NthKeyBasedRetention::key(&mut key_state, &r2).unwrap(),
815 r2
816 ),
817 MaxKeysReached(false)
818 );
819
820 assert_eq!(
821 NthKeyBasedRetention::reduce(
822 &mut state,
823 NthKeyBasedRetention::key(&mut key_state, &r3).unwrap(),
824 r3
825 ),
826 MaxKeysReached(false)
827 );
828
829 assert_eq!(
830 NthKeyBasedRetention::collect(state),
831 expected_compactor_result
832 );
833 }
834
835 type TemperatureSensorId = u32;
846
847 #[derive(Deserialize, Serialize)]
848 enum TemperatureSensorEvent {
849 BatteryLevelSensed(TemperatureSensorId, u32),
850 NameChanged(TemperatureSensorId, String),
851 TemperatureSensed(TemperatureSensorId, u32),
852 }
853
854 const EVENT_TYPE_BIT_SHIFT: usize = 52;
858
859 impl From<TemperatureSensorEvent> for Key {
861 fn from(val: TemperatureSensorEvent) -> Self {
862 let event_key = match val {
863 TemperatureSensorEvent::BatteryLevelSensed(id, _) => {
864 TemperatureSensorEventKey::BatteryLevelSensed(id)
865 }
866 TemperatureSensorEvent::NameChanged(id, _) => {
867 TemperatureSensorEventKey::NameChanged(id)
868 }
869 TemperatureSensorEvent::TemperatureSensed(id, _) => {
870 TemperatureSensorEventKey::TemperatureSensed(id)
871 }
872 };
873 let (event_type, id) = match event_key {
874 TemperatureSensorEventKey::BatteryLevelSensed(id) => (0u64, id),
875 TemperatureSensorEventKey::NameChanged(id) => (1u64, id),
876 TemperatureSensorEventKey::TemperatureSensed(id) => (2u64, id),
877 };
878 (event_type << EVENT_TYPE_BIT_SHIFT) | (id as u64)
879 }
880 }
881
882 enum TemperatureSensorEventKey {
887 BatteryLevelSensed(TemperatureSensorId),
888 NameChanged(TemperatureSensorId),
889 TemperatureSensed(TemperatureSensorId),
890 }
891
892 struct TemperatureSensorEventKeyParseError;
893
894 impl TryFrom<Key> for TemperatureSensorEventKey {
895 type Error = TemperatureSensorEventKeyParseError;
896
897 fn try_from(value: Key) -> Result<Self, Self::Error> {
898 let id = (value & 0x0000_0000_FFFF_FFFF) as u32;
899 match value >> EVENT_TYPE_BIT_SHIFT {
900 0 => Ok(TemperatureSensorEventKey::BatteryLevelSensed(id)),
901 1 => Ok(TemperatureSensorEventKey::NameChanged(id)),
902 2 => Ok(TemperatureSensorEventKey::TemperatureSensed(id)),
903 _ => Err(TemperatureSensorEventKeyParseError),
904 }
905 }
906 }
907
908 struct TemperatureSensorTopic;
913
914 impl TemperatureSensorTopic {
915 fn name() -> Topic {
916 Topic::from("temp-sensor-events")
917 }
918 }
919
920 #[derive(Debug)]
925 struct TemperatureSensorCompactionState {
926 temperature_events: NthKeyBasedRetentionState,
927 remaining_events: KeyBasedRetentionState,
928 }
929
930 const MAX_TEMPERATURE_SENSOR_IDS_PER_COMPACTION: usize = 10;
931 const MAX_TEMPERATURE_SENSOR_TEMPS_PER_ID: usize = 10;
932
933 #[async_trait]
934 impl CompactionStrategy for TemperatureSensorTopic {
935 type S = TemperatureSensorCompactionState;
936 type KS = ();
937
938 fn key_init(&self) -> Self::KS {}
939
940 fn key(_key_state: &mut Self::KS, r: &ConsumerRecord) -> Option<Key> {
941 Some(r.key)
942 }
943
944 async fn init(&self) -> TemperatureSensorCompactionState {
945 TemperatureSensorCompactionState {
946 temperature_events: NthKeyBasedRetention::new(
947 MAX_TEMPERATURE_SENSOR_IDS_PER_COMPACTION,
948 MAX_TEMPERATURE_SENSOR_TEMPS_PER_ID,
949 )
950 .init()
951 .await,
952 remaining_events: KeyBasedRetention::new(
955 2 * MAX_TEMPERATURE_SENSOR_IDS_PER_COMPACTION,
956 )
957 .init()
958 .await,
959 }
960 }
961
962 fn reduce(
963 state: &mut TemperatureSensorCompactionState,
964 key: Key,
965 record: ConsumerRecord,
966 ) -> MaxKeysReached {
967 let Ok(event_type) = TemperatureSensorEventKey::try_from(key) else {
968 return MaxKeysReached(false);
969 };
970
971 if matches!(event_type, TemperatureSensorEventKey::TemperatureSensed(_)) {
972 NthKeyBasedRetention::reduce(&mut state.temperature_events, key, record)
973 } else {
974 KeyBasedRetention::reduce(&mut state.remaining_events, key, record)
975 }
976 }
977
978 fn collect(state: TemperatureSensorCompactionState) -> CompactionMap {
979 let mut compaction_map = NthKeyBasedRetention::collect(state.temperature_events);
980 compaction_map.extend(KeyBasedRetention::collect(state.remaining_events));
981 compaction_map
982 }
983 }
984
985 #[tokio::test]
988 async fn test_both_retention_types() {
989 let e0 = TemperatureSensorEvent::BatteryLevelSensed(0, 10);
990 let v0 = postcard::to_stdvec(&e0).unwrap();
991 let r0 = ConsumerRecord {
992 topic: TemperatureSensorTopic::name(),
993 headers: vec![],
994 timestamp: None,
995 key: e0.into(),
996 value: v0,
997 partition: 0,
998 offset: 0,
999 };
1000
1001 let e1 = TemperatureSensorEvent::BatteryLevelSensed(0, 8);
1002 let v1 = postcard::to_stdvec(&e1).unwrap();
1003 let r1 = ConsumerRecord {
1004 topic: TemperatureSensorTopic::name(),
1005 headers: vec![],
1006 timestamp: None,
1007 key: e1.into(),
1008 value: v1,
1009 partition: 0,
1010 offset: 1,
1011 };
1012
1013 let e2 = TemperatureSensorEvent::TemperatureSensed(0, 30);
1014 let v2 = postcard::to_stdvec(&e2).unwrap();
1015 let r2 = ConsumerRecord {
1016 topic: TemperatureSensorTopic::name(),
1017 headers: vec![],
1018 timestamp: None,
1019 key: e2.into(),
1020 value: v2,
1021 partition: 0,
1022 offset: 2,
1023 };
1024
1025 let e3 = TemperatureSensorEvent::TemperatureSensed(0, 31);
1026 let v3 = postcard::to_stdvec(&e3).unwrap();
1027 let r3 = ConsumerRecord {
1028 topic: TemperatureSensorTopic::name(),
1029 headers: vec![],
1030 timestamp: None,
1031 key: e3.into(),
1032 value: v3,
1033 partition: 0,
1034 offset: 3,
1035 };
1036
1037 let mut expected_compactor_result = HashMap::new();
1038
1039 expected_compactor_result.insert(r1.key, 1);
1040
1041 expected_compactor_result.insert(r2.key, 2);
1042
1043 let compaction = TemperatureSensorTopic;
1044
1045 let mut state = compaction.init().await;
1046
1047 assert_eq!(
1048 TemperatureSensorTopic::reduce(
1049 &mut state,
1050 TemperatureSensorTopic::key(&mut (), &r0).unwrap(),
1051 r0
1052 ),
1053 MaxKeysReached(false)
1054 );
1055
1056 assert_eq!(
1057 TemperatureSensorTopic::reduce(
1058 &mut state,
1059 TemperatureSensorTopic::key(&mut (), &r1).unwrap(),
1060 r1
1061 ),
1062 MaxKeysReached(false),
1063 );
1064
1065 assert_eq!(
1066 TemperatureSensorTopic::reduce(
1067 &mut state,
1068 TemperatureSensorTopic::key(&mut (), &r2).unwrap(),
1069 r2
1070 ),
1071 MaxKeysReached(false)
1072 );
1073
1074 assert_eq!(
1075 TemperatureSensorTopic::reduce(
1076 &mut state,
1077 TemperatureSensorTopic::key(&mut (), &r3).unwrap(),
1078 r3
1079 ),
1080 MaxKeysReached(false)
1081 );
1082
1083 assert_eq!(
1084 TemperatureSensorTopic::collect(state),
1085 expected_compactor_result
1086 );
1087 }
1088
1089 #[tokio::test]
1090 async fn test_key_state() {
1091 let compaction = KeyBasedRetention::with_qids(
1092 |r| std::str::from_utf8(&r.value).map(|s| s.into()).ok(),
1093 2,
1094 );
1095
1096 let mut key_state = compaction.key_init();
1097
1098 assert_eq!(
1099 KeyBasedRetention::key(
1100 &mut key_state,
1101 &ConsumerRecord {
1102 topic: "some-topic".into(),
1103 headers: vec![],
1104 timestamp: None,
1105 key: 0,
1106 value: "some-key".as_bytes().to_vec(),
1107 partition: 0,
1108 offset: 10,
1109 },
1110 ),
1111 Some(0)
1112 );
1113
1114 assert_eq!(
1115 KeyBasedRetention::key(
1116 &mut key_state,
1117 &ConsumerRecord {
1118 topic: "some-topic".into(),
1119 headers: vec![],
1120 timestamp: None,
1121 key: 0,
1122 value: "some-key".as_bytes().to_vec(),
1123 partition: 0,
1124 offset: 10,
1125 },
1126 ),
1127 Some(0)
1128 );
1129
1130 assert_eq!(
1131 KeyBasedRetention::key(
1132 &mut key_state,
1133 &ConsumerRecord {
1134 topic: "some-topic".into(),
1135 headers: vec![],
1136 timestamp: None,
1137 key: 0,
1138 value: "some-other-key".as_bytes().to_vec(),
1139 partition: 0,
1140 offset: 10,
1141 },
1142 ),
1143 Some(1)
1144 );
1145
1146 assert_eq!(
1147 key_state.unwrap().0,
1148 HashMap::from([("some-key".into(), 0), ("some-other-key".into(), 1)])
1149 );
1150 }
1151
1152 #[derive(Clone)]
1153 struct TestCommitLog;
1154
1155 #[async_trait]
1156 impl CommitLog for TestCommitLog {
1157 async fn offsets(&self, _topic: Topic, _partition: Partition) -> Option<PartitionOffsets> {
1158 todo!()
1159 }
1160
1161 async fn produce(&self, _record: ProducerRecord) -> ProduceReply {
1162 todo!()
1163 }
1164
1165 fn scoped_subscribe<'a>(
1166 &'a self,
1167 _consumer_group_name: &str,
1168 _offsets: Vec<ConsumerOffset>,
1169 _subscriptions: Vec<Subscription>,
1170 _idle_timeout: Option<Duration>,
1171 ) -> Pin<Box<dyn Stream<Item = ConsumerRecord> + Send + 'a>> {
1172 Box::pin(stream!({
1173 yield ConsumerRecord {
1174 topic: Topic::from(""),
1175 headers: vec![],
1176 timestamp: None,
1177 key: 0,
1178 value: b"".to_vec(),
1179 partition: 0,
1180 offset: 0,
1181 };
1182 yield ConsumerRecord {
1183 topic: Topic::from(""),
1184 headers: vec![],
1185 timestamp: None,
1186 key: 1,
1187 value: b"".to_vec(),
1188 partition: 0,
1189 offset: 1,
1190 };
1191 }))
1192 }
1193 }
1194
1195 struct TestCompactionStrategy;
1196
1197 #[async_trait]
1198 impl CompactionStrategy for TestCompactionStrategy {
1199 type S = CompactionMap;
1200 type KS = ();
1201
1202 fn key_init(&self) -> Self::KS {}
1203
1204 fn key(_key_state: &mut Self::KS, r: &ConsumerRecord) -> Option<Key> {
1205 Some(r.key)
1206 }
1207
1208 async fn init(&self) -> Self::S {
1209 CompactionMap::new()
1210 }
1211
1212 fn reduce(state: &mut Self::S, key: Key, record: ConsumerRecord) -> MaxKeysReached {
1213 if state.is_empty() {
1214 state.insert(key, record.offset);
1215 MaxKeysReached(false)
1216 } else {
1217 MaxKeysReached(true)
1218 }
1219 }
1220
1221 fn collect(state: Self::S) -> CompactionMap {
1222 state
1223 }
1224 }
1225
1226 #[tokio::test]
1227 async fn test_compactor_end_to_end() {
1228 let topic = Topic::from("my-topic");
1229
1230 let compaction_dir = env::temp_dir().join("test_compactor_end_to_end");
1231 let _ = fs::remove_dir_all(&compaction_dir);
1232 let _ = fs::create_dir_all(&compaction_dir);
1233 println!("Writing to {compaction_dir:?}");
1234
1235 let cl = TestCommitLog;
1236 let cs = TestCompactionStrategy;
1237 let sts = ScopedTopicSubscriber::new(cl, topic);
1238
1239 let num_ages = Arc::new(AtomicU32::new(0));
1240 let tso_num_ages = num_ages.clone();
1241 let num_new_work_writers = Arc::new(AtomicU32::new(0));
1242 let tso_num_new_work_writers = num_new_work_writers.clone();
1243 let num_recover_histories = Arc::new(AtomicU32::new(0));
1244 let tso_num_recover_histories = num_recover_histories.clone();
1245 let num_rename_histories = Arc::new(AtomicU32::new(0));
1246 let tso_num_rename_histories = num_rename_histories.clone();
1247 let work_file = compaction_dir.join("work_file");
1248 let tso_work_file = work_file.clone();
1249
1250 let tso = TopicStorageOps::new(
1251 move || {
1252 tso_num_ages.clone().fetch_add(1, Ordering::Relaxed);
1253 Ok(Some(1))
1254 },
1255 move || {
1256 tso_num_new_work_writers
1257 .clone()
1258 .fetch_add(1, Ordering::Relaxed);
1259 File::create(tso_work_file.clone())
1260 },
1261 move || {
1262 tso_num_recover_histories
1263 .clone()
1264 .fetch_add(1, Ordering::Relaxed);
1265 Ok(())
1266 },
1267 move || {
1268 tso_num_rename_histories
1269 .clone()
1270 .fetch_add(1, Ordering::Relaxed);
1271 Ok(())
1272 },
1273 );
1274
1275 let mut c = Compactor::new(cs, 1, sts, tso);
1276
1277 let mut steps = 1u32;
1278 c.step(1).await;
1279 while steps < 10 && !c.is_idle() {
1280 c.step(1).await;
1281 steps = steps.wrapping_add(1);
1282 }
1283
1284 assert!(c.is_idle());
1285
1286 assert_eq!(num_ages.load(Ordering::Relaxed), 2);
1287 assert_eq!(num_new_work_writers.load(Ordering::Relaxed), 2);
1288 assert_eq!(num_recover_histories.load(Ordering::Relaxed), 1);
1289 assert_eq!(num_rename_histories.load(Ordering::Relaxed), 2);
1290
1291 let mut f = File::open(work_file).unwrap();
1292 let mut buf = vec![];
1293 let _ = f.read_to_end(&mut buf).unwrap();
1294
1295 assert_eq!(
1297 buf,
1298 [0, 0, 0, 0, 0, 0, 138, 124, 42, 87, 0, 0, 0, 1, 0, 1, 247, 109, 0, 0]
1299 );
1300 }
1301}