bevy_realtime/
lib.rs

1#![allow(clippy::type_complexity)]
2#![allow(clippy::too_many_arguments)]
3
4pub mod channel;
5pub mod client;
6pub mod message;
7pub mod presence;
8
9use std::{thread::sleep, time::Duration};
10
11use bevy::prelude::*;
12use bevy_crossbeam_event::{CrossbeamEventApp, CrossbeamEventSender};
13use channel::{
14    BroadcastCallbackEvent, ChannelBuilder, ChannelManager, ChannelStateCallbackEvent,
15    PostgresChangesCallbackEvent, PresenceStateCallbackEvent,
16};
17use client::{
18    ChannelCallbackEvent, ClientBuilder, ClientManager, ConnectResultCallbackEvent,
19    ConnectionState, NextMessageError,
20};
21use presence::PresenceCallbackEvent;
22
23use crate::presence::{presence_untrack, update_presence_track};
24
25#[derive(Resource, Deref)]
26pub struct Client(pub ClientManager);
27
28#[derive(Component, Deref, DerefMut)]
29pub struct BevyChannelBuilder(pub ChannelBuilder);
30
31#[derive(Component, Deref, DerefMut)]
32pub struct Channel(pub ChannelManager);
33
34#[derive(Component)]
35pub struct BuildChannel;
36
37fn build_channels(
38    mut commands: Commands,
39    mut q: Query<(Entity, &mut BevyChannelBuilder), With<BuildChannel>>,
40    client: Res<Client>,
41    presence_state_callback_event_sender: Res<CrossbeamEventSender<PresenceStateCallbackEvent>>,
42    channel_state_callback_event_sender: Res<CrossbeamEventSender<ChannelStateCallbackEvent>>,
43    broadcast_callback_event_sender: Res<CrossbeamEventSender<BroadcastCallbackEvent>>,
44    presence_callback_event_sender: Res<CrossbeamEventSender<PresenceCallbackEvent>>,
45    postgres_changes_callback_event_sender: Res<CrossbeamEventSender<PostgresChangesCallbackEvent>>,
46) {
47    for (e, c) in q.iter_mut() {
48        commands.entity(e).remove::<BevyChannelBuilder>();
49
50        let channel = c.build(
51            &client.0,
52            presence_state_callback_event_sender.clone(),
53            channel_state_callback_event_sender.clone(),
54            broadcast_callback_event_sender.clone(),
55            presence_callback_event_sender.clone(),
56            postgres_changes_callback_event_sender.clone(),
57        );
58
59        channel.subscribe().unwrap();
60        commands.entity(e).insert(Channel(channel));
61    }
62}
63
64pub struct RealtimePlugin {
65    endpoint: String,
66    apikey: String,
67}
68
69impl RealtimePlugin {
70    pub fn new(endpoint: String, apikey: String) -> Self {
71        Self { endpoint, apikey }
72    }
73}
74
75impl Plugin for RealtimePlugin {
76    fn build(&self, app: &mut App) {
77        app.add_crossbeam_event::<ConnectionState>()
78            .add_crossbeam_event::<ChannelCallbackEvent>()
79            .add_crossbeam_event::<PresenceStateCallbackEvent>()
80            .add_crossbeam_event::<ChannelStateCallbackEvent>()
81            .add_crossbeam_event::<BroadcastCallbackEvent>()
82            .add_crossbeam_event::<PresenceCallbackEvent>()
83            .add_crossbeam_event::<PostgresChangesCallbackEvent>()
84            .add_crossbeam_event::<ConnectResultCallbackEvent>()
85            .add_systems(
86                Update,
87                (
88                    ((
89                        //
90                        update_presence_track,
91                        presence_untrack,
92                        build_channels,
93                    )
94                        .chain()
95                        .run_if(client_ready),),
96                    run_callbacks,
97                )
98                    .chain(),
99            );
100
101        // TODO: Allow this to fail and be retried later at user request
102
103        let mut client = ClientBuilder::new(self.endpoint.clone(), self.apikey.clone());
104        client.reconnect_max_attempts(3);
105        let mut client = client.build(
106            app.world_mut()
107                .resource::<CrossbeamEventSender<ChannelCallbackEvent>>()
108                .clone(),
109            app.world_mut()
110                .resource::<CrossbeamEventSender<ConnectResultCallbackEvent>>()
111                .clone(),
112        );
113
114        app.insert_resource(Client(ClientManager::new(&client)));
115
116        // Start off thread client
117        let _thread = std::thread::spawn(move || {
118            loop {
119                match client.step() {
120                    Err(NextMessageError::WouldBlock) => {}
121                    Ok(_) => {}
122                    Err(_e) => {} //error!("{}", _e),
123                }
124
125                // TODO find a sane sleep value
126                sleep(Duration::from_secs_f32(f32::MIN_POSITIVE));
127            }
128        });
129    }
130}
131
132fn run_callbacks(
133    mut commands: Commands,
134    mut channel_evr: EventReader<ChannelCallbackEvent>,
135    mut presence_state_evr: EventReader<PresenceStateCallbackEvent>,
136    mut channel_state_evr: EventReader<ChannelStateCallbackEvent>,
137    mut broadcast_evr: EventReader<BroadcastCallbackEvent>,
138    mut presence_evr: EventReader<PresenceCallbackEvent>,
139    mut postgres_evr: EventReader<PostgresChangesCallbackEvent>,
140    mut connect_evr: EventReader<ConnectResultCallbackEvent>,
141) {
142    // TODO this is crying out for a macro lol
143    for ev in channel_evr.read() {
144        let (callback, input) = ev.0.clone();
145        commands.run_system_with_input(callback, input);
146    }
147
148    for ev in presence_state_evr.read() {
149        let (callback, input) = ev.0.clone();
150        commands.run_system_with_input(callback, input);
151    }
152
153    for ev in channel_state_evr.read() {
154        let (callback, input) = ev.0;
155        commands.run_system_with_input(callback, input);
156    }
157
158    for ev in broadcast_evr.read() {
159        let (callback, input) = ev.0.clone();
160        commands.run_system_with_input(callback, input);
161    }
162
163    for ev in presence_evr.read() {
164        let (callback, input) = ev.0.clone();
165        commands.run_system_with_input(callback, input);
166    }
167
168    for ev in postgres_evr.read() {
169        let (callback, input) = ev.0.clone();
170        commands.run_system_with_input(callback, input);
171    }
172
173    for ev in connect_evr.read() {
174        let (callback, input) = ev.0.clone();
175        commands.run_system_with_input(callback, input);
176    }
177}
178
179pub fn client_ready(
180    mut evr: EventReader<ConnectionState>,
181    mut last_state: Local<ConnectionState>,
182    client: Res<Client>,
183    sender: Res<CrossbeamEventSender<ConnectionState>>,
184) -> bool {
185    client.connection_state(sender.clone()).unwrap_or(());
186
187    for ev in evr.read() {
188        *last_state = *ev;
189    }
190
191    *last_state == ConnectionState::Open
192}