use std::ops::ControlFlow;
use ahash::RandomState;
use indexmap::IndexMap;
use tracing::{debug, info};
use crate::{
core::{StatemapInstallState, StatemapInstallerHashmap},
events::{ReplicatorCandidateEvent, ReplicatorCandidateEventTimingsTrait},
utils::installer_utils::{is_queue_item_above_version, is_queue_item_serializable, is_queue_item_state_match},
};
#[derive(Debug, Default)]
#[allow(dead_code)]
pub struct DbgQueueFilterSummary<T> {
pub filter_enter_count: usize,
pub filter_exit_count: usize,
pub filter_reject_items: Vec<T>,
}
#[derive(Debug, Default)]
#[allow(dead_code)]
pub struct DbgQueueInstallItemsSummary<T> {
pub installable_items: Vec<T>,
pub filter_steps_insights: Vec<DbgQueueFilterSummary<T>>,
}
#[derive(Debug, Default)]
pub struct StatemapInstallerQueue {
pub queue: IndexMap<u64, StatemapInstallerHashmap, RandomState>,
pub snapshot_version: u64,
}
impl StatemapInstallerQueue {
pub fn update_snapshot(&mut self, snapshot_version: u64) {
if snapshot_version > self.snapshot_version {
debug!("Updating snapshot_version to {}", snapshot_version);
self.snapshot_version = snapshot_version;
}
}
pub fn insert_queue_item(&mut self, version: &u64, installer_item: StatemapInstallerHashmap) {
self.queue.insert(*version, installer_item);
}
pub fn update_queue_item_state(&mut self, version: &u64, state: StatemapInstallState) -> Option<i128> {
let item = self.queue.get_mut(version)?;
item.state = state;
item.events.get_event_timestamp(ReplicatorCandidateEvent::QueueStatemapReceived)
}
pub fn prune_till_version(&mut self, version: u64) -> Option<u64> {
let index = self.queue.get_index_of(&version)?;
Some(self.prune_till_index(index))
}
pub fn prune_till_index(&mut self, index: usize) -> u64 {
let queue_length = self.queue.len();
if index < queue_length {
let items = self.queue.drain(..=index);
let count = items.len() as u64;
drop(items);
if count > 0 {
info!(
"Pruned {count} items in statemap queue. Length changed from {queue_length} to {} | snapshot_version = {} ",
self.queue.len(),
self.snapshot_version
);
}
count
} else {
0
}
}
pub fn filter_items_by_state(&self, state: StatemapInstallState) -> impl Iterator<Item = &StatemapInstallerHashmap> {
self.queue.values().filter(is_queue_item_state_match(state))
}
#[deprecated]
#[allow(dead_code)]
pub(crate) fn dbg_get_versions_to_install(&self) -> DbgQueueInstallItemsSummary<&StatemapInstallerHashmap> {
let mut intermediate_steps = vec![];
let items_awaiting: Vec<&StatemapInstallerHashmap> = self.queue.values().filter(is_queue_item_state_match(StatemapInstallState::Awaiting)).collect();
let filter_on_awaiting_criteria = DbgQueueFilterSummary::<&StatemapInstallerHashmap> {
filter_enter_count: self.queue.len(),
filter_exit_count: items_awaiting.len(),
filter_reject_items: vec![],
};
let vec1 = vec![];
let vec2 = vec![];
let mut closure_above_version = is_queue_item_above_version(&self.snapshot_version);
let x: ControlFlow<_, _> = items_awaiting.iter().try_fold((vec1, vec2), |mut acc, x| {
if closure_above_version(x) {
acc.0.push(*x);
ControlFlow::Continue(acc)
} else {
acc.1.push(*x);
ControlFlow::Break(acc)
}
});
let (items_safepoint_match, items_safepoint_fail) = match x {
ControlFlow::Continue(v) => v,
ControlFlow::Break(v) => v,
};
let filter_on_snapshot_criteria = DbgQueueFilterSummary::<&StatemapInstallerHashmap> {
filter_enter_count: items_awaiting.len(),
filter_exit_count: items_safepoint_match.len(),
filter_reject_items: items_safepoint_fail,
};
let (final_items, items_non_serializable): (Vec<&StatemapInstallerHashmap>, Vec<&StatemapInstallerHashmap>) =
items_safepoint_match.into_iter().partition(is_queue_item_serializable(&self.queue));
let filter_on_serialization_criteria = DbgQueueFilterSummary::<&StatemapInstallerHashmap> {
filter_enter_count: filter_on_snapshot_criteria.filter_exit_count,
filter_exit_count: final_items.len(),
filter_reject_items: items_non_serializable,
};
intermediate_steps.push(filter_on_awaiting_criteria);
intermediate_steps.push(filter_on_snapshot_criteria);
intermediate_steps.push(filter_on_serialization_criteria);
DbgQueueInstallItemsSummary {
installable_items: final_items,
filter_steps_insights: intermediate_steps,
}
}
pub fn get_versions_to_install(&self) -> Vec<u64> {
self.queue
.values()
.filter(is_queue_item_state_match(StatemapInstallState::Awaiting))
.take_while(is_queue_item_above_version(&self.snapshot_version))
.filter(is_queue_item_serializable(&self.queue))
.map(|x| x.version)
.collect::<Vec<u64>>()
}
pub fn get_last_contiguous_installed_version(&self) -> Option<u64> {
if self.queue.is_empty() {
return None;
}
let start = self.queue.get_index_of(&self.snapshot_version).unwrap_or_default();
let (last_installed_version, _) = self
.queue
.get_range(start..)?
.iter()
.take_while(|(_, statemap_installer_item)| statemap_installer_item.state == StatemapInstallState::Installed)
.last()?;
Some(*last_installed_version)
}
}
#[cfg(test)]
mod tests {
use crate::{core::StatemapInstallerHashmap, events::StatemapEvents};
use super::StatemapInstallerQueue;
fn create_initial_test_installer_data(version: &u64, safepoint: Option<u64>) -> StatemapInstallerHashmap {
StatemapInstallerHashmap {
statemaps: vec![],
version: *version,
safepoint,
state: crate::core::StatemapInstallState::Awaiting,
events: StatemapEvents::default(),
}
}
#[test]
fn test_installer_queue() {
let mut installer_queue = StatemapInstallerQueue::default();
assert_eq!(installer_queue.snapshot_version, 0);
let version = 5;
installer_queue.insert_queue_item(
&version,
StatemapInstallerHashmap {
version,
safepoint: None,
state: crate::core::StatemapInstallState::Awaiting,
statemaps: vec![],
events: StatemapEvents::default(),
},
);
let version = 3;
installer_queue.insert_queue_item(
&version,
StatemapInstallerHashmap {
version,
safepoint: None,
state: crate::core::StatemapInstallState::Awaiting,
statemaps: vec![],
events: StatemapEvents::default(),
},
);
assert_eq!(installer_queue.queue.len(), 2);
assert_eq!(installer_queue.queue.last().unwrap().0, &3);
installer_queue.update_snapshot(5);
assert_eq!(installer_queue.snapshot_version, 5);
let count = installer_queue.prune_till_version(installer_queue.snapshot_version);
assert_eq!(count, Some(1));
installer_queue.update_queue_item_state(&5, crate::core::StatemapInstallState::Installed);
installer_queue.update_queue_item_state(&3, crate::core::StatemapInstallState::Installed);
assert_eq!(installer_queue.queue.len(), 1);
}
#[test]
fn test_installer_queue_items_pick_all() {
let mut installer_queue = StatemapInstallerQueue::default();
assert_eq!(installer_queue.snapshot_version, 0);
let version = 2;
let install_item = create_initial_test_installer_data(&version, None);
installer_queue.insert_queue_item(&version, install_item);
let version = 3;
let install_item = create_initial_test_installer_data(&version, None);
installer_queue.insert_queue_item(&version, install_item);
let version = 5;
let install_item = create_initial_test_installer_data(&version, None);
installer_queue.insert_queue_item(&version, install_item);
let versions_to_install = installer_queue.get_versions_to_install();
assert_eq!(versions_to_install.len(), 3);
installer_queue.update_queue_item_state(&2, crate::core::StatemapInstallState::Inflight);
installer_queue.update_queue_item_state(&3, crate::core::StatemapInstallState::Installed);
let versions_to_install = installer_queue.get_versions_to_install();
assert_eq!(versions_to_install.len(), 1);
}
#[test]
fn test_installer_queue_items_snapshot_less_than_safepoint() {
let mut installer_queue = StatemapInstallerQueue::default();
assert_eq!(installer_queue.snapshot_version, 0);
let version = 5;
let install_item = create_initial_test_installer_data(&version, None);
installer_queue.insert_queue_item(&version, install_item);
let version = 7;
let install_item = create_initial_test_installer_data(&version, Some(3));
installer_queue.insert_queue_item(&version, install_item);
let version = 9;
let install_item = create_initial_test_installer_data(&version, Some(5));
installer_queue.insert_queue_item(&version, install_item);
let version = 12;
let install_item = create_initial_test_installer_data(&version, Some(8));
installer_queue.insert_queue_item(&version, install_item);
installer_queue.update_snapshot(6);
let versions_to_install = installer_queue.get_versions_to_install();
assert_eq!(versions_to_install.len(), 2);
}
#[test]
fn test_installer_queue_items_snapshot_version_present_in_queue() {
let mut installer_queue = StatemapInstallerQueue::default();
assert_eq!(installer_queue.snapshot_version, 0);
let version = 5;
let install_item = create_initial_test_installer_data(&version, None);
installer_queue.insert_queue_item(&version, install_item);
let version = 7;
let install_item = create_initial_test_installer_data(&version, Some(3));
installer_queue.insert_queue_item(&version, install_item);
let version = 9;
let install_item = create_initial_test_installer_data(&version, Some(6));
installer_queue.insert_queue_item(&version, install_item);
let version = 12;
let install_item = create_initial_test_installer_data(&version, Some(9));
installer_queue.insert_queue_item(&version, install_item);
let version = 18;
let install_item = create_initial_test_installer_data(&version, Some(13));
installer_queue.insert_queue_item(&version, install_item);
installer_queue.update_snapshot(14);
assert_eq!(installer_queue.snapshot_version, 14);
let versions_to_install = installer_queue.get_versions_to_install();
assert_eq!(versions_to_install.len(), 4);
assert!(!versions_to_install.into_iter().any(|v| v == 12));
installer_queue.update_queue_item_state(&9, crate::core::StatemapInstallState::Installed);
let versions_to_install = installer_queue.get_versions_to_install();
assert_eq!(versions_to_install.len(), 4);
assert!(!versions_to_install.into_iter().any(|v| v == 9));
}
}