bevy_fleet 0.1.0

bevy swarm diagnostic, event, metric, and telemetry client
Documentation
use bevy::diagnostic::{DiagnosticsStore, FrameTimeDiagnosticsPlugin};
use bevy::prelude::*;
use bevy_fleet::{
    FleetConfig, FleetEvent, FleetEventBuffer, FleetPlugin, forward_serialized_events,
};
use bevy_fleet_test_server::{ReceivedTelemetry, TelemetryReceiver, start_mock_server};
use crossbeam_channel::RecvTimeoutError;
use serde::Serialize;
use std::time::{Duration, Instant};

#[test]
fn test_plugin_initialization() {
    let mut app = App::new();
    app.add_plugins(MinimalPlugins)
        .add_plugins(FrameTimeDiagnosticsPlugin::default())
        .add_plugins(FleetPlugin {
            config: FleetConfig {
                app_id: "test-app".to_string(),
                aggregation_url: "http://localhost:8080/test".to_string(),
                enabled: true,
                publish_interval_secs: 60,
                ..Default::default()
            },
        });

    assert!(app.world().get_resource::<FleetConfig>().is_some());
    assert!(app.world().get_resource::<Messages<FleetEvent>>().is_some());
    assert!(app.world().get_resource::<FleetEventBuffer>().is_some());
    assert!(app.world().get_resource::<DiagnosticsStore>().is_some());
}

#[test]
fn test_plugin_disabled() {
    let mut app = App::new();
    app.add_plugins(MinimalPlugins).add_plugins(FleetPlugin {
        config: FleetConfig {
            app_id: "test-app".to_string(),
            aggregation_url: "http://localhost:8080/test".to_string(),
            enabled: false,
            publish_interval_secs: 60,
            ..Default::default()
        },
    });

    assert!(app.world().get_resource::<Messages<FleetEvent>>().is_none());
}

#[test]
fn test_telemetry_payload_reaches_test_server() {
    let (server_handle, receiver) =
        start_mock_server(Some(0)).expect("failed to start test server");
    let mut app = App::new();
    app.add_plugins(MinimalPlugins)
        .add_plugins(FrameTimeDiagnosticsPlugin::default())
        .add_plugins(FleetPlugin {
            config: FleetConfig {
                app_id: "integration-app".to_string(),
                aggregation_url: server_handle.telemetry_url(),
                publish_interval_secs: 0,
                ..FleetConfig::default()
            },
        });

    app.update();
    let _ = wait_for_payload(&receiver, Duration::from_secs(5));

    app.world_mut().write_message(
        FleetEvent::new("integration_event").with_data("source", "integration_test"),
    );

    app.update();

    let payload = wait_for_payload_matching(&receiver, Duration::from_secs(5), |payload| {
        payload
            .events
            .iter()
            .any(|event| event.event_type == "integration_event")
    });

    assert_eq!(payload.app_id, "integration-app");
    assert!(
        payload
            .events
            .iter()
            .any(|event| event.event_type == "integration_event"),
        "expected integration_event to be exported"
    );

    server_handle.shutdown();
}

#[derive(Clone, Event, Serialize, Message)]
struct ForwardedTestEvent {
    value: String,
}

#[test]
fn test_forwarded_events_are_serialized() {
    let (server_handle, receiver) =
        start_mock_server(Some(0)).expect("failed to start test server");

    let mut app = App::new();
    app.add_plugins(MinimalPlugins)
        .add_plugins(FrameTimeDiagnosticsPlugin::default())
        .add_plugins(FleetPlugin {
            config: FleetConfig {
                app_id: "integration-app".to_string(),
                aggregation_url: server_handle.telemetry_url(),
                publish_interval_secs: 0,
                ..FleetConfig::default()
            },
        })
        .add_message::<ForwardedTestEvent>()
        .add_systems(Update, forward_serialized_events::<ForwardedTestEvent>);

    app.world_mut().write_message(ForwardedTestEvent {
        value: "hello".to_string(),
    });

    app.update();
    app.update();

    let payload = wait_for_payload_matching(&receiver, Duration::from_secs(5), |payload| {
        payload
            .events
            .iter()
            .any(|event| event.event_type == std::any::type_name::<ForwardedTestEvent>())
    });
    assert_eq!(payload.app_id, "integration-app");
    let event = payload
        .events
        .iter()
        .find(|event| event.event_type == std::any::type_name::<ForwardedTestEvent>())
        .expect("expected forwarded event to be exported");
    assert_eq!(event.data.get("value").map(String::as_str), Some("hello"));

    server_handle.shutdown();
}

fn wait_for_payload(receiver: &TelemetryReceiver, timeout: Duration) -> ReceivedTelemetry {
    wait_for_payload_matching(receiver, timeout, |_| true)
}

fn wait_for_payload_matching<F>(
    receiver: &TelemetryReceiver,
    timeout: Duration,
    mut predicate: F,
) -> ReceivedTelemetry
where
    F: FnMut(&ReceivedTelemetry) -> bool,
{
    let deadline = Instant::now() + timeout;
    loop {
        let now = Instant::now();
        if now >= deadline {
            panic!("timed out waiting for telemetry payload");
        }

        let remaining = deadline - now;
        let slice = remaining.min(Duration::from_millis(250));
        match receiver.recv_timeout(slice) {
            Ok(payload) => {
                if predicate(&payload) {
                    return payload;
                }
            }
            Err(RecvTimeoutError::Timeout) => continue,
            Err(RecvTimeoutError::Disconnected) => {
                panic!("telemetry channel closed before matching payload arrived")
            }
        }
    }
}