rlstatsapi 0.1.1

Rocket League Stats API TCP client, parser, and optional Python bindings
Documentation
use std::path::PathBuf;

use pyo3::exceptions::{PyRuntimeError, PyValueError};
use pyo3::prelude::*;
use pyo3::types::PyModule;
use serde_json::json;
use tokio::runtime::Runtime;

use crate::{
    ClientOptions, EventFilter, EventKind, MatchSignal, RocketLeagueStatsClient,
    parse_stats_event, stats_event_name, stats_event_to_value, to_match_signal,
    winner_team_num,
};

#[pyclass(name = "RocketLeagueStatsClient", unsendable)]
pub struct PyRocketLeagueStatsClient {
    options: ClientOptions,
    runtime: Runtime,
    client: Option<RocketLeagueStatsClient>,
}

#[pymethods]
impl PyRocketLeagueStatsClient {
    #[new]
    #[pyo3(signature = (
        host = "127.0.0.1".to_string(),
        port = 49123,
        ini_path = None,
        auto_enable_packet_rate = true,
        packet_send_rate = 60.0,
        set_packet_rate_only_when_zero = true
    ))]
    fn new(
        host: String,
        port: u16,
        ini_path: Option<String>,
        auto_enable_packet_rate: bool,
        packet_send_rate: f32,
        set_packet_rate_only_when_zero: bool,
    ) -> PyResult<Self> {
        let mut options = ClientOptions::default();
        options.host = host;
        options.port_override = Some(port);
        options.auto_enable_packet_rate = auto_enable_packet_rate;
        options.packet_send_rate = packet_send_rate;
        options.set_packet_rate_only_when_zero = set_packet_rate_only_when_zero;
        options.stats_api_ini_path = ini_path.map(PathBuf::from);

        let runtime = Runtime::new().map_err(to_runtime_err)?;

        Ok(Self {
            options,
            runtime,
            client: None,
        })
    }

    fn connect(&mut self) -> PyResult<()> {
        let client = self
            .runtime
            .block_on(RocketLeagueStatsClient::connect(self.options.clone()))
            .map_err(to_runtime_err)?;

        self.client = Some(client);
        Ok(())
    }

    fn reconnect(&mut self) -> PyResult<()> {
        if self.client.is_none() {
            return self.connect();
        }

        let client = self
            .client
            .as_mut()
            .ok_or_else(|| PyRuntimeError::new_err("client not connected"))?;
        self.runtime
            .block_on(client.reconnect())
            .map_err(to_runtime_err)
    }

    fn next_event_json(&mut self) -> PyResult<Option<String>> {
        self.ensure_connected()?;

        let client = self
            .client
            .as_mut()
            .ok_or_else(|| PyRuntimeError::new_err("client not connected"))?;

        let event = self
            .runtime
            .block_on(client.next_event())
            .map_err(to_runtime_err)?;

        match event {
            Some(event) => {
                let value = stats_event_to_value(&event).map_err(to_runtime_err)?;
                let serialized =
                    serde_json::to_string(&value).map_err(to_runtime_err)?;
                Ok(Some(serialized))
            }
            None => Ok(None),
        }
    }

    #[pyo3(signature = (
        event_types = None,
        player_name = None,
        player_primary_id = None,
        team_num = None,
        match_guid = None
    ))]
    fn next_filtered_event_json(
        &mut self,
        event_types: Option<Vec<String>>,
        player_name: Option<String>,
        player_primary_id: Option<String>,
        team_num: Option<i64>,
        match_guid: Option<String>,
    ) -> PyResult<Option<String>> {
        self.ensure_connected()?;

        let filter = build_filter(
            event_types,
            player_name,
            player_primary_id,
            team_num,
            match_guid,
        )?;

        let client = self
            .client
            .as_mut()
            .ok_or_else(|| PyRuntimeError::new_err("client not connected"))?;

        let event = self
            .runtime
            .block_on(client.next_filtered_event(&filter))
            .map_err(to_runtime_err)?;

        match event {
            Some(event) => {
                let value = stats_event_to_value(&event).map_err(to_runtime_err)?;
                let serialized =
                    serde_json::to_string(&value).map_err(to_runtime_err)?;
                Ok(Some(serialized))
            }
            None => Ok(None),
        }
    }

    fn close(&mut self) -> PyResult<()> {
        if let Some(client) = self.client.take() {
            self.runtime
                .block_on(client.close())
                .map_err(to_runtime_err)?;
        }

        Ok(())
    }

    fn socket_address(&self) -> String {
        let host = &self.options.host;
        let port = self.options.port_override.unwrap_or(49123);
        format!("{host}:{port}")
    }
}

