use std::{sync::Arc, time::Duration};
use metrics_util::{
CompositeKey, MetricKind,
debugging::{DebugValue, DebuggingRecorder, Snapshotter},
};
use tokio::time::sleep;
use tsoracle_core::Epoch;
use tsoracle_proto::v1::{GetTsRequest, tso_service_client::TsoServiceClient};
use tsoracle_server::test_fakes::InMemoryDriver;
use tsoracle_server::test_support::{
boot_server, wait_for_grpc_handshake, wait_until, wait_until_serving,
};
use tsoracle_server::{Server, ServingState};
type RecordedMetric = (
CompositeKey,
Option<metrics::Unit>,
Option<metrics::SharedString>,
DebugValue,
);
fn install_recorder() -> Snapshotter {
let recorder = DebuggingRecorder::new();
let snapshotter = recorder.snapshotter();
recorder.install().expect("install metrics recorder");
snapshotter
}
fn counter_value(snapshot: &[RecordedMetric], name: &str) -> u64 {
for (composite, _unit, _desc, value) in snapshot {
if composite.kind() == MetricKind::Counter && composite.key().name() == name {
if let DebugValue::Counter(n) = value {
return *n;
}
}
}
0
}
fn histogram_sample_count(snapshot: &[RecordedMetric], name: &str) -> usize {
for (composite, _unit, _desc, value) in snapshot {
if composite.kind() == MetricKind::Histogram && composite.key().name() == name {
if let DebugValue::Histogram(samples) = value {
return samples.len();
}
}
}
0
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn emits_documented_signals_end_to_end() {
let snapshotter = install_recorder();
let driver = Arc::new(InMemoryDriver::new());
let server = Server::builder()
.consensus_driver(driver.clone())
.failover_advance(Duration::from_millis(20))
.window_ahead(Duration::from_millis(50))
.build()
.unwrap();
let mut booted = boot_server(server).await;
driver.become_leader(Epoch(1));
wait_until_serving(&mut booted.state_rx).await;
wait_for_grpc_handshake(booted.addr, Duration::from_secs(5))
.await
.expect("tonic never accepted gRPC handshake");
let mut client = TsoServiceClient::connect(format!("http://{}", booted.addr))
.await
.unwrap();
let resp = client
.get_ts(GetTsRequest { count: 5 })
.await
.unwrap()
.into_inner();
assert_eq!(resp.count, 5);
sleep(Duration::from_millis(60)).await;
let resp = client
.get_ts(GetTsRequest { count: 3 })
.await
.unwrap()
.into_inner();
assert_eq!(resp.count, 3);
driver.become_follower(Some("10.9.8.7:50551".into()));
wait_until(&mut booted.state_rx, |s| {
matches!(
s,
ServingState::NotServing {
leader_endpoint: Some(_)
}
)
})
.await;
let err = client.get_ts(GetTsRequest { count: 1 }).await.unwrap_err();
assert_eq!(err.code(), tonic::Code::FailedPrecondition);
booted.shutdown().await.unwrap();
let snapshot: Vec<RecordedMetric> = snapshotter.snapshot().into_vec();
assert!(
counter_value(&snapshot, "tsoracle.get_ts.total") >= 2,
"tsoracle.get_ts.total did not increment for both GetTs calls"
);
assert!(
counter_value(&snapshot, "tsoracle.get_ts.timestamps_issued") >= 8,
"tsoracle.get_ts.timestamps_issued did not sum counts (expected >= 5 + 3)"
);
assert!(
counter_value(&snapshot, "tsoracle.window.extensions.total") >= 1,
"tsoracle.window.extensions.total never incremented; extend_window may not have run"
);
assert!(
histogram_sample_count(&snapshot, "tsoracle.window.extension_latency") >= 1,
"tsoracle.window.extension_latency recorded no samples"
);
assert!(
counter_value(&snapshot, "tsoracle.leader_transition.total") >= 2,
"tsoracle.leader_transition.total missed at least one of Leader/Follower transitions"
);
assert!(
histogram_sample_count(&snapshot, "tsoracle.leader_transition.fence_latency") >= 1,
"tsoracle.leader_transition.fence_latency recorded no samples"
);
assert!(
counter_value(&snapshot, "tsoracle.not_leader.total") >= 1,
"tsoracle.not_leader.total never incremented despite a NOT_LEADER rejection"
);
}