1#![allow(clippy::type_complexity)]
2#![allow(clippy::too_many_arguments)]
3
4pub mod channel;
5pub mod client;
6pub mod message;
7pub mod presence;
8
9use std::{thread::sleep, time::Duration};
10
11use bevy::prelude::*;
12use bevy_crossbeam_event::{CrossbeamEventApp, CrossbeamEventSender};
13use channel::{
14 BroadcastCallbackEvent, ChannelBuilder, ChannelManager, ChannelStateCallbackEvent,
15 PostgresChangesCallbackEvent, PresenceStateCallbackEvent,
16};
17use client::{
18 ChannelCallbackEvent, ClientBuilder, ClientManager, ConnectResultCallbackEvent,
19 ConnectionState, NextMessageError,
20};
21use presence::PresenceCallbackEvent;
22
23use crate::presence::{presence_untrack, update_presence_track};
24
25#[derive(Resource, Deref)]
26pub struct Client(pub ClientManager);
27
28#[derive(Component, Deref, DerefMut)]
29pub struct BevyChannelBuilder(pub ChannelBuilder);
30
31#[derive(Component, Deref, DerefMut)]
32pub struct Channel(pub ChannelManager);
33
34#[derive(Component)]
35pub struct BuildChannel;
36
37fn build_channels(
38 mut commands: Commands,
39 mut q: Query<(Entity, &mut BevyChannelBuilder), With<BuildChannel>>,
40 client: Res<Client>,
41 presence_state_callback_event_sender: Res<CrossbeamEventSender<PresenceStateCallbackEvent>>,
42 channel_state_callback_event_sender: Res<CrossbeamEventSender<ChannelStateCallbackEvent>>,
43 broadcast_callback_event_sender: Res<CrossbeamEventSender<BroadcastCallbackEvent>>,
44 presence_callback_event_sender: Res<CrossbeamEventSender<PresenceCallbackEvent>>,
45 postgres_changes_callback_event_sender: Res<CrossbeamEventSender<PostgresChangesCallbackEvent>>,
46) {
47 for (e, c) in q.iter_mut() {
48 commands.entity(e).remove::<BevyChannelBuilder>();
49
50 let channel = c.build(
51 &client.0,
52 presence_state_callback_event_sender.clone(),
53 channel_state_callback_event_sender.clone(),
54 broadcast_callback_event_sender.clone(),
55 presence_callback_event_sender.clone(),
56 postgres_changes_callback_event_sender.clone(),
57 );
58
59 channel.subscribe().unwrap();
60 commands.entity(e).insert(Channel(channel));
61 }
62}
63
64pub struct RealtimePlugin {
65 endpoint: String,
66 apikey: String,
67}
68
69impl RealtimePlugin {
70 pub fn new(endpoint: String, apikey: String) -> Self {
71 Self { endpoint, apikey }
72 }
73}
74
75impl Plugin for RealtimePlugin {
76 fn build(&self, app: &mut App) {
77 app.add_crossbeam_event::<ConnectionState>()
78 .add_crossbeam_event::<ChannelCallbackEvent>()
79 .add_crossbeam_event::<PresenceStateCallbackEvent>()
80 .add_crossbeam_event::<ChannelStateCallbackEvent>()
81 .add_crossbeam_event::<BroadcastCallbackEvent>()
82 .add_crossbeam_event::<PresenceCallbackEvent>()
83 .add_crossbeam_event::<PostgresChangesCallbackEvent>()
84 .add_crossbeam_event::<ConnectResultCallbackEvent>()
85 .add_systems(
86 Update,
87 (
88 ((
89 update_presence_track,
91 presence_untrack,
92 build_channels,
93 )
94 .chain()
95 .run_if(client_ready),),
96 run_callbacks,
97 )
98 .chain(),
99 );
100
101 let mut client = ClientBuilder::new(self.endpoint.clone(), self.apikey.clone());
104 client.reconnect_max_attempts(3);
105 let mut client = client.build(
106 app.world_mut()
107 .resource::<CrossbeamEventSender<ChannelCallbackEvent>>()
108 .clone(),
109 app.world_mut()
110 .resource::<CrossbeamEventSender<ConnectResultCallbackEvent>>()
111 .clone(),
112 );
113
114 app.insert_resource(Client(ClientManager::new(&client)));
115
116 let _thread = std::thread::spawn(move || {
118 loop {
119 match client.step() {
120 Err(NextMessageError::WouldBlock) => {}
121 Ok(_) => {}
122 Err(_e) => {} }
124
125 sleep(Duration::from_secs_f32(f32::MIN_POSITIVE));
127 }
128 });
129 }
130}
131
132fn run_callbacks(
133 mut commands: Commands,
134 mut channel_evr: EventReader<ChannelCallbackEvent>,
135 mut presence_state_evr: EventReader<PresenceStateCallbackEvent>,
136 mut channel_state_evr: EventReader<ChannelStateCallbackEvent>,
137 mut broadcast_evr: EventReader<BroadcastCallbackEvent>,
138 mut presence_evr: EventReader<PresenceCallbackEvent>,
139 mut postgres_evr: EventReader<PostgresChangesCallbackEvent>,
140 mut connect_evr: EventReader<ConnectResultCallbackEvent>,
141) {
142 for ev in channel_evr.read() {
144 let (callback, input) = ev.0.clone();
145 commands.run_system_with_input(callback, input);
146 }
147
148 for ev in presence_state_evr.read() {
149 let (callback, input) = ev.0.clone();
150 commands.run_system_with_input(callback, input);
151 }
152
153 for ev in channel_state_evr.read() {
154 let (callback, input) = ev.0;
155 commands.run_system_with_input(callback, input);
156 }
157
158 for ev in broadcast_evr.read() {
159 let (callback, input) = ev.0.clone();
160 commands.run_system_with_input(callback, input);
161 }
162
163 for ev in presence_evr.read() {
164 let (callback, input) = ev.0.clone();
165 commands.run_system_with_input(callback, input);
166 }
167
168 for ev in postgres_evr.read() {
169 let (callback, input) = ev.0.clone();
170 commands.run_system_with_input(callback, input);
171 }
172
173 for ev in connect_evr.read() {
174 let (callback, input) = ev.0.clone();
175 commands.run_system_with_input(callback, input);
176 }
177}
178
179pub fn client_ready(
180 mut evr: EventReader<ConnectionState>,
181 mut last_state: Local<ConnectionState>,
182 client: Res<Client>,
183 sender: Res<CrossbeamEventSender<ConnectionState>>,
184) -> bool {
185 client.connection_state(sender.clone()).unwrap_or(());
186
187 for ev in evr.read() {
188 *last_state = *ev;
189 }
190
191 *last_state == ConnectionState::Open
192}