impl PyRocketLeagueStatsClient {
    fn ensure_connected(&mut self) -> PyResult<()> {
        if self.client.is_none() {
            self.connect()?;
        }

        Ok(())
    }
}

#[pyfunction]
fn parse_event_json(raw: &str) -> PyResult<String> {
    let event = parse_stats_event(raw).map_err(to_value_err)?;
    let value = stats_event_to_value(&event).map_err(to_value_err)?;
    serde_json::to_string(&value).map_err(to_value_err)
}

#[pyfunction]
fn event_name(raw: &str) -> PyResult<String> {
    let event = parse_stats_event(raw).map_err(to_value_err)?;
    Ok(stats_event_name(&event).to_string())
}

#[pyfunction]
fn list_event_kinds() -> Vec<String> {
    [
        "update_state",
        "ball_hit",
        "clock_updated_seconds",
        "countdown_begin",
        "crossbar_hit",
        "goal_replay_end",
        "goal_replay_start",
        "goal_replay_will_end",
        "goal_scored",
        "match_created",
        "match_initialized",
        "match_destroyed",
        "match_ended",
        "match_paused",
        "match_unpaused",
        "podium_start",
        "replay_created",
        "round_started",
        "statfeed_event",
        "unknown",
    ]
    .iter()
    .map(|value| (*value).to_string())
    .collect()
}

#[pyfunction]
#[pyo3(signature = (
    raw,
    event_types = None,
    player_name = None,
    player_primary_id = None,
    team_num = None,
    match_guid = None
))]
fn event_matches(
    raw: &str,
    event_types: Option<Vec<String>>,
    player_name: Option<String>,
    player_primary_id: Option<String>,
    team_num: Option<i64>,
    match_guid: Option<String>,
) -> PyResult<bool> {
    let event = parse_stats_event(raw).map_err(to_value_err)?;
    let filter = build_filter(
        event_types,
        player_name,
        player_primary_id,
        team_num,
        match_guid,
    )?;
    Ok(filter.matches(&event))
}

#[pyfunction]
#[pyo3(signature = (
    raw,
    event_types = None,
    player_name = None,
    player_primary_id = None,
    team_num = None,
    match_guid = None
))]
fn filter_event_json(
    raw: &str,
    event_types: Option<Vec<String>>,
    player_name: Option<String>,
    player_primary_id: Option<String>,
    team_num: Option<i64>,
    match_guid: Option<String>,
) -> PyResult<Option<String>> {
    let event = parse_stats_event(raw).map_err(to_value_err)?;
    let filter = build_filter(
        event_types,
        player_name,
        player_primary_id,
        team_num,
        match_guid,
    )?;

    if !filter.matches(&event) {
        return Ok(None);
    }

    let value = stats_event_to_value(&event).map_err(to_value_err)?;
    let serialized = serde_json::to_string(&value).map_err(to_value_err)?;
    Ok(Some(serialized))
}

#[pyfunction]
fn winner_team(raw: &str) -> PyResult<Option<i64>> {
    let event = parse_stats_event(raw).map_err(to_value_err)?;
    Ok(winner_team_num(&event))
}

#[pyfunction]
fn match_signal_json(raw: &str) -> PyResult<Option<String>> {
    let event = parse_stats_event(raw).map_err(to_value_err)?;

    let signal = match to_match_signal(&event) {
        Some(MatchSignal::GoalScored(data)) => {
            json!({"signal": "goal_scored", "data": data})
        }
        Some(MatchSignal::MatchConcluded(data)) => {
            json!({"signal": "match_concluded", "data": data})
        }
        None => return Ok(None),
    };

    let serialized = serde_json::to_string(&signal).map_err(to_value_err)?;
    Ok(Some(serialized))
}

