use std::fmt::Write;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Context;
use tokio_stream::StreamExt;
use force::auth::ClientCredentials;
use force::client::ForceClientBuilder;
use force_pubsub::{
BackoffConfig, PubSubConfig, PubSubEvent, PubSubHandler, ReconnectPolicy, ReplayPreset,
};
fn required_env(name: &str) -> anyhow::Result<String> {
std::env::var(name).with_context(|| format!("{name} environment variable not set"))
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let client_id = required_env("SF_CLIENT_ID")?;
let client_secret = required_env("SF_CLIENT_SECRET")?;
let topic = std::env::var("SF_TOPIC_NAME")
.unwrap_or_else(|_| "/event/My_Platform_Event__e".to_string());
println!("Authenticating with Salesforce...");
let auth = ClientCredentials::new_production(client_id, client_secret);
let force_client = ForceClientBuilder::new()
.authenticate(auth)
.build()
.await
.context("Failed to build ForceClient")?;
println!("Authentication successful");
let config = PubSubConfig {
reconnect_policy: ReconnectPolicy::Auto {
max_retries: 3,
backoff: BackoffConfig {
initial_delay: Duration::from_secs(1),
max_delay: Duration::from_secs(30),
multiplier: 2.0,
},
},
..PubSubConfig::default()
};
println!("Connecting to Pub/Sub API...");
let session = force_client.session();
let handler = PubSubHandler::connect(Arc::clone(&session), config)
.await
.context("Failed to connect to Pub/Sub API")?;
println!("Connected");
println!("Subscribing to topic: {topic}");
let mut stream = handler
.subscribe(&topic, ReplayPreset::Latest)
.await
.context("Failed to subscribe")?;
drop(handler);
println!("Waiting for events (will stop after 10)...\n");
let mut event_count = 0usize;
while let Some(item) = stream.next().await {
match item {
Ok(PubSubEvent::Event(msg)) => {
event_count += 1;
let replay_hex =
msg.replay_id
.as_bytes()
.iter()
.fold(String::new(), |mut acc, b| {
let _ = write!(acc, "{b:02x}");
acc
});
println!(
"[{event_count:>2}] event_id={} replay_id=0x{replay_hex}",
msg.event_id
);
if event_count >= 10 {
println!("\nReceived 10 events — done.");
break;
}
}
Ok(PubSubEvent::Reconnected { attempt, .. }) => {
println!("Stream reconnected (attempt {attempt})");
}
Ok(PubSubEvent::KeepAlive) => {
print!(".");
}
Err(e) => {
eprintln!("\nStream error: {e}");
break;
}
}
}
Ok(())
}