use bevy::diagnostic::{DiagnosticsStore, FrameTimeDiagnosticsPlugin};
use bevy::prelude::*;
use bevy_fleet::{
FleetConfig, FleetEvent, FleetEventBuffer, FleetPlugin, forward_serialized_events,
};
use bevy_fleet_test_server::{ReceivedTelemetry, TelemetryReceiver, start_mock_server};
use crossbeam_channel::RecvTimeoutError;
use serde::Serialize;
use std::time::{Duration, Instant};
#[test]
fn test_plugin_initialization() {
let mut app = App::new();
app.add_plugins(MinimalPlugins)
.add_plugins(FrameTimeDiagnosticsPlugin::default())
.add_plugins(FleetPlugin {
config: FleetConfig {
app_id: "test-app".to_string(),
aggregation_url: "http://localhost:8080/test".to_string(),
enabled: true,
publish_interval_secs: 60,
..Default::default()
},
});
assert!(app.world().get_resource::<FleetConfig>().is_some());
assert!(app.world().get_resource::<Messages<FleetEvent>>().is_some());
assert!(app.world().get_resource::<FleetEventBuffer>().is_some());
assert!(app.world().get_resource::<DiagnosticsStore>().is_some());
}
#[test]
fn test_plugin_disabled() {
let mut app = App::new();
app.add_plugins(MinimalPlugins).add_plugins(FleetPlugin {
config: FleetConfig {
app_id: "test-app".to_string(),
aggregation_url: "http://localhost:8080/test".to_string(),
enabled: false,
publish_interval_secs: 60,
..Default::default()
},
});
assert!(app.world().get_resource::<Messages<FleetEvent>>().is_none());
}
#[test]
fn test_telemetry_payload_reaches_test_server() {
let (server_handle, receiver) =
start_mock_server(Some(0)).expect("failed to start test server");
let mut app = App::new();
app.add_plugins(MinimalPlugins)
.add_plugins(FrameTimeDiagnosticsPlugin::default())
.add_plugins(FleetPlugin {
config: FleetConfig {
app_id: "integration-app".to_string(),
aggregation_url: server_handle.telemetry_url(),
publish_interval_secs: 0,
..FleetConfig::default()
},
});
app.update();
let _ = wait_for_payload(&receiver, Duration::from_secs(5));
app.world_mut().write_message(
FleetEvent::new("integration_event").with_data("source", "integration_test"),
);
app.update();
let payload = wait_for_payload_matching(&receiver, Duration::from_secs(5), |payload| {
payload
.events
.iter()
.any(|event| event.event_type == "integration_event")
});
assert_eq!(payload.app_id, "integration-app");
assert!(
payload
.events
.iter()
.any(|event| event.event_type == "integration_event"),
"expected integration_event to be exported"
);
server_handle.shutdown();
}
#[derive(Clone, Event, Serialize, Message)]
struct ForwardedTestEvent {
value: String,
}
#[test]
fn test_forwarded_events_are_serialized() {
let (server_handle, receiver) =
start_mock_server(Some(0)).expect("failed to start test server");
let mut app = App::new();
app.add_plugins(MinimalPlugins)
.add_plugins(FrameTimeDiagnosticsPlugin::default())
.add_plugins(FleetPlugin {
config: FleetConfig {
app_id: "integration-app".to_string(),
aggregation_url: server_handle.telemetry_url(),
publish_interval_secs: 0,
..FleetConfig::default()
},
})
.add_message::<ForwardedTestEvent>()
.add_systems(Update, forward_serialized_events::<ForwardedTestEvent>);
app.world_mut().write_message(ForwardedTestEvent {
value: "hello".to_string(),
});
app.update();
app.update();
let payload = wait_for_payload_matching(&receiver, Duration::from_secs(5), |payload| {
payload
.events
.iter()
.any(|event| event.event_type == std::any::type_name::<ForwardedTestEvent>())
});
assert_eq!(payload.app_id, "integration-app");
let event = payload
.events
.iter()
.find(|event| event.event_type == std::any::type_name::<ForwardedTestEvent>())
.expect("expected forwarded event to be exported");
assert_eq!(event.data.get("value").map(String::as_str), Some("hello"));
server_handle.shutdown();
}
fn wait_for_payload(receiver: &TelemetryReceiver, timeout: Duration) -> ReceivedTelemetry {
wait_for_payload_matching(receiver, timeout, |_| true)
}
fn wait_for_payload_matching<F>(
receiver: &TelemetryReceiver,
timeout: Duration,
mut predicate: F,
) -> ReceivedTelemetry
where
F: FnMut(&ReceivedTelemetry) -> bool,
{
let deadline = Instant::now() + timeout;
loop {
let now = Instant::now();
if now >= deadline {
panic!("timed out waiting for telemetry payload");
}
let remaining = deadline - now;
let slice = remaining.min(Duration::from_millis(250));
match receiver.recv_timeout(slice) {
Ok(payload) => {
if predicate(&payload) {
return payload;
}
}
Err(RecvTimeoutError::Timeout) => continue,
Err(RecvTimeoutError::Disconnected) => {
panic!("telemetry channel closed before matching payload arrived")
}
}
}
}