1use std::collections::HashMap;
2use std::fmt;
3use std::time::Duration;
4
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7
8#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct PhoenixMessage {
13 pub event: String,
14 pub topic: String,
15 pub payload: Value,
16 #[serde(rename = "ref")]
17 pub msg_ref: Option<String>,
18 pub join_ref: Option<String>,
19}
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum ChannelState {
26 Closed,
27 Joining,
28 Joined,
29 Leaving,
30 Errored,
31}
32
33impl fmt::Display for ChannelState {
34 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35 match self {
36 Self::Closed => write!(f, "closed"),
37 Self::Joining => write!(f, "joining"),
38 Self::Joined => write!(f, "joined"),
39 Self::Leaving => write!(f, "leaving"),
40 Self::Errored => write!(f, "errored"),
41 }
42 }
43}
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50pub enum SubscriptionStatus {
51 Subscribed,
52 TimedOut,
53 Closed,
54 ChannelError,
55}
56
57impl fmt::Display for SubscriptionStatus {
58 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59 match self {
60 Self::Subscribed => write!(f, "SUBSCRIBED"),
61 Self::TimedOut => write!(f, "TIMED_OUT"),
62 Self::Closed => write!(f, "CLOSED"),
63 Self::ChannelError => write!(f, "CHANNEL_ERROR"),
64 }
65 }
66}
67
68#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
72pub enum PostgresChangesEvent {
73 #[serde(rename = "*")]
74 All,
75 #[serde(rename = "INSERT")]
76 Insert,
77 #[serde(rename = "UPDATE")]
78 Update,
79 #[serde(rename = "DELETE")]
80 Delete,
81}
82
83impl fmt::Display for PostgresChangesEvent {
84 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
85 match self {
86 Self::All => write!(f, "*"),
87 Self::Insert => write!(f, "INSERT"),
88 Self::Update => write!(f, "UPDATE"),
89 Self::Delete => write!(f, "DELETE"),
90 }
91 }
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize)]
96pub struct PostgresChangesFilter {
97 pub event: String,
98 pub schema: String,
99 #[serde(skip_serializing_if = "Option::is_none")]
100 pub table: Option<String>,
101 #[serde(skip_serializing_if = "Option::is_none")]
102 pub filter: Option<String>,
103}
104
105impl PostgresChangesFilter {
106 pub fn new(schema: impl Into<String>, table: impl Into<String>) -> Self {
108 Self {
109 event: "*".to_string(),
110 schema: schema.into(),
111 table: Some(table.into()),
112 filter: None,
113 }
114 }
115
116 pub fn schema_only(schema: impl Into<String>) -> Self {
118 Self {
119 event: "*".to_string(),
120 schema: schema.into(),
121 table: None,
122 filter: None,
123 }
124 }
125
126 pub fn event(mut self, event: PostgresChangesEvent) -> Self {
128 self.event = event.to_string();
129 self
130 }
131
132 pub fn with_filter(mut self, filter: impl Into<String>) -> Self {
134 self.filter = Some(filter.into());
135 self
136 }
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct PostgresChangePayload {
142 pub schema: String,
143 pub table: String,
144 #[serde(rename = "type")]
145 pub change_type: String,
146 #[serde(default)]
147 pub commit_timestamp: Option<String>,
148 #[serde(default)]
149 pub columns: Vec<ColumnInfo>,
150 #[serde(default)]
151 pub record: Option<Value>,
152 #[serde(default)]
153 pub old_record: Option<Value>,
154 #[serde(default)]
155 pub errors: Option<Value>,
156}
157
158#[derive(Debug, Clone, Serialize, Deserialize)]
160pub struct ColumnInfo {
161 pub name: String,
162 #[serde(rename = "type")]
163 pub column_type: String,
164}
165
166pub type PresenceState = HashMap<String, Vec<PresenceMeta>>;
170
171#[derive(Debug, Clone, Serialize, Deserialize)]
173pub struct PresenceMeta {
174 #[serde(default)]
175 pub phx_ref: Option<String>,
176 #[serde(default)]
177 pub phx_ref_prev: Option<String>,
178 #[serde(flatten)]
179 pub data: Value,
180}
181
182#[derive(Debug, Clone, Serialize, Deserialize)]
184pub struct PresenceDiff {
185 pub joins: HashMap<String, PresenceEntry>,
186 pub leaves: HashMap<String, PresenceEntry>,
187}
188
189#[derive(Debug, Clone, Serialize, Deserialize)]
191pub struct PresenceEntry {
192 pub metas: Vec<PresenceMeta>,
193}
194
195#[derive(Debug, Clone, Serialize, Deserialize)]
199pub struct JoinPayload {
200 pub config: JoinConfig,
201 #[serde(skip_serializing_if = "Option::is_none")]
202 pub access_token: Option<String>,
203}
204
205#[derive(Debug, Clone, Serialize, Deserialize)]
207pub struct JoinConfig {
208 pub broadcast: BroadcastConfig,
209 pub presence: PresenceConfig,
210 pub postgres_changes: Vec<PostgresChangesFilter>,
211}
212
213#[derive(Debug, Clone, Serialize, Deserialize)]
215pub struct BroadcastConfig {
216 pub ack: bool,
217 #[serde(rename = "self")]
218 pub self_send: bool,
219}
220
221impl Default for BroadcastConfig {
222 fn default() -> Self {
223 Self {
224 ack: false,
225 self_send: false,
226 }
227 }
228}
229
230#[derive(Debug, Clone, Serialize, Deserialize)]
232pub struct PresenceConfig {
233 pub key: String,
234}
235
236impl Default for PresenceConfig {
237 fn default() -> Self {
238 Self {
239 key: String::new(),
240 }
241 }
242}
243
244#[derive(Debug, Clone)]
248pub struct RealtimeConfig {
249 pub url: String,
251 pub api_key: String,
253 pub heartbeat_interval: Duration,
255 pub subscribe_timeout: Duration,
257 pub reconnect: ReconnectConfig,
259 pub headers: HashMap<String, String>,
261}
262
263impl RealtimeConfig {
264 pub fn new(url: impl Into<String>, api_key: impl Into<String>) -> Self {
265 Self {
266 url: url.into(),
267 api_key: api_key.into(),
268 heartbeat_interval: Duration::from_secs(25),
269 subscribe_timeout: Duration::from_secs(10),
270 reconnect: ReconnectConfig::default(),
271 headers: HashMap::new(),
272 }
273 }
274
275 pub fn with_headers(mut self, headers: HashMap<String, String>) -> Self {
277 self.headers = headers;
278 self
279 }
280}
281
282#[derive(Debug, Clone)]
284pub struct ReconnectConfig {
285 pub intervals: Vec<Duration>,
287 pub fallback: Duration,
289}
290
291impl Default for ReconnectConfig {
292 fn default() -> Self {
293 Self {
294 intervals: vec![
295 Duration::from_secs(1),
296 Duration::from_secs(2),
297 Duration::from_secs(5),
298 Duration::from_secs(10),
299 ],
300 fallback: Duration::from_secs(10),
301 }
302 }
303}