pub struct Connection { /* private fields */ }Expand description
High-level connection object that manages a single TCP/STOMP connection.
The Connection spawns a background task that maintains the TCP transport,
sends/receives STOMP frames using StompCodec, negotiates heartbeats, and
performs simple reconnect logic with exponential backoff.
Implementations§
Source§impl Connection
impl Connection
Sourcepub const NO_HEARTBEAT: &'static str = "0,0"
pub const NO_HEARTBEAT: &'static str = "0,0"
Heartbeat value that disables heartbeats entirely.
Use this when you don’t want the client or server to send heartbeats. Note that some brokers may still require heartbeats for long-lived connections.
§Example
let conn = Connection::connect(
"localhost:61613",
"guest",
"guest",
Connection::NO_HEARTBEAT,
).await?;Sourcepub const DEFAULT_HEARTBEAT: &'static str = "10000,10000"
pub const DEFAULT_HEARTBEAT: &'static str = "10000,10000"
Default heartbeat value: 10 seconds for both send and receive.
This is a reasonable default for most applications. The actual heartbeat interval will be negotiated with the server (taking the maximum of client and server preferences).
§Example
let conn = Connection::connect(
"localhost:61613",
"guest",
"guest",
Connection::DEFAULT_HEARTBEAT,
).await?;Sourcepub async fn connect(
addr: &str,
login: &str,
passcode: &str,
client_hb: &str,
) -> Result<Self, ConnError>
pub async fn connect( addr: &str, login: &str, passcode: &str, client_hb: &str, ) -> Result<Self, ConnError>
Establish a connection to the STOMP server at addr with the given
credentials and heartbeat header string (e.g. “10000,10000”).
This is a convenience wrapper around connect_with_options() that uses
default options (STOMP 1.2, host=“/”, no client-id).
Parameters
addr: TCP address (host:port) of the STOMP server.login: login username for STOMPCONNECT.passcode: passcode for STOMPCONNECT.client_hb: client’sheart-beatheader value (“cx,cy” in milliseconds) that will be sent in theCONNECTframe.
Returns a Connection which provides send_frame, next_frame, and
close helpers. The detailed connection handling (I/O, heartbeats,
reconnects) runs on a background task spawned by this method.
Sourcepub async fn connect_with_options(
addr: &str,
login: &str,
passcode: &str,
client_hb: &str,
options: ConnectOptions,
) -> Result<Self, ConnError>
pub async fn connect_with_options( addr: &str, login: &str, passcode: &str, client_hb: &str, options: ConnectOptions, ) -> Result<Self, ConnError>
Establish a connection to the STOMP server with custom options.
Use this method when you need to set a custom client-id (for durable
subscriptions), specify a virtual host, negotiate different STOMP
versions, or add custom CONNECT headers.
Parameters
addr: TCP address (host:port) of the STOMP server.login: login username for STOMPCONNECT.passcode: passcode for STOMPCONNECT.client_hb: client’sheart-beatheader value (“cx,cy” in milliseconds) that will be sent in theCONNECTframe.options: custom connection options (version, host, client-id, etc.).
§Errors
Returns an error if:
- The TCP connection cannot be established (
ConnError::Io) - The server rejects the connection, e.g., due to invalid credentials
(
ConnError::ServerRejected) - The server closes the connection without responding (
ConnError::Protocol)
§Example
use iridium_stomp::{Connection, ConnectOptions};
// Connect with a client-id for durable subscriptions
let options = ConnectOptions::default()
.client_id("my-app-instance-1");
let conn = Connection::connect_with_options(
"localhost:61613",
"guest",
"guest",
"10000,10000",
options,
).await?;pub async fn send_frame(&self, frame: Frame) -> Result<(), ConnError>
Sourcepub async fn send_frame_with_receipt(
&self,
frame: Frame,
) -> Result<String, ConnError>
pub async fn send_frame_with_receipt( &self, frame: Frame, ) -> Result<String, ConnError>
Send a frame with a receipt request and return the receipt ID.
This method adds a unique receipt header to the frame and registers
the receipt ID for tracking. Use wait_for_receipt() to wait for the
server’s RECEIPT response.
§Parameters
frame: the frame to send. Areceiptheader will be added.
§Returns
The generated receipt ID that can be used with wait_for_receipt().
§Example
let receipt_id = conn.send_frame_with_receipt(frame).await?;
conn.wait_for_receipt(&receipt_id, Duration::from_secs(5)).await?;Sourcepub async fn wait_for_receipt(
&self,
receipt_id: &str,
timeout: Duration,
) -> Result<(), ConnError>
pub async fn wait_for_receipt( &self, receipt_id: &str, timeout: Duration, ) -> Result<(), ConnError>
Wait for a receipt confirmation from the server.
This method blocks until the server sends a RECEIPT frame with the matching receipt-id, or until the timeout expires.
§Parameters
receipt_id: the receipt ID returned bysend_frame_with_receipt().timeout: maximum time to wait for the receipt.
§Returns
Ok(()) if the receipt was received, or Err(ConnError::ReceiptTimeout)
if the timeout expired.
§Example
let receipt_id = conn.send_frame_with_receipt(frame).await?;
conn.wait_for_receipt(&receipt_id, Duration::from_secs(5)).await?;
println!("Message confirmed!");Sourcepub async fn send_frame_confirmed(
&self,
frame: Frame,
timeout: Duration,
) -> Result<(), ConnError>
pub async fn send_frame_confirmed( &self, frame: Frame, timeout: Duration, ) -> Result<(), ConnError>
Send a frame and wait for server confirmation via RECEIPT.
This is a convenience method that combines send_frame_with_receipt()
and wait_for_receipt(). Use this when you want to ensure a frame
was processed by the server before continuing.
§Parameters
frame: the frame to send.timeout: maximum time to wait for the receipt.
§Returns
Ok(()) if the frame was sent and receipt confirmed, or an error if
sending failed or the receipt timed out.
§Example
let frame = Frame::new("SEND")
.header("destination", "/queue/orders")
.set_body(b"order data".to_vec());
conn.send_frame_confirmed(frame, Duration::from_secs(5)).await?;
println!("Order sent and confirmed!");Sourcepub async fn subscribe_with_headers(
&self,
destination: &str,
ack: AckMode,
extra_headers: Vec<(String, String)>,
) -> Result<Subscription, ConnError>
pub async fn subscribe_with_headers( &self, destination: &str, ack: AckMode, extra_headers: Vec<(String, String)>, ) -> Result<Subscription, ConnError>
Subscribe to a destination.
Parameters
destination: the STOMP destination to subscribe to (e.g. “/queue/foo”).ack: acknowledgement mode to request from the server.
Returns a tuple (subscription_id, receiver) where subscription_id is
the opaque id assigned locally for this subscription and receiver is a
mpsc::Receiver<Frame> which will yield incoming MESSAGE frames for the
destination. The caller should read from the receiver to handle messages.
Subscribe to a destination using optional extra headers.
This variant accepts additional headers which are stored locally and
re-sent on reconnect. Use subscribe as a convenience wrapper when no
extra headers are needed.
Sourcepub async fn subscribe(
&self,
destination: &str,
ack: AckMode,
) -> Result<Subscription, ConnError>
pub async fn subscribe( &self, destination: &str, ack: AckMode, ) -> Result<Subscription, ConnError>
Convenience wrapper without extra headers.
Sourcepub async fn subscribe_with_options(
&self,
destination: &str,
ack: AckMode,
options: SubscriptionOptions,
) -> Result<Subscription, ConnError>
pub async fn subscribe_with_options( &self, destination: &str, ack: AckMode, options: SubscriptionOptions, ) -> Result<Subscription, ConnError>
Subscribe with a typed SubscriptionOptions structure.
SubscriptionOptions.headers are forwarded to the broker and persisted
for automatic resubscribe after reconnect. If durable_queue is set,
it will be used as the actual destination instead of destination.
Sourcepub async fn unsubscribe(&self, subscription_id: &str) -> Result<(), ConnError>
pub async fn unsubscribe(&self, subscription_id: &str) -> Result<(), ConnError>
Unsubscribe a previously created subscription by its local subscription id.
Sourcepub async fn ack(
&self,
subscription_id: &str,
message_id: &str,
) -> Result<(), ConnError>
pub async fn ack( &self, subscription_id: &str, message_id: &str, ) -> Result<(), ConnError>
Acknowledge a message previously received in client or
client-individual ack modes.
STOMP ack semantics:
auto: server considers message delivered immediately; the client should not ack.client: cumulative acknowledgements. ACKing messageMfor subscriptionSacknowledges all messages delivered toSup to and includingM.client-individual: only the named message is acknowledged.
Parameters
subscription_id: the local subscription id returned byConnection::subscribe. This disambiguates which subscription’s pending queue to advance for cumulative ACKs.message_id: themessage-idheader value from the received MESSAGE frame to acknowledge.
Behavior
- The pending queue for
subscription_idis searched formessage_id. If the subscription usedclientack mode, all pending messages up to and including the matched message are removed. If the subscription usedclient-individual, only the matched message is removed. - An
ACKframe is sent to the server withid=<message_id>andsubscription=<subscription_id>headers.
Sourcepub async fn nack(
&self,
subscription_id: &str,
message_id: &str,
) -> Result<(), ConnError>
pub async fn nack( &self, subscription_id: &str, message_id: &str, ) -> Result<(), ConnError>
Negative-acknowledge a message (NACK).
Parameters
subscription_id: the local subscription id the message was delivered under.message_id: themessage-idheader value from the received MESSAGE.
Behavior
- Removes the message from the local pending queue (cumulatively if the
subscription used
clientack mode, otherwise only the single message). Sends aNACKframe to the server withidandsubscriptionheaders.
Sourcepub async fn begin(&self, transaction_id: &str) -> Result<(), ConnError>
pub async fn begin(&self, transaction_id: &str) -> Result<(), ConnError>
Begin a transaction.
Parameters
transaction_id: unique identifier for the transaction. The caller is responsible for ensuring uniqueness within the connection.
Behavior
- Sends a
BEGINframe to the server withtransaction:<transaction_id>header. SubsequentSEND,ACK, andNACKframes may include this transaction id to group them into the transaction. The transaction must be finalized with eithercommitorabort.
Sourcepub async fn commit(&self, transaction_id: &str) -> Result<(), ConnError>
pub async fn commit(&self, transaction_id: &str) -> Result<(), ConnError>
Commit a transaction.
Parameters
transaction_id: the transaction identifier previously passed tobegin.
Behavior
- Sends a
COMMITframe to the server withtransaction:<transaction_id>header. All operations within the transaction are applied atomically.
Sourcepub async fn abort(&self, transaction_id: &str) -> Result<(), ConnError>
pub async fn abort(&self, transaction_id: &str) -> Result<(), ConnError>
Abort a transaction.
Parameters
transaction_id: the transaction identifier previously passed tobegin.
Behavior
- Sends an
ABORTframe to the server withtransaction:<transaction_id>header. All operations within the transaction are discarded.
Sourcepub async fn next_frame(&self) -> Option<ReceivedFrame>
pub async fn next_frame(&self) -> Option<ReceivedFrame>
Receive the next frame from the server.
Returns Some(ReceivedFrame::Frame(..)) for normal frames (MESSAGE, etc.),
Some(ReceivedFrame::Error(..)) for ERROR frames, or None if the
connection has been closed.
§Example
use iridium_stomp::ReceivedFrame;
while let Some(received) = conn.next_frame().await {
match received {
ReceivedFrame::Frame(frame) => {
println!("Got {}: {:?}", frame.command, frame.body);
}
ReceivedFrame::Error(err) => {
eprintln!("Server error: {}", err);
break;
}
}
}pub async fn close(self)
Trait Implementations§
Source§impl Clone for Connection
impl Clone for Connection
Source§fn clone(&self) -> Connection
fn clone(&self) -> Connection
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read more