#![forbid(unsafe_code)]
#![allow(clippy::expect_used)]
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use async_trait::async_trait;
use bytes::{Bytes, BytesMut};
use futures::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use magnetar_proto::{
ConnectionConfig, FrameError, ReplicatedSubscriptionMarkerKind, SubscribeRequest, decode_one,
encode_command, encode_payload, pb,
};
use magnetar_runtime_moonpool::{Client, MoonpoolEngine};
use moonpool_core::{NetworkProvider, Providers, TaskProvider, TcpListenerTrait, TimeProvider};
use moonpool_sim::{SimContext, SimulationBuilder, SimulationError, SimulationResult, Workload};
use parking_lot::Mutex;
mod common;
use common::sweep_seeds;
const BROKER_PORT: u16 = 6650;
const SIM_RUN_TIME_BUDGET: Duration = Duration::from_secs(30);
const TOPIC: &str = "persistent://public/default/replicated-sim";
const SUBSCRIPTION: &str = "sub-pip-33-sim";
async fn read_into<S: AsyncRead + Unpin>(
stream: &mut S,
buf: &mut BytesMut,
) -> std::io::Result<usize> {
let mut tmp = vec![0u8; 64 * 1024];
let n = stream.read(&mut tmp).await?;
buf.extend_from_slice(&tmp[..n]);
Ok(n)
}
#[derive(Default)]
struct Coordination {
parked: AtomicBool,
}
struct DelayedMarkerBroker {
coord: Arc<Coordination>,
}
impl DelayedMarkerBroker {
fn new(coord: Arc<Coordination>) -> Self {
Self { coord }
}
}
#[async_trait]
impl Workload for DelayedMarkerBroker {
fn name(&self) -> &str {
"broker"
}
async fn run(&mut self, ctx: &SimContext) -> SimulationResult<()> {
let network = ctx.network().clone();
let bind_addr = format!("{}:{BROKER_PORT}", ctx.my_ip());
let listener = network
.bind(&bind_addr)
.await
.map_err(|e| SimulationError::InvalidState(format!("broker bind: {e}")))?;
let shutdown = ctx.shutdown().clone();
let task = ctx.providers().task().clone();
let time = ctx.providers().time().clone();
loop {
tokio::select! {
() = shutdown.cancelled() => return Ok(()),
accepted = listener.accept() => {
match accepted {
Ok((stream, _peer)) => {
let coord = self.coord.clone();
let session_time = time.clone();
let _handle = task.spawn_task("delayed-marker-session", async move {
let _ = handle_session(stream, coord, session_time).await;
});
}
Err(_) => return Ok(()),
}
}
}
}
}
}
#[derive(Default)]
struct SessionState {
consumers: HashMap<u64, String>,
marker_pushed: bool,
}
async fn handle_session<S, T>(
mut stream: S,
coord: Arc<Coordination>,
time: T,
) -> SimulationResult<()>
where
S: AsyncRead + AsyncWrite + Unpin + Send,
T: TimeProvider,
{
let mut session = SessionState::default();
let mut read_buf = BytesMut::with_capacity(64 * 1024);
let mut out_buf = BytesMut::with_capacity(64 * 1024);
loop {
loop {
let mut framed = read_buf.clone().freeze();
let before = framed.len();
let frame = match decode_one(&mut framed) {
Ok(f) => f,
Err(FrameError::Incomplete { .. }) => break,
Err(_) => return Ok(()),
};
let consumed = before - framed.len();
let _ = read_buf.split_to(consumed);
handle_frame(&mut session, &frame, &mut out_buf);
}
if !session.marker_pushed
&& coord.parked.load(Ordering::SeqCst)
&& !session.consumers.is_empty()
{
for &consumer_id in session.consumers.keys() {
emit_snapshot_marker(&mut out_buf, consumer_id);
}
session.marker_pushed = true;
}
if !out_buf.is_empty() {
if stream.write_all(&out_buf).await.is_err() {
return Ok(());
}
if stream.flush().await.is_err() {
return Ok(());
}
out_buf.clear();
}
let tick = time.sleep(Duration::from_millis(2));
tokio::pin!(tick);
tokio::select! {
biased;
read = read_into(&mut stream, &mut read_buf) => {
match read {
Ok(0) | Err(_) => return Ok(()),
Ok(_) => {}
}
}
_ = &mut tick => {}
}
}
}
fn handle_frame(session: &mut SessionState, frame: &magnetar_proto::Frame, out: &mut BytesMut) {
let Ok(kind) = pb::base_command::Type::try_from(frame.command.r#type) else {
return;
};
match kind {
pb::base_command::Type::Connect => emit_connected(out),
pb::base_command::Type::Ping => emit_pong(out),
pb::base_command::Type::Lookup => {
if let Some(l) = &frame.command.lookup_topic {
emit_lookup_response(out, l.request_id);
}
}
pb::base_command::Type::Subscribe => {
if let Some(s) = &frame.command.subscribe {
session.consumers.insert(s.consumer_id, s.topic.clone());
emit_success(out, s.request_id);
}
}
_ => {}
}
}
fn emit_connected(out: &mut BytesMut) {
let cmd = pb::BaseCommand {
r#type: pb::base_command::Type::Connected as i32,
connected: Some(pb::CommandConnected {
server_version: "magnetar-sim-marker-broker".to_owned(),
protocol_version: Some(21),
max_message_size: Some(5 * 1024 * 1024),
feature_flags: Some(pb::FeatureFlags::default()),
}),
..Default::default()
};
let _ = encode_command(out, &cmd);
}
fn emit_pong(out: &mut BytesMut) {
let cmd = pb::BaseCommand {
r#type: pb::base_command::Type::Pong as i32,
pong: Some(pb::CommandPong {}),
..Default::default()
};
let _ = encode_command(out, &cmd);
}
fn emit_lookup_response(out: &mut BytesMut, request_id: u64) {
let cmd = pb::BaseCommand {
r#type: pb::base_command::Type::LookupResponse as i32,
lookup_topic_response: Some(pb::CommandLookupTopicResponse {
broker_service_url: None,
broker_service_url_tls: None,
response: Some(pb::command_lookup_topic_response::LookupType::Connect as i32),
request_id,
authoritative: Some(true),
error: None,
message: None,
proxy_through_service_url: Some(false),
}),
..Default::default()
};
let _ = encode_command(out, &cmd);
}
fn emit_success(out: &mut BytesMut, request_id: u64) {
let cmd = pb::BaseCommand {
r#type: pb::base_command::Type::Success as i32,
success: Some(pb::CommandSuccess {
request_id,
schema: None,
}),
..Default::default()
};
let _ = encode_command(out, &cmd);
}
fn emit_snapshot_marker(out: &mut BytesMut, consumer_id: u64) {
let cmd = pb::BaseCommand {
r#type: pb::base_command::Type::Message as i32,
message: Some(pb::CommandMessage {
consumer_id,
message_id: pb::MessageIdData {
ledger_id: 1,
entry_id: 1,
partition: None,
batch_index: None,
ack_set: Vec::new(),
batch_size: None,
first_chunk_message_id: None,
},
redelivery_count: Some(0),
ack_set: Vec::new(),
consumer_epoch: None,
}),
..Default::default()
};
let snapshot = pb::ReplicatedSubscriptionsSnapshot {
snapshot_id: "sim-snap".to_owned(),
local_message_id: Some(pb::MarkersMessageIdData {
ledger_id: 1,
entry_id: 1,
}),
clusters: vec![pb::ClusterMessageId {
cluster: "cluster-b".to_owned(),
message_id: pb::MarkersMessageIdData {
ledger_id: 1,
entry_id: 1,
},
}],
};
let mut payload = Vec::new();
prost::Message::encode(&snapshot, &mut payload).expect("encode snapshot");
let meta = pb::MessageMetadata {
producer_name: "broker-marker".to_owned(),
sequence_id: 0,
publish_time: 0,
marker_type: Some(ReplicatedSubscriptionMarkerKind::Snapshot.marker_type()),
..Default::default()
};
let _ = encode_payload(out, &cmd, &meta, &Bytes::from(payload));
}
struct MarkerClientWorkload {
coord: Arc<Coordination>,
outcome: Arc<Mutex<Option<Result<(), String>>>>,
}
impl MarkerClientWorkload {
fn new(coord: Arc<Coordination>) -> Self {
Self {
coord,
outcome: Arc::new(Mutex::new(None)),
}
}
}
#[async_trait]
impl Workload for MarkerClientWorkload {
fn name(&self) -> &str {
"client"
}
async fn setup(&mut self, _ctx: &SimContext) -> SimulationResult<()> {
self.coord.parked.store(false, Ordering::SeqCst);
*self.outcome.lock() = None;
Ok(())
}
async fn run(&mut self, ctx: &SimContext) -> SimulationResult<()> {
let broker_ip = ctx
.peer("broker")
.ok_or_else(|| SimulationError::InvalidState("broker peer missing".into()))?;
let addr = format!("{broker_ip}:{BROKER_PORT}");
let engine = MoonpoolEngine::new(ctx.providers().clone());
let result = self.drive(&engine, &addr).await;
let gate = match &result {
Ok(()) => Ok(()),
Err(reason) => Err(SimulationError::InvalidState(format!(
"marker accessor did not resolve: {reason}"
))),
};
*self.outcome.lock() = Some(result);
gate
}
async fn check(&mut self, _ctx: &SimContext) -> SimulationResult<()> {
match self.outcome.lock().take() {
Some(Ok(())) => Ok(()),
Some(Err(reason)) => Err(SimulationError::InvalidState(format!(
"marker accessor did not resolve: {reason}"
))),
None => Err(SimulationError::InvalidState(
"client workload did not record an outcome".into(),
)),
}
}
}
fn sim_connect_config() -> ConnectionConfig {
ConnectionConfig {
connect_timeout: Duration::from_millis(250),
connect_max_retries: 64,
operation_timeout: Duration::from_secs(20),
..ConnectionConfig::default()
}
}
impl MarkerClientWorkload {
async fn drive<P>(&self, engine: &MoonpoolEngine<P>, addr: &str) -> Result<(), String>
where
P: Providers,
{
let client = tokio::time::timeout(
Duration::from_secs(20),
Client::connect_plain(engine, addr, sim_connect_config()),
)
.await
.map_err(|_| "connect timed out".to_owned())?
.map_err(|e| format!("connect: {e:?}"))?;
let _consumer = tokio::time::timeout(
Duration::from_secs(5),
client.subscribe(SubscribeRequest {
topic: TOPIC.to_owned(),
subscription: SUBSCRIPTION.to_owned(),
receiver_queue_size: 32,
durable: true,
replicate_subscription_state: Some(true),
..Default::default()
}),
)
.await
.map_err(|_| "subscribe timed out".to_owned())?
.map_err(|e| format!("subscribe: {e:?}"))?;
self.coord.parked.store(true, Ordering::SeqCst);
let observed = tokio::time::timeout(
Duration::from_secs(15),
client.next_replicated_subscription_marker(),
)
.await
.map_err(|_| "next_replicated_subscription_marker hung (lost wakeup)".to_owned())?
.ok_or_else(|| "connection closed before marker arrived".to_owned())?;
if observed.marker.kind != ReplicatedSubscriptionMarkerKind::Snapshot {
return Err(format!(
"expected Snapshot marker, got {:?}",
observed.marker.kind
));
}
client.close().await;
Ok(())
}
}
#[test]
fn sim_delayed_marker_is_observed() {
let coord = Arc::new(Coordination::default());
let report = SimulationBuilder::new()
.run_time_budget(SIM_RUN_TIME_BUDGET)
.workload(DelayedMarkerBroker::new(coord.clone()))
.workload(MarkerClientWorkload::new(coord))
.set_iterations(1)
.run();
assert!(report.successful_runs >= 1, "report: {report:?}");
assert_eq!(report.failed_runs, 0, "report: {report:?}");
}
#[test]
fn sim_delayed_marker_is_observed_sweep_16_seeds() {
let coord = Arc::new(Coordination::default());
let report = SimulationBuilder::new()
.run_time_budget(SIM_RUN_TIME_BUDGET)
.workload(DelayedMarkerBroker::new(coord.clone()))
.workload(MarkerClientWorkload::new(coord))
.set_debug_seeds(sweep_seeds(16))
.set_iterations(16)
.run();
assert_eq!(report.failed_runs, 0, "report: {report:?}");
assert!(report.successful_runs >= 1, "report: {report:?}");
}