#![allow(clippy::expect_used, clippy::too_many_lines)]
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use bytes::{Bytes, BytesMut};
use futures::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use magnetar_proto::producer::OutgoingMessage;
use magnetar_proto::{
ConnectionConfig, CreateProducerRequest, FrameError, decode_one, encode_command, pb,
};
use magnetar_runtime_moonpool::{Client, ClientError, MoonpoolEngine};
use moonpool_core::{NetworkProvider, Providers, TaskProvider, TcpListenerTrait, TimeProvider};
use moonpool_sim::providers::SimProviders;
use moonpool_sim::{SimContext, SimulationBuilder, SimulationError, SimulationResult, Workload};
use parking_lot::Mutex;
const BROKER_PORT: u16 = 6650;
const SEND_TIMEOUT_SECS: u64 = 10;
async fn read_into<S: AsyncRead + Unpin>(
stream: &mut S,
buf: &mut BytesMut,
) -> std::io::Result<usize> {
let mut tmp = vec![0u8; 64 * 1024];
let n = stream.read(&mut tmp).await?;
buf.extend_from_slice(&tmp[..n]);
Ok(n)
}
fn emit_connected(out: &mut BytesMut) {
let cmd = pb::BaseCommand {
r#type: pb::base_command::Type::Connected as i32,
connected: Some(pb::CommandConnected {
server_version: "magnetar-virtual-clock-driver".to_owned(),
protocol_version: Some(21),
max_message_size: Some(5 * 1024 * 1024),
feature_flags: Some(pb::FeatureFlags::default()),
}),
..Default::default()
};
let _ = encode_command(out, &cmd);
}
fn emit_pong(out: &mut BytesMut) {
let cmd = pb::BaseCommand {
r#type: pb::base_command::Type::Pong as i32,
pong: Some(pb::CommandPong {}),
..Default::default()
};
let _ = encode_command(out, &cmd);
}
fn emit_lookup_response(out: &mut BytesMut, request_id: u64) {
let cmd = pb::BaseCommand {
r#type: pb::base_command::Type::LookupResponse as i32,
lookup_topic_response: Some(pb::CommandLookupTopicResponse {
broker_service_url: None,
broker_service_url_tls: None,
response: Some(pb::command_lookup_topic_response::LookupType::Connect as i32),
request_id,
authoritative: Some(true),
error: None,
message: None,
proxy_through_service_url: Some(false),
}),
..Default::default()
};
let _ = encode_command(out, &cmd);
}
fn emit_producer_success(out: &mut BytesMut, request_id: u64) {
let cmd = pb::BaseCommand {
r#type: pb::base_command::Type::ProducerSuccess as i32,
producer_success: Some(pb::CommandProducerSuccess {
request_id,
producer_name: "magnetar-virtual-clock-driver".to_owned(),
last_sequence_id: Some(-1),
schema_version: None,
topic_epoch: Some(0),
producer_ready: Some(true),
}),
..Default::default()
};
let _ = encode_command(out, &cmd);
}
async fn handle_session<S>(mut stream: S) -> SimulationResult<()>
where
S: AsyncRead + AsyncWrite + Unpin + Send,
{
let mut read_buf = BytesMut::with_capacity(64 * 1024);
let mut out_buf = BytesMut::with_capacity(64 * 1024);
loop {
loop {
let mut framed = read_buf.clone().freeze();
let before = framed.len();
let frame = match decode_one(&mut framed) {
Ok(f) => f,
Err(FrameError::Incomplete { .. }) => break,
Err(_) => return Ok(()),
};
let consumed = before - framed.len();
let _ = read_buf.split_to(consumed);
let Ok(kind) = pb::base_command::Type::try_from(frame.command.r#type) else {
continue;
};
match kind {
pb::base_command::Type::Connect => emit_connected(&mut out_buf),
pb::base_command::Type::Ping => emit_pong(&mut out_buf),
pb::base_command::Type::Lookup => {
if let Some(l) = &frame.command.lookup_topic {
emit_lookup_response(&mut out_buf, l.request_id);
}
}
pb::base_command::Type::Producer => {
if let Some(p) = &frame.command.producer {
emit_producer_success(&mut out_buf, p.request_id);
}
}
_ => {}
}
}
if !out_buf.is_empty() {
if stream.write_all(&out_buf).await.is_err() {
return Ok(());
}
if stream.flush().await.is_err() {
return Ok(());
}
out_buf.clear();
}
match read_into(&mut stream, &mut read_buf).await {
Ok(0) | Err(_) => return Ok(()),
Ok(_) => {}
}
}
}
struct SendTimeoutBroker {
sessions_handled: Arc<Mutex<u32>>,
}
impl SendTimeoutBroker {
fn new() -> Self {
Self {
sessions_handled: Arc::new(Mutex::new(0)),
}
}
}
#[async_trait]
impl Workload for SendTimeoutBroker {
fn name(&self) -> &str {
"broker"
}
async fn run(&mut self, ctx: &SimContext) -> SimulationResult<()> {
let network = ctx.network().clone();
let bind_addr = format!("{}:{BROKER_PORT}", ctx.my_ip());
let listener = network
.bind(&bind_addr)
.await
.map_err(|e| SimulationError::InvalidState(format!("broker bind: {e}")))?;
let shutdown = ctx.shutdown().clone();
let handled = self.sessions_handled.clone();
let task = ctx.providers().task().clone();
loop {
tokio::select! {
() = shutdown.cancelled() => return Ok(()),
inbound = listener.accept() => {
match inbound {
Ok((stream, _peer)) => {
*handled.lock() += 1;
let _h = task.spawn_task(
"virtual-clock-driver-broker-session",
async move {
let _ = handle_session(stream).await;
},
);
}
Err(_) => return Ok(()),
}
}
}
}
}
}
#[derive(Debug, Default)]
struct ClientObservation {
timed_out: Option<bool>,
last_error: Option<String>,
virtual_elapsed: Option<Duration>,
}
struct SendTimeoutClient {
obs: Arc<Mutex<ClientObservation>>,
}
impl SendTimeoutClient {
fn new() -> Self {
Self {
obs: Arc::new(Mutex::new(ClientObservation::default())),
}
}
}
#[async_trait]
impl Workload for SendTimeoutClient {
fn name(&self) -> &str {
"client"
}
async fn run(&mut self, ctx: &SimContext) -> SimulationResult<()> {
let broker_ip = ctx
.peer("broker")
.ok_or_else(|| SimulationError::InvalidState("broker peer missing".into()))?;
let addr = format!("{broker_ip}:{BROKER_PORT}");
let engine = MoonpoolEngine::new(ctx.providers().clone());
let time = ctx.providers().time().clone();
let connect = tokio::time::timeout(
Duration::from_secs(20),
Client::connect_plain(&engine, &addr, ConnectionConfig::default()),
)
.await;
let Ok(Ok(client)) = connect else {
self.obs.lock().last_error = Some(format!("connect_plain failed: {connect:?}"));
return Ok(());
};
let producer = match tokio::time::timeout(
Duration::from_secs(20),
client.open_producer(CreateProducerRequest {
topic: "persistent://public/default/virtual-clock-driver".to_owned(),
send_timeout: Some(Duration::from_secs(SEND_TIMEOUT_SECS)),
..Default::default()
}),
)
.await
{
Ok(Ok(p)) => p,
other => {
self.obs.lock().last_error = Some(format!("open_producer failed: {other:?}"));
return Ok(());
}
};
let t_before = time.now();
let payload = Bytes::from_static(b"will-time-out");
let payload_len = u32::try_from(payload.len()).unwrap_or(u32::MAX);
let send_fut = producer.send(OutgoingMessage {
payload,
metadata: pb::MessageMetadata::default(),
uncompressed_size: payload_len,
num_messages: 1,
txn_id: None,
source_message_id: None,
});
let resolved: Option<Result<magnetar_proto::MessageId, ClientError>> =
tokio::time::timeout(Duration::from_secs(30), async {
tokio::pin!(send_fut);
for _ in 0..32 {
tokio::task::yield_now().await;
}
for _ in 0..12 {
tokio::select! {
biased;
result = &mut send_fut => return Some(result),
slept = time.sleep(Duration::from_secs(1)) => {
if slept.is_err() {
break;
}
for _ in 0..32 {
tokio::task::yield_now().await;
}
}
}
}
tokio::select! {
biased;
result = &mut send_fut => Some(result),
() = tokio::task::yield_now() => None,
}
})
.await
.unwrap_or(None);
let t_after = time.now();
let mut obs = self.obs.lock();
obs.virtual_elapsed = Some(t_after.saturating_sub(t_before));
match resolved {
Some(Err(ClientError::Broker { code, message })) => {
let timed_out = code == -1 && message.to_lowercase().contains("timeout");
obs.timed_out = Some(timed_out);
if !timed_out {
obs.last_error = Some(format!(
"send resolved with non-timeout broker error code={code} msg={message}"
));
}
}
Some(Ok(_msg_id)) => {
obs.timed_out = Some(false);
obs.last_error = Some(
"send returned Ok — broker never replies; the deadline must have fired"
.to_owned(),
);
}
Some(Err(other)) => {
obs.timed_out = Some(false);
obs.last_error = Some(format!("send resolved with non-broker error: {other:?}"));
}
None => {
obs.timed_out = None;
obs.last_error =
Some("driver pump exited before the send future resolved".to_owned());
}
}
Ok(())
}
}
#[test]
fn driver_loop_send_timeout_fires_against_virtual_clock() {
let broker = SendTimeoutBroker::new();
let sessions = broker.sessions_handled.clone();
let client = SendTimeoutClient::new();
let obs = client.obs.clone();
let _report = SimulationBuilder::new()
.workload(broker)
.workload(client)
.set_debug_seeds(vec![1_234_567_890_u64])
.set_iterations(1)
.run();
let handled = *sessions.lock();
assert!(
handled >= 1,
"broker must have accepted the client's CONNECT (sessions_handled={handled})"
);
let obs = obs.lock();
assert_eq!(
obs.timed_out,
Some(true),
"send must resolve with the synthetic timeout sentinel under virtual time \
(obs={obs:?}); a `None` or `Some(false)` means the driver loop is still \
reading host time somewhere on the timeout path (HIGH-5, ADR-0011)."
);
let elapsed = obs.virtual_elapsed.unwrap_or_default();
assert!(
elapsed >= Duration::from_secs(SEND_TIMEOUT_SECS),
"virtual time must have advanced past the deadline before resolution \
(elapsed={elapsed:?}, obs={obs:?})"
);
}
#[allow(dead_code)]
fn _engine_sim_providers_compiles(providers: SimProviders) {
let _engine: MoonpoolEngine<SimProviders> = MoonpoolEngine::new(providers);
}