use std::{
collections::HashSet,
sync::{atomic::AtomicU64, Arc},
time::Duration,
};
use axum::async_trait;
use serde_json::json;
use tokio::sync::mpsc::{self, error::TryRecvError};
use crate::{
callbacks::ReplicatorSnapshotProvider,
core::StatemapInstallState,
events::StatemapEvents,
services::statemap_queue_service::{self, StatemapQueueServiceConfig},
StatemapItem, StatemapQueueChannelMessage,
};
#[derive(Default)]
pub struct MockSnapshotApi {
pub snapshot: AtomicU64,
}
#[async_trait]
impl ReplicatorSnapshotProvider for MockSnapshotApi {
async fn get_snapshot(&self) -> Result<u64, String> {
let snapshot = self.snapshot.load(std::sync::atomic::Ordering::Relaxed);
Ok(snapshot)
}
async fn update_snapshot(&self, version: u64) -> Result<(), String> {
self.snapshot.store(version, std::sync::atomic::Ordering::Relaxed);
Ok(())
}
}
#[tokio::test]
async fn test_queue_service_snapshot_version_above_safepoint_below_snapshot() {
let (statemaps_tx, statemaps_rx) = mpsc::channel(50);
let (_statemap_installation_tx, statemap_installation_rx) = mpsc::channel(50);
let (installation_tx, mut installation_rx) = mpsc::channel(50);
let (replicator_feedback_tx, _) = mpsc::channel(50);
let config = StatemapQueueServiceConfig {
queue_cleanup_frequency_ms: 60_000, enable_stats: false,
};
let snapshot_api = MockSnapshotApi { snapshot: 8.into() };
let mut queue_svc = statemap_queue_service::StatemapQueueService::new(
statemaps_rx,
installation_tx,
statemap_installation_rx,
replicator_feedback_tx,
snapshot_api.into(),
config,
0,
None,
);
tokio::spawn(async move {
queue_svc.run().await.unwrap();
});
let statemap_item = StatemapItem::new("TestAction".to_string(), 10, json!({"version": 10}), Some(7));
statemaps_tx
.send(StatemapQueueChannelMessage::Message((10, vec![statemap_item], StatemapEvents::default())))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(10)).await;
let k = installation_rx.try_recv().unwrap();
assert_eq!(k.0, 10);
}
#[tokio::test]
async fn test_queue_service_version_and_safepoint_below_snapshot() {
let (statemaps_tx, statemaps_rx) = mpsc::channel(50);
let (_, statemap_installation_rx) = mpsc::channel(50);
let (installation_tx, mut installation_rx) = mpsc::channel(50);
let (replicator_feedback_tx, _) = mpsc::channel(50);
let config = StatemapQueueServiceConfig {
queue_cleanup_frequency_ms: 60_000, enable_stats: false,
};
let snapshot_api = MockSnapshotApi { snapshot: 8.into() };
let mut queue_svc = statemap_queue_service::StatemapQueueService::new(
statemaps_rx,
installation_tx,
statemap_installation_rx,
replicator_feedback_tx,
snapshot_api.into(),
config,
0,
None,
);
tokio::spawn(async move {
queue_svc.run().await.unwrap();
});
let statemap_item = StatemapItem::new("TestAction".to_string(), 7, json!({"version": 7}), Some(5));
statemaps_tx
.send(StatemapQueueChannelMessage::Message((7, vec![statemap_item], StatemapEvents::default())))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(10)).await;
let receive_result = installation_rx.try_recv();
assert_eq!(receive_result, Err(TryRecvError::Empty));
}
#[tokio::test]
async fn test_queue_service_insert_install_feedbacks() {
let (statemaps_tx, statemaps_rx) = mpsc::channel(50);
let (installation_feedback_tx, installation_feedback_rx) = mpsc::channel(50);
let (installation_tx, mut installation_rx) = mpsc::channel(50);
let (replicator_feedback_tx, _) = mpsc::channel(50);
let config = StatemapQueueServiceConfig {
queue_cleanup_frequency_ms: 60_000, enable_stats: false,
};
let initial_snapshot_version = 15;
let snapshot_api = MockSnapshotApi {
snapshot: initial_snapshot_version.into(),
};
let mut queue_svc = statemap_queue_service::StatemapQueueService::new(
statemaps_rx,
installation_tx,
installation_feedback_rx,
replicator_feedback_tx,
snapshot_api.into(),
config,
0,
None,
);
queue_svc.statemap_queue.snapshot_version = initial_snapshot_version;
let install_vers = 7;
let safepoint = Some(5);
let statemap_item = StatemapItem::new(
"TestAction".to_string(),
install_vers,
json!({"mockPayload": format!("some mock data {install_vers}")}),
safepoint,
);
statemaps_tx
.send(StatemapQueueChannelMessage::Message((
install_vers,
vec![statemap_item],
StatemapEvents::default(),
)))
.await
.unwrap();
let install_vers = 8;
let safepoint = None;
let statemap_item = StatemapItem::new(
"TestAction".to_string(),
install_vers,
json!({"mockPayload": format!("some mock data {install_vers}")}),
safepoint,
);
statemaps_tx
.send(StatemapQueueChannelMessage::Message((
install_vers,
vec![statemap_item],
StatemapEvents::default(),
)))
.await
.unwrap();
queue_svc.run_once().await.unwrap();
queue_svc.run_once().await.unwrap();
let receive_result = installation_rx.try_recv();
assert_eq!(receive_result, Err(TryRecvError::Empty));
let install_vers = 16;
let safepoint = None;
let statemap_item = StatemapItem::new(
"TestAction".to_string(),
install_vers,
json!({"mockPayload": format!("some mock data {install_vers}")}),
safepoint,
);
statemaps_tx
.send(StatemapQueueChannelMessage::Message((
install_vers,
vec![statemap_item],
StatemapEvents::default(),
)))
.await
.unwrap();
let install_vers = 17;
let safepoint = Some(2);
let statemap_item = StatemapItem::new(
"TestAction".to_string(),
install_vers,
json!({"mockPayload": format!("some mock data {install_vers}")}),
safepoint,
);
statemaps_tx
.send(StatemapQueueChannelMessage::Message((
install_vers,
vec![statemap_item],
StatemapEvents::default(),
)))
.await
.unwrap();
let install_vers = 18;
let safepoint = Some(2);
let statemap_item = StatemapItem::new(
"TestAction".to_string(),
install_vers,
json!({"mockPayload": format!("some mock data {install_vers}")}),
safepoint,
);
statemaps_tx
.send(StatemapQueueChannelMessage::Message((
install_vers,
vec![statemap_item],
StatemapEvents::default(),
)))
.await
.unwrap();
let install_vers = 19;
let safepoint = Some(16);
let statemap_item = StatemapItem::new(
"TestAction".to_string(),
install_vers,
json!({"mockPayload": format!("some mock data {install_vers}")}),
safepoint,
);
statemaps_tx
.send(StatemapQueueChannelMessage::Message((
install_vers,
vec![statemap_item],
StatemapEvents::default(),
)))
.await
.unwrap();
queue_svc.run_once().await.unwrap();
queue_svc.run_once().await.unwrap();
queue_svc.run_once().await.unwrap();
queue_svc.run_once().await.unwrap();
assert_eq!(queue_svc.statemap_queue.snapshot_version, initial_snapshot_version);
assert_eq!(installation_rx.max_capacity() - installation_rx.capacity(), 3);
{
let _ = installation_rx.try_recv().unwrap();
let _ = installation_rx.try_recv().unwrap();
let _ = installation_rx.try_recv().unwrap();
}
installation_feedback_tx
.send(crate::core::StatemapInstallationStatus::Success(18))
.await
.unwrap();
queue_svc.run_once().await.unwrap();
assert_eq!(queue_svc.statemap_queue.snapshot_version, initial_snapshot_version);
installation_feedback_tx
.send(crate::core::StatemapInstallationStatus::Success(16))
.await
.unwrap();
queue_svc.run_once().await.unwrap();
assert_eq!(installation_rx.max_capacity() - installation_rx.capacity(), 1);
assert_eq!(queue_svc.statemap_queue.snapshot_version, 16);
installation_feedback_tx
.send(crate::core::StatemapInstallationStatus::Error(17, "Error".to_string()))
.await
.unwrap();
queue_svc.run_once().await.unwrap();
assert_eq!(installation_rx.max_capacity() - installation_rx.capacity(), 2);
let install_vers = 20;
let safepoint = Some(17);
let statemap_item = StatemapItem::new(
"TestAction".to_string(),
install_vers,
json!({"mockPayload": format!("some mock data {install_vers}")}),
safepoint,
);
statemaps_tx
.send(StatemapQueueChannelMessage::Message((
install_vers,
vec![statemap_item],
StatemapEvents::default(),
)))
.await
.unwrap();
queue_svc.run_once().await.unwrap();
assert_eq!(installation_rx.max_capacity() - installation_rx.capacity(), 2);
assert_eq!(queue_svc.statemap_queue.snapshot_version, 16);
let installed_versions = queue_svc
.statemap_queue
.filter_items_by_state(crate::core::StatemapInstallState::Installed)
.map(|x| x.version)
.collect::<HashSet<u64>>();
assert!(installed_versions.contains(&7));
assert!(installed_versions.contains(&8));
assert!(installed_versions.contains(&16));
assert!(installed_versions.contains(&18));
let inflight_versions = queue_svc
.statemap_queue
.filter_items_by_state(crate::core::StatemapInstallState::Inflight)
.map(|x| x.version)
.collect::<HashSet<u64>>();
assert!(inflight_versions.contains(&17));
assert!(inflight_versions.contains(&19));
let awaiting_versions = queue_svc
.statemap_queue
.filter_items_by_state(crate::core::StatemapInstallState::Awaiting)
.map(|x| x.version)
.collect::<HashSet<u64>>();
assert!(awaiting_versions.contains(&20));
}
#[tokio::test]
async fn test_queue_service_effects_of_resetting_snapshot() {
let (statemaps_tx, statemaps_rx) = mpsc::channel(50);
let (installation_feedback_tx, installation_feedback_rx) = mpsc::channel(50);
let (installation_tx, mut installation_rx) = mpsc::channel(50);
let (replicator_feedback_tx, _) = mpsc::channel(50);
let config = StatemapQueueServiceConfig {
queue_cleanup_frequency_ms: 60_000, enable_stats: false,
};
let initial_snapshot_version = 10;
let snapshot_api: Arc<MockSnapshotApi> = Arc::new(MockSnapshotApi {
snapshot: initial_snapshot_version.into(),
});
let mut queue_svc = statemap_queue_service::StatemapQueueService::new(
statemaps_rx,
installation_tx,
installation_feedback_rx,
replicator_feedback_tx,
snapshot_api.clone(),
config,
0,
None,
);
queue_svc.statemap_queue.snapshot_version = initial_snapshot_version;
let install_vers = 12;
let safepoint = Some(2);
let statemap_item = StatemapItem::new(
"TestAction".to_string(),
install_vers,
json!({"mockPayload": format!("some mock data {install_vers}")}),
safepoint,
);
statemaps_tx
.send(StatemapQueueChannelMessage::Message((
install_vers,
vec![statemap_item],
StatemapEvents::default(),
)))
.await
.unwrap();
queue_svc.run_once().await.unwrap();
let install_vers = 13;
let safepoint = Some(12);
let statemap_item = StatemapItem::new(
"TestAction".to_string(),
install_vers,
json!({"mockPayload": format!("some mock data {install_vers}")}),
safepoint,
);
statemaps_tx
.send(StatemapQueueChannelMessage::Message((
install_vers,
vec![statemap_item],
StatemapEvents::default(),
)))
.await
.unwrap();
queue_svc.run_once().await.unwrap();
let install_vers = 15;
let safepoint = None;
let statemap_item = StatemapItem::new(
"TestAction".to_string(),
install_vers,
json!({"mockPayload": format!("some mock data {install_vers}")}),
safepoint,
);
statemaps_tx
.send(StatemapQueueChannelMessage::Message((
install_vers,
vec![statemap_item],
StatemapEvents::default(),
)))
.await
.unwrap();
queue_svc.run_once().await.unwrap();
assert_eq!(installation_rx.max_capacity() - installation_rx.capacity(), 1);
assert_eq!(queue_svc.statemap_queue.snapshot_version, initial_snapshot_version);
let res = installation_rx.try_recv().unwrap();
installation_feedback_tx
.send(crate::core::StatemapInstallationStatus::Success(res.0))
.await
.unwrap();
queue_svc.run_once().await.unwrap();
assert_eq!(installation_rx.max_capacity() - installation_rx.capacity(), 2);
assert_eq!(queue_svc.statemap_queue.snapshot_version, 12);
let install_vers = 20;
let safepoint = Some(18);
let statemap_item = StatemapItem::new(
"TestAction".to_string(),
install_vers,
json!({"mockPayload": format!("some mock data {install_vers}")}),
safepoint,
);
statemaps_tx
.send(StatemapQueueChannelMessage::Message((
install_vers,
vec![statemap_item],
StatemapEvents::default(),
)))
.await
.unwrap();
queue_svc.run_once().await.unwrap();
assert_eq!(installation_rx.max_capacity() - installation_rx.capacity(), 2);
assert_eq!(queue_svc.statemap_queue.queue.len(), 4);
let _ = snapshot_api.update_snapshot(19).await;
assert_eq!(snapshot_api.get_snapshot().await.unwrap(), 19);
statemaps_tx.send(StatemapQueueChannelMessage::UpdateSnapshot).await.unwrap();
queue_svc.run_once().await.unwrap();
assert_eq!(queue_svc.statemap_queue.snapshot_version, 19);
assert_eq!(queue_svc.statemap_queue.queue.len(), 1);
assert_eq!(installation_rx.max_capacity() - installation_rx.capacity(), 3);
let res = installation_rx.try_recv().unwrap();
installation_feedback_tx
.send(crate::core::StatemapInstallationStatus::Success(res.0))
.await
.unwrap();
assert_eq!(res.0, 13);
let res = installation_rx.try_recv().unwrap();
installation_feedback_tx
.send(crate::core::StatemapInstallationStatus::Success(res.0))
.await
.unwrap();
assert_eq!(res.0, 15);
assert_eq!(installation_rx.max_capacity() - installation_rx.capacity(), 1);
}
#[tokio::test]
async fn test_snapshot_updated_via_callback() {
let (statemaps_tx, statemaps_rx) = mpsc::channel(50);
let (installation_feedback_tx, installation_feedback_rx) = mpsc::channel(50);
let (installation_tx, mut installation_rx) = mpsc::channel(50);
let (replicator_feedback_tx, _) = mpsc::channel(50);
let config = StatemapQueueServiceConfig {
queue_cleanup_frequency_ms: 30_000, enable_stats: false,
};
let initial_snapshot_version = 10;
let snapshot_api: Arc<MockSnapshotApi> = Arc::new(MockSnapshotApi {
snapshot: initial_snapshot_version.into(),
});
let mut queue_svc = statemap_queue_service::StatemapQueueService::new(
statemaps_rx,
installation_tx,
installation_feedback_rx,
replicator_feedback_tx,
snapshot_api.clone(),
config,
0,
None,
);
assert_eq!(queue_svc.statemap_queue.snapshot_version, 0);
statemaps_tx.send(StatemapQueueChannelMessage::UpdateSnapshot).await.unwrap();
queue_svc.run_once().await.unwrap();
assert_eq!(queue_svc.statemap_queue.snapshot_version, initial_snapshot_version);
assert_eq!(queue_svc.get_last_saved_snapshot(), initial_snapshot_version);
let install_vers = 12;
let safepoint = Some(10);
let statemap_item = StatemapItem::new(
"TestAction".to_string(),
install_vers,
json!({"mockPayload": format!("some mock data {install_vers}")}),
safepoint,
);
statemaps_tx
.send(StatemapQueueChannelMessage::Message((
install_vers,
vec![statemap_item],
StatemapEvents::default(),
)))
.await
.unwrap();
queue_svc.run_once().await.unwrap();
let install_vers = 13;
let safepoint = Some(10);
let statemap_item = StatemapItem::new(
"TestAction".to_string(),
install_vers,
json!({"mockPayload": format!("some mock data {install_vers}")}),
safepoint,
);
statemaps_tx
.send(StatemapQueueChannelMessage::Message((
install_vers,
vec![statemap_item],
StatemapEvents::default(),
)))
.await
.unwrap();
queue_svc.run_once().await.unwrap();
assert_eq!(installation_rx.max_capacity() - installation_rx.capacity(), 2);
let res_12 = installation_rx.try_recv().unwrap();
let res_13 = installation_rx.try_recv().unwrap();
installation_feedback_tx
.send(crate::core::StatemapInstallationStatus::Success(res_13.0))
.await
.unwrap();
queue_svc.run_once().await.unwrap();
assert_eq!(queue_svc.get_last_saved_snapshot(), initial_snapshot_version);
queue_svc.handle_interval_arm().await.unwrap();
assert_eq!(queue_svc.get_last_saved_snapshot(), initial_snapshot_version);
installation_feedback_tx
.send(crate::core::StatemapInstallationStatus::Success(res_12.0))
.await
.unwrap();
queue_svc.run_once().await.unwrap();
assert_eq!(queue_svc.statemap_queue.snapshot_version, 13);
queue_svc.handle_interval_arm().await.unwrap();
tokio::time::sleep(Duration::from_millis(10)).await;
assert_eq!(snapshot_api.get_snapshot().await.unwrap(), 13);
assert_eq!(queue_svc.get_last_saved_snapshot(), 13);
let install_vers = 19;
let safepoint = Some(13);
let statemap_item = StatemapItem::new(
"TestAction".to_string(),
install_vers,
json!({"mockPayload": format!("some mock data {install_vers}")}),
safepoint,
);
statemaps_tx
.send(StatemapQueueChannelMessage::Message((
install_vers,
vec![statemap_item],
StatemapEvents::default(),
)))
.await
.unwrap();
queue_svc.run_once().await.unwrap();
let install_vers = 20;
let safepoint = Some(13);
let statemap_item = StatemapItem::new(
"TestAction".to_string(),
install_vers,
json!({"mockPayload": format!("some mock data {install_vers}")}),
safepoint,
);
statemaps_tx
.send(StatemapQueueChannelMessage::Message((
install_vers,
vec![statemap_item],
StatemapEvents::default(),
)))
.await
.unwrap();
queue_svc.run_once().await.unwrap();
let install_vers = 23;
let safepoint = Some(13);
let statemap_item = StatemapItem::new(
"TestAction".to_string(),
install_vers,
json!({"mockPayload": format!("some mock data {install_vers}")}),
safepoint,
);
statemaps_tx
.send(StatemapQueueChannelMessage::Message((
install_vers,
vec![statemap_item],
StatemapEvents::default(),
)))
.await
.unwrap();
queue_svc.run_once().await.unwrap();
let install_vers = 25;
let safepoint = Some(13);
let statemap_item = StatemapItem::new(
"TestAction".to_string(),
install_vers,
json!({"mockPayload": format!("some mock data {install_vers}")}),
safepoint,
);
statemaps_tx
.send(StatemapQueueChannelMessage::Message((
install_vers,
vec![statemap_item],
StatemapEvents::default(),
)))
.await
.unwrap();
queue_svc.run_once().await.unwrap();
queue_svc.run_once().await.unwrap();
let res_19 = installation_rx.try_recv().unwrap();
assert_eq!(res_19.0, 19);
let res_20 = installation_rx.try_recv().unwrap();
assert_eq!(res_20.0, 20);
let res_23 = installation_rx.try_recv().unwrap();
assert_eq!(res_23.0, 23);
let res_25 = installation_rx.try_recv().unwrap();
assert_eq!(res_25.0, 25);
installation_feedback_tx
.send(crate::core::StatemapInstallationStatus::Success(res_25.0))
.await
.unwrap();
queue_svc.run_once().await.unwrap();
installation_feedback_tx
.send(crate::core::StatemapInstallationStatus::Success(res_20.0))
.await
.unwrap();
queue_svc.run_once().await.unwrap();
queue_svc.handle_interval_arm().await.unwrap();
assert_eq!(snapshot_api.get_snapshot().await.unwrap(), 13);
tokio::time::sleep(Duration::from_millis(10)).await;
assert_eq!(snapshot_api.get_snapshot().await.unwrap(), 13);
assert_eq!(queue_svc.get_last_saved_snapshot(), 13);
installation_feedback_tx
.send(crate::core::StatemapInstallationStatus::Success(res_19.0))
.await
.unwrap();
queue_svc.run_once().await.unwrap();
queue_svc.handle_interval_arm().await.unwrap();
assert_eq!(snapshot_api.get_snapshot().await.unwrap(), 13);
tokio::time::sleep(Duration::from_millis(10)).await;
assert_eq!(snapshot_api.get_snapshot().await.unwrap(), 20);
assert_eq!(queue_svc.get_last_saved_snapshot(), 20);
installation_feedback_tx
.send(crate::core::StatemapInstallationStatus::Success(res_23.0))
.await
.unwrap();
queue_svc.run_once().await.unwrap();
queue_svc.handle_interval_arm().await.unwrap();
assert_eq!(snapshot_api.get_snapshot().await.unwrap(), 20);
tokio::time::sleep(Duration::from_millis(10)).await;
assert_eq!(snapshot_api.get_snapshot().await.unwrap(), 25);
assert_eq!(queue_svc.get_last_saved_snapshot(), 25);
}
#[tokio::test]
async fn test_queue_service_last_installed_snapshot_against_internal_snapshot() {
let (statemaps_tx, statemaps_rx) = mpsc::channel(50);
let (_installation_feedback_tx, installation_feedback_rx) = mpsc::channel(50);
let (installation_tx, installation_rx) = mpsc::channel(50);
let (replicator_feedback_tx, _) = mpsc::channel(50);
let config = StatemapQueueServiceConfig {
queue_cleanup_frequency_ms: 30_000, enable_stats: false,
};
let initial_snapshot_version = 10;
let snapshot_api: Arc<MockSnapshotApi> = Arc::new(MockSnapshotApi {
snapshot: initial_snapshot_version.into(),
});
let mut queue_svc = statemap_queue_service::StatemapQueueService::new(
statemaps_rx,
installation_tx,
installation_feedback_rx,
replicator_feedback_tx,
snapshot_api.clone(),
config,
0,
None,
);
assert_eq!(queue_svc.statemap_queue.snapshot_version, 0);
statemaps_tx.send(StatemapQueueChannelMessage::UpdateSnapshot).await.unwrap();
queue_svc.run_once().await.unwrap();
assert_eq!(queue_svc.statemap_queue.snapshot_version, initial_snapshot_version);
assert_eq!(queue_svc.get_last_saved_snapshot(), initial_snapshot_version);
assert_eq!(queue_svc.statemap_queue.queue.len(), 0);
let install_vers = 8;
let safepoint = Some(0);
let statemap_item = StatemapItem::new(
"TestAction".to_string(),
install_vers,
json!({"mockPayload": format!("some mock data {install_vers}")}),
safepoint,
);
statemaps_tx
.send(StatemapQueueChannelMessage::Message((
install_vers,
vec![statemap_item],
StatemapEvents::default(),
)))
.await
.unwrap();
queue_svc.run_once().await.unwrap();
assert_eq!(queue_svc.statemap_queue.queue.len(), 1);
assert_eq!(
queue_svc.statemap_queue.queue.get(&install_vers).unwrap().state,
StatemapInstallState::Installed
);
let install_vers = 10;
let safepoint = Some(0);
let statemap_item = StatemapItem::new(
"TestAction".to_string(),
install_vers,
json!({"mockPayload": format!("some mock data {install_vers}")}),
safepoint,
);
statemaps_tx
.send(StatemapQueueChannelMessage::Message((
install_vers,
vec![statemap_item],
StatemapEvents::default(),
)))
.await
.unwrap();
queue_svc.run_once().await.unwrap();
assert_eq!(queue_svc.statemap_queue.queue.len(), 2);
assert_eq!(
queue_svc.statemap_queue.queue.get(&install_vers).unwrap().state,
StatemapInstallState::Installed
);
assert_eq!(installation_rx.max_capacity() - installation_rx.capacity(), 0);
let install_vers = 12;
let safepoint = Some(0);
let statemap_item = StatemapItem::new(
"TestAction".to_string(),
install_vers,
json!({"mockPayload": format!("some mock data {install_vers}")}),
safepoint,
);
statemaps_tx
.send(StatemapQueueChannelMessage::Message((
install_vers,
vec![statemap_item],
StatemapEvents::default(),
)))
.await
.unwrap();
queue_svc.run_once().await.unwrap();
assert_eq!(queue_svc.statemap_queue.queue.get(&install_vers).unwrap().state, StatemapInstallState::Inflight);
assert_eq!(installation_rx.max_capacity() - installation_rx.capacity(), 1);
assert_eq!(queue_svc.statemap_queue.queue.len(), 3);
queue_svc.handle_interval_arm().await.unwrap();
assert_eq!(queue_svc.statemap_queue.queue.len(), 1);
let install_vers = 14;
let safepoint = Some(0);
let statemap_item = StatemapItem::new(
"TestAction".to_string(),
install_vers,
json!({"mockPayload": format!("some mock data {install_vers}")}),
safepoint,
);
statemaps_tx
.send(StatemapQueueChannelMessage::Message((
install_vers,
vec![statemap_item],
StatemapEvents::default(),
)))
.await
.unwrap();
queue_svc.run_once().await.unwrap();
assert_eq!(queue_svc.statemap_queue.queue.len(), 2);
let _ = snapshot_api.update_snapshot(13).await;
assert_eq!(snapshot_api.get_snapshot().await.unwrap(), 13);
statemaps_tx.send(StatemapQueueChannelMessage::UpdateSnapshot).await.unwrap();
queue_svc.run_once().await.unwrap();
assert_eq!(queue_svc.statemap_queue.queue.len(), 1);
let _ = snapshot_api.update_snapshot(14).await;
assert_eq!(snapshot_api.get_snapshot().await.unwrap(), 14);
statemaps_tx.send(StatemapQueueChannelMessage::UpdateSnapshot).await.unwrap();
queue_svc.run_once().await.unwrap();
assert_eq!(queue_svc.statemap_queue.queue.len(), 0);
assert_eq!(queue_svc.statemap_queue.snapshot_version, 14)
}