use talos_suffix::SuffixItem;
use time::OffsetDateTime;
use crate::{
core::StatemapItem,
events::{EventTimingsMap, ReplicatorCandidateEvent},
suffix::ReplicatorSuffixItemTrait,
};
pub fn get_filtered_batch<'a, T: ReplicatorSuffixItemTrait + 'a>(messages: impl Iterator<Item = &'a SuffixItem<T>>) -> impl Iterator<Item = &'a SuffixItem<T>> {
messages.into_iter().take_while(|&m| m.is_decided)
}
pub fn get_statemap_from_suffix_items<'a, T: ReplicatorSuffixItemTrait + 'a>(
messages: impl Iterator<Item = &'a SuffixItem<T>>,
) -> Vec<(u64, Vec<StatemapItem>, EventTimingsMap)> {
messages.into_iter().fold(vec![], |mut acc, m| {
let mut event_timings = m.item.get_all_timings();
event_timings.insert(
ReplicatorCandidateEvent::ReplicatorStatemapPicked,
OffsetDateTime::now_utc().unix_timestamp_nanos(),
);
if m.item.get_safepoint().is_none() {
acc.push((m.item_ver, vec![], event_timings));
return acc;
}
match m.item.get_statemap().as_ref() {
Some(sm_items) => {
let state_maps_to_append = sm_items.iter().map(|sm| {
let key = sm.keys().next().unwrap().to_string();
let payload = sm.get(&key).unwrap().clone();
StatemapItem {
action: key,
payload,
version: m.item_ver,
safepoint: *m.item.get_safepoint(),
}
});
acc.push((m.item_ver, state_maps_to_append.collect::<Vec<StatemapItem>>(), event_timings));
acc
}
None => {
acc.push((m.item_ver, vec![], event_timings));
acc
}
}
})
}