1use ahash::AHashMap;
2use serde::de::Error as _;
3use serde::{Deserialize, Serialize};
4use serde_json::Value as JsonValue;
5use sonic_rs::prelude::*;
6use sonic_rs::{Value, json};
7use std::collections::{BTreeMap, HashMap};
8use std::time::Duration;
9
10use crate::protocol_version::ProtocolVersion;
11
12#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
15#[serde(untagged)]
16pub enum ExtrasValue {
17 String(String),
18 Number(f64),
19 Bool(bool),
20}
21
22#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
29#[serde(rename_all = "camelCase")]
30pub struct MessageExtras {
31 #[serde(skip_serializing_if = "Option::is_none")]
34 pub headers: Option<HashMap<String, ExtrasValue>>,
35
36 #[serde(skip_serializing_if = "Option::is_none")]
39 pub ephemeral: Option<bool>,
40
41 #[serde(skip_serializing_if = "Option::is_none")]
44 pub idempotency_key: Option<String>,
45
46 #[serde(skip_serializing_if = "Option::is_none")]
49 pub echo: Option<bool>,
50}
51
52impl MessageExtras {
53 pub fn validate_headers_from_json(raw: &Value) -> Result<(), String> {
58 if let Some(extras) = raw.get("extras")
59 && let Some(headers) = extras.get("headers")
60 && let Some(obj) = headers.as_object()
61 {
62 for (key, val) in obj.iter() {
63 if val.is_object() || val.is_array() {
64 return Err(format!(
65 "extras.headers must be a flat object — nested objects and arrays are not allowed (key: '{key}')"
66 ));
67 }
68 }
69 }
70 Ok(())
71 }
72}
73
74pub fn generate_message_id() -> String {
76 uuid::Uuid::new_v4().to_string()
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct PresenceData {
81 pub ids: Vec<String>,
82 pub hash: AHashMap<String, Option<Value>>,
83 pub count: usize,
84}
85
86#[derive(Debug, Clone, Serialize, PartialEq)]
87#[serde(untagged)]
88pub enum MessageData {
89 String(String),
90 Structured {
91 #[serde(skip_serializing_if = "Option::is_none")]
92 channel_data: Option<String>,
93 #[serde(skip_serializing_if = "Option::is_none")]
94 channel: Option<String>,
95 #[serde(skip_serializing_if = "Option::is_none")]
96 user_data: Option<String>,
97 #[serde(flatten)]
98 extra: AHashMap<String, Value>,
99 },
100 Json(Value),
101}
102
103impl<'de> Deserialize<'de> for MessageData {
104 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
105 where
106 D: serde::Deserializer<'de>,
107 {
108 let v = JsonValue::deserialize(deserializer)?;
109 if let Some(s) = v.as_str() {
110 return Ok(MessageData::String(s.to_string()));
111 }
112 if let Some(obj) = v.as_object() {
113 let channel_data = obj
116 .get("channel_data")
117 .and_then(|x| x.as_str())
118 .map(ToString::to_string);
119 let channel = obj
120 .get("channel")
121 .and_then(|x| x.as_str())
122 .map(ToString::to_string);
123 let user_data = obj
124 .get("user_data")
125 .and_then(|x| x.as_str())
126 .map(ToString::to_string);
127
128 if channel_data.is_some() || channel.is_some() || user_data.is_some() {
129 let mut extra = AHashMap::new();
130 for (k, val) in obj.iter() {
131 if k != "channel_data" && k != "channel" && k != "user_data" {
132 extra.insert(
133 k.to_string(),
134 serde_json_value_to_sonic(val.clone()).map_err(D::Error::custom)?,
135 );
136 }
137 }
138 return Ok(MessageData::Structured {
139 channel_data,
140 channel,
141 user_data,
142 extra,
143 });
144 }
145 }
146 Ok(MessageData::Json(
147 serde_json_value_to_sonic(v).map_err(D::Error::custom)?,
148 ))
149 }
150}
151
152fn serde_json_value_to_sonic(value: JsonValue) -> Result<Value, String> {
153 let encoded = serde_json::to_string(&value)
154 .map_err(|err| format!("failed to encode json value for MessageData: {err}"))?;
155 sonic_rs::from_str(&encoded)
156 .map_err(|err| format!("failed to decode json value for MessageData: {err}"))
157}
158
159#[derive(Debug, Clone, Serialize, Deserialize)]
160pub struct ErrorData {
161 pub code: Option<u16>,
162 pub message: String,
163}
164
165#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
166pub struct PusherMessage {
167 #[serde(skip_serializing_if = "Option::is_none")]
168 pub event: Option<String>,
169 #[serde(skip_serializing_if = "Option::is_none")]
170 pub channel: Option<String>,
171 #[serde(skip_serializing_if = "Option::is_none")]
172 pub data: Option<MessageData>,
173 #[serde(skip_serializing_if = "Option::is_none")]
174 pub name: Option<String>,
175 #[serde(skip_serializing_if = "Option::is_none")]
176 pub user_id: Option<String>,
177 #[serde(skip_serializing_if = "Option::is_none")]
180 pub tags: Option<BTreeMap<String, String>>,
181 #[serde(skip_serializing_if = "Option::is_none")]
183 pub sequence: Option<u64>,
184 #[serde(skip_serializing_if = "Option::is_none")]
186 pub conflation_key: Option<String>,
187 #[serde(skip_serializing_if = "Option::is_none")]
189 pub message_id: Option<String>,
190 #[serde(skip_serializing_if = "Option::is_none")]
193 pub stream_id: Option<String>,
194 #[serde(skip_serializing_if = "Option::is_none")]
197 pub serial: Option<u64>,
198 #[serde(skip_serializing_if = "Option::is_none")]
203 pub idempotency_key: Option<String>,
204 #[serde(skip_serializing_if = "Option::is_none")]
208 pub extras: Option<MessageExtras>,
209 #[serde(rename = "__delta_seq", skip_serializing_if = "Option::is_none")]
211 pub delta_sequence: Option<u64>,
212 #[serde(rename = "__conflation_key", skip_serializing_if = "Option::is_none")]
214 pub delta_conflation_key: Option<String>,
215}
216
217#[derive(Debug, Clone, Serialize, Deserialize)]
218pub struct PusherApiMessage {
219 #[serde(skip_serializing_if = "Option::is_none")]
220 pub name: Option<String>,
221 #[serde(skip_serializing_if = "Option::is_none")]
222 pub data: Option<ApiMessageData>,
223 #[serde(skip_serializing_if = "Option::is_none")]
224 pub channel: Option<String>,
225 #[serde(skip_serializing_if = "Option::is_none")]
226 pub channels: Option<Vec<String>>,
227 #[serde(skip_serializing_if = "Option::is_none")]
228 pub socket_id: Option<String>,
229 #[serde(skip_serializing_if = "Option::is_none")]
230 pub info: Option<String>,
231 #[serde(skip_serializing_if = "Option::is_none")]
232 pub tags: Option<AHashMap<String, String>>,
233 #[serde(skip_serializing_if = "Option::is_none")]
238 pub delta: Option<bool>,
239 #[serde(skip_serializing_if = "Option::is_none")]
243 pub idempotency_key: Option<String>,
244 #[serde(skip_serializing_if = "Option::is_none")]
246 pub extras: Option<MessageExtras>,
247}
248
249#[derive(Debug, Clone, Serialize, Deserialize)]
250pub struct BatchPusherApiMessage {
251 pub batch: Vec<PusherApiMessage>,
252}
253
254#[derive(Debug, Clone, Serialize, Deserialize)]
255#[serde(untagged)]
256pub enum ApiMessageData {
257 String(String),
258 Json(Value),
259}
260
261#[derive(Debug, Clone, Serialize, Deserialize)]
262pub struct SentPusherMessage {
263 #[serde(skip_serializing_if = "Option::is_none")]
264 pub channel: Option<String>,
265 #[serde(skip_serializing_if = "Option::is_none")]
266 pub event: Option<String>,
267 #[serde(skip_serializing_if = "Option::is_none")]
268 pub data: Option<MessageData>,
269}
270
271impl MessageData {
273 pub fn as_string(&self) -> Option<&str> {
274 match self {
275 MessageData::String(s) => Some(s),
276 _ => None,
277 }
278 }
279
280 pub fn into_string(self) -> Option<String> {
281 match self {
282 MessageData::String(s) => Some(s),
283 _ => None,
284 }
285 }
286
287 pub fn as_value(&self) -> Option<&Value> {
288 match self {
289 MessageData::Structured { extra, .. } => extra.values().next(),
290 _ => None,
291 }
292 }
293}
294
295impl From<String> for MessageData {
296 fn from(s: String) -> Self {
297 MessageData::String(s)
298 }
299}
300
301impl From<Value> for MessageData {
302 fn from(v: Value) -> Self {
303 MessageData::Json(v)
304 }
305}
306
307impl PusherMessage {
308 pub fn connection_established(socket_id: String, activity_timeout: u64) -> Self {
309 Self {
310 event: Some("pusher:connection_established".to_string()),
311 data: Some(MessageData::from(
312 json!({
313 "socket_id": socket_id,
314 "activity_timeout": activity_timeout })
316 .to_string(),
317 )),
318 channel: None,
319 name: None,
320 user_id: None,
321 sequence: None,
322 conflation_key: None,
323 tags: None,
324 message_id: None,
325 stream_id: None,
326 serial: None,
327 idempotency_key: None,
328 extras: None,
329 delta_sequence: None,
330 delta_conflation_key: None,
331 }
332 }
333 pub fn subscription_succeeded(channel: String, presence_data: Option<PresenceData>) -> Self {
334 let data_obj = if let Some(data) = presence_data {
335 json!({
336 "presence": {
337 "ids": data.ids,
338 "hash": data.hash,
339 "count": data.count
340 }
341 })
342 } else {
343 json!({})
344 };
345
346 Self {
347 event: Some("pusher_internal:subscription_succeeded".to_string()),
348 channel: Some(channel),
349 data: Some(MessageData::String(data_obj.to_string())),
350 name: None,
351 user_id: None,
352 sequence: None,
353 conflation_key: None,
354 tags: None,
355 message_id: None,
356 stream_id: None,
357 serial: None,
358 idempotency_key: None,
359 extras: None,
360 delta_sequence: None,
361 delta_conflation_key: None,
362 }
363 }
364
365 pub fn error(code: u16, message: String, channel: Option<String>) -> Self {
366 Self {
367 event: Some("pusher:error".to_string()),
368 data: Some(MessageData::Json(json!({
369 "code": code,
370 "message": message
371 }))),
372 channel,
373 name: None,
374 user_id: None,
375 sequence: None,
376 conflation_key: None,
377 tags: None,
378 message_id: None,
379 stream_id: None,
380 serial: None,
381 idempotency_key: None,
382 extras: None,
383 delta_sequence: None,
384 delta_conflation_key: None,
385 }
386 }
387
388 pub fn ping() -> Self {
389 Self {
390 event: Some("pusher:ping".to_string()),
391 data: None,
392 channel: None,
393 name: None,
394 user_id: None,
395 sequence: None,
396 conflation_key: None,
397 tags: None,
398 message_id: None,
399 stream_id: None,
400 serial: None,
401 idempotency_key: None,
402 extras: None,
403 delta_sequence: None,
404 delta_conflation_key: None,
405 }
406 }
407 pub fn channel_event<S: Into<String>>(event: S, channel: S, data: Value) -> Self {
408 Self {
409 event: Some(event.into()),
410 channel: Some(channel.into()),
411 data: Some(MessageData::String(data.to_string())),
412 name: None,
413 user_id: None,
414 sequence: None,
415 conflation_key: None,
416 tags: None,
417 message_id: None,
418 stream_id: None,
419 serial: None,
420 idempotency_key: None,
421 extras: None,
422 delta_sequence: None,
423 delta_conflation_key: None,
424 }
425 }
426
427 pub fn member_added(channel: String, user_id: String, user_info: Option<Value>) -> Self {
428 Self {
429 event: Some("pusher_internal:member_added".to_string()),
430 channel: Some(channel),
431 data: Some(MessageData::String(
433 json!({
434 "user_id": user_id,
435 "user_info": user_info.unwrap_or_else(|| json!({}))
436 })
437 .to_string(),
438 )),
439 name: None,
440 user_id: None,
441 sequence: None,
442 conflation_key: None,
443 tags: None,
444 message_id: None,
445 stream_id: None,
446 serial: None,
447 idempotency_key: None,
448 extras: None,
449 delta_sequence: None,
450 delta_conflation_key: None,
451 }
452 }
453
454 pub fn member_removed(channel: String, user_id: String) -> Self {
455 Self {
456 event: Some("pusher_internal:member_removed".to_string()),
457 channel: Some(channel),
458 data: Some(MessageData::String(
460 json!({
461 "user_id": user_id
462 })
463 .to_string(),
464 )),
465 name: None,
466 user_id: None,
467 sequence: None,
468 conflation_key: None,
469 tags: None,
470 message_id: None,
471 stream_id: None,
472 serial: None,
473 idempotency_key: None,
474 extras: None,
475 delta_sequence: None,
476 delta_conflation_key: None,
477 }
478 }
479
480 pub fn pong() -> Self {
482 Self {
483 event: Some("pusher:pong".to_string()),
484 data: None,
485 channel: None,
486 name: None,
487 user_id: None,
488 sequence: None,
489 conflation_key: None,
490 tags: None,
491 message_id: None,
492 stream_id: None,
493 serial: None,
494 idempotency_key: None,
495 extras: None,
496 delta_sequence: None,
497 delta_conflation_key: None,
498 }
499 }
500
501 pub fn channel_info(
503 occupied: bool,
504 subscription_count: Option<u64>,
505 user_count: Option<u64>,
506 cache_data: Option<(String, Duration)>,
507 ) -> Value {
508 let mut response = json!({
509 "occupied": occupied
510 });
511
512 if let Some(count) = subscription_count {
513 response["subscription_count"] = json!(count);
514 }
515
516 if let Some(count) = user_count {
517 response["user_count"] = json!(count);
518 }
519
520 if let Some((data, ttl)) = cache_data {
521 response["cache"] = json!({
522 "data": data,
523 "ttl": ttl.as_secs()
524 });
525 }
526
527 response
528 }
529
530 pub fn channels_list(channels_info: AHashMap<String, Value>) -> Value {
532 json!({
533 "channels": channels_info
534 })
535 }
536
537 pub fn user_list(user_ids: Vec<String>) -> Value {
539 let users = user_ids
540 .into_iter()
541 .map(|id| json!({ "id": id }))
542 .collect::<Vec<_>>();
543
544 json!({ "users": users })
545 }
546
547 pub fn batch_response(batch_info: Vec<Value>) -> Value {
549 json!({ "batch": batch_info })
550 }
551
552 pub fn success_response() -> Value {
554 json!({ "ok": true })
555 }
556
557 pub fn watchlist_online_event(user_ids: Vec<String>) -> Self {
558 Self {
559 event: Some("online".to_string()),
560 channel: None, name: None,
562 data: Some(MessageData::Json(json!({
563 "user_ids": user_ids
564 }))),
565 user_id: None,
566 sequence: None,
567 conflation_key: None,
568 tags: None,
569 message_id: None,
570 stream_id: None,
571 serial: None,
572 idempotency_key: None,
573 extras: None,
574 delta_sequence: None,
575 delta_conflation_key: None,
576 }
577 }
578
579 pub fn watchlist_offline_event(user_ids: Vec<String>) -> Self {
580 Self {
581 event: Some("offline".to_string()),
582 channel: None,
583 name: None,
584 data: Some(MessageData::Json(json!({
585 "user_ids": user_ids
586 }))),
587 user_id: None,
588 sequence: None,
589 conflation_key: None,
590 tags: None,
591 message_id: None,
592 stream_id: None,
593 serial: None,
594 idempotency_key: None,
595 extras: None,
596 delta_sequence: None,
597 delta_conflation_key: None,
598 }
599 }
600
601 pub fn cache_miss_event(channel: String) -> Self {
602 Self {
603 event: Some("pusher:cache_miss".to_string()),
604 channel: Some(channel),
605 data: Some(MessageData::String("{}".to_string())),
606 name: None,
607 user_id: None,
608 sequence: None,
609 conflation_key: None,
610 tags: None,
611 message_id: None,
612 stream_id: None,
613 serial: None,
614 idempotency_key: None,
615 extras: None,
616 delta_sequence: None,
617 delta_conflation_key: None,
618 }
619 }
620
621 pub fn signin_success(user_data: String) -> Self {
622 Self {
623 event: Some("pusher:signin_success".to_string()),
624 data: Some(MessageData::Json(json!({
625 "user_data": user_data
626 }))),
627 channel: None,
628 name: None,
629 user_id: None,
630 sequence: None,
631 conflation_key: None,
632 tags: None,
633 message_id: None,
634 stream_id: None,
635 serial: None,
636 idempotency_key: None,
637 extras: None,
638 delta_sequence: None,
639 delta_conflation_key: None,
640 }
641 }
642
643 pub fn delta_message(
645 channel: String,
646 event: String,
647 delta_base64: String,
648 base_sequence: u32,
649 target_sequence: u32,
650 algorithm: &str,
651 ) -> Self {
652 Self {
653 event: Some("pusher:delta".to_string()),
654 channel: Some(channel.clone()),
655 data: Some(MessageData::String(
656 json!({
657 "channel": channel,
658 "event": event,
659 "delta": delta_base64,
660 "base_seq": base_sequence,
661 "target_seq": target_sequence,
662 "algorithm": algorithm,
663 })
664 .to_string(),
665 )),
666 name: None,
667 user_id: None,
668 sequence: None,
669 conflation_key: None,
670 tags: None,
671 message_id: None,
672 stream_id: None,
673 serial: None,
674 idempotency_key: None,
675 extras: None,
676 delta_sequence: None,
677 delta_conflation_key: None,
678 }
679 }
680
681 pub fn rewrite_prefix(&mut self, version: ProtocolVersion) {
684 if let Some(ref event) = self.event {
685 self.event = Some(version.rewrite_event_prefix(event));
686 }
687 }
688
689 pub fn is_ephemeral(&self) -> bool {
691 self.extras
692 .as_ref()
693 .and_then(|e| e.ephemeral)
694 .unwrap_or(false)
695 }
696
697 pub fn extras_idempotency_key(&self) -> Option<&str> {
699 self.extras
700 .as_ref()
701 .and_then(|e| e.idempotency_key.as_deref())
702 }
703
704 pub fn should_echo(&self, connection_default: bool) -> bool {
707 self.extras
708 .as_ref()
709 .and_then(|e| e.echo)
710 .unwrap_or(connection_default)
711 }
712
713 pub fn filter_headers(&self) -> Option<&HashMap<String, ExtrasValue>> {
715 self.extras.as_ref().and_then(|e| e.headers.as_ref())
716 }
717
718 pub fn should_include_extras(protocol: &ProtocolVersion) -> bool {
720 matches!(protocol, ProtocolVersion::V2)
721 }
722
723 pub fn add_base_sequence(mut self, base_sequence: u32) -> Self {
725 if let Some(MessageData::String(ref data_str)) = self.data
726 && let Ok(mut data_obj) = sonic_rs::from_str::<Value>(data_str)
727 && let Some(obj) = data_obj.as_object_mut()
728 {
729 obj.insert("__delta_base_seq", json!(base_sequence));
730 self.data = Some(MessageData::String(data_obj.to_string()));
731 }
732 self
733 }
734
735 pub fn delta_compression_enabled(default_algorithm: &str) -> Self {
737 Self {
738 event: Some("pusher:delta_compression_enabled".to_string()),
739 data: Some(MessageData::Json(json!({
740 "enabled": true,
741 "default_algorithm": default_algorithm,
742 }))),
743 channel: None,
744 name: None,
745 user_id: None,
746 sequence: None,
747 conflation_key: None,
748 tags: None,
749 message_id: None,
750 stream_id: None,
751 serial: None,
752 idempotency_key: None,
753 extras: None,
754 delta_sequence: None,
755 delta_conflation_key: None,
756 }
757 }
758}
759
760pub trait InfoQueryParser {
762 fn parse_info(&self) -> Vec<&str>;
763 fn wants_user_count(&self) -> bool;
764 fn wants_subscription_count(&self) -> bool;
765 fn wants_cache(&self) -> bool;
766}
767
768impl InfoQueryParser for Option<&String> {
769 fn parse_info(&self) -> Vec<&str> {
770 self.map(|s| s.split(',').collect::<Vec<_>>())
771 .unwrap_or_default()
772 }
773
774 fn wants_user_count(&self) -> bool {
775 self.parse_info().contains(&"user_count")
776 }
777
778 fn wants_subscription_count(&self) -> bool {
779 self.parse_info().contains(&"subscription_count")
780 }
781
782 fn wants_cache(&self) -> bool {
783 self.parse_info().contains(&"cache")
784 }
785}