Skip to main content

Connection

Struct Connection 

Source
pub struct Connection { /* private fields */ }
Expand description

High-level connection object that manages a single TCP/STOMP connection.

The Connection spawns a background task that maintains the TCP transport, sends/receives STOMP frames using StompCodec, negotiates heartbeats, and performs simple reconnect logic with exponential backoff.

Implementations§

Source§

impl Connection

Source

pub const NO_HEARTBEAT: &'static str = "0,0"

Heartbeat value that disables heartbeats entirely.

Use this when you don’t want the client or server to send heartbeats. Note that some brokers may still require heartbeats for long-lived connections.

§Example
let conn = Connection::connect(
    "localhost:61613",
    "guest",
    "guest",
    Connection::NO_HEARTBEAT,
).await?;
Source

pub const DEFAULT_HEARTBEAT: &'static str = "10000,10000"

Default heartbeat value: 10 seconds for both send and receive.

This is a reasonable default for most applications. The actual heartbeat interval will be negotiated with the server (taking the maximum of client and server preferences).

§Example
let conn = Connection::connect(
    "localhost:61613",
    "guest",
    "guest",
    Connection::DEFAULT_HEARTBEAT,
).await?;
Source

pub async fn connect( addr: &str, login: &str, passcode: &str, client_hb: &str, ) -> Result<Self, ConnError>

Establish a connection to the STOMP server at addr with the given credentials and heartbeat header string (e.g. “10000,10000”).

This is a convenience wrapper around connect_with_options() that uses default options (STOMP 1.2, host=“/”, no client-id).

Parameters

  • addr: TCP address (host:port) of the STOMP server.
  • login: login username for STOMP CONNECT.
  • passcode: passcode for STOMP CONNECT.
  • client_hb: client’s heart-beat header value (“cx,cy” in milliseconds) that will be sent in the CONNECT frame.

Returns a Connection which provides send_frame, next_frame, and close helpers. The detailed connection handling (I/O, heartbeats, reconnects) runs on a background task spawned by this method.

Source

pub async fn connect_with_options( addr: &str, login: &str, passcode: &str, client_hb: &str, options: ConnectOptions, ) -> Result<Self, ConnError>

Establish a connection to the STOMP server with custom options.

Use this method when you need to set a custom client-id (for durable subscriptions), specify a virtual host, negotiate different STOMP versions, or add custom CONNECT headers.

Parameters

  • addr: TCP address (host:port) of the STOMP server.
  • login: login username for STOMP CONNECT.
  • passcode: passcode for STOMP CONNECT.
  • client_hb: client’s heart-beat header value (“cx,cy” in milliseconds) that will be sent in the CONNECT frame.
  • options: custom connection options (version, host, client-id, etc.).
§Errors

Returns an error if:

  • The TCP connection cannot be established (ConnError::Io)
  • The server rejects the connection, e.g., due to invalid credentials (ConnError::ServerRejected)
  • The server closes the connection without responding (ConnError::Protocol)
§Example
use iridium_stomp::{Connection, ConnectOptions};

// Connect with a client-id for durable subscriptions
let options = ConnectOptions::default()
    .client_id("my-app-instance-1");

let conn = Connection::connect_with_options(
    "localhost:61613",
    "guest",
    "guest",
    "10000,10000",
    options,
).await?;
Source

pub async fn send_frame(&self, frame: Frame) -> Result<(), ConnError>

Source

pub async fn send_frame_with_receipt( &self, frame: Frame, ) -> Result<String, ConnError>

Send a frame with a receipt request and return the receipt ID.

This method adds a unique receipt header to the frame and registers the receipt ID for tracking. Use wait_for_receipt() to wait for the server’s RECEIPT response.

§Parameters
  • frame: the frame to send. A receipt header will be added.
§Returns

The generated receipt ID that can be used with wait_for_receipt().

§Example
let receipt_id = conn.send_frame_with_receipt(frame).await?;
conn.wait_for_receipt(&receipt_id, Duration::from_secs(5)).await?;
Source

pub async fn wait_for_receipt( &self, receipt_id: &str, timeout: Duration, ) -> Result<(), ConnError>

Wait for a receipt confirmation from the server.

This method blocks until the server sends a RECEIPT frame with the matching receipt-id, or until the timeout expires.

§Parameters
  • receipt_id: the receipt ID returned by send_frame_with_receipt().
  • timeout: maximum time to wait for the receipt.
§Returns

Ok(()) if the receipt was received, or Err(ConnError::ReceiptTimeout) if the timeout expired.

§Example
let receipt_id = conn.send_frame_with_receipt(frame).await?;
conn.wait_for_receipt(&receipt_id, Duration::from_secs(5)).await?;
println!("Message confirmed!");
Source

pub async fn send_frame_confirmed( &self, frame: Frame, timeout: Duration, ) -> Result<(), ConnError>

Send a frame and wait for server confirmation via RECEIPT.

This is a convenience method that combines send_frame_with_receipt() and wait_for_receipt(). Use this when you want to ensure a frame was processed by the server before continuing.

§Parameters
  • frame: the frame to send.
  • timeout: maximum time to wait for the receipt.
§Returns

Ok(()) if the frame was sent and receipt confirmed, or an error if sending failed or the receipt timed out.

§Example
let frame = Frame::new("SEND")
    .header("destination", "/queue/orders")
    .set_body(b"order data".to_vec());

conn.send_frame_confirmed(frame, Duration::from_secs(5)).await?;
println!("Order sent and confirmed!");
Source

pub async fn subscribe_with_headers( &self, destination: &str, ack: AckMode, extra_headers: Vec<(String, String)>, ) -> Result<Subscription, ConnError>

Subscribe to a destination.

Parameters

  • destination: the STOMP destination to subscribe to (e.g. “/queue/foo”).
  • ack: acknowledgement mode to request from the server.

Returns a tuple (subscription_id, receiver) where subscription_id is the opaque id assigned locally for this subscription and receiver is a mpsc::Receiver<Frame> which will yield incoming MESSAGE frames for the destination. The caller should read from the receiver to handle messages. Subscribe to a destination using optional extra headers.

This variant accepts additional headers which are stored locally and re-sent on reconnect. Use subscribe as a convenience wrapper when no extra headers are needed.

Source

pub async fn subscribe( &self, destination: &str, ack: AckMode, ) -> Result<Subscription, ConnError>

Convenience wrapper without extra headers.

Source

pub async fn subscribe_with_options( &self, destination: &str, ack: AckMode, options: SubscriptionOptions, ) -> Result<Subscription, ConnError>

Subscribe with a typed SubscriptionOptions structure.

SubscriptionOptions.headers are forwarded to the broker and persisted for automatic resubscribe after reconnect. If durable_queue is set, it will be used as the actual destination instead of destination.

Source

pub async fn unsubscribe(&self, subscription_id: &str) -> Result<(), ConnError>

Unsubscribe a previously created subscription by its local subscription id.

Source

pub async fn ack( &self, subscription_id: &str, message_id: &str, ) -> Result<(), ConnError>

Acknowledge a message previously received in client or client-individual ack modes.

STOMP ack semantics:

  • auto: server considers message delivered immediately; the client should not ack.
  • client: cumulative acknowledgements. ACKing message M for subscription S acknowledges all messages delivered to S up to and including M.
  • client-individual: only the named message is acknowledged.

Parameters

  • subscription_id: the local subscription id returned by Connection::subscribe. This disambiguates which subscription’s pending queue to advance for cumulative ACKs.
  • message_id: the message-id header value from the received MESSAGE frame to acknowledge.

Behavior

  • The pending queue for subscription_id is searched for message_id. If the subscription used client ack mode, all pending messages up to and including the matched message are removed. If the subscription used client-individual, only the matched message is removed.
  • An ACK frame is sent to the server with id=<message_id> and subscription=<subscription_id> headers.
Source

pub async fn nack( &self, subscription_id: &str, message_id: &str, ) -> Result<(), ConnError>

Negative-acknowledge a message (NACK).

Parameters

  • subscription_id: the local subscription id the message was delivered under.
  • message_id: the message-id header value from the received MESSAGE.

Behavior

  • Removes the message from the local pending queue (cumulatively if the subscription used client ack mode, otherwise only the single message). Sends a NACK frame to the server with id and subscription headers.
Source

pub async fn begin(&self, transaction_id: &str) -> Result<(), ConnError>

Begin a transaction.

Parameters

  • transaction_id: unique identifier for the transaction. The caller is responsible for ensuring uniqueness within the connection.

Behavior

  • Sends a BEGIN frame to the server with transaction:<transaction_id> header. Subsequent SEND, ACK, and NACK frames may include this transaction id to group them into the transaction. The transaction must be finalized with either commit or abort.
Source

pub async fn commit(&self, transaction_id: &str) -> Result<(), ConnError>

Commit a transaction.

Parameters

  • transaction_id: the transaction identifier previously passed to begin.

Behavior

  • Sends a COMMIT frame to the server with transaction:<transaction_id> header. All operations within the transaction are applied atomically.
Source

pub async fn abort(&self, transaction_id: &str) -> Result<(), ConnError>

Abort a transaction.

Parameters

  • transaction_id: the transaction identifier previously passed to begin.

Behavior

  • Sends an ABORT frame to the server with transaction:<transaction_id> header. All operations within the transaction are discarded.
Source

pub async fn next_frame(&self) -> Option<ReceivedFrame>

Receive the next frame from the server.

Returns Some(ReceivedFrame::Frame(..)) for normal frames (MESSAGE, etc.), Some(ReceivedFrame::Error(..)) for ERROR frames, or None if the connection has been closed.

§Example
use iridium_stomp::ReceivedFrame;

while let Some(received) = conn.next_frame().await {
    match received {
        ReceivedFrame::Frame(frame) => {
            println!("Got {}: {:?}", frame.command, frame.body);
        }
        ReceivedFrame::Error(err) => {
            eprintln!("Server error: {}", err);
            break;
        }
    }
}
Source

pub async fn close(self)

Trait Implementations§

Source§

impl Clone for Connection

Source§

fn clone(&self) -> Connection

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.