reliably 0.3.4

A fully-featured real-time Rust client for Ably
Documentation
# Reliably

A Rust client for [Ably](https://www.ably.com) with full REST and Realtime (Pub/Sub) support.

_[Ably](https://ably.com) is the platform that powers synchronized digital experiences in realtime. For more information, see the [Ably documentation](https://ably.com/documentation)._

This is a community-maintained fork of the original [ably-rust](https://github.com/ably/ably-rust) SDK, which only supported the REST API. This fork adds the complete Realtime (WebSocket) layer: persistent connections, channels with attach/detach, publish/subscribe, presence, and connection/channel state machines with automatic recovery.

## Features

### REST API
- Publish messages (string, JSON, binary)
- Retrieve message history with pagination
- Presence: get current members, history
- Token authentication (request tokens, sign token requests)
- Application statistics
- Message encryption (AES-128/AES-256)

### Realtime (Pub/Sub)
- Persistent WebSocket connection with automatic reconnection
- Connection state machine (initialized, connecting, connected, disconnected, suspended, closing, closed, failed)
- Channel state machine (initialized, attaching, attached, detaching, detached, suspended, failed)
- Publish with server ACK
- Subscribe with zero-message-loss delivery (unbounded per-subscriber channels)
- Multiple concurrent subscribers per channel, each with independent backpressure
- Presence: enter, leave, update, get members, subscribe to presence events
- Presence sync protocol with newness comparison and residual leave detection
- Automatic member re-enter on non-resumed re-attach (RTP17i)
- Connection resume with `connectionKey` and `connectionSerial`
- Connection state freshness check (`connectionStateTtl + maxIdleInterval`)
- Idle/heartbeat timeout detection (`maxIdleInterval` from server + grace period)
- Channel auto-re-attach on reconnection
- Discontinuity detection (RTL18) for zero-message-loss applications
- Ping/pong RTT measurement
- JSON and MessagePack wire protocols

### Design Decisions
- **Server-side, API-key auth only.** No token refresh, no `authCallback`/`authUrl` on WebSocket, no browser transports.
- **Zero message loss by default.** All subscriber delivery uses unbounded `mpsc` fan-out. Applications that need guaranteed delivery should also monitor `channel.on_discontinuity()` and backfill from the history API when fired.
- **No polling.** All state tracking uses `tokio::sync::watch` for race-free, immediate reads. State waits use `watch::Receiver::wait_for()`, not sleep loops.
- **Idiomatic async Rust.** Built on `tokio` with `tokio-tungstenite` for WebSocket transport.

## Installation

Add `reliably` and `tokio` to your `Cargo.toml`:

```toml
[dependencies]
reliably = "0.3.0"
tokio = { version = "1", features = ["full"] }
```

## Using the Realtime API

### Connect

```rust
use reliably::Realtime;

let client = Realtime::new("your-api-key")?;

// Wait for the connection to be established.
client.connection.wait_for_state(ConnectionState::Connected).await?;

// When done:
client.close().await;
```

Or with manual connect:

```rust
use reliably::{Realtime, ClientOptions, ConnectionState};

let mut opts = ClientOptions::new("your-api-key");
opts.auto_connect = false;

let client = Realtime::from_options(opts)?;
client.connection.connect();
client.connection.wait_for_state(ConnectionState::Connected).await?;
```

### Publish and Subscribe

```rust
use reliably::{Realtime, Data};

let client = Realtime::new("your-api-key")?;

let channel = client.channels.get("my-channel").await;
let mut sub = channel.subscribe().await?; // auto-attaches

// Publish (waits for server ACK).
channel.publish(Some("greeting"), Data::String("hello".into())).await?;

// Receive.
if let Some(msg) = sub.recv().await {
    println!("name={:?} data={:?}", msg.name, msg.data);
}
```

### Multiple Subscribers

Each subscriber gets its own independent unbounded stream. No message drops regardless of subscriber speed.

```rust
let mut sub1 = channel.subscribe().await?;
let mut sub2 = channel.subscribe().await?;

// Both sub1 and sub2 receive every message independently.
```

### JSON and Binary Data

```rust
use serde_json::json;

// JSON
channel.publish(Some("event"), Data::JSON(json!({"key": "value"}))).await?;

// Binary
let bytes = serde_bytes::ByteBuf::from(vec![0x01, 0x02, 0x03]);
channel.publish(Some("binary"), Data::Binary(bytes)).await?;
```

### Channel State

```rust
use reliably::ChannelState;

let channel = client.channels.get("my-channel").await;

// Synchronous (non-blocking) state check.
let state = channel.state(); // ChannelState::Initialized

// Explicit attach/detach.
channel.attach().await?;
assert_eq!(channel.state(), ChannelState::Attached);

channel.detach().await?;
assert_eq!(channel.state(), ChannelState::Detached);
```

### Connection State

```rust
use reliably::ConnectionState;

// Synchronous (non-blocking).
let state = client.connection.state();

// Wait with timeout.
client.connection.wait_for_state_with_timeout(
    ConnectionState::Connected,
    std::time::Duration::from_secs(10),
).await?;

// Ping.
let rtt = client.connection.ping().await?;
println!("RTT: {:?}", rtt);
```

### Presence

Presence requires a `client_id` set in `ClientOptions`:

```rust
use reliably::{Realtime, ClientOptions, Data};

let opts = ClientOptions::new("your-api-key")
    .client_id("my-user-id")?;
let client = Realtime::from_options(opts)?;

let channel = client.channels.get("chat-room").await;
channel.attach().await?;

// Enter presence.
channel.presence.enter(Some(Data::String("online".into()))).await?;

// Update presence data.
channel.presence.update(Some(Data::String("away".into()))).await?;

// Get all current members.
let members = channel.presence.get().await;
for member in &members {
    println!("{}: {:?}", member.client_id, member.data);
}

// Subscribe to presence events.
let mut presence_sub = channel.presence.subscribe();
if let Some(event) = presence_sub.recv().await {
    println!("{} did {:?}", event.client_id, event.action);
}

// Leave.
channel.presence.leave(None).await?;
```

### Discontinuity Detection (Zero Message Loss)

When a channel re-attaches without the server's `RESUMED` flag, messages may have been lost during the gap. Monitor `on_discontinuity()` and backfill from history:

```rust
let channel = client.channels.get("critical-channel").await;
let mut disc_rx = channel.on_discontinuity();

// Spawn a task to monitor for discontinuities.
tokio::spawn(async move {
    while let Some(event) = disc_rx.recv().await {
        eprintln!("Discontinuity detected! resumed={}, reason={:?}", event.resumed, event.reason);
        // Backfill from history API here.
    }
});
```

## Using the REST API

### Initialize a Client

```rust
// With an API key:
let client = reliably::Rest::new("xVLyHw.SmDuMg:************")?;

// With an auth URL:
let auth_url = "https://example.com/auth".parse()?;
let client = reliably::ClientOptions::new()
    .auth_url(auth_url)
    .rest()?;
```

### Publish a Message

```rust
let channel = client.channels().get("test");

// String
channel.publish().string("a string").send().await?;

// JSON
#[derive(Serialize)]
struct Point { x: i32, y: i32 }
channel.publish().json(Point { x: 3, y: 4 }).send().await?;

// Binary
channel.publish().binary(vec![0x01, 0x02, 0x03]).send().await?;
```

### Retrieve History

```rust
let mut pages = channel.history().pages();
while let Some(Ok(page)) = pages.next().await {
    for msg in page.items().await? {
        println!("message data = {:?}", msg.data);
    }
}
```

### Retrieve Presence

```rust
let mut pages = channel.presence.get().pages();
while let Some(Ok(page)) = pages.next().await {
    for msg in page.items().await? {
        println!("presence data = {:?}", msg.data);
    }
}
```

### Encrypted Messages

```rust
let cipher_key = reliably::crypto::generate_random_key::<reliably::crypto::Key256>();
let params = reliably::rest::CipherParams::from(cipher_key);
let channel = client.channels().name("encrypted").cipher(params).get();

channel.publish().string("sensitive data is encrypted").send().await?;
```

### Request a Token

```rust
let token = client
    .auth()
    .request_token()
    .client_id("test@example.com")
    .capability(r#"{"example":["subscribe"]}"#)
    .send()
    .await?;
```

### Application Statistics

```rust
let mut pages = client.stats().pages();
while let Some(Ok(page)) = pages.next().await {
    for stats in page.items().await? {
        println!("stats = {:?}", stats);
    }
}
```

## Architecture

```
src/
  lib.rs                 # Public API, re-exports, test suite
  options.rs             # ClientOptions (shared by REST and Realtime)
  rest.rs                # REST client, channels, messages, encoding/decoding
  auth.rs                # Token auth, API key signing
  realtime.rs            # Realtime client entry point, router task
  connection.rs          # Connection state machine, ConnectionManager event loop
  realtime_channel.rs    # Channel state machine, pub/sub, discontinuity detection
  realtime_presence.rs   # PresenceMap, sync protocol, enter/leave/update API
  protocol.rs            # Wire format: actions, flags, ProtocolMessage, MessageQueue
  transport.rs           # WebSocket transport (tokio-tungstenite)
  crypto.rs              # AES-CBC encryption/decryption
  http.rs                # HTTP request builder, pagination
  presence.rs            # REST presence API
  stats.rs               # Application statistics types
  error.rs               # Error types and codes
```

## What's Not Implemented

- **Token auth refresh on WebSocket** -- No `authCallback`/`authUrl` re-auth via AUTH protocol messages. API key auth only.
- **Recovery keys** -- No cross-process resume. Process restart triggers discontinuity; backfill from history.
- **Comet/long-polling** -- WebSocket only.
- **Delta compression** -- No vcdiff delta decoding.
- **LiveObjects, Annotations, message interactions** -- Not in scope.
- **Filtered subscriptions** -- Not implemented.
- **Browser-specific concerns** -- Server-side only.

## Testing

Tests run against the Ably sandbox environment:

```sh
cargo test
```

The test suite includes 59 integration tests (40 REST + 19 Realtime) and 13 doctests:

- REST: publish, history, presence, auth, tokens, stats, encryption, fallback hosts
- Realtime: connect, close, ping, channel attach/detach, publish/subscribe (string, JSON, binary), multiple subscribers, high-throughput ordered delivery, two-client cross-connection pub/sub, auto-attach on subscribe, channel state changes, presence enter/leave/update/get with multiple clients, discontinuity detection

## License

Apache-2.0