#![allow(clippy::unwrap_used)]
#![allow(clippy::expect_used)]
mod common;
use std::sync::Arc;
use std::sync::atomic::AtomicU32;
use std::time::Duration;
use async_trait::async_trait;
use common::mock_server::{
AlwaysErrorService, EventStreamService, ReconnectingStreamService, make_consumer_event,
make_fetch_response, make_keepalive_response, start_always_error_server,
start_event_stream_server, start_reconnecting_stream_server, start_userinfo_mock,
};
use force::auth::{AccessToken, Authenticator, TokenResponse};
use force::client::builder;
use force::error::Result as ForceResult;
use force_pubsub::{
BackoffConfig, PubSubConfig, PubSubError, PubSubEvent, PubSubHandler, ReconnectPolicy,
ReplayPreset,
};
use futures::StreamExt;
use wiremock::MockServer;
#[derive(Debug, Clone)]
struct TestAuth {
token: String,
instance_url: String,
}
impl TestAuth {
fn new(token: &str, instance_url: &str) -> Self {
Self {
token: token.to_string(),
instance_url: instance_url.to_string(),
}
}
}
#[async_trait]
impl Authenticator for TestAuth {
async fn authenticate(&self) -> ForceResult<AccessToken> {
Ok(AccessToken::from_response(TokenResponse {
access_token: self.token.clone(),
instance_url: self.instance_url.clone(),
token_type: "Bearer".to_string(),
issued_at: "1704067200000".to_string(),
signature: "test_sig".to_string(),
expires_in: Some(7200),
refresh_token: None,
}))
}
async fn refresh(&self) -> ForceResult<AccessToken> {
self.authenticate().await
}
}
async fn make_handler(
endpoint: String,
reconnect_policy: ReconnectPolicy,
) -> (PubSubHandler<TestAuth>, MockServer) {
let (userinfo_mock, instance_url) = start_userinfo_mock("00Dxx0000001gEREAY").await;
let auth = TestAuth::new("test-token", &instance_url);
let client = builder().authenticate(auth).build().await.unwrap();
let config = PubSubConfig {
endpoint,
reconnect_policy,
batch_size: 10,
};
let handler = PubSubHandler::connect(client.session(), config)
.await
.unwrap();
(handler, userinfo_mock)
}
const TEST_SCHEMA_ID: &str = "schema-events-test-001";
const TEST_SCHEMA_JSON: &str =
r#"{"type":"record","name":"TestEvent","fields":[{"name":"field","type":"string"}]}"#;
fn encode_field_value(value: &str) -> Vec<u8> {
let schema = apache_avro::Schema::parse_str(TEST_SCHEMA_JSON).unwrap();
force_pubsub::encode_avro(&schema, &serde_json::json!({"field": value})).unwrap()
}
#[tokio::test]
async fn test_subscribe_yields_events() {
let replay_id_bytes = vec![0xAA, 0xBB, 0xCC];
let payload = encode_field_value("hello");
let event = make_consumer_event(TEST_SCHEMA_ID, replay_id_bytes.clone(), payload);
let response = make_fetch_response(
"/event/Test__e",
vec![event],
replay_id_bytes.clone(), );
let service = EventStreamService {
topic_schema_id: TEST_SCHEMA_ID.to_string(),
schema_json: TEST_SCHEMA_JSON.to_string(),
responses: vec![response],
};
let url = start_event_stream_server(service).await;
let (handler, _userinfo) = make_handler(url, ReconnectPolicy::None).await;
let mut stream = handler
.subscribe("/event/Test__e", ReplayPreset::Latest)
.await
.unwrap();
let first = stream
.next()
.await
.expect("stream must yield at least one item");
let event_msg = match first.expect("item must be Ok") {
PubSubEvent::Event(msg) => msg,
other => panic!("expected PubSubEvent::Event, got {other:?}"),
};
assert_eq!(event_msg.schema_id, TEST_SCHEMA_ID);
assert_eq!(event_msg.replay_id.as_bytes(), replay_id_bytes.as_slice());
let field = event_msg.payload["field"]
.as_str()
.expect("payload.field must be a string");
assert_eq!(field, "hello");
}
#[tokio::test]
async fn test_subscribe_yields_keepalive() {
let keepalive_replay_id = vec![0x01, 0x02];
let response = make_keepalive_response("/event/Test__e", keepalive_replay_id);
let service = EventStreamService {
topic_schema_id: TEST_SCHEMA_ID.to_string(),
schema_json: TEST_SCHEMA_JSON.to_string(),
responses: vec![response],
};
let url = start_event_stream_server(service).await;
let (handler, _userinfo) = make_handler(url, ReconnectPolicy::None).await;
let mut stream = handler
.subscribe("/event/Test__e", ReplayPreset::Latest)
.await
.unwrap();
let first = stream
.next()
.await
.expect("stream must yield at least one item");
let event = first.expect("item must be Ok");
assert!(
matches!(event, PubSubEvent::KeepAlive),
"expected KeepAlive, got {event:?}"
);
}
#[tokio::test]
async fn test_subscribe_reconnects_on_disconnect() {
let replay_a = vec![0x01];
let replay_b = vec![0x02];
let replay_c = vec![0x03];
let event_a = make_consumer_event(TEST_SCHEMA_ID, replay_a.clone(), encode_field_value("a"));
let event_b = make_consumer_event(TEST_SCHEMA_ID, replay_b.clone(), encode_field_value("b"));
let resp1 = make_fetch_response(
"/event/Test__e",
vec![event_a, event_b],
replay_b.clone(), );
let event_c = make_consumer_event(TEST_SCHEMA_ID, replay_c.clone(), encode_field_value("c"));
let resp2 = make_fetch_response("/event/Test__e", vec![event_c], replay_c.clone());
let connection_count = Arc::new(AtomicU32::new(0));
let service = ReconnectingStreamService {
topic_schema_id: TEST_SCHEMA_ID.to_string(),
schema_json: TEST_SCHEMA_JSON.to_string(),
first_responses: vec![resp1],
second_responses: vec![resp2],
connection_count: Arc::clone(&connection_count),
};
let url = start_reconnecting_stream_server(service).await;
let (handler, _userinfo) = make_handler(
url,
ReconnectPolicy::Auto {
max_retries: 3,
backoff: BackoffConfig {
initial_delay: Duration::from_millis(10),
max_delay: Duration::from_millis(50),
multiplier: 1.0,
},
},
)
.await;
let mut stream = handler
.subscribe("/event/Test__e", ReplayPreset::Latest)
.await
.unwrap();
let mut items: Vec<_> = Vec::new();
loop {
match tokio::time::timeout(Duration::from_secs(5), stream.next()).await {
Ok(Some(item)) => {
let is_last = match &item {
Ok(PubSubEvent::Event(msg)) => msg.replay_id.as_bytes() == replay_c.as_slice(),
Err(_) => true,
_ => false,
};
items.push(item);
if is_last || items.len() >= 10 {
break;
}
}
Ok(None) => break,
Err(e) => panic!("stream timed out waiting for items: {e}"),
}
}
assert!(
items.len() >= 4,
"expected at least 4 items, got {}",
items.len()
);
let msg_a = match items[0].as_ref().expect("item 0 must be Ok") {
PubSubEvent::Event(m) => m,
other => panic!("item 0: expected Event, got {other:?}"),
};
assert_eq!(msg_a.payload["field"].as_str().unwrap(), "a");
let msg_b = match items[1].as_ref().expect("item 1 must be Ok") {
PubSubEvent::Event(m) => m,
other => panic!("item 1: expected Event, got {other:?}"),
};
assert_eq!(msg_b.payload["field"].as_str().unwrap(), "b");
match items[2].as_ref().expect("item 2 must be Ok") {
PubSubEvent::Reconnected { replay_id, attempt } => {
assert_eq!(
replay_id.as_bytes(),
replay_b.as_slice(),
"reconnect replay_id must be the last seen replay_id"
);
assert_eq!(*attempt, 1, "first reconnect attempt must be 1");
}
other => panic!("item 2: expected Reconnected, got {other:?}"),
}
let msg_c = match items[3].as_ref().expect("item 3 must be Ok") {
PubSubEvent::Event(m) => m,
other => panic!("item 3: expected Event, got {other:?}"),
};
assert_eq!(msg_c.payload["field"].as_str().unwrap(), "c");
assert_eq!(msg_c.replay_id.as_bytes(), replay_c.as_slice());
}
#[tokio::test]
async fn test_subscribe_exhausts_retries_returns_error() {
let service = AlwaysErrorService::default();
let url = start_always_error_server(service).await;
let (handler, _userinfo) = make_handler(
url,
ReconnectPolicy::Auto {
max_retries: 2,
backoff: BackoffConfig {
initial_delay: Duration::from_millis(10),
max_delay: Duration::from_millis(50),
multiplier: 1.0,
},
},
)
.await;
let mut stream = handler
.subscribe("/event/Test__e", ReplayPreset::Latest)
.await
.unwrap();
let mut reconnect_failed_seen = false;
let mut count = 0usize;
loop {
match tokio::time::timeout(Duration::from_secs(5), stream.next()).await {
Ok(Some(item)) => {
count += 1;
if let Err(PubSubError::ReconnectFailed { attempts, .. }) = &item {
assert!(*attempts > 0, "attempts must be > 0");
reconnect_failed_seen = true;
break;
}
assert!(count <= 20, "stream did not terminate after 20 items");
}
Ok(None) => break, Err(e) => panic!("stream timed out — possible infinite reconnect loop: {e}"),
}
}
assert!(
reconnect_failed_seen,
"expected PubSubError::ReconnectFailed to be yielded"
);
}