#![allow(clippy::expect_used)]
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use bytes::BytesMut;
use futures::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use magnetar_proto::{
AntiThrashThreshold, ConnectionConfig, CreateProducerRequest, FrameError, SupervisorConfig,
decode_one, encode_command, pb,
};
use magnetar_runtime_moonpool::{Client, MoonpoolEngine};
use moonpool_core::{NetworkProvider, Providers, TaskProvider, TcpListenerTrait, TimeProvider};
use moonpool_sim::providers::SimProviders;
use moonpool_sim::{SimContext, SimulationBuilder, SimulationError, SimulationResult, Workload};
use parking_lot::Mutex;
const BROKER_PORT: u16 = 6650;
async fn read_into<S: AsyncRead + Unpin>(
stream: &mut S,
buf: &mut BytesMut,
) -> std::io::Result<usize> {
let mut tmp = vec![0u8; 64 * 1024];
let n = stream.read(&mut tmp).await?;
buf.extend_from_slice(&tmp[..n]);
Ok(n)
}
fn emit_connected(out: &mut BytesMut) {
let cmd = pb::BaseCommand {
r#type: pb::base_command::Type::Connected as i32,
connected: Some(pb::CommandConnected {
server_version: "magnetar-supervised-redial-sim".to_owned(),
protocol_version: Some(21),
max_message_size: Some(5 * 1024 * 1024),
feature_flags: Some(pb::FeatureFlags::default()),
}),
..Default::default()
};
let _ = encode_command(out, &cmd);
}
fn emit_pong(out: &mut BytesMut) {
let cmd = pb::BaseCommand {
r#type: pb::base_command::Type::Pong as i32,
pong: Some(pb::CommandPong {}),
..Default::default()
};
let _ = encode_command(out, &cmd);
}
fn emit_lookup_response(out: &mut BytesMut, request_id: u64) {
let cmd = pb::BaseCommand {
r#type: pb::base_command::Type::LookupResponse as i32,
lookup_topic_response: Some(pb::CommandLookupTopicResponse {
broker_service_url: None,
broker_service_url_tls: None,
response: Some(pb::command_lookup_topic_response::LookupType::Connect as i32),
request_id,
authoritative: Some(true),
error: None,
message: None,
proxy_through_service_url: Some(false),
}),
..Default::default()
};
let _ = encode_command(out, &cmd);
}
fn emit_producer_success(out: &mut BytesMut, request_id: u64) {
let cmd = pb::BaseCommand {
r#type: pb::base_command::Type::ProducerSuccess as i32,
producer_success: Some(pb::CommandProducerSuccess {
request_id,
producer_name: "supervised-redial-sim".to_owned(),
last_sequence_id: Some(-1),
schema_version: None,
topic_epoch: Some(0),
producer_ready: Some(true),
}),
..Default::default()
};
let _ = encode_command(out, &cmd);
}
async fn handle_drop_after_create_session<S, T>(
mut stream: S,
delay: Duration,
time: T,
drops_performed: Arc<Mutex<u32>>,
) -> SimulationResult<()>
where
S: AsyncRead + AsyncWrite + Unpin + Send,
T: TimeProvider,
{
let mut read_buf = BytesMut::with_capacity(64 * 1024);
let mut out_buf = BytesMut::with_capacity(64 * 1024);
let mut sent_producer_success = false;
loop {
loop {
let mut framed = read_buf.clone().freeze();
let before = framed.len();
let frame = match decode_one(&mut framed) {
Ok(f) => f,
Err(FrameError::Incomplete { .. }) => break,
Err(_) => return Ok(()),
};
let consumed = before - framed.len();
let _ = read_buf.split_to(consumed);
let Ok(kind) = pb::base_command::Type::try_from(frame.command.r#type) else {
continue;
};
match kind {
pb::base_command::Type::Connect => emit_connected(&mut out_buf),
pb::base_command::Type::Ping => emit_pong(&mut out_buf),
pb::base_command::Type::Lookup => {
if let Some(l) = &frame.command.lookup_topic {
emit_lookup_response(&mut out_buf, l.request_id);
}
}
pb::base_command::Type::Producer => {
if let Some(p) = &frame.command.producer {
emit_producer_success(&mut out_buf, p.request_id);
sent_producer_success = true;
}
}
_ => {}
}
}
if !out_buf.is_empty() {
if stream.write_all(&out_buf).await.is_err() {
return Ok(());
}
if stream.flush().await.is_err() {
return Ok(());
}
out_buf.clear();
}
if sent_producer_success {
let _ = time.sleep(delay).await;
*drops_performed.lock() += 1;
return Ok(());
}
match read_into(&mut stream, &mut read_buf).await {
Ok(0) | Err(_) => return Ok(()),
Ok(_) => {}
}
}
}
struct DropAcceptCycleBroker {
delay_ms: u64,
sessions_accepted: Arc<Mutex<u32>>,
drops_performed: Arc<Mutex<u32>>,
}
impl DropAcceptCycleBroker {
fn new(delay_ms: u64) -> Self {
Self {
delay_ms,
sessions_accepted: Arc::new(Mutex::new(0)),
drops_performed: Arc::new(Mutex::new(0)),
}
}
}
#[async_trait]
impl Workload for DropAcceptCycleBroker {
fn name(&self) -> &str {
"broker"
}
async fn run(&mut self, ctx: &SimContext) -> SimulationResult<()> {
let network = ctx.network().clone();
let bind_addr = format!("{}:{BROKER_PORT}", ctx.my_ip());
let listener = network
.bind(&bind_addr)
.await
.map_err(|e| SimulationError::InvalidState(format!("broker bind: {e}")))?;
let shutdown = ctx.shutdown().clone();
let delay = Duration::from_millis(self.delay_ms);
let drops = self.drops_performed.clone();
let accepted = self.sessions_accepted.clone();
let providers = ctx.providers().clone();
let task = ctx.providers().task().clone();
loop {
tokio::select! {
() = shutdown.cancelled() => return Ok(()),
inbound = listener.accept() => {
match inbound {
Ok((stream, _peer)) => {
*accepted.lock() += 1;
let drops_for_session = drops.clone();
let time = providers.time().clone();
let session_delay = delay;
let _handle = task.spawn_task("drop-accept-cycle-session", async move {
let _ = handle_drop_after_create_session(
stream,
session_delay,
time,
drops_for_session,
)
.await;
});
}
Err(_) => return Ok(()),
}
}
}
}
}
}
struct SupervisedRedialClientWorkload;
impl SupervisedRedialClientWorkload {
fn new() -> Self {
Self
}
}
#[async_trait]
impl Workload for SupervisedRedialClientWorkload {
fn name(&self) -> &str {
"client"
}
async fn run(&mut self, ctx: &SimContext) -> SimulationResult<()> {
let broker_ip = ctx
.peer("broker")
.ok_or_else(|| SimulationError::InvalidState("broker peer missing".into()))?;
let addr = format!("{broker_ip}:{BROKER_PORT}");
let engine = MoonpoolEngine::new(ctx.providers().clone());
let cfg = ConnectionConfig {
supervisor: Some(SupervisorConfig {
initial_backoff: Duration::from_millis(5),
max_backoff: Duration::from_millis(40),
mandatory_stop: Duration::from_secs(60),
max_attempts: Some(64),
anti_thrash_threshold: Some(AntiThrashThreshold {
successful_attaches: 3,
window: Duration::from_secs(5),
drop_within: Duration::from_millis(200),
}),
drop_grace: Duration::from_millis(500),
max_backoff_after_thrash: Duration::from_millis(120),
}),
..ConnectionConfig::default()
};
let connect_res = tokio::time::timeout(
Duration::from_secs(20),
Client::connect_plain_supervised(&engine, &addr, cfg, None, None),
)
.await;
let Ok(Ok(client)) = connect_res else {
return Ok(());
};
for _ in 0..12u32 {
let _ = tokio::time::timeout(
Duration::from_millis(800),
client.open_producer(CreateProducerRequest {
topic: "persistent://public/default/sim-supervised-redial".to_owned(),
..Default::default()
}),
)
.await;
for _ in 0..64 {
tokio::task::yield_now().await;
}
let _ = ctx
.providers()
.time()
.sleep(Duration::from_millis(30))
.await;
}
client.close().await;
Ok(())
}
}
#[test]
fn supervised_loop_redials_under_drop_accept_cycle_sweep_8_seeds() {
let broker = DropAcceptCycleBroker::new(5);
let sessions_accepted = broker.sessions_accepted.clone();
let drops_performed = broker.drops_performed.clone();
let report = SimulationBuilder::new()
.workload(broker)
.workload(SupervisedRedialClientWorkload::new())
.set_debug_seeds(vec![
4_772_263_927_792_134_539,
1,
2,
3,
7,
42,
12_345,
9_999_999,
])
.set_iterations(8)
.run();
let accepts = *sessions_accepted.lock();
let drops = *drops_performed.lock();
assert!(
accepts >= 2,
"supervisor must redial at least once (accepts={accepts}, report={report:?})"
);
assert!(
drops >= 2,
"multi-cycle drop pattern must fire (drops={drops}, report={report:?})"
);
}
#[allow(dead_code)]
fn _engine_sim_providers_compiles(providers: SimProviders) {
let _engine: MoonpoolEngine<SimProviders> = MoonpoolEngine::new(providers);
}