#[pymodule]
fn rlstatsapi(m: &Bound<'_, PyModule>) -> PyResult<()> {
    m.add_class::<PyRocketLeagueStatsClient>()?;
    m.add_function(wrap_pyfunction!(parse_event_json, m)?)?;
    m.add_function(wrap_pyfunction!(event_name, m)?)?;
    m.add_function(wrap_pyfunction!(list_event_kinds, m)?)?;
    m.add_function(wrap_pyfunction!(event_matches, m)?)?;
    m.add_function(wrap_pyfunction!(filter_event_json, m)?)?;
    m.add_function(wrap_pyfunction!(winner_team, m)?)?;
    m.add_function(wrap_pyfunction!(match_signal_json, m)?)?;
    Ok(())
}

fn build_filter(
    event_types: Option<Vec<String>>,
    player_name: Option<String>,
    player_primary_id: Option<String>,
    team_num: Option<i64>,
    match_guid: Option<String>,
) -> PyResult<EventFilter> {
    let mut filter = EventFilter::new();

    if let Some(tokens) = event_types {
        let mut kinds = Vec::new();
        for token in tokens {
            for part in token.split(',') {
                let normalized = part.trim().to_ascii_lowercase();
                if normalized.is_empty() || normalized == "all" {
                    continue;
                }

                let kind = parse_event_kind(&normalized).ok_or_else(|| {
                    PyValueError::new_err(format!(
                        "unknown event type '{normalized}'"
                    ))
                })?;
                kinds.push(kind);
            }
        }

        if !kinds.is_empty() {
            filter = filter.include_kinds(kinds);
        }
    }

    if let Some(name) = player_name {
        filter = filter.with_player_name(name);
    }
    if let Some(primary_id) = player_primary_id {
        filter = filter.with_player_primary_id(primary_id);
    }
    if let Some(team_num) = team_num {
        filter = filter.with_team_num(team_num);
    }
    if let Some(match_guid) = match_guid {
        filter = filter.with_match_guid(match_guid);
    }

    Ok(filter)
}

fn parse_event_kind(token: &str) -> Option<EventKind> {
    match token {
        "update_state" | "update" | "state" | "tick" => Some(EventKind::UpdateState),
        "ball_hit" | "ballhit" => Some(EventKind::BallHit),
        "clock_updated_seconds" | "clock" => Some(EventKind::ClockUpdatedSeconds),
        "countdown_begin" | "countdown" => Some(EventKind::CountdownBegin),
        "crossbar_hit" | "crossbar" => Some(EventKind::CrossbarHit),
        "goal_replay_end" => Some(EventKind::GoalReplayEnd),
        "goal_replay_start" => Some(EventKind::GoalReplayStart),
        "goal_replay_will_end" => Some(EventKind::GoalReplayWillEnd),
        "goal_scored" | "goal" => Some(EventKind::GoalScored),
        "match_created" => Some(EventKind::MatchCreated),
        "match_initialized" => Some(EventKind::MatchInitialized),
        "match_destroyed" => Some(EventKind::MatchDestroyed),
        "match_ended" | "ended" => Some(EventKind::MatchEnded),
        "match_paused" | "paused" | "pause" => Some(EventKind::MatchPaused),
        "match_unpaused" | "unpaused" | "unpause" => Some(EventKind::MatchUnpaused),
        "podium_start" => Some(EventKind::PodiumStart),
        "replay_created" => Some(EventKind::ReplayCreated),
        "round_started" | "round" => Some(EventKind::RoundStarted),
        "statfeed_event" | "statfeed" => Some(EventKind::StatfeedEvent),
        "unknown" => Some(EventKind::Unknown),
        _ => None,
    }
}

fn to_runtime_err<E: std::fmt::Display>(error: E) -> PyErr {
    PyRuntimeError::new_err(error.to_string())
}

fn to_value_err<E: std::fmt::Display>(error: E) -> PyErr {
    PyValueError::new_err(error.to_string())
}