#![allow(clippy::expect_used)]
#![forbid(unsafe_code)]
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use bytes::BytesMut;
use futures::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use magnetar_proto::{
ConnectionConfig, CreateProducerRequest, FrameError, decode_one, encode_command, pb,
};
use magnetar_runtime_moonpool::{Client, MoonpoolEngine};
use moonpool_core::{NetworkProvider, Providers, TaskProvider, TcpListenerTrait};
use moonpool_sim::{SimContext, SimulationBuilder, SimulationError, SimulationResult, Workload};
use parking_lot::Mutex;
mod common;
use common::sweep_seeds;
const BROKER_PORT: u16 = 6650;
const ADVERTISED_BROKER_URL: &str = "pulsar://broker-pool-lifecycle.proxy.internal:6650";
const ADVERTISED_BROKER_HOST_PORT: &str = "broker-pool-lifecycle.proxy.internal:6650";
const RUN_TIME_BUDGET: Duration = Duration::from_secs(120);
const TIGHT_CONNECT_TIMEOUT: Duration = Duration::from_secs(1);
const GENEROUS_OPERATION_TIMEOUT: Duration = Duration::from_secs(60);
const HIGH_CONNECT_MAX_RETRIES: u32 = 64;
#[derive(Clone, Debug, Default)]
struct SessionRecord {
connect_proxy_to_broker_url: Option<String>,
frames: Vec<i32>,
}
#[derive(Clone, Debug)]
enum LifecycleOutcome {
PooledThenClean { sessions: Vec<SessionRecord> },
BoundedError(String),
}
struct ProxyWorkload {
sessions: Arc<Mutex<Vec<SessionRecord>>>,
}
#[async_trait]
impl Workload for ProxyWorkload {
fn name(&self) -> &str {
"broker"
}
async fn run(&mut self, ctx: &SimContext) -> SimulationResult<()> {
let network = ctx.network().clone();
let bind_addr = format!("{}:{BROKER_PORT}", ctx.my_ip());
let listener = network
.bind(&bind_addr)
.await
.map_err(|e| SimulationError::InvalidState(format!("proxy bind: {e}")))?;
let shutdown = ctx.shutdown().clone();
let sessions = self.sessions.clone();
let task = ctx.providers().task().clone();
loop {
tokio::select! {
() = shutdown.cancelled() => return Ok(()),
accepted = listener.accept() => {
match accepted {
Ok((stream, _peer)) => {
let session_idx = {
let mut s = sessions.lock();
s.push(SessionRecord::default());
s.len() - 1
};
let sessions_for_task = sessions.clone();
let _handle = task.spawn_task("proxy-session", async move {
let _ = handle_session(stream, sessions_for_task, session_idx).await;
});
}
Err(_) => return Ok(()),
}
}
}
}
}
}
async fn handle_session<S>(
mut stream: S,
sessions: Arc<Mutex<Vec<SessionRecord>>>,
session_idx: usize,
) -> SimulationResult<()>
where
S: AsyncRead + AsyncWrite + Unpin + Send,
{
let mut read_buf = BytesMut::with_capacity(64 * 1024);
let mut out_buf = BytesMut::with_capacity(64 * 1024);
loop {
loop {
let mut framed = read_buf.clone().freeze();
let before = framed.len();
let frame = match decode_one(&mut framed) {
Ok(f) => f,
Err(FrameError::Incomplete { .. }) => break,
Err(_) => return Ok(()),
};
let consumed = before - framed.len();
let _ = read_buf.split_to(consumed);
let kind = frame.command.r#type;
if matches!(
pb::base_command::Type::try_from(kind).ok(),
Some(pb::base_command::Type::Connect)
) {
if let Some(c) = &frame.command.connect {
sessions.lock()[session_idx]
.connect_proxy_to_broker_url
.clone_from(&c.proxy_to_broker_url);
}
} else {
sessions.lock()[session_idx].frames.push(kind);
}
handle_frame(&frame, &mut out_buf, session_idx);
}
if !out_buf.is_empty() {
if stream.write_all(&out_buf).await.is_err() {
return Ok(());
}
if stream.flush().await.is_err() {
return Ok(());
}
out_buf.clear();
}
let mut tmp = vec![0u8; 64 * 1024];
match stream.read(&mut tmp).await {
Ok(0) | Err(_) => return Ok(()),
Ok(n) => read_buf.extend_from_slice(&tmp[..n]),
}
}
}
fn handle_frame(frame: &magnetar_proto::Frame, out: &mut BytesMut, session_idx: usize) {
let Ok(kind) = pb::base_command::Type::try_from(frame.command.r#type) else {
return;
};
match kind {
pb::base_command::Type::Connect => {
let cmd = pb::BaseCommand {
r#type: pb::base_command::Type::Connected as i32,
connected: Some(pb::CommandConnected {
server_version: "magnetar-pool-lifecycle".to_owned(),
protocol_version: Some(21),
max_message_size: Some(5 * 1024 * 1024),
feature_flags: Some(pb::FeatureFlags::default()),
}),
..Default::default()
};
let _ = encode_command(out, &cmd);
}
pb::base_command::Type::Ping => {
let cmd = pb::BaseCommand {
r#type: pb::base_command::Type::Pong as i32,
pong: Some(pb::CommandPong {}),
..Default::default()
};
let _ = encode_command(out, &cmd);
}
pb::base_command::Type::Lookup => {
if let Some(l) = &frame.command.lookup_topic {
let proxy_through = session_idx == 0;
let cmd = pb::BaseCommand {
r#type: pb::base_command::Type::LookupResponse as i32,
lookup_topic_response: Some(pb::CommandLookupTopicResponse {
broker_service_url: Some(ADVERTISED_BROKER_URL.to_owned()),
broker_service_url_tls: None,
response: Some(
pb::command_lookup_topic_response::LookupType::Connect as i32,
),
request_id: l.request_id,
authoritative: Some(true),
error: None,
message: None,
proxy_through_service_url: Some(proxy_through),
}),
..Default::default()
};
let _ = encode_command(out, &cmd);
}
}
pb::base_command::Type::Producer => {
if let Some(p) = &frame.command.producer {
let cmd = pb::BaseCommand {
r#type: pb::base_command::Type::ProducerSuccess as i32,
producer_success: Some(pb::CommandProducerSuccess {
request_id: p.request_id,
producer_name: "pool-lifecycle".to_owned(),
last_sequence_id: Some(-1),
schema_version: None,
topic_epoch: Some(0),
producer_ready: Some(true),
}),
..Default::default()
};
let _ = encode_command(out, &cmd);
}
}
_ => {}
}
}
struct ClientWorkload {
sessions: Arc<Mutex<Vec<SessionRecord>>>,
outcomes: Arc<Mutex<Vec<LifecycleOutcome>>>,
}
impl ClientWorkload {
fn new(
sessions: Arc<Mutex<Vec<SessionRecord>>>,
outcomes: Arc<Mutex<Vec<LifecycleOutcome>>>,
) -> Self {
Self { sessions, outcomes }
}
}
#[async_trait]
impl Workload for ClientWorkload {
fn name(&self) -> &str {
"client"
}
async fn run(&mut self, ctx: &SimContext) -> SimulationResult<()> {
let broker_ip = ctx
.peer("broker")
.ok_or_else(|| SimulationError::InvalidState("broker peer missing".into()))?;
let addr = format!("{broker_ip}:{BROKER_PORT}");
let engine = MoonpoolEngine::new(ctx.providers().clone());
let cfg = ConnectionConfig {
connect_timeout: TIGHT_CONNECT_TIMEOUT,
operation_timeout: GENEROUS_OPERATION_TIMEOUT,
connect_max_retries: HIGH_CONNECT_MAX_RETRIES,
supervisor: Some(magnetar_proto::SupervisorConfig {
initial_backoff: Duration::from_millis(10),
max_backoff: Duration::from_millis(200),
mandatory_stop: Duration::from_secs(90),
max_attempts: Some(64),
..magnetar_proto::SupervisorConfig::default()
}),
..ConnectionConfig::default()
};
let client = match Client::connect_plain_supervised(&engine, &addr, cfg, None, None).await {
Ok(client) => client,
Err(err) => {
self.outcomes
.lock()
.push(LifecycleOutcome::BoundedError(format!(
"supervised connect failed: {err:?}"
)));
return Ok(());
}
};
let open = client
.open_producer(CreateProducerRequest {
topic: "persistent://public/default/pool-lifecycle-producer".to_owned(),
..Default::default()
})
.await;
let outcome = match open {
Ok(producer) => {
let sessions = self.sessions.lock().clone();
drop(producer);
client.close().await;
LifecycleOutcome::PooledThenClean { sessions }
}
Err(err) => {
client.close().await;
LifecycleOutcome::BoundedError(format!("open_producer failed: {err:?}"))
}
};
self.outcomes.lock().push(outcome);
Ok(())
}
async fn check(&mut self, _ctx: &SimContext) -> SimulationResult<()> {
self.sessions.lock().clear();
Ok(())
}
}
fn assert_every_iteration_pooled_then_clean(
outcomes: &Arc<Mutex<Vec<LifecycleOutcome>>>,
expected_iterations: usize,
) {
let snapshot = outcomes.lock().clone();
assert_eq!(
snapshot.len(),
expected_iterations,
"every iteration must record exactly one terminating outcome; got {} for {expected_iterations} iteration(s): {snapshot:?}",
snapshot.len(),
);
for (i, outcome) in snapshot.iter().enumerate() {
match outcome {
LifecycleOutcome::PooledThenClean { sessions } => {
assert!(
sessions.len() >= 2,
"iter {i}: proxy_through lookup must open a SECOND pooled connection; \
saw {} session(s): {sessions:?}",
sessions.len(),
);
let bootstrap = &sessions[0];
let pinned = &sessions[1];
assert!(
bootstrap.connect_proxy_to_broker_url.is_none(),
"iter {i}: bootstrap CONNECT must NOT carry proxy_to_broker_url, got {:?}",
bootstrap.connect_proxy_to_broker_url,
);
assert_eq!(
pinned.connect_proxy_to_broker_url.as_deref(),
Some(ADVERTISED_BROKER_HOST_PORT),
"iter {i}: pinned pool CONNECT must carry proxy_to_broker_url = host:port \
(no scheme), got {:?}",
pinned.connect_proxy_to_broker_url,
);
assert!(
pinned
.frames
.contains(&(pb::base_command::Type::Producer as i32)),
"iter {i}: pooled producer open must ride the pinned connection; \
pinned frames {:?}",
pinned.frames,
);
}
LifecycleOutcome::BoundedError(reason) => panic!(
"iter {i}: pooled proxy connection did not establish + tear down cleanly within \
the dual cap: {reason}"
),
}
}
}
fn assert_every_iteration_terminated(
outcomes: &Arc<Mutex<Vec<LifecycleOutcome>>>,
expected_iterations: usize,
) {
let snapshot = outcomes.lock().clone();
assert_eq!(
snapshot.len(),
expected_iterations,
"every iteration must record exactly one terminating outcome; got {} for {expected_iterations} iteration(s): {snapshot:?}",
snapshot.len(),
);
}
#[test]
fn moonpool_pooled_proxy_connection_opens_and_tears_down_clean_smoke() {
let sessions = Arc::new(Mutex::new(Vec::<SessionRecord>::new()));
let outcomes = Arc::new(Mutex::new(Vec::<LifecycleOutcome>::new()));
let report = SimulationBuilder::new()
.run_time_budget(RUN_TIME_BUDGET)
.workload(ProxyWorkload {
sessions: sessions.clone(),
})
.workload(ClientWorkload::new(sessions, outcomes.clone()))
.set_iterations(1)
.run();
assert_eq!(
report.iterations, 1,
"expected exactly one iteration to be dispatched and terminate: {report:?}",
);
assert_every_iteration_pooled_then_clean(&outcomes, 1);
}
#[test]
fn moonpool_pooled_proxy_connection_opens_and_tears_down_clean_sweep_8_seeds() {
let sessions = Arc::new(Mutex::new(Vec::<SessionRecord>::new()));
let outcomes = Arc::new(Mutex::new(Vec::<LifecycleOutcome>::new()));
let report = SimulationBuilder::new()
.run_time_budget(RUN_TIME_BUDGET)
.workload(ProxyWorkload {
sessions: sessions.clone(),
})
.workload(ClientWorkload::new(sessions, outcomes.clone()))
.set_debug_seeds(sweep_seeds(8))
.set_iterations(8)
.run();
assert_eq!(
report.iterations, 8,
"every seed must be dispatched and terminate (no silent hang): {report:?}",
);
assert_every_iteration_terminated(&outcomes, 8);
}