#![allow(clippy::unwrap_used)]
#![allow(clippy::expect_used)]
#![allow(clippy::significant_drop_tightening)]
mod common;
use async_trait::async_trait;
use common::mock_server::{
EchoPublishStreamService, start_echo_stream_server, start_userinfo_mock,
};
use force::auth::{AccessToken, Authenticator, TokenResponse};
use force::client::builder;
use force::error::Result as ForceResult;
use force_pubsub::{PubSubConfig, PubSubHandler, ReconnectPolicy};
use futures::StreamExt as _;
use serde::{Deserialize, Serialize};
use wiremock::MockServer;
const SCHEMA_ID: &str = "schema-test-001";
#[derive(Debug, Clone, Serialize, Deserialize)]
struct TestEvent {
id: String,
}
#[derive(Debug, Clone)]
struct TestAuth {
token: String,
instance_url: String,
}
impl TestAuth {
fn new(token: &str, instance_url: &str) -> Self {
Self {
token: token.to_string(),
instance_url: instance_url.to_string(),
}
}
}
#[async_trait]
impl Authenticator for TestAuth {
async fn authenticate(&self) -> ForceResult<AccessToken> {
Ok(AccessToken::from_response(TokenResponse {
access_token: self.token.clone(),
instance_url: self.instance_url.clone(),
token_type: "Bearer".to_string(),
issued_at: "1704067200000".to_string(),
signature: "test_sig".to_string(),
expires_in: Some(7200),
refresh_token: None,
}))
}
async fn refresh(&self) -> ForceResult<AccessToken> {
self.authenticate().await
}
}
async fn make_handler(endpoint: String) -> (PubSubHandler<TestAuth>, MockServer) {
let (userinfo_mock, instance_url) = start_userinfo_mock("00Dxx0000001gEREAY").await;
let auth = TestAuth::new("test-token", &instance_url);
let client = builder().authenticate(auth).build().await.unwrap();
let config = PubSubConfig {
endpoint,
reconnect_policy: ReconnectPolicy::None,
..PubSubConfig::default()
};
let handler = PubSubHandler::connect(client.session(), config)
.await
.unwrap();
(handler, userinfo_mock)
}
#[tokio::test]
async fn test_publish_stream_sends_events_and_receives_responses() {
let url = start_echo_stream_server(EchoPublishStreamService::default()).await;
let (handler, _userinfo) = make_handler(url).await;
let mut sink = handler
.publish_stream::<TestEvent>("/event/Test__e")
.await
.unwrap();
sink.send(
SCHEMA_ID,
vec![TestEvent {
id: "evt-001".to_string(),
}],
)
.await
.unwrap();
sink.send(
SCHEMA_ID,
vec![
TestEvent {
id: "evt-002".to_string(),
},
TestEvent {
id: "evt-003".to_string(),
},
],
)
.await
.unwrap();
sink.close().await.unwrap();
}
#[tokio::test]
async fn test_publish_stream_close_drains_responses() {
let url = start_echo_stream_server(EchoPublishStreamService::default()).await;
let (handler, _userinfo) = make_handler(url).await;
let mut sink = handler
.publish_stream::<TestEvent>("/event/Test__e")
.await
.unwrap();
sink.send(
SCHEMA_ID,
vec![TestEvent {
id: "close-test".to_string(),
}],
)
.await
.unwrap();
let result = sink.close().await;
assert!(
result.is_ok(),
"close() should drain responses without error"
);
}
#[tokio::test]
async fn test_publish_stream_response_has_results() {
let url = start_echo_stream_server(EchoPublishStreamService::default()).await;
let (handler, _userinfo) = make_handler(url).await;
let mut sink = handler
.publish_stream::<TestEvent>("/event/Test__e")
.await
.unwrap();
sink.send(
SCHEMA_ID,
vec![
TestEvent {
id: "r1".to_string(),
},
TestEvent {
id: "r2".to_string(),
},
],
)
.await
.unwrap();
let first = sink
.responses()
.next()
.await
.expect("should have at least one response");
let resp = first.expect("response should be Ok");
assert_eq!(
resp.results.len(),
2,
"echo server returns one result per event"
);
assert!(resp.all_succeeded(), "all results should succeed");
}
#[tokio::test]
async fn test_publish_stream_send_encodes_avro_and_populates_cache() {
let url = start_echo_stream_server(EchoPublishStreamService::default()).await;
let (handler, _userinfo) = make_handler(url).await;
assert!(handler.schema_cache.is_empty());
let mut sink = handler
.publish_stream::<TestEvent>("/event/Test__e")
.await
.unwrap();
sink.send(
SCHEMA_ID,
vec![TestEvent {
id: "avro-test".to_string(),
}],
)
.await
.unwrap();
assert!(
handler.schema_cache.get(SCHEMA_ID).is_some(),
"schema should be cached after first send"
);
sink.close().await.unwrap();
}
#[tokio::test]
async fn test_publish_stream_empty_batch_succeeds() {
let url = start_echo_stream_server(EchoPublishStreamService::default()).await;
let (handler, _userinfo) = make_handler(url).await;
let mut sink = handler
.publish_stream::<TestEvent>("/event/Test__e")
.await
.unwrap();
let result = sink.send(SCHEMA_ID, vec![]).await;
assert!(result.is_ok(), "empty batch send should succeed");
sink.close().await.unwrap();
}
#[test]
fn test_publish_sink_is_reexported() {
let _: Option<force_pubsub::PublishSink<TestEvent>> = None;
}