Pusher HTTP Rust Client
A Rust client for interacting with the Pusher HTTP API, allowing you to publish events, authorize channels, authenticate users, and handle webhooks from your Rust applications.
Features
- Trigger events on public, private, and presence channels
- Trigger events to specific users (User Authentication)
- Trigger batch events for efficiency
- Support for end-to-end encrypted channels
- Authorize client subscriptions to private, presence, and encrypted channels
- Authenticate users for user-specific Pusher features
- Terminate user connections
- Validate and process incoming Pusher webhooks
- Configurable host, port, scheme (HTTP/HTTPS), and timeout
- Asynchronous API using
async/await
- Typed responses and errors
Installation
Add the following to your Cargo.toml
:
[dependencies]
pushers = { path = "./" }
serde_json = "1.0"
tokio = { version = "1", features = ["full"] }
Then run:
cargo build
Usage
1. Initialization
Configure and create a Pusher
client:
use pushers::{Config, Pusher, PusherError};
#[tokio::main]
async fn main() -> Result<(), PusherError> {
let mut config_builder = Config::builder() .app_id("YOUR_APP_ID")
.key("YOUR_APP_KEY")
.secret("YOUR_APP_SECRET")
.cluster("YOUR_CLUSTER") .timeout(std::time::Duration::from_secs(5));
let config = config_builder.build()?;
let pusher = Pusher::new(config)?;
Ok(())
}
You can also initialize from a Pusher URL:
use pushers::{Pusher, PusherError};
# #[tokio::main]
# async fn main() -> Result<(), PusherError> {
let pusher_from_url = Pusher::from_url(
"http://YOUR_APP_KEY:YOUR_APP_SECRET@api-YOUR_[CLUSTER.pusher.com/apps/YOUR_APP_ID](https://CLUSTER.pusher.com/apps/YOUR_APP_ID)", None, )?;
# Ok(())
# }
2. Triggering Events
use pushers::{Pusher, Channel, PusherError}; use serde_json::json;
# async fn doc_trigger_event(pusher: &Pusher) -> Result<(), PusherError> {
let channels = vec![Channel::from_string("my-channel")?]; let event_name = "new-message";
let data = json!({ "text": "Hello from Rust!" });
match pusher.trigger(&channels, event_name, data, None).await { Ok(response) => {
println!("Event triggered! Status: {}", response.status());
}
Err(e) => eprintln!("Error triggering event: {:?}", e),
}
# Ok(())
# }
Encrypted channels
If channels
contains a single encrypted channel (e.g. "private-encrypted-mychannel"
) and you’ve set the encryption_master_key
in the Config
, the library will encrypt data
automatically.
Excluding a recipient
use pushers::{Pusher, Channel, PusherError, events::TriggerParams}; use serde_json::json;
# async fn doc_trigger_event_exclude(pusher: &Pusher) -> Result<(), PusherError> {
let channels = vec![Channel::from_string("my-channel")?];
let event_name = "new-message";
let data = json!({ "text": "Hello from Rust!" });
let params = TriggerParams {
socket_id: Some("socket_id_to_exclude".to_string()),
info: None,
};
pusher
.trigger(&channels, event_name, data, Some(params)) .await?;
# Ok(())
# }
3. Triggering Batch Events
use pushers::{Pusher, PusherError, events::BatchEvent}; use serde_json::json;
# async fn doc_trigger_batch(pusher: &Pusher) -> Result<(), PusherError> {
let batch = vec![
BatchEvent {
name: "event1".to_string(),
channel: "channel-a".to_string(),
data: json!({ "value": 1 }).to_string(),
socket_id: None,
info: None,
},
BatchEvent {
name: "event2".to_string(),
channel: "channel-b".to_string(),
data: json!({ "value": 2 }).to_string(),
socket_id: None,
info: None,
},
];
match pusher.trigger_batch(batch).await {
Ok(response) => println!("Batch triggered! Status: {}", response.status()),
Err(e) => eprintln!("Error triggering batch: {:?}", e),
}
# Ok(())
# }
4. Authorizing Channels
Typically done in your HTTP handler when a client attempts to subscribe:
use pushers::{Pusher, Channel, PusherError}; use serde_json::json;
# fn doc_authorize_channel(pusher: &Pusher) -> Result<(), PusherError> {
let socket_id = "123.456";
let channel_name_str = "private-mychannel";
let channel = Channel::from_string(channel_name_str)?;
let presence_data = Some(json!({
"user_id": "unique_user_id",
"user_info": { "name": "Alice" }
}));
match pusher.authorize_channel(
socket_id,
&channel, presence_data.as_ref(),
) {
Ok(auth_signature) => {
println!("Auth success: {:?}", auth_signature);
}
Err(e) => eprintln!("Auth error: {:?}", e),
}
# Ok(())
# }
5. Authenticating Users
For server-to-user events:
use pushers::{Pusher, PusherError}; use serde_json::json;
# fn doc_authenticate_user(pusher: &Pusher) -> Result<(), PusherError> {
let socket_id = "789.012";
let user_data = json!({
"id": "user-bob", "name": "Bob The Builder",
"email": "bob@example.com"
});
match pusher.authenticate_user(socket_id, &user_data) {
Ok(user_auth) => {
println!("User auth success: {:?}", user_auth);
}
Err(e) => eprintln!("User auth error: {:?}", e),
}
# Ok(())
# }
6. Sending an Event to a User
use pushers::{Pusher, PusherError}; use serde_json::json;
# async fn doc_send_to_user(pusher: &Pusher) -> Result<(), PusherError> {
let user_id = "user-bob";
let event_name = "personal-notification";
let data = json!({ "alert": "Your report is ready!" });
match pusher.send_to_user(user_id, event_name, data).await { Ok(response) => println!("Sent to user! Status: {}", response.status()),
Err(e) => eprintln!("Error sending to user: {:?}", e),
}
# Ok(())
# }
7. Terminating User Connections
use pushers::{Pusher, PusherError};
# async fn doc_terminate_user(pusher: &Pusher) -> Result<(), PusherError> {
let user_id = "user-charlie";
match pusher.terminate_user_connections(user_id).await {
Ok(response) => println!("Terminate successful! Status: {}", response.status()),
Err(e) => eprintln!("Error terminating user: {:?}", e),
}
# Ok(())
# }
8. Handling Webhooks
use pushers::{Pusher, PusherError, webhook::WebhookEvent}; use std::collections::BTreeMap;
# fn doc_handle_webhook(pusher: &Pusher) -> Result<(), PusherError> {
let mut headers = BTreeMap::new();
headers.insert("X-Pusher-Key".to_string(), "YOUR_APP_KEY".to_string()); headers.insert("X-Pusher-Signature".to_string(), "RECEIVED_SIGNATURE".to_string());
headers.insert("Content-Type".to_string(), "application/json".to_string());
let body = r#"{
"time_ms": 1600000000000,
"events":[{"name":"channel_occupied","channel":"my-channel"}]
}"#;
let webhook = pusher.webhook(&headers, body);
if webhook.is_valid(None) {
println!("Webhook is valid!");
match webhook.get_events() {
Ok(events) => {
println!("Events: {:?}", events);
for event in events {
match event {
WebhookEvent::ChannelOccupied { channel } => {
println!("Channel occupied: {}", channel);
}
_ => {}
}
}
}
Err(e) => eprintln!("Error getting events: {:?}", e),
}
match webhook.get_time() {
Ok(time) => println!("Webhook time: {:?}", time),
Err(e) => eprintln!("Error getting time: {:?}", e),
}
} else {
eprintln!("Invalid webhook!");
}
# Ok(())
# }
9. Example: Integration with Axum
use axum::{
extract::{Json, State}, http::{HeaderMap, StatusCode},
response::IntoResponse,
routing::post,
Router,
};
use pushers::{Config, Pusher, PusherError, Channel}; use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::{collections::BTreeMap, sync::Arc};
#[derive(Clone)]
struct AppState {
pusher: Arc<Pusher>,
}
#[tokio::main]
async fn main() {
let config = Config::builder()
.app_id("YOUR_APP_ID")
.key("YOUR_APP_KEY")
.secret("YOUR_APP_SECRET")
.cluster("YOUR_CLUSTER")
.build()
.expect("Failed to build Pusher config");
let pusher = Arc::new(Pusher::new(config).expect("Failed to create Pusher client"));
let app_state = AppState { pusher };
let app = Router::new()
.route("/pusher/auth", post(pusher_auth_handler))
.route("/pusher/webhook", post(pusher_webhook_handler))
.with_state(app_state);
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
println!("Listening on {}", listener.local_addr().unwrap());
axum::serve(listener, app).await.unwrap();
}
#[derive(Deserialize)]
struct AuthRequest {
socket_id: String,
channel_name: String,
#[serde(alias = "channel_data")] presence_data: Option<Value>,
}
#[derive(Serialize)]
struct AuthResponseError {
error: String,
}
async fn pusher_auth_handler(
State(state): State<AppState>,
Json(payload): Json<AuthRequest>,
) -> impl IntoResponse {
let channel = match Channel::from_string(&payload.channel_name) {
Ok(ch) => ch,
Err(_) => {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "Invalid channel_name"})),
)
.into_response();
}
};
match state.pusher.authorize_channel(
&payload.socket_id,
&channel, payload.presence_data.as_ref(),
) {
Ok(auth_response) => (StatusCode::OK, Json(auth_response)).into_response(),
Err(e) => {
eprintln!("Auth error: {:?}", e); (
StatusCode::FORBIDDEN,
Json(json!({ "error": "Forbidden" })),
)
.into_response();
}
}
}
async fn pusher_webhook_handler(
State(state): State<AppState>,
headers: HeaderMap,
body: String, ) -> impl IntoResponse {
let mut hdrs_btreemap = BTreeMap::new();
for (k, v) in headers.iter() {
if let Ok(s) = v.to_str() {
hdrs_btreemap.insert(k.as_str().to_string(), s.to_string());
}
}
let webhook = state.pusher.webhook(&hdrs_btreemap, &body);
if webhook.is_valid(None) { println!("Webhook validated successfully.");
(StatusCode::OK, Json(json!({ "status": "ok" }))).into_response()
} else {
eprintln!("Webhook validation failed. Key: {:?}, Signature: {:?}, Body: {}", webhook.key(), webhook.signature(), webhook.body());
(
StatusCode::UNAUTHORIZED,
Json(json!({ "error": "Unauthorized" })),
)
.into_response()
}
}
Configuration Options
The Config
struct is used to configure the Pusher client. You can create it using Config::builder()
:
app_id(id: impl Into<String>)
key(key: impl Into<String>)
secret(secret: impl Into<String>)
cluster(name: impl AsRef<str>)
: Sets the cluster (e.g., "eu"
, "ap1"
). This correctly sets the host to api-{cluster}.pusher.com
.
host(host: impl Into<String>)
: Sets a custom host if not using a standard cluster.
use_tls(bool)
: Enable HTTPS (default true
). ConfigBuilder
sets scheme to "https" by default, this method can override it.
port(number: u16)
: Custom port.
timeout(duration: Duration)
: HTTP request timeout.
encryption_master_key(key: Vec<u8>)
: Sets the 32-byte encryption master key from raw bytes.
encryption_master_key_base64(key: impl AsRef<str>)
: Sets the 32-byte encryption master key from a base64 encoded string.
pool_max_idle_per_host(max: usize)
: Sets the maximum number of idle connections per host for the underlying HTTP client.
enable_retry(enable: bool)
: Enables or disables retry logic for failed requests (default true
).
max_retries(max: u32)
: Sets the maximum number of retries for failed requests if retry is enabled (default 3
).
Finally, call .build()
on the ConfigBuilder
to get a Result<Config, PusherError>
.
Error Handling
All fallible methods in this library return Result<T, PusherError>
.
The PusherError
enum has the following variants:
Request(RequestError)
: Errors related to making HTTP requests (e.g., network issues, non-success status codes). RequestError
contains message
, url
, status
, and body
.
Webhook(WebhookError)
: Errors during webhook processing, such as signature validation failure or invalid body. WebhookError
contains message
, content_type
, body
, and signature
.
Config { message: String }
: Errors due to invalid client configuration (e.g., missing app ID, invalid encryption key).
Validation { message: String }
: Errors from input validation (e.g., invalid channel name, socket ID, event name too long).
Encryption { message: String }
: Errors related to data encryption or decryption for end-to-end encrypted channels.
Json(serde_json::Error)
: Errors during JSON serialization or deserialization.
Http(reqwest::Error)
: Underlying errors from the reqwest
HTTP client library.
Contributing
Contributions are welcome! Please open issues for bugs or feature requests, or submit pull requests for improvements.
For major changes, please discuss these via an issue first to ensure alignment.
When contributing code, please ensure:
- Code is formatted with
cargo fmt
.
- Clippy lints are addressed (
cargo clippy --all-targets --all-features
).
- New functionality is covered by tests.
- Documentation is updated accordingly.
License
This project is licensed under the GNU Affero General Public License v3.0. See the LICENSE.md
file for details.