use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use magnetar_proto::producer::OutgoingMessage;
use magnetar_proto::types::CompressionKind;
use magnetar_proto::{
ConnectionEvent, CreateProducerRequest, MessageId, OpOutcome, PendingOpKey, ProducerHandle,
ProducerStats, SequenceId,
};
use moonpool_core::Providers;
use crate::ConnectionShared;
use crate::client::{Client, ClientError};
use crate::crypto::MessageEncryptor;
pub struct Producer<P: Providers> {
pub(crate) shared: Arc<ConnectionShared>,
pub(crate) handle: ProducerHandle,
pub(crate) slot: Arc<magnetar_proto::ProducerSlot>,
pub(crate) compression: CompressionKind,
pub(crate) encryptor: Option<Arc<dyn MessageEncryptor>>,
pub(crate) close_guard: Arc<ProducerCloseGuard>,
pub(crate) _providers: std::marker::PhantomData<fn() -> P>,
}
#[derive(Debug)]
pub(crate) struct ProducerCloseGuard {
shared: Arc<ConnectionShared>,
handle: ProducerHandle,
slot: Arc<magnetar_proto::ProducerSlot>,
}
impl Drop for ProducerCloseGuard {
fn drop(&mut self) {
let already_closed = self.slot.state.lock().closed;
if already_closed {
return;
}
{
let mut conn = self.shared.inner.lock();
let _ = conn.close_producer_forget(self.handle);
}
self.shared.driver_waker.notify_one();
tracing::debug!(
topic = %self.slot.identity.topic,
handle = ?self.handle,
"producer dropped without explicit close — best-effort CloseProducer enqueued"
);
}
}
impl<P: Providers> Clone for Producer<P> {
fn clone(&self) -> Self {
Self {
shared: self.shared.clone(),
handle: self.handle,
slot: self.slot.clone(),
compression: self.compression,
encryptor: self.encryptor.clone(),
close_guard: self.close_guard.clone(),
_providers: std::marker::PhantomData,
}
}
}
impl<P: Providers> std::fmt::Debug for Producer<P> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Producer")
.field("handle", &self.handle)
.field("compression", &self.compression)
.finish_non_exhaustive()
}
}
impl<P: Providers> Producer<P> {
pub(crate) fn assemble(
shared: Arc<ConnectionShared>,
handle: ProducerHandle,
slot: Arc<magnetar_proto::ProducerSlot>,
compression: CompressionKind,
encryptor: Option<Arc<dyn MessageEncryptor>>,
) -> Self {
let close_guard = Arc::new(ProducerCloseGuard {
shared: shared.clone(),
handle,
slot: slot.clone(),
});
Self {
shared,
handle,
slot,
compression,
encryptor,
close_guard,
_providers: std::marker::PhantomData,
}
}
#[must_use]
pub fn handle(&self) -> ProducerHandle {
self.handle
}
#[must_use]
pub fn compression(&self) -> CompressionKind {
self.compression
}
#[must_use]
pub fn topic(&self) -> String {
self.slot.identity.topic.clone()
}
#[must_use]
pub fn name(&self) -> String {
self.slot.state.lock().name.clone().unwrap_or_default()
}
#[must_use]
pub fn is_closed(&self) -> bool {
self.slot.state.lock().closed
}
#[must_use]
pub fn is_connected(&self) -> bool {
self.shared.inner.lock().is_connected()
}
#[must_use]
pub fn last_disconnected_timestamp(&self) -> Option<std::time::SystemTime> {
self.shared.inner.lock().last_disconnected_timestamp()
}
#[must_use]
pub fn pending_count(&self) -> usize {
self.slot.state.lock().pending.len()
}
#[must_use]
pub fn last_sequence_id(&self) -> i64 {
self.slot.state.lock().last_sequence_id_pushed
}
#[must_use]
pub fn last_sequence_id_published(&self) -> i64 {
self.slot.state.lock().last_sequence_id_published
}
#[must_use]
pub fn batch_len(&self) -> usize {
self.slot.state.lock().batch.len()
}
#[must_use]
pub fn batch_bytes(&self) -> usize {
self.slot.state.lock().batch.current_size_bytes
}
#[must_use]
pub fn stats(&self) -> ProducerStats {
self.slot.state.lock().stats()
}
pub fn send(&self, mut msg: OutgoingMessage) -> SendFut {
let publish_time_ms = self.shared.now_wall_clock_ms();
if self.compression != CompressionKind::None {
tracing::debug!(
compression = ?self.compression,
"send rejected: compression not yet wired on the moonpool engine"
);
return SendFut {
shared: self.shared.clone(),
handle: self.handle,
state: SendState::Failed {
error: Some(ClientError::Other(format!(
"moonpool engine: compression {:?} not yet wired; \
use CompressionKind::None for now",
self.compression
))),
},
reserved_bytes: 0,
};
}
if let Some(encryptor) = self.encryptor.as_ref() {
match encryptor.encrypt(&msg.payload, &mut msg.metadata) {
Ok(ciphertext) => msg.payload = ciphertext,
Err(err) => {
tracing::debug!(error = %err, "send rejected: encryption failed");
return SendFut {
shared: self.shared.clone(),
handle: self.handle,
state: SendState::Failed {
error: Some(ClientError::Other(format!("encrypt: {err}"))),
},
reserved_bytes: 0,
};
}
}
}
let reserved_bytes = msg.payload.len() as u64;
match self.shared.memory_limit_policy {
magnetar_proto::MemoryLimitPolicy::FailImmediately => {
if let Err(err) = self.shared.try_reserve_memory(reserved_bytes) {
tracing::debug!(
payload_len = reserved_bytes,
"send rejected: memory limit exceeded"
);
return SendFut {
shared: self.shared.clone(),
handle: self.handle,
state: SendState::Failed {
error: Some(ClientError::Engine(err)),
},
reserved_bytes: 0,
};
}
self.queue_send(msg, publish_time_ms, reserved_bytes)
}
magnetar_proto::MemoryLimitPolicy::ProducerBlock => {
if self.shared.try_reserve_memory(reserved_bytes).is_ok() {
return self.queue_send(msg, publish_time_ms, reserved_bytes);
}
SendFut {
shared: self.shared.clone(),
handle: self.handle,
state: SendState::Reserving {
msg: Some(Box::new(msg)),
publish_time_ms,
bytes: reserved_bytes,
slab_key: None,
},
reserved_bytes: 0,
}
}
}
}
pub fn send_with_source_message_id(
&self,
source_msg_id: magnetar_proto::MessageId,
payload: impl Into<bytes::Bytes>,
metadata: magnetar_proto::pb::MessageMetadata,
) -> SendFut {
let payload = payload.into();
let uncompressed_size = u32::try_from(payload.len()).unwrap_or(u32::MAX);
self.send(OutgoingMessage {
payload,
metadata,
uncompressed_size,
num_messages: 1,
txn_id: None,
source_message_id: Some(source_msg_id),
})
}
fn queue_send(
&self,
msg: OutgoingMessage,
publish_time_ms: u64,
reserved_bytes: u64,
) -> SendFut {
debug_assert_eq!(
self.slot.identity.handle, self.handle,
"producer slot/handle mismatch: slot is for {:?} but handle is {:?}",
self.slot.identity.handle, self.handle,
);
let now = self.shared.now_instant();
let result = self.slot.queue_send(msg, publish_time_ms, now);
self.shared.driver_waker.notify_one();
match result {
Ok(seq) => {
tracing::trace!(
sequence_id = seq.0,
payload_len = reserved_bytes,
"send queued"
);
SendFut {
shared: self.shared.clone(),
handle: self.handle,
state: SendState::Pending { sequence_id: seq },
reserved_bytes,
}
}
Err(err) => {
self.shared.release_memory(reserved_bytes);
tracing::debug!(error = %err, "send rejected by producer state machine");
let error = if self
.shared
.no_driver
.load(std::sync::atomic::Ordering::SeqCst)
{
ClientError::PeerClosed
} else {
ClientError::Other(format!("send: {err}"))
};
SendFut {
shared: self.shared.clone(),
handle: self.handle,
state: SendState::Failed { error: Some(error) },
reserved_bytes: 0,
}
}
}
}
pub async fn flush(&self) -> Result<(), ClientError> {
let publish_time_ms = self.shared.now_wall_clock_ms();
{
let now = self.shared.now_instant();
let mut conn = self.shared.inner.lock();
conn.flush_producer(self.handle, publish_time_ms, now);
}
self.shared.driver_waker.notify_one();
loop {
let pending = self.shared.inner.lock().producer_pending_count(self.handle);
if pending == 0 {
return Ok(());
}
let notified = self.shared.driver_waker.notified();
tokio::pin!(notified);
notified.as_mut().enable();
notified.await;
}
}
pub async fn close(self) -> Result<(), ClientError> {
self.shared.fail_if_no_driver()?;
let topic = self.slot.identity.topic.clone();
let producer_name = self.slot.state.lock().name.clone().unwrap_or_default();
let request_id = {
let mut conn = self.shared.inner.lock();
conn.close_producer(self.handle)
};
self.shared.driver_waker.notify_one();
let outcome = RequestFut {
shared: self.shared.clone(),
key: PendingOpKey::Request(request_id),
}
.await;
match outcome {
OpOutcome::Success { .. } => {
tracing::info!(
topic = %topic,
producer_name = %producer_name,
handle = ?self.handle,
access_mode = ?self.slot.identity.access_mode,
"producer closed"
);
Ok(())
}
OpOutcome::Error { code, message, .. } => Err(ClientError::Broker { code, message }),
OpOutcome::Terminal { .. } => Err(ClientError::PeerClosed),
other => Err(ClientError::Other(format!(
"unexpected outcome for close request {request_id}: {other:?}"
))),
}
}
pub async fn get_schema(
&self,
version: Option<bytes::Bytes>,
) -> Result<magnetar_proto::pb::Schema, ClientError> {
let topic = self
.shared
.inner
.lock()
.producer_topic(self.handle)
.map(str::to_owned)
.ok_or_else(|| {
ClientError::Other(format!(
"get_schema: producer handle {:?} is no longer registered",
self.handle
))
})?;
tracing::debug!(topic = %topic, "schema lookup");
let request_id = {
let mut conn = self.shared.inner.lock();
conn.get_schema(&topic, version)
};
self.shared.driver_waker.notify_one();
let outcome = RequestFut {
shared: self.shared.clone(),
key: PendingOpKey::Request(request_id),
}
.await;
match outcome {
OpOutcome::GetSchemaResponse { result, .. } => match result {
Ok((schema, _version)) => Ok(schema),
Err((code, message)) => Err(ClientError::Broker { code, message }),
},
OpOutcome::Error { code, message, .. } => Err(ClientError::Broker { code, message }),
OpOutcome::Terminal { .. } => Err(ClientError::PeerClosed),
other => Err(ClientError::Other(format!(
"unexpected get_schema outcome: {other:?}"
))),
}
}
}
impl<P: Providers + Send + Sync> Client<P> {
pub async fn open_producer(
&self,
req: CreateProducerRequest,
) -> Result<Producer<P>, ClientError> {
self.open_producer_with(req, None).await
}
pub async fn open_producer_with(
&self,
req: CreateProducerRequest,
encryptor: Option<Arc<dyn MessageEncryptor>>,
) -> Result<Producer<P>, ClientError> {
let compression = req.compression;
let (target, landed_on) = self.lookup_topic_target(&req.topic).await?;
let target_shared = self.resolve_target(&target, &landed_on, &req.topic).await?;
target_shared.fail_if_no_driver()?;
let (handle, slot) = {
let mut conn = target_shared.inner.lock();
let handle = conn.create_producer(req);
let slot = conn
.producer(handle)
.cloned()
.expect("just-created producer slot must exist");
(handle, slot)
};
target_shared.driver_waker.notify_one();
wait_producer_ready(&target_shared, handle).await?;
let producer_name = slot.state.lock().name.clone().unwrap_or_default();
tracing::info!(
topic = %slot.identity.topic,
producer_name = %producer_name,
handle = ?handle,
access_mode = ?slot.identity.access_mode,
"producer created"
);
Ok(Producer::assemble(
target_shared,
handle,
slot,
compression,
encryptor,
))
}
}
#[derive(Debug)]
pub struct SendFut {
shared: Arc<ConnectionShared>,
handle: ProducerHandle,
state: SendState,
reserved_bytes: u64,
}
#[derive(Debug)]
enum SendState {
Pending {
sequence_id: SequenceId,
},
Failed {
error: Option<ClientError>,
},
Reserving {
msg: Option<Box<OutgoingMessage>>,
publish_time_ms: u64,
bytes: u64,
slab_key: Option<usize>,
},
}
impl Drop for SendFut {
fn drop(&mut self) {
if self.reserved_bytes > 0 {
self.shared.release_memory(self.reserved_bytes);
self.reserved_bytes = 0;
}
if let SendState::Reserving {
slab_key: Some(key),
..
} = &self.state
{
self.shared.cancel_memory_waker(*key);
}
}
}
impl Future for SendFut {
type Output = Result<MessageId, ClientError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let handle = self.handle;
let shared = self.shared.clone();
if matches!(self.state, SendState::Reserving { .. }) {
let prev = std::mem::replace(&mut self.state, SendState::Failed { error: None });
let SendState::Reserving {
mut msg,
publish_time_ms,
bytes,
slab_key,
} = prev
else {
unreachable!()
};
match shared.try_reserve_memory_or_register(bytes, cx.waker()) {
Ok(()) => {
if let Some(prior) = slab_key {
shared.cancel_memory_waker(prior);
}
let owned = *msg.take().expect("Reserving polled with no message");
let result = {
let now = shared.now_instant();
let mut conn = shared.inner.lock();
conn.send(handle, owned, publish_time_ms, now)
};
shared.driver_waker.notify_one();
match result {
Ok(seq) => {
self.state = SendState::Pending { sequence_id: seq };
self.reserved_bytes = bytes;
}
Err(err) => {
shared.release_memory(bytes);
return Poll::Ready(Err(ClientError::Other(format!("send: {err}"))));
}
}
}
Err(new_key) => {
if let Some(prior) = slab_key {
shared.cancel_memory_waker(prior);
}
self.state = SendState::Reserving {
msg,
publish_time_ms,
bytes,
slab_key: Some(new_key),
};
return Poll::Pending;
}
}
}
let outcome = match &mut self.state {
SendState::Failed { error } => {
let err = error
.take()
.unwrap_or_else(|| ClientError::Other("send future polled after error".into()));
Poll::Ready(Err(err))
}
SendState::Pending { sequence_id } => {
let key = PendingOpKey::Send(handle, *sequence_id);
let mut conn = shared.inner.lock();
if let Some(outcome) = conn.take_outcome(key) {
drop(conn);
Poll::Ready(translate_send_outcome(outcome))
} else {
conn.register_waker(key, cx.waker().clone());
Poll::Pending
}
}
SendState::Reserving { .. } => unreachable!("Reserving handled above"),
};
if matches!(outcome, Poll::Ready(_)) && self.reserved_bytes > 0 {
self.shared.release_memory(self.reserved_bytes);
self.reserved_bytes = 0;
}
outcome
}
}
fn translate_send_outcome(outcome: OpOutcome) -> Result<MessageId, ClientError> {
match outcome {
OpOutcome::SendReceipt { message_id, .. } => Ok(message_id),
OpOutcome::SendError { code, message, .. } => Err(ClientError::Broker { code, message }),
OpOutcome::Terminal { .. } => Err(ClientError::PeerClosed),
other => Err(ClientError::Other(format!(
"unexpected send outcome: {other:?}"
))),
}
}
struct RequestFut {
shared: Arc<ConnectionShared>,
key: PendingOpKey,
}
impl Future for RequestFut {
type Output = OpOutcome;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut conn = self.shared.inner.lock();
if let Some(outcome) = conn.take_outcome(self.key) {
drop(conn);
return Poll::Ready(outcome);
}
conn.register_waker(self.key, cx.waker().clone());
Poll::Pending
}
}
impl Drop for RequestFut {
fn drop(&mut self) {
self.shared.inner.lock().unregister_waker(self.key);
}
}
struct ProducerReadyFut {
shared: Arc<ConnectionShared>,
handle: ProducerHandle,
helper: Option<tokio::task::JoinHandle<()>>,
}
impl Drop for ProducerReadyFut {
fn drop(&mut self) {
if let Some(h) = self.helper.take() {
h.abort();
}
}
}
impl Future for ProducerReadyFut {
type Output = Result<(), ClientError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let mut conn = this.shared.inner.lock();
loop {
match conn.poll_event() {
Some(ConnectionEvent::ProducerReady { handle, .. }) => {
if handle == this.handle {
return Poll::Ready(Ok(()));
}
}
Some(ConnectionEvent::ProducerClosedByBroker {
handle,
assigned_broker_service_url,
}) => {
if handle == this.handle {
let topic = conn
.producer(handle)
.map(|s| s.identity.topic.clone())
.unwrap_or_default();
tracing::warn!(
handle = ?handle,
topic = %topic,
assigned_broker_service_url = assigned_broker_service_url
.as_deref()
.map(crate::log_fields::truncate_broker_str),
"broker closed producer while waiting for ProducerReady"
);
return Poll::Ready(Err(ClientError::Closed));
}
}
Some(ConnectionEvent::ProducerOpenFailed {
handle,
code,
message,
}) => {
if handle == this.handle {
return Poll::Ready(Err(ClientError::Broker { code, message }));
}
}
Some(ConnectionEvent::Closed { reason }) => {
tracing::warn!(
reason = reason
.as_deref()
.map(crate::log_fields::truncate_broker_str),
"connection closed while waiting for producer readiness"
);
return Poll::Ready(Err(match reason {
Some(_) => ClientError::PeerClosed,
None => ClientError::Closed,
}));
}
Some(_) => {} None => break,
}
}
drop(conn);
if let Some(prev) = this.helper.take() {
prev.abort();
}
let waker = cx.waker().clone();
let shared = this.shared.clone();
this.helper = Some(tokio::spawn(async move {
shared.driver_waker.notified().await;
waker.wake();
}));
Poll::Pending
}
}
async fn wait_producer_ready(
shared: &Arc<ConnectionShared>,
handle: ProducerHandle,
) -> Result<(), ClientError> {
ProducerReadyFut {
shared: shared.clone(),
handle,
helper: None,
}
.await
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::{Duration, Instant};
use bytes::{Bytes, BytesMut};
use magnetar_proto::producer::OutgoingMessage;
use magnetar_proto::types::{CompressionKind, ProducerHandle};
use magnetar_proto::{ConnectionConfig, CreateProducerRequest, encode_command, pb};
use moonpool_core::TokioProviders;
use super::Producer;
use crate::client::{Client, ClientError};
use crate::{ConnectionShared, MoonpoolEngine};
fn handshake_response_bytes() -> BytesMut {
let cmd = pb::BaseCommand {
r#type: pb::base_command::Type::Connected as i32,
connected: Some(pb::CommandConnected {
server_version: "magnetar-test".to_owned(),
protocol_version: Some(21),
max_message_size: Some(5 * 1024 * 1024),
feature_flags: Some(pb::FeatureFlags::default()),
}),
..Default::default()
};
let mut buf = BytesMut::new();
encode_command(&mut buf, &cmd).expect("encode CommandConnected");
buf
}
fn handshake_complete_shared() -> Arc<ConnectionShared> {
let shared = ConnectionShared::new(ConnectionConfig::default());
{
let mut conn = shared.inner.lock();
conn.begin_handshake().expect("handshake");
let frame = handshake_response_bytes();
conn.handle_bytes(Instant::now(), &frame)
.expect("connected");
}
shared
}
fn slot_for(
shared: &Arc<ConnectionShared>,
handle: ProducerHandle,
) -> Arc<magnetar_proto::ProducerSlot> {
shared
.inner
.lock()
.producer(handle)
.cloned()
.expect("test producer slot must exist")
}
fn stub_slot_for_test(handle: ProducerHandle) -> Arc<magnetar_proto::ProducerSlot> {
magnetar_proto::ProducerSlot::new(
magnetar_proto::ProducerIdentity {
handle,
topic: String::new(),
access_mode: pb::ProducerAccessMode::Shared,
},
magnetar_proto::producer::ProducerState::new(
handle,
String::new(),
CompressionKind::None,
0,
),
)
}
#[derive(Debug, Default)]
struct XorEncryptor {
seen_plaintext: std::sync::Mutex<Option<Vec<u8>>>,
}
const XOR_KEY: u8 = 0x5A;
impl crate::crypto::MessageEncryptor for XorEncryptor {
fn encrypt(
&self,
plaintext: &[u8],
metadata: &mut pb::MessageMetadata,
) -> Result<Bytes, crate::crypto::EncryptError> {
*self.seen_plaintext.lock().unwrap() = Some(plaintext.to_vec());
metadata.encryption_keys.push(pb::EncryptionKeys {
key: "xor-test".to_owned(),
value: Bytes::from_static(b"k"),
metadata: Vec::new(),
});
metadata.encryption_algo = Some("XOR-TEST".to_owned());
metadata.encryption_param = Some(Bytes::from_static(b"iv"));
Ok(Bytes::from(
plaintext.iter().map(|b| b ^ XOR_KEY).collect::<Vec<u8>>(),
))
}
}
#[tokio::test(flavor = "current_thread")]
async fn send_encrypts_payload_and_stamps_metadata() {
let shared = handshake_complete_shared();
let handle = {
let mut conn = shared.inner.lock();
conn.create_producer(CreateProducerRequest {
topic: "persistent://public/default/encrypt".to_owned(),
..Default::default()
})
};
let encryptor = Arc::new(XorEncryptor::default());
let producer: Producer<TokioProviders> = Producer::assemble(
shared.clone(),
handle,
slot_for(&shared, handle),
CompressionKind::None,
Some(encryptor.clone()),
);
let _fut = producer.send(OutgoingMessage {
payload: Bytes::from_static(b"plain-secret"),
metadata: pb::MessageMetadata::default(),
uncompressed_size: 12,
num_messages: 1,
txn_id: None,
source_message_id: None,
});
assert_eq!(
encryptor.seen_plaintext.lock().unwrap().as_deref(),
Some(b"plain-secret".as_slice()),
"encrypt hook must see the pre-encryption payload",
);
assert!(
producer.pending_count() >= 1,
"expected pending encrypted send; got {}",
producer.pending_count()
);
}
#[derive(Debug, Default)]
struct FailingEncryptor;
impl crate::crypto::MessageEncryptor for FailingEncryptor {
fn encrypt(
&self,
_plaintext: &[u8],
_metadata: &mut pb::MessageMetadata,
) -> Result<Bytes, crate::crypto::EncryptError> {
Err(crate::crypto::EncryptError::new(
"forced encrypt failure (test)",
))
}
}
#[tokio::test(flavor = "current_thread")]
async fn send_encrypt_failure_surfaces_error() {
let shared = handshake_complete_shared();
let handle = {
let mut conn = shared.inner.lock();
conn.create_producer(CreateProducerRequest {
topic: "persistent://public/default/encrypt-fail".to_owned(),
..Default::default()
})
};
let producer: Producer<TokioProviders> = Producer::assemble(
shared.clone(),
handle,
slot_for(&shared, handle),
CompressionKind::None,
Some(Arc::new(FailingEncryptor)),
);
let res = producer
.send(OutgoingMessage {
payload: Bytes::from_static(b"plain"),
metadata: pb::MessageMetadata::default(),
uncompressed_size: 5,
num_messages: 1,
txn_id: None,
source_message_id: None,
})
.await;
let err = res.expect_err("encrypt failure must surface");
assert!(
format!("{err}").contains("encrypt:"),
"expected encrypt-error message, got {err:?}"
);
assert_eq!(
producer.pending_count(),
0,
"a failed encrypt must not enqueue a send"
);
}
#[tokio::test(flavor = "current_thread")]
async fn fresh_producer_reports_defaults() {
let shared = handshake_complete_shared();
let handle = {
let mut conn = shared.inner.lock();
conn.create_producer(CreateProducerRequest {
topic: "persistent://public/default/defaults".to_owned(),
..Default::default()
})
};
let producer: Producer<TokioProviders> = Producer::assemble(
shared.clone(),
handle,
slot_for(&shared, handle),
CompressionKind::None,
None,
);
assert_eq!(producer.pending_count(), 0);
assert_eq!(producer.last_sequence_id(), -1);
assert!(!producer.is_closed());
assert_eq!(producer.name(), "");
assert_eq!(producer.topic(), "persistent://public/default/defaults");
assert_eq!(producer.compression(), CompressionKind::None);
let stats = producer.stats();
assert_eq!(stats.total_msgs_sent, 0);
assert_eq!(stats.pending_queue_size, 0);
}
#[tokio::test(flavor = "current_thread")]
async fn send_enqueues_pending_op() {
let shared = handshake_complete_shared();
let handle = {
let mut conn = shared.inner.lock();
conn.create_producer(CreateProducerRequest {
topic: "persistent://public/default/enqueue".to_owned(),
..Default::default()
})
};
let slot = slot_for(&shared, handle);
let producer: Producer<TokioProviders> =
Producer::assemble(shared, handle, slot, CompressionKind::None, None);
let _fut = producer.send(OutgoingMessage {
payload: Bytes::from_static(b"hello"),
metadata: pb::MessageMetadata::default(),
uncompressed_size: 5,
num_messages: 1,
txn_id: None,
source_message_id: None,
});
assert!(
producer.pending_count() >= 1,
"expected pending send; got {}",
producer.pending_count()
);
}
#[tokio::test(flavor = "current_thread")]
async fn send_with_compression_returns_error() {
let shared = handshake_complete_shared();
let handle = {
let mut conn = shared.inner.lock();
conn.create_producer(CreateProducerRequest {
topic: "persistent://public/default/zstd".to_owned(),
..Default::default()
})
};
let slot = slot_for(&shared, handle);
let producer: Producer<TokioProviders> =
Producer::assemble(shared, handle, slot, CompressionKind::Zstd, None);
let res = producer
.send(OutgoingMessage {
payload: Bytes::from_static(b"hello"),
metadata: pb::MessageMetadata::default(),
uncompressed_size: 5,
num_messages: 1,
txn_id: None,
source_message_id: None,
})
.await;
let err = res.expect_err("expected error for unwired compression");
let s = format!("{err}");
assert!(
s.contains("not yet wired"),
"expected compression-not-wired message, got {s:?}"
);
}
#[tokio::test(flavor = "current_thread")]
async fn flush_quiescent_is_noop() {
let shared = handshake_complete_shared();
let handle = {
let mut conn = shared.inner.lock();
conn.create_producer(CreateProducerRequest {
topic: "persistent://public/default/flush-ok".to_owned(),
..Default::default()
})
};
let slot = slot_for(&shared, handle);
let producer: Producer<TokioProviders> =
Producer::assemble(shared, handle, slot, CompressionKind::None, None);
assert_eq!(producer.pending_count(), 0);
tokio::time::timeout(Duration::from_secs(1), producer.flush())
.await
.expect("flush should resolve on quiescent producer")
.expect("flush ok");
}
#[test]
fn producer_clones_share_handle() {
let shared = handshake_complete_shared();
let handle = {
let mut conn = shared.inner.lock();
conn.create_producer(CreateProducerRequest {
topic: "persistent://public/default/clone".to_owned(),
..Default::default()
})
};
let slot = slot_for(&shared, handle);
let producer: Producer<TokioProviders> =
Producer::assemble(shared, handle, slot, CompressionKind::None, None);
let clone = producer.clone();
assert_eq!(producer.handle(), clone.handle());
assert_eq!(producer.compression(), clone.compression());
}
#[allow(dead_code)]
fn _open_producer_bounds<P: moonpool_core::Providers>(
client: &Client<P>,
req: CreateProducerRequest,
) -> impl std::future::Future<Output = Result<super::Producer<P>, super::ClientError>> + '_
{
client.open_producer(req)
}
#[test]
#[allow(clippy::let_underscore_future, clippy::no_effect_underscore_binding)]
fn open_producer_compiles_against_tokio_providers() {
let providers = TokioProviders::new();
let engine = MoonpoolEngine::new(providers);
let _client_fut =
Client::connect_plain(&engine, "127.0.0.1:6650", ConnectionConfig::default());
}
#[tokio::test(flavor = "current_thread")]
async fn send_reserves_and_releases_memory_budget() {
let cfg = ConnectionConfig {
memory_limit_bytes: 1024,
..ConnectionConfig::default()
};
let shared = ConnectionShared::new(cfg);
{
let mut conn = shared.inner.lock();
conn.begin_handshake().expect("handshake");
let frame = handshake_response_bytes();
conn.handle_bytes(Instant::now(), &frame)
.expect("connected");
}
let handle = {
let mut conn = shared.inner.lock();
conn.create_producer(CreateProducerRequest {
topic: "persistent://public/default/budget".to_owned(),
..Default::default()
})
};
let producer: Producer<TokioProviders> = Producer::assemble(
shared.clone(),
handle,
slot_for(&shared, handle),
CompressionKind::None,
None,
);
let fut = producer.send(OutgoingMessage {
payload: Bytes::from_static(b"abcdef"),
metadata: pb::MessageMetadata::default(),
uncompressed_size: 6,
num_messages: 1,
txn_id: None,
source_message_id: None,
});
assert_eq!(
shared
.memory_used
.load(std::sync::atomic::Ordering::Acquire),
6,
"payload bytes must be reserved against the budget"
);
drop(fut);
assert_eq!(
shared
.memory_used
.load(std::sync::atomic::Ordering::Acquire),
0,
"dropping the SendFut must release the reservation"
);
}
#[tokio::test(flavor = "current_thread")]
async fn send_fails_when_memory_budget_would_overflow() {
let cfg = ConnectionConfig {
memory_limit_bytes: 4,
..ConnectionConfig::default()
};
let shared = ConnectionShared::new(cfg);
{
let mut conn = shared.inner.lock();
conn.begin_handshake().expect("handshake");
let frame = handshake_response_bytes();
conn.handle_bytes(Instant::now(), &frame)
.expect("connected");
}
let handle = {
let mut conn = shared.inner.lock();
conn.create_producer(CreateProducerRequest {
topic: "persistent://public/default/overflow".to_owned(),
..Default::default()
})
};
let producer: Producer<TokioProviders> = Producer::assemble(
shared.clone(),
handle,
slot_for(&shared, handle),
CompressionKind::None,
None,
);
let res = producer
.send(OutgoingMessage {
payload: Bytes::from_static(b"too-big-payload"),
metadata: pb::MessageMetadata::default(),
uncompressed_size: 15,
num_messages: 1,
txn_id: None,
source_message_id: None,
})
.await;
assert!(matches!(
res,
Err(super::ClientError::Engine(
super::super::EngineError::MemoryLimitExceeded { .. }
))
));
assert_eq!(
shared
.memory_used
.load(std::sync::atomic::Ordering::Acquire),
0,
"rejected sends must not bump the budget counter"
);
}
#[tokio::test(flavor = "current_thread")]
async fn wait_producer_ready_surfaces_broker_error() {
let shared = handshake_complete_shared();
let (handle, request_id) = {
let mut conn = shared.inner.lock();
let request_id = conn.peek_next_request_id_for_test();
let handle = conn.create_producer(CreateProducerRequest {
topic: "persistent://public/default/forbidden".to_owned(),
..Default::default()
});
(handle, request_id)
};
let err = pb::BaseCommand {
r#type: pb::base_command::Type::Error as i32,
error: Some(pb::CommandError {
request_id,
error: pb::ServerError::AuthorizationError as i32,
message: "not authorized".to_owned(),
}),
..Default::default()
};
let mut buf = BytesMut::new();
encode_command(&mut buf, &err).expect("encode CommandError");
{
let mut conn = shared.inner.lock();
conn.handle_bytes(Instant::now(), &buf)
.expect("handle CommandError");
}
let res = tokio::time::timeout(
Duration::from_secs(2),
super::wait_producer_ready(&shared, handle),
)
.await
.expect("producer-ready future must resolve (regression: previously hung)");
match res {
Err(super::ClientError::Broker { code, message }) => {
assert_eq!(code, pb::ServerError::AuthorizationError as i32);
assert_eq!(message, "not authorized");
}
other => panic!("expected ClientError::Broker, got {other:?}"),
}
}
#[tokio::test(flavor = "current_thread")]
async fn producer_block_parks_on_overflow_instead_of_erroring() {
use std::future::Future as _;
use std::pin::Pin;
use std::task::{Context, Poll};
let cfg = ConnectionConfig {
memory_limit_bytes: 4,
memory_limit_policy: magnetar_proto::MemoryLimitPolicy::ProducerBlock,
..ConnectionConfig::default()
};
let shared = ConnectionShared::new(cfg);
{
let mut conn = shared.inner.lock();
conn.begin_handshake().expect("handshake");
let frame = handshake_response_bytes();
conn.handle_bytes(Instant::now(), &frame)
.expect("connected");
}
shared
.try_reserve_memory(4)
.expect("seeding the budget at the limit");
let handle = {
let mut conn = shared.inner.lock();
conn.create_producer(CreateProducerRequest {
topic: "persistent://public/default/block".to_owned(),
..Default::default()
})
};
let producer: Producer<TokioProviders> = Producer::assemble(
shared.clone(),
handle,
slot_for(&shared, handle),
CompressionKind::None,
None,
);
let mut fut = producer.send(OutgoingMessage {
payload: Bytes::from_static(b"overflow"),
metadata: pb::MessageMetadata::default(),
uncompressed_size: 8,
num_messages: 1,
txn_id: None,
source_message_id: None,
});
let waker = futures_task_waker();
let mut cx = Context::from_waker(&waker);
let poll = Pin::new(&mut fut).poll(&mut cx);
assert!(
matches!(poll, Poll::Pending),
"ProducerBlock must park instead of erroring (got {poll:?})"
);
assert_eq!(
shared.memory_wakers.lock().len(),
1,
"Reserving must register exactly one waker"
);
drop(fut);
assert!(
shared.memory_wakers.lock().is_empty(),
"dropping the SendFut must cancel its registration"
);
}
#[tokio::test(flavor = "current_thread")]
async fn producer_block_release_drains_wakers() {
use std::future::Future as _;
use std::pin::Pin;
use std::task::{Context, Poll};
let cfg = ConnectionConfig {
memory_limit_bytes: 4,
memory_limit_policy: magnetar_proto::MemoryLimitPolicy::ProducerBlock,
..ConnectionConfig::default()
};
let shared = ConnectionShared::new(cfg);
{
let mut conn = shared.inner.lock();
conn.begin_handshake().expect("handshake");
let frame = handshake_response_bytes();
conn.handle_bytes(Instant::now(), &frame)
.expect("connected");
}
shared
.try_reserve_memory(4)
.expect("seeding the budget at the limit");
let handle = {
let mut conn = shared.inner.lock();
conn.create_producer(CreateProducerRequest {
topic: "persistent://public/default/release".to_owned(),
..Default::default()
})
};
let producer: Producer<TokioProviders> = Producer::assemble(
shared.clone(),
handle,
slot_for(&shared, handle),
CompressionKind::None,
None,
);
let mut fut = producer.send(OutgoingMessage {
payload: Bytes::from_static(b"AB"),
metadata: pb::MessageMetadata::default(),
uncompressed_size: 2,
num_messages: 1,
txn_id: None,
source_message_id: None,
});
let waker = futures_task_waker();
let mut cx = Context::from_waker(&waker);
assert!(matches!(Pin::new(&mut fut).poll(&mut cx), Poll::Pending));
assert_eq!(shared.memory_wakers.lock().len(), 1);
shared.release_memory(4);
assert!(
shared.memory_wakers.lock().is_empty(),
"release_memory must drain the slab"
);
drop(fut);
}
#[tokio::test(flavor = "current_thread")]
async fn producer_block_completes_when_budget_frees_up() {
use std::future::Future as _;
use std::pin::Pin;
use std::sync::atomic::Ordering;
use std::task::{Context, Poll};
let cfg = ConnectionConfig {
memory_limit_bytes: 4,
memory_limit_policy: magnetar_proto::MemoryLimitPolicy::ProducerBlock,
..ConnectionConfig::default()
};
let shared = ConnectionShared::new(cfg);
{
let mut conn = shared.inner.lock();
conn.begin_handshake().expect("handshake");
let frame = handshake_response_bytes();
conn.handle_bytes(Instant::now(), &frame)
.expect("connected");
}
shared.try_reserve_memory(4).expect("seed budget");
let handle = {
let mut conn = shared.inner.lock();
conn.create_producer(CreateProducerRequest {
topic: "persistent://public/default/free".to_owned(),
..Default::default()
})
};
let producer: Producer<TokioProviders> = Producer::assemble(
shared.clone(),
handle,
slot_for(&shared, handle),
CompressionKind::None,
None,
);
let mut fut = producer.send(OutgoingMessage {
payload: Bytes::from_static(b"ab"),
metadata: pb::MessageMetadata::default(),
uncompressed_size: 2,
num_messages: 1,
txn_id: None,
source_message_id: None,
});
let waker = futures_task_waker();
let mut cx = Context::from_waker(&waker);
assert!(matches!(Pin::new(&mut fut).poll(&mut cx), Poll::Pending));
shared.release_memory(4);
assert_eq!(shared.memory_used.load(Ordering::Acquire), 0);
let poll = Pin::new(&mut fut).poll(&mut cx);
assert!(
matches!(poll, Poll::Pending),
"still waiting on broker receipt"
);
assert_eq!(
shared.memory_used.load(Ordering::Acquire),
2,
"the released budget must have been re-reserved by the parked send"
);
assert!(
shared.memory_wakers.lock().is_empty(),
"successful reservation must clear the slab slot"
);
drop(fut);
assert_eq!(shared.memory_used.load(Ordering::Acquire), 0);
}
#[tokio::test(flavor = "current_thread")]
async fn producer_block_fast_path_when_budget_available() {
let cfg = ConnectionConfig {
memory_limit_bytes: 1024,
memory_limit_policy: magnetar_proto::MemoryLimitPolicy::ProducerBlock,
..ConnectionConfig::default()
};
let shared = ConnectionShared::new(cfg);
{
let mut conn = shared.inner.lock();
conn.begin_handshake().expect("handshake");
let frame = handshake_response_bytes();
conn.handle_bytes(Instant::now(), &frame)
.expect("connected");
}
let handle = {
let mut conn = shared.inner.lock();
conn.create_producer(CreateProducerRequest {
topic: "persistent://public/default/fast".to_owned(),
..Default::default()
})
};
let producer: Producer<TokioProviders> = Producer::assemble(
shared.clone(),
handle,
slot_for(&shared, handle),
CompressionKind::None,
None,
);
let _fut = producer.send(OutgoingMessage {
payload: Bytes::from_static(b"fast"),
metadata: pb::MessageMetadata::default(),
uncompressed_size: 4,
num_messages: 1,
txn_id: None,
source_message_id: None,
});
assert_eq!(
shared
.memory_used
.load(std::sync::atomic::Ordering::Acquire),
4,
"ProducerBlock fast path must reserve synchronously",
);
assert!(
shared.memory_wakers.lock().is_empty(),
"fast path must not register a waker slot",
);
}
#[tokio::test(flavor = "current_thread")]
async fn producer_block_send_error_releases_reservation() {
use std::future::Future as _;
use std::pin::Pin;
use std::task::{Context, Poll};
let cfg = ConnectionConfig {
memory_limit_bytes: 16,
memory_limit_policy: magnetar_proto::MemoryLimitPolicy::ProducerBlock,
..ConnectionConfig::default()
};
let shared = ConnectionShared::new(cfg);
{
let mut conn = shared.inner.lock();
conn.begin_handshake().expect("handshake");
let frame = handshake_response_bytes();
conn.handle_bytes(Instant::now(), &frame)
.expect("connected");
}
shared.try_reserve_memory(16).expect("seed budget");
let bogus_handle = ProducerHandle(u64::MAX);
let producer: Producer<TokioProviders> = Producer::assemble(
shared.clone(),
bogus_handle,
stub_slot_for_test(bogus_handle),
CompressionKind::None,
None,
);
let mut fut = producer.send(OutgoingMessage {
payload: Bytes::from_static(b"err"),
metadata: pb::MessageMetadata::default(),
uncompressed_size: 3,
num_messages: 1,
txn_id: None,
source_message_id: None,
});
let waker = futures_task_waker();
let mut cx = Context::from_waker(&waker);
assert!(matches!(Pin::new(&mut fut).poll(&mut cx), Poll::Pending));
shared.release_memory(16);
let outcome = Pin::new(&mut fut).poll(&mut cx);
match outcome {
Poll::Ready(Err(ClientError::Other(msg))) => {
assert!(
msg.contains("send:"),
"expected `send:` error prefix, got {msg:?}",
);
}
other => panic!("expected Ready(Err(Other(...))), got {other:?}"),
}
assert_eq!(
shared
.memory_used
.load(std::sync::atomic::Ordering::Acquire),
0,
"Err arm must release the reservation it took",
);
}
#[tokio::test(flavor = "current_thread")]
async fn producer_block_re_park_cancels_prior_waker_slot() {
use std::future::Future as _;
use std::pin::Pin;
use std::task::{Context, Poll};
let cfg = ConnectionConfig {
memory_limit_bytes: 4,
memory_limit_policy: magnetar_proto::MemoryLimitPolicy::ProducerBlock,
..ConnectionConfig::default()
};
let shared = ConnectionShared::new(cfg);
{
let mut conn = shared.inner.lock();
conn.begin_handshake().expect("handshake");
let frame = handshake_response_bytes();
conn.handle_bytes(Instant::now(), &frame)
.expect("connected");
}
shared.try_reserve_memory(4).expect("seed budget");
let handle = {
let mut conn = shared.inner.lock();
conn.create_producer(CreateProducerRequest {
topic: "persistent://public/default/repark".to_owned(),
..Default::default()
})
};
let producer: Producer<TokioProviders> = Producer::assemble(
shared.clone(),
handle,
slot_for(&shared, handle),
CompressionKind::None,
None,
);
let mut fut = producer.send(OutgoingMessage {
payload: Bytes::from_static(b"hi"),
metadata: pb::MessageMetadata::default(),
uncompressed_size: 2,
num_messages: 1,
txn_id: None,
source_message_id: None,
});
let waker = futures_task_waker();
let mut cx = Context::from_waker(&waker);
assert!(matches!(Pin::new(&mut fut).poll(&mut cx), Poll::Pending));
assert_eq!(shared.memory_wakers.lock().len(), 1);
assert!(matches!(Pin::new(&mut fut).poll(&mut cx), Poll::Pending));
assert_eq!(
shared.memory_wakers.lock().len(),
1,
"re-park must cancel the prior waker before inserting a new one",
);
}
fn futures_task_waker() -> std::task::Waker {
std::task::Waker::noop().clone()
}
#[tokio::test(flavor = "current_thread")]
async fn last_sequence_id_published_defaults_to_minus_one() {
let shared = handshake_complete_shared();
let handle = {
let mut conn = shared.inner.lock();
conn.create_producer(CreateProducerRequest {
topic: "persistent://public/default/last-seq-pub".to_owned(),
..Default::default()
})
};
let slot = slot_for(&shared, handle);
let producer: Producer<TokioProviders> =
Producer::assemble(shared, handle, slot, CompressionKind::None, None);
assert_eq!(
producer.last_sequence_id_published(),
-1,
"no broker ack yet → -1 (parity with tokio engine + Java)"
);
}
#[tokio::test(flavor = "current_thread")]
async fn batch_len_reports_zero_when_batching_disabled() {
let shared = handshake_complete_shared();
let handle = {
let mut conn = shared.inner.lock();
conn.create_producer(CreateProducerRequest {
topic: "persistent://public/default/batch-len".to_owned(),
..Default::default()
})
};
let slot = slot_for(&shared, handle);
let producer: Producer<TokioProviders> =
Producer::assemble(shared, handle, slot, CompressionKind::None, None);
assert_eq!(
producer.batch_len(),
0,
"batching disabled → batch_len == 0"
);
}
#[tokio::test(flavor = "current_thread")]
async fn batch_bytes_reports_zero_when_batching_disabled() {
let shared = handshake_complete_shared();
let handle = {
let mut conn = shared.inner.lock();
conn.create_producer(CreateProducerRequest {
topic: "persistent://public/default/batch-bytes".to_owned(),
..Default::default()
})
};
let slot = slot_for(&shared, handle);
let producer: Producer<TokioProviders> =
Producer::assemble(shared, handle, slot, CompressionKind::None, None);
assert_eq!(
producer.batch_bytes(),
0,
"batching disabled → batch_bytes == 0"
);
}
}