Reliably
A Rust client for Ably with full REST and Realtime (Pub/Sub) support.
Ably is the platform that powers synchronized digital experiences in realtime. For more information, see the Ably documentation.
This is a community-maintained fork of the original ably-rust SDK, which only supported the REST API. This fork adds the complete Realtime (WebSocket) layer: persistent connections, channels with attach/detach, publish/subscribe, presence, and connection/channel state machines with automatic recovery.
Features
REST API
- Publish messages (string, JSON, binary)
- Retrieve message history with pagination
- Presence: get current members, history
- Token authentication (request tokens, sign token requests)
- Application statistics
- Message encryption (AES-128/AES-256)
Realtime (Pub/Sub)
- Persistent WebSocket connection with automatic reconnection
- Connection state machine (initialized, connecting, connected, disconnected, suspended, closing, closed, failed)
- Channel state machine (initialized, attaching, attached, detaching, detached, suspended, failed)
- Publish with server ACK
- Subscribe with zero-message-loss delivery (unbounded per-subscriber channels)
- Multiple concurrent subscribers per channel, each with independent backpressure
- Presence: enter, leave, update, get members, subscribe to presence events
- Presence sync protocol with newness comparison and residual leave detection
- Automatic member re-enter on non-resumed re-attach (RTP17i)
- Connection resume with
connectionKeyandconnectionSerial - Connection state freshness check (
connectionStateTtl + maxIdleInterval) - Idle/heartbeat timeout detection (
maxIdleIntervalfrom server + grace period) - Channel auto-re-attach on reconnection
- Discontinuity detection (RTL18) for zero-message-loss applications
- Ping/pong RTT measurement
- JSON and MessagePack wire protocols
Design Decisions
- Server-side, API-key auth only. No token refresh, no
authCallback/authUrlon WebSocket, no browser transports. - Zero message loss by default. All subscriber delivery uses unbounded
mpscfan-out. Applications that need guaranteed delivery should also monitorchannel.on_discontinuity()and backfill from the history API when fired. - No polling. All state tracking uses
tokio::sync::watchfor race-free, immediate reads. State waits usewatch::Receiver::wait_for(), not sleep loops. - Idiomatic async Rust. Built on
tokiowithtokio-tungstenitefor WebSocket transport.
Installation
Add reliably and tokio to your Cargo.toml:
[]
= "0.3.0"
= { = "1", = ["full"] }
Using the Realtime API
Connect
use Realtime;
let client = new?;
// Wait for the connection to be established.
client.connection.wait_for_state.await?;
// When done:
client.close.await;
Or with manual connect:
use ;
let mut opts = new;
opts.auto_connect = false;
let client = from_options?;
client.connection.connect;
client.connection.wait_for_state.await?;
Publish and Subscribe
use ;
let client = new?;
let channel = client.channels.get.await;
let mut sub = channel.subscribe.await?; // auto-attaches
// Publish (waits for server ACK).
channel.publish.await?;
// Receive.
if let Some = sub.recv.await
Multiple Subscribers
Each subscriber gets its own independent unbounded stream. No message drops regardless of subscriber speed.
let mut sub1 = channel.subscribe.await?;
let mut sub2 = channel.subscribe.await?;
// Both sub1 and sub2 receive every message independently.
JSON and Binary Data
use json;
// JSON
channel.publish.await?;
// Binary
let bytes = from;
channel.publish.await?;
Channel State
use ChannelState;
let channel = client.channels.get.await;
// Synchronous (non-blocking) state check.
let state = channel.state; // ChannelState::Initialized
// Explicit attach/detach.
channel.attach.await?;
assert_eq!;
channel.detach.await?;
assert_eq!;
Connection State
use ConnectionState;
// Synchronous (non-blocking).
let state = client.connection.state;
// Wait with timeout.
client.connection.wait_for_state_with_timeout.await?;
// Ping.
let rtt = client.connection.ping.await?;
println!;
Presence
Presence requires a client_id set in ClientOptions:
use ;
let opts = new
.client_id?;
let client = from_options?;
let channel = client.channels.get.await;
channel.attach.await?;
// Enter presence.
channel.presence.enter.await?;
// Update presence data.
channel.presence.update.await?;
// Get all current members.
let members = channel.presence.get.await;
for member in &members
// Subscribe to presence events.
let mut presence_sub = channel.presence.subscribe;
if let Some = presence_sub.recv.await
// Leave.
channel.presence.leave.await?;
Discontinuity Detection (Zero Message Loss)
When a channel re-attaches without the server's RESUMED flag, messages may have been lost during the gap. Monitor on_discontinuity() and backfill from history:
let channel = client.channels.get.await;
let mut disc_rx = channel.on_discontinuity;
// Spawn a task to monitor for discontinuities.
spawn;
Using the REST API
Initialize a Client
// With an API key:
let client = new?;
// With an auth URL:
let auth_url = "https://example.com/auth".parse?;
let client = new
.auth_url
.rest?;
Publish a Message
let channel = client.channels.get;
// String
channel.publish.string.send.await?;
// JSON
channel.publish.json.send.await?;
// Binary
channel.publish.binary.send.await?;
Retrieve History
let mut pages = channel.history.pages;
while let Some = pages.next.await
Retrieve Presence
let mut pages = channel.presence.get.pages;
while let Some = pages.next.await
Encrypted Messages
let cipher_key = ;
let params = from;
let channel = client.channels.name.cipher.get;
channel.publish.string.send.await?;
Request a Token
let token = client
.auth
.request_token
.client_id
.capability
.send
.await?;
Application Statistics
let mut pages = client.stats.pages;
while let Some = pages.next.await
Architecture
src/
lib.rs # Public API, re-exports, test suite
options.rs # ClientOptions (shared by REST and Realtime)
rest.rs # REST client, channels, messages, encoding/decoding
auth.rs # Token auth, API key signing
realtime.rs # Realtime client entry point, router task
connection.rs # Connection state machine, ConnectionManager event loop
realtime_channel.rs # Channel state machine, pub/sub, discontinuity detection
realtime_presence.rs # PresenceMap, sync protocol, enter/leave/update API
protocol.rs # Wire format: actions, flags, ProtocolMessage, MessageQueue
transport.rs # WebSocket transport (tokio-tungstenite)
crypto.rs # AES-CBC encryption/decryption
http.rs # HTTP request builder, pagination
presence.rs # REST presence API
stats.rs # Application statistics types
error.rs # Error types and codes
What's Not Implemented
- Token auth refresh on WebSocket -- No
authCallback/authUrlre-auth via AUTH protocol messages. API key auth only. - Recovery keys -- No cross-process resume. Process restart triggers discontinuity; backfill from history.
- Comet/long-polling -- WebSocket only.
- Delta compression -- No vcdiff delta decoding.
- LiveObjects, Annotations, message interactions -- Not in scope.
- Filtered subscriptions -- Not implemented.
- Browser-specific concerns -- Server-side only.
Testing
Tests run against the Ably sandbox environment:
The test suite includes 59 integration tests (40 REST + 19 Realtime) and 13 doctests:
- REST: publish, history, presence, auth, tokens, stats, encryption, fallback hosts
- Realtime: connect, close, ping, channel attach/detach, publish/subscribe (string, JSON, binary), multiple subscribers, high-throughput ordered delivery, two-client cross-connection pub/sub, auto-attach on subscribe, channel state changes, presence enter/leave/update/get with multiple clients, discontinuity detection
License
Apache-2.0