bevy_realtime/message/
payload.rs

1use std::collections::HashMap;
2
3use serde::{Deserialize, Serialize};
4use serde_json::Value;
5
6use crate::presence::{PresenceEvent, RawPresenceDiff, RawPresenceState};
7
8/// Message payload, enum allows each payload type to be contained in
9/// [crate::message::realtime_message::RealtimeMessage] without
10/// needing a seperate struct per message type.
11#[derive(Serialize, Deserialize, Debug, Clone)]
12#[serde(untagged)]
13pub enum Payload {
14    Join(JoinPayload),
15    Response(JoinResponsePayload),
16    System(SystemPayload),
17    AccessToken(AccessTokenPayload),
18    PostgresChanges(PostgresChangesPayload),
19    Broadcast(BroadcastPayload),
20    PresenceState(RawPresenceState),
21    PresenceDiff(RawPresenceDiff),
22    Reply(ReplyPayload),                 // think is only used for heartbeat?
23    PresenceTrack(PresenceTrackPayload), // TODO matches greedily
24    Empty {}, // TODO implement custom deser cos this bad. typechecking: this matches
25              // everything that can't deser elsewhere. not good.
26}
27
28impl Default for Payload {
29    fn default() -> Self {
30        Payload::Broadcast(BroadcastPayload::default())
31    }
32}
33
34#[derive(Serialize, Deserialize, Debug, Clone)]
35pub struct ReplyPayload {
36    pub response: Value,
37    pub status: String,
38}
39
40/// Data to track with presence
41///
42/// Use [crate::sync::realtime_channel::RealtimeChannel::track()]
43#[derive(Serialize, Deserialize, Debug, Clone)]
44pub struct PresenceTrackPayload {
45    pub event: PresenceEvent, // TODO custom serialize to remove required event here
46    pub payload: HashMap<String, Value>,
47}
48
49impl Default for PresenceTrackPayload {
50    fn default() -> Self {
51        Self {
52            event: PresenceEvent::Track,
53            payload: HashMap::new(),
54        }
55    }
56}
57
58impl From<HashMap<String, Value>> for PresenceTrackPayload {
59    fn from(value: HashMap<String, Value>) -> Self {
60        PresenceTrackPayload {
61            payload: value,
62            ..Default::default()
63        }
64    }
65}
66
67/// Payload for broadcast messages
68/// ```
69/// # use realtime_rs::message::*;  
70/// # use realtime_rs::sync::*;    
71/// # use realtime_rs::*;          
72/// # use realtime_rs::message::payload::*;  
73/// # use std::{collections::HashMap, env};
74/// #
75/// # fn main() -> Result<(), ()> {
76/// #     let url = "http://127.0.0.1:54321";
77/// #     let anon_key = env::var("LOCAL_ANON_KEY").expect("No anon key!");
78/// #
79/// #     let mut client = RealtimeClient::builder(url, anon_key).build();
80/// #
81/// #     let _ = client.connect();
82/// #
83///       // Create channels
84///       let channel_a = client.channel("topic").build(&mut client);
85///       let channel_b = client.channel("topic").build(&mut client);
86///   
87///       let _ = client.block_until_subscribed(channel_a).unwrap();
88///       let _ = client.block_until_subscribed(channel_b).unwrap();
89///   
90///       // Create message
91///       let mut payload = HashMap::new();
92///       payload.insert("message".into(), "hello, multicast!".into());
93///   
94///       let payload = BroadcastPayload::new("target_event", payload);
95///   
96///       // Send message on both channels
97///       let _ = client
98///           .get_channel_mut(channel_a)
99///           .unwrap()
100///           .broadcast(payload.clone());
101///   
102///       let _ = client
103///           .get_channel_mut(channel_b)
104///           .unwrap()
105///           .broadcast(payload);
106/// #
107/// #     match client.next_message() {
108/// #         Ok(_) => Ok(()),
109/// #         Err(NextMessageError::WouldBlock) => Ok(()),
110/// #         Err(_) => Err(()),
111/// #     }
112/// # }
113#[derive(Serialize, Deserialize, Debug, Clone)]
114pub struct BroadcastPayload {
115    pub event: String,
116    pub payload: HashMap<String, Value>,
117    #[serde(rename = "type")]
118    pub broadcast_type: String, // TODO this is always 'broadcast', impl custom serde ;_;
119}
120
121// TODO impl From<HashMap<String, Value>>
122
123impl BroadcastPayload {
124    pub fn new(event: impl Into<String>, payload: HashMap<String, Value>) -> Self {
125        BroadcastPayload {
126            event: event.into(),
127            payload,
128            broadcast_type: "broadcast".into(),
129        }
130    }
131}
132
133impl Default for BroadcastPayload {
134    fn default() -> Self {
135        BroadcastPayload {
136            event: "event_missing".into(),
137            payload: HashMap::new(),
138            broadcast_type: "broadcast".into(),
139        }
140    }
141}
142
143/// Payload wrapper for postgres changes
144#[derive(Serialize, Deserialize, Debug, Clone)]
145pub struct PostgresChangesPayload {
146    pub data: PostgresChangeData,
147    pub ids: Vec<usize>,
148}
149
150/// Recieved data regarding a postgres change
151#[derive(Serialize, Deserialize, Debug, Clone)]
152pub struct PostgresChangeData {
153    pub columns: Vec<PostgresColumn>,
154    pub commit_timestamp: String,
155    pub errors: Option<String>,
156    pub old_record: Option<PostgresOldDataRef>,
157    pub record: Option<HashMap<String, Value>>,
158    #[serde(rename = "type")]
159    pub change_type: PostgresChangesEvent,
160    pub schema: String,
161    pub table: String,
162}
163
164#[derive(Serialize, Deserialize, Debug, Clone)]
165pub struct PostgresColumn {
166    pub name: String,
167    #[serde(rename = "type")]
168    pub column_type: String,
169}
170
171#[derive(Serialize, Deserialize, Debug, Clone)]
172pub struct PostgresOldDataRef {
173    pub id: isize,
174}
175
176#[derive(Serialize, Deserialize, Debug, Clone)]
177pub struct AccessTokenPayload {
178    pub access_token: String,
179}
180
181/// Subscription result payload
182#[derive(Serialize, Deserialize, Debug, Clone)]
183pub struct SystemPayload {
184    pub channel: String,
185    pub extension: String,
186    pub message: String,
187    pub status: PayloadStatus,
188}
189
190/// Subscription configuration payload wrapper
191#[derive(Serialize, Deserialize, Debug, Clone, Default)]
192pub struct JoinPayload {
193    pub config: JoinConfig,
194    pub access_token: String,
195}
196
197/// Subscription configuration data
198#[derive(Serialize, Deserialize, Debug, Clone, Default)]
199pub struct JoinConfig {
200    pub broadcast: BroadcastConfig,
201    pub presence: PresenceConfig,
202    pub postgres_changes: Vec<PostgresChange>,
203}
204
205/// Channel broadcast options
206#[derive(Serialize, Deserialize, Debug, Clone, Default)]
207pub struct BroadcastConfig {
208    #[serde(rename = "self")]
209    pub broadcast_self: bool,
210    pub ack: bool,
211}
212
213/// Channel presence options
214#[derive(Serialize, Deserialize, Debug, Clone, Default)]
215pub struct PresenceConfig {
216    pub key: Option<String>,
217}
218
219#[derive(Serialize, Deserialize, Debug, Eq, PartialEq, Default, Clone, Hash)]
220pub enum PostgresChangesEvent {
221    #[serde(rename = "*")]
222    #[default]
223    All,
224    #[serde(rename = "INSERT")]
225    Insert,
226    #[serde(rename = "UPDATE")]
227    Update,
228    #[serde(rename = "DELETE")]
229    Delete,
230}
231
232#[derive(Serialize, Deserialize, Debug, Default, Clone)]
233pub struct PostgresChange {
234    pub event: PostgresChangesEvent,
235    pub schema: String,
236    pub table: String,
237    #[serde(skip_serializing_if = "Option::is_none")]
238    pub filter: Option<String>, // TODO structured filters
239}
240
241#[derive(Serialize, Deserialize, Debug, Clone)]
242pub struct JoinResponsePayload {
243    pub response: PostgresChangesList,
244    pub status: PayloadStatus,
245}
246
247#[derive(Serialize, Deserialize, Debug, Clone)]
248pub struct PostgresChangesList {
249    pub postgres_changes: Vec<PostgresChange>,
250}
251
252#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
253pub enum PayloadStatus {
254    #[serde(rename = "ok")]
255    Ok,
256    #[serde(rename = "error")]
257    Error,
258}