Skip to main content

sockudo_protocol/
messages.rs

1use ahash::AHashMap;
2use serde::de::Error as _;
3use serde::{Deserialize, Serialize};
4use serde_json::Value as JsonValue;
5use sonic_rs::prelude::*;
6use sonic_rs::{Value, json};
7use std::collections::{BTreeMap, HashMap};
8use std::time::Duration;
9
10use crate::protocol_version::ProtocolVersion;
11
12/// Allowed value types for extras.headers.
13/// Flat only — no Object or Array variant so nesting is structurally impossible.
14#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
15#[serde(untagged)]
16pub enum ExtrasValue {
17    String(String),
18    Number(f64),
19    Bool(bool),
20}
21
22/// Structured metadata envelope for V2-specific message features.
23///
24/// Present on the wire for V2 connections only. V1 connections receive messages
25/// with extras stripped entirely. Pusher SDKs ignore unknown fields so the
26/// field is safe to carry through internal pipelines even when the publisher
27/// is V1.
28#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
29#[serde(rename_all = "camelCase")]
30pub struct MessageExtras {
31    /// Flat metadata for server-side event name filtering.
32    /// Must be a flat object — no nested objects, no arrays.
33    #[serde(skip_serializing_if = "Option::is_none")]
34    pub headers: Option<HashMap<String, ExtrasValue>>,
35
36    /// If true: skip connection recovery buffer and webhook forwarding.
37    /// Deliver to currently connected V2 subscribers only.
38    #[serde(skip_serializing_if = "Option::is_none")]
39    pub ephemeral: Option<bool>,
40
41    /// Server-side deduplication key. If the same key arrives again within
42    /// the app's idempotency TTL window, the message is silently dropped.
43    #[serde(skip_serializing_if = "Option::is_none")]
44    pub idempotency_key: Option<String>,
45
46    /// V2-only push payload. HTTP publish with `extras.push` creates an
47    /// asynchronous channel push event for push subscribers and is stripped from
48    /// V1/Pusher-compatible WebSocket deliveries with the rest of `extras`.
49    #[serde(skip_serializing_if = "Option::is_none")]
50    pub push: Option<Value>,
51
52    /// Per-message echo control. Overrides the connection-level echo setting
53    /// when explicitly set.
54    #[serde(skip_serializing_if = "Option::is_none")]
55    pub echo: Option<bool>,
56}
57
58impl MessageExtras {
59    /// Validate that headers (if present) contain only flat scalar values.
60    /// This is structurally guaranteed by `ExtrasValue` having no Object/Array
61    /// variants, but this method provides an explicit check with a clear error
62    /// when validating raw JSON before deserialization.
63    pub fn validate_headers_from_json(raw: &Value) -> Result<(), String> {
64        if let Some(extras) = raw.get("extras")
65            && let Some(headers) = extras.get("headers")
66            && let Some(obj) = headers.as_object()
67        {
68            for (key, val) in obj.iter() {
69                if val.is_object() || val.is_array() {
70                    return Err(format!(
71                        "extras.headers must be a flat object — nested objects and arrays are not allowed (key: '{key}')"
72                    ));
73                }
74            }
75        }
76        Ok(())
77    }
78}
79
80/// Generate a unique message ID (UUIDv4) for client-side deduplication.
81pub fn generate_message_id() -> String {
82    uuid::Uuid::new_v4().to_string()
83}
84
85pub const ANNOTATION_EVENT_NAME: &str = "sockudo_internal:annotation";
86pub const MESSAGE_SUMMARY_EVENT_NAME: &str = "sockudo_internal:message";
87pub const ANNOTATION_SUBSCRIBE_MODE: &str = "ANNOTATION_SUBSCRIBE";
88
89#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
90pub enum AnnotationEventAction {
91    #[serde(rename = "annotation.create")]
92    Create,
93    #[serde(rename = "annotation.delete")]
94    Delete,
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
98#[serde(rename_all = "camelCase")]
99pub struct AnnotationEventData {
100    pub action: AnnotationEventAction,
101    #[serde(skip_serializing_if = "Option::is_none")]
102    pub id: Option<String>,
103    pub serial: String,
104    pub message_serial: String,
105    #[serde(rename = "type")]
106    pub annotation_type: String,
107    #[serde(skip_serializing_if = "Option::is_none")]
108    pub name: Option<String>,
109    #[serde(skip_serializing_if = "Option::is_none")]
110    pub client_id: Option<String>,
111    #[serde(skip_serializing_if = "Option::is_none")]
112    pub count: Option<u64>,
113    #[serde(skip_serializing_if = "Option::is_none")]
114    pub data: Option<Value>,
115    #[serde(skip_serializing_if = "Option::is_none")]
116    pub encoding: Option<String>,
117    pub timestamp: i64,
118}
119
120#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
121pub struct AnnotationSummaryEnvelope {
122    pub summary: BTreeMap<String, Value>,
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
126#[serde(rename_all = "camelCase")]
127pub struct MessageSummaryData {
128    pub action: String,
129    pub serial: String,
130    pub annotations: AnnotationSummaryEnvelope,
131}
132
133#[derive(Debug, Clone, Serialize, Deserialize)]
134pub struct PresenceData {
135    pub ids: Vec<String>,
136    pub hash: AHashMap<String, Option<Value>>,
137    pub count: usize,
138}
139
140#[derive(Debug, Clone, Serialize, PartialEq)]
141#[serde(untagged)]
142pub enum MessageData {
143    String(String),
144    Structured {
145        #[serde(skip_serializing_if = "Option::is_none")]
146        channel_data: Option<String>,
147        #[serde(skip_serializing_if = "Option::is_none")]
148        channel: Option<String>,
149        #[serde(skip_serializing_if = "Option::is_none")]
150        user_data: Option<String>,
151        #[serde(flatten)]
152        extra: AHashMap<String, Value>,
153    },
154    Json(Value),
155}
156
157impl<'de> Deserialize<'de> for MessageData {
158    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
159    where
160        D: serde::Deserializer<'de>,
161    {
162        let v = JsonValue::deserialize(deserializer)?;
163        if let Some(s) = v.as_str() {
164            return Ok(MessageData::String(s.to_string()));
165        }
166        if let Some(obj) = v.as_object() {
167            // Flatten workaround for sonic-rs issue #114:
168            // manually split known structured keys and keep remaining keys in `extra`.
169            let channel_data = obj
170                .get("channel_data")
171                .and_then(|x| x.as_str())
172                .map(ToString::to_string);
173            let channel = obj
174                .get("channel")
175                .and_then(|x| x.as_str())
176                .map(ToString::to_string);
177            let user_data = obj
178                .get("user_data")
179                .and_then(|x| x.as_str())
180                .map(ToString::to_string);
181
182            if channel_data.is_some() || channel.is_some() || user_data.is_some() {
183                let mut extra = AHashMap::new();
184                for (k, val) in obj.iter() {
185                    if k != "channel_data" && k != "channel" && k != "user_data" {
186                        extra.insert(
187                            k.to_string(),
188                            serde_json_value_to_sonic(val.clone()).map_err(D::Error::custom)?,
189                        );
190                    }
191                }
192                return Ok(MessageData::Structured {
193                    channel_data,
194                    channel,
195                    user_data,
196                    extra,
197                });
198            }
199        }
200        Ok(MessageData::Json(
201            serde_json_value_to_sonic(v).map_err(D::Error::custom)?,
202        ))
203    }
204}
205
206fn serde_json_value_to_sonic(value: JsonValue) -> Result<Value, String> {
207    let encoded = serde_json::to_string(&value)
208        .map_err(|err| format!("failed to encode json value for MessageData: {err}"))?;
209    sonic_rs::from_str(&encoded)
210        .map_err(|err| format!("failed to decode json value for MessageData: {err}"))
211}
212
213#[derive(Debug, Clone, Serialize, Deserialize)]
214pub struct ErrorData {
215    pub code: Option<u16>,
216    pub message: String,
217}
218
219#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
220pub struct PusherMessage {
221    #[serde(skip_serializing_if = "Option::is_none")]
222    pub event: Option<String>,
223    #[serde(skip_serializing_if = "Option::is_none")]
224    pub channel: Option<String>,
225    #[serde(skip_serializing_if = "Option::is_none")]
226    pub data: Option<MessageData>,
227    #[serde(skip_serializing_if = "Option::is_none")]
228    pub name: Option<String>,
229    #[serde(skip_serializing_if = "Option::is_none")]
230    pub user_id: Option<String>,
231    /// Tags for filtering - uses BTreeMap for deterministic serialization order
232    /// which is required for delta compression to work correctly
233    #[serde(skip_serializing_if = "Option::is_none")]
234    pub tags: Option<BTreeMap<String, String>>,
235    /// Delta compression sequence number for full messages
236    #[serde(skip_serializing_if = "Option::is_none")]
237    pub sequence: Option<u64>,
238    /// Delta compression conflation key for message grouping
239    #[serde(skip_serializing_if = "Option::is_none")]
240    pub conflation_key: Option<String>,
241    /// Unique message ID for client-side deduplication
242    #[serde(skip_serializing_if = "Option::is_none")]
243    pub message_id: Option<String>,
244    /// Opaque per-channel continuity token for durable history and recovery.
245    /// Changes only when the server can no longer prove continuity for the channel stream.
246    #[serde(skip_serializing_if = "Option::is_none")]
247    pub stream_id: Option<String>,
248    /// Monotonically increasing serial for connection recovery.
249    /// Assigned per-channel at broadcast time when connection recovery is enabled.
250    #[serde(skip_serializing_if = "Option::is_none")]
251    pub serial: Option<u64>,
252    /// Idempotency key for cross-region deduplication.
253    /// Threaded from the HTTP publish request through the broadcast pipeline
254    /// so that receiving nodes can register it in their local cache.
255    /// Never sent to WebSocket clients.
256    #[serde(skip_serializing_if = "Option::is_none")]
257    pub idempotency_key: Option<String>,
258    /// V2 message extras envelope. Carries ephemeral flag, per-message echo
259    /// control, header-based filtering metadata, and extras-level idempotency.
260    /// Stripped from V1 deliveries; included in V2 wire format.
261    #[serde(skip_serializing_if = "Option::is_none")]
262    pub extras: Option<MessageExtras>,
263    /// Delta sequence marker for full messages in V2 delta streams.
264    #[serde(rename = "__delta_seq", skip_serializing_if = "Option::is_none")]
265    pub delta_sequence: Option<u64>,
266    /// Delta conflation key marker for full messages in V2 delta streams.
267    #[serde(rename = "__conflation_key", skip_serializing_if = "Option::is_none")]
268    pub delta_conflation_key: Option<String>,
269}
270
271#[derive(Debug, Clone, Serialize, Deserialize)]
272pub struct PusherApiMessage {
273    #[serde(skip_serializing_if = "Option::is_none")]
274    pub name: Option<String>,
275    #[serde(skip_serializing_if = "Option::is_none")]
276    pub data: Option<ApiMessageData>,
277    #[serde(skip_serializing_if = "Option::is_none")]
278    pub channel: Option<String>,
279    #[serde(skip_serializing_if = "Option::is_none")]
280    pub channels: Option<Vec<String>>,
281    #[serde(skip_serializing_if = "Option::is_none")]
282    pub socket_id: Option<String>,
283    #[serde(skip_serializing_if = "Option::is_none")]
284    pub info: Option<String>,
285    #[serde(skip_serializing_if = "Option::is_none")]
286    pub tags: Option<AHashMap<String, String>>,
287    /// Per-publish delta compression control.
288    /// - `Some(true)`: Force delta compression for this message (if client supports it)
289    /// - `Some(false)`: Force full message (skip delta compression)
290    /// - `None`: Use default behavior based on channel/global configuration
291    #[serde(skip_serializing_if = "Option::is_none")]
292    pub delta: Option<bool>,
293    /// Idempotency key for deduplicating publish requests.
294    /// If the same key is seen within the TTL window, the server returns the
295    /// cached response without re-broadcasting.
296    #[serde(skip_serializing_if = "Option::is_none")]
297    pub idempotency_key: Option<String>,
298    /// V2 extras envelope. Passed through to PusherMessage for V2 delivery.
299    #[serde(skip_serializing_if = "Option::is_none")]
300    pub extras: Option<MessageExtras>,
301}
302
303#[derive(Debug, Clone, Serialize, Deserialize)]
304pub struct BatchPusherApiMessage {
305    pub batch: Vec<PusherApiMessage>,
306}
307
308#[derive(Debug, Clone, Serialize, Deserialize)]
309#[serde(untagged)]
310pub enum ApiMessageData {
311    String(String),
312    Json(Value),
313}
314
315#[derive(Debug, Clone, Serialize, Deserialize)]
316pub struct SentPusherMessage {
317    #[serde(skip_serializing_if = "Option::is_none")]
318    pub channel: Option<String>,
319    #[serde(skip_serializing_if = "Option::is_none")]
320    pub event: Option<String>,
321    #[serde(skip_serializing_if = "Option::is_none")]
322    pub data: Option<MessageData>,
323}
324
325// Helper implementations
326impl MessageData {
327    pub fn as_string(&self) -> Option<&str> {
328        match self {
329            MessageData::String(s) => Some(s),
330            _ => None,
331        }
332    }
333
334    pub fn into_string(self) -> Option<String> {
335        match self {
336            MessageData::String(s) => Some(s),
337            _ => None,
338        }
339    }
340
341    pub fn as_value(&self) -> Option<&Value> {
342        match self {
343            MessageData::Structured { extra, .. } => extra.values().next(),
344            _ => None,
345        }
346    }
347}
348
349impl From<String> for MessageData {
350    fn from(s: String) -> Self {
351        MessageData::String(s)
352    }
353}
354
355impl From<Value> for MessageData {
356    fn from(v: Value) -> Self {
357        MessageData::Json(v)
358    }
359}
360
361impl PusherMessage {
362    pub fn is_protocol_ping_or_pong(&self) -> bool {
363        let Some(event) = self.event.as_deref() else {
364            return false;
365        };
366
367        matches!(
368            ProtocolVersion::parse_any_protocol_event(event),
369            Some(("ping", _)) | Some(("pong", _))
370        )
371    }
372
373    pub fn connection_established(socket_id: String, activity_timeout: u64) -> Self {
374        Self {
375            event: Some("pusher:connection_established".to_string()),
376            data: Some(MessageData::from(
377                json!({
378                    "socket_id": socket_id,
379                    "activity_timeout": activity_timeout  // Now configurable
380                })
381                .to_string(),
382            )),
383            channel: None,
384            name: None,
385            user_id: None,
386            sequence: None,
387            conflation_key: None,
388            tags: None,
389            message_id: None,
390            stream_id: None,
391            serial: None,
392            idempotency_key: None,
393            extras: None,
394            delta_sequence: None,
395            delta_conflation_key: None,
396        }
397    }
398    pub fn subscription_succeeded(channel: String, presence_data: Option<PresenceData>) -> Self {
399        let data_obj = if let Some(data) = presence_data {
400            json!({
401                "presence": {
402                    "ids": data.ids,
403                    "hash": data.hash,
404                    "count": data.count
405                }
406            })
407        } else {
408            json!({})
409        };
410
411        Self {
412            event: Some("pusher_internal:subscription_succeeded".to_string()),
413            channel: Some(channel),
414            data: Some(MessageData::String(data_obj.to_string())),
415            name: None,
416            user_id: None,
417            sequence: None,
418            conflation_key: None,
419            tags: None,
420            message_id: None,
421            stream_id: None,
422            serial: None,
423            idempotency_key: None,
424            extras: None,
425            delta_sequence: None,
426            delta_conflation_key: None,
427        }
428    }
429
430    pub fn error(code: u16, message: String, channel: Option<String>) -> Self {
431        Self {
432            event: Some("pusher:error".to_string()),
433            data: Some(MessageData::Json(json!({
434                "code": code,
435                "message": message
436            }))),
437            channel,
438            name: None,
439            user_id: None,
440            sequence: None,
441            conflation_key: None,
442            tags: None,
443            message_id: None,
444            stream_id: None,
445            serial: None,
446            idempotency_key: None,
447            extras: None,
448            delta_sequence: None,
449            delta_conflation_key: None,
450        }
451    }
452
453    pub fn ping() -> Self {
454        Self {
455            event: Some("pusher:ping".to_string()),
456            data: None,
457            channel: None,
458            name: None,
459            user_id: None,
460            sequence: None,
461            conflation_key: None,
462            tags: None,
463            message_id: None,
464            stream_id: None,
465            serial: None,
466            idempotency_key: None,
467            extras: None,
468            delta_sequence: None,
469            delta_conflation_key: None,
470        }
471    }
472    pub fn channel_event<S: Into<String>>(event: S, channel: S, data: Value) -> Self {
473        Self {
474            event: Some(event.into()),
475            channel: Some(channel.into()),
476            data: Some(MessageData::String(data.to_string())),
477            name: None,
478            user_id: None,
479            sequence: None,
480            conflation_key: None,
481            tags: None,
482            message_id: None,
483            stream_id: None,
484            serial: None,
485            idempotency_key: None,
486            extras: None,
487            delta_sequence: None,
488            delta_conflation_key: None,
489        }
490    }
491
492    pub fn member_added(channel: String, user_id: String, user_info: Option<Value>) -> Self {
493        Self {
494            event: Some("pusher_internal:member_added".to_string()),
495            channel: Some(channel),
496            // FIX: Use MessageData::String with JSON-encoded string instead of MessageData::Json
497            data: Some(MessageData::String(
498                json!({
499                    "user_id": user_id,
500                    "user_info": user_info.unwrap_or_else(|| json!({}))
501                })
502                .to_string(),
503            )),
504            name: None,
505            user_id: None,
506            sequence: None,
507            conflation_key: None,
508            tags: None,
509            message_id: None,
510            stream_id: None,
511            serial: None,
512            idempotency_key: None,
513            extras: None,
514            delta_sequence: None,
515            delta_conflation_key: None,
516        }
517    }
518
519    pub fn member_removed(channel: String, user_id: String) -> Self {
520        Self {
521            event: Some("pusher_internal:member_removed".to_string()),
522            channel: Some(channel),
523            // FIX: Also apply same fix to member_removed for consistency
524            data: Some(MessageData::String(
525                json!({
526                    "user_id": user_id
527                })
528                .to_string(),
529            )),
530            name: None,
531            user_id: None,
532            sequence: None,
533            conflation_key: None,
534            tags: None,
535            message_id: None,
536            stream_id: None,
537            serial: None,
538            idempotency_key: None,
539            extras: None,
540            delta_sequence: None,
541            delta_conflation_key: None,
542        }
543    }
544
545    // New helper method for pong response
546    pub fn pong() -> Self {
547        Self {
548            event: Some("pusher:pong".to_string()),
549            data: None,
550            channel: None,
551            name: None,
552            user_id: None,
553            sequence: None,
554            conflation_key: None,
555            tags: None,
556            message_id: None,
557            stream_id: None,
558            serial: None,
559            idempotency_key: None,
560            extras: None,
561            delta_sequence: None,
562            delta_conflation_key: None,
563        }
564    }
565
566    // Helper for creating channel info response
567    pub fn channel_info(
568        occupied: bool,
569        subscription_count: Option<u64>,
570        user_count: Option<u64>,
571        cache_data: Option<(String, Duration)>,
572    ) -> Value {
573        let mut response = json!({
574            "occupied": occupied
575        });
576
577        if let Some(count) = subscription_count {
578            response["subscription_count"] = json!(count);
579        }
580
581        if let Some(count) = user_count {
582            response["user_count"] = json!(count);
583        }
584
585        if let Some((data, ttl)) = cache_data {
586            response["cache"] = json!({
587                "data": data,
588                "ttl": ttl.as_secs()
589            });
590        }
591
592        response
593    }
594
595    // Helper for creating channels list response
596    pub fn channels_list(channels_info: AHashMap<String, Value>) -> Value {
597        json!({
598            "channels": channels_info
599        })
600    }
601
602    // Helper for creating user list response
603    pub fn user_list(user_ids: Vec<String>) -> Value {
604        let users = user_ids
605            .into_iter()
606            .map(|id| json!({ "id": id }))
607            .collect::<Vec<_>>();
608
609        json!({ "users": users })
610    }
611
612    // Helper for batch events response
613    pub fn batch_response(batch_info: Vec<Value>) -> Value {
614        json!({ "batch": batch_info })
615    }
616
617    // Helper for simple success response
618    pub fn success_response() -> Value {
619        json!({ "ok": true })
620    }
621
622    pub fn watchlist_online_event(user_ids: Vec<String>) -> Self {
623        Self {
624            event: Some("online".to_string()),
625            channel: None, // Watchlist events don't use channels
626            name: None,
627            data: Some(MessageData::Json(json!({
628                "user_ids": user_ids
629            }))),
630            user_id: None,
631            sequence: None,
632            conflation_key: None,
633            tags: None,
634            message_id: None,
635            stream_id: None,
636            serial: None,
637            idempotency_key: None,
638            extras: None,
639            delta_sequence: None,
640            delta_conflation_key: None,
641        }
642    }
643
644    pub fn watchlist_offline_event(user_ids: Vec<String>) -> Self {
645        Self {
646            event: Some("offline".to_string()),
647            channel: None,
648            name: None,
649            data: Some(MessageData::Json(json!({
650                "user_ids": user_ids
651            }))),
652            user_id: None,
653            sequence: None,
654            conflation_key: None,
655            tags: None,
656            message_id: None,
657            stream_id: None,
658            serial: None,
659            idempotency_key: None,
660            extras: None,
661            delta_sequence: None,
662            delta_conflation_key: None,
663        }
664    }
665
666    pub fn cache_miss_event(channel: String) -> Self {
667        Self {
668            event: Some("pusher:cache_miss".to_string()),
669            channel: Some(channel),
670            data: Some(MessageData::String("{}".to_string())),
671            name: None,
672            user_id: None,
673            sequence: None,
674            conflation_key: None,
675            tags: None,
676            message_id: None,
677            stream_id: None,
678            serial: None,
679            idempotency_key: None,
680            extras: None,
681            delta_sequence: None,
682            delta_conflation_key: None,
683        }
684    }
685
686    pub fn signin_success(user_data: String) -> Self {
687        Self {
688            event: Some("pusher:signin_success".to_string()),
689            data: Some(MessageData::Json(json!({
690                "user_data": user_data
691            }))),
692            channel: None,
693            name: None,
694            user_id: None,
695            sequence: None,
696            conflation_key: None,
697            tags: None,
698            message_id: None,
699            stream_id: None,
700            serial: None,
701            idempotency_key: None,
702            extras: None,
703            delta_sequence: None,
704            delta_conflation_key: None,
705        }
706    }
707
708    /// Create a delta-compressed message
709    pub fn delta_message(
710        channel: String,
711        event: String,
712        delta_base64: String,
713        base_sequence: u32,
714        target_sequence: u32,
715        algorithm: &str,
716    ) -> Self {
717        Self {
718            event: Some("pusher:delta".to_string()),
719            channel: Some(channel.clone()),
720            data: Some(MessageData::String(
721                json!({
722                    "channel": channel,
723                    "event": event,
724                    "delta": delta_base64,
725                    "base_seq": base_sequence,
726                    "target_seq": target_sequence,
727                    "algorithm": algorithm,
728                })
729                .to_string(),
730            )),
731            name: None,
732            user_id: None,
733            sequence: None,
734            conflation_key: None,
735            tags: None,
736            message_id: None,
737            stream_id: None,
738            serial: None,
739            idempotency_key: None,
740            extras: None,
741            delta_sequence: None,
742            delta_conflation_key: None,
743        }
744    }
745
746    /// Rewrite the event name prefix to match the given protocol version.
747    /// This is the single translation point between V1 (`pusher:`) and V2 (`sockudo:`) wire formats.
748    pub fn rewrite_prefix(&mut self, version: ProtocolVersion) {
749        if let Some(ref event) = self.event {
750            self.event = Some(version.rewrite_event_prefix(event));
751        }
752    }
753
754    /// Returns true if this message is ephemeral (skip recovery buffer and webhooks).
755    pub fn is_ephemeral(&self) -> bool {
756        self.extras
757            .as_ref()
758            .and_then(|e| e.ephemeral)
759            .unwrap_or(false)
760    }
761
762    /// Returns the extras-level idempotency key, if set.
763    pub fn extras_idempotency_key(&self) -> Option<&str> {
764        self.extras
765            .as_ref()
766            .and_then(|e| e.idempotency_key.as_deref())
767    }
768
769    /// Resolve whether this message should be echoed back to the publishing socket.
770    /// Message-level `extras.echo` takes precedence over the connection default.
771    pub fn should_echo(&self, connection_default: bool) -> bool {
772        self.extras
773            .as_ref()
774            .and_then(|e| e.echo)
775            .unwrap_or(connection_default)
776    }
777
778    /// Returns the extras headers for server-side filtering, if present.
779    pub fn filter_headers(&self) -> Option<&HashMap<String, ExtrasValue>> {
780        self.extras.as_ref().and_then(|e| e.headers.as_ref())
781    }
782
783    /// Returns true if the given protocol version should receive extras in delivered messages.
784    pub fn should_include_extras(protocol: &ProtocolVersion) -> bool {
785        matches!(protocol, ProtocolVersion::V2)
786    }
787
788    /// Add base sequence marker to a full message for delta tracking
789    pub fn add_base_sequence(mut self, base_sequence: u32) -> Self {
790        if let Some(MessageData::String(ref data_str)) = self.data
791            && let Ok(mut data_obj) = sonic_rs::from_str::<Value>(data_str)
792            && let Some(obj) = data_obj.as_object_mut()
793        {
794            obj.insert("__delta_base_seq", json!(base_sequence));
795            self.data = Some(MessageData::String(data_obj.to_string()));
796        }
797        self
798    }
799
800    /// Create delta compression enabled confirmation
801    pub fn delta_compression_enabled(default_algorithm: &str) -> Self {
802        Self {
803            event: Some("pusher:delta_compression_enabled".to_string()),
804            data: Some(MessageData::Json(json!({
805                "enabled": true,
806                "default_algorithm": default_algorithm,
807            }))),
808            channel: None,
809            name: None,
810            user_id: None,
811            sequence: None,
812            conflation_key: None,
813            tags: None,
814            message_id: None,
815            stream_id: None,
816            serial: None,
817            idempotency_key: None,
818            extras: None,
819            delta_sequence: None,
820            delta_conflation_key: None,
821        }
822    }
823}
824
825// Add a helper extension trait for working with info parameters
826pub trait InfoQueryParser {
827    fn parse_info(&self) -> Vec<&str>;
828    fn wants_user_count(&self) -> bool;
829    fn wants_subscription_count(&self) -> bool;
830    fn wants_cache(&self) -> bool;
831}
832
833impl InfoQueryParser for Option<&String> {
834    fn parse_info(&self) -> Vec<&str> {
835        self.map(|s| s.split(',').collect::<Vec<_>>())
836            .unwrap_or_default()
837    }
838
839    fn wants_user_count(&self) -> bool {
840        self.parse_info().contains(&"user_count")
841    }
842
843    fn wants_subscription_count(&self) -> bool {
844        self.parse_info().contains(&"subscription_count")
845    }
846
847    fn wants_cache(&self) -> bool {
848        self.parse_info().contains(&"cache")
849    }
850}
851
852#[cfg(test)]
853mod tests {
854    use super::{
855        AnnotationEventAction, AnnotationEventData, AnnotationSummaryEnvelope, MessageSummaryData,
856        PusherMessage,
857    };
858    use sonic_rs::JsonValueTrait;
859    use std::collections::BTreeMap;
860
861    #[test]
862    fn protocol_heartbeat_detection_matches_both_prefix_families() {
863        let mut ping = PusherMessage::ping();
864        assert!(ping.is_protocol_ping_or_pong());
865
866        ping.rewrite_prefix(crate::protocol_version::ProtocolVersion::V2);
867        assert!(ping.is_protocol_ping_or_pong());
868
869        let mut pong = PusherMessage::pong();
870        assert!(pong.is_protocol_ping_or_pong());
871
872        pong.rewrite_prefix(crate::protocol_version::ProtocolVersion::V2);
873        assert!(pong.is_protocol_ping_or_pong());
874    }
875
876    #[test]
877    fn protocol_heartbeat_detection_ignores_regular_messages() {
878        let message = PusherMessage::channel_event(
879            "chat.message",
880            "room",
881            sonic_rs::json!({"text": "hello"}),
882        );
883
884        assert!(!message.is_protocol_ping_or_pong());
885    }
886
887    #[test]
888    fn annotation_create_serializes_camel_case_contract() {
889        let data = AnnotationEventData {
890            action: AnnotationEventAction::Create,
891            id: Some("ann-1".to_string()),
892            serial: "ann:1".to_string(),
893            message_serial: "msg:1".to_string(),
894            annotation_type: "reactions:distinct.v1".to_string(),
895            name: Some("thumbsup".to_string()),
896            client_id: Some("user-123".to_string()),
897            count: Some(1),
898            data: Some(sonic_rs::json!({"raw": true})),
899            encoding: None,
900            timestamp: 1_700_000_000_000,
901        };
902
903        let value = sonic_rs::to_value(&data).unwrap();
904        assert_eq!(value["action"].as_str(), Some("annotation.create"));
905        assert_eq!(value["messageSerial"].as_str(), Some("msg:1"));
906        assert_eq!(value["type"].as_str(), Some("reactions:distinct.v1"));
907        assert_eq!(value["clientId"].as_str(), Some("user-123"));
908    }
909
910    #[test]
911    fn message_summary_serializes_summary_envelope() {
912        let mut summary = BTreeMap::new();
913        summary.insert(
914            "reactions:distinct.v1".to_string(),
915            sonic_rs::json!({"thumbsup": {"total": 5, "clientIds": ["a"], "clipped": false}}),
916        );
917        let data = MessageSummaryData {
918            action: "message.summary".to_string(),
919            serial: "msg:1".to_string(),
920            annotations: AnnotationSummaryEnvelope { summary },
921        };
922
923        let value = sonic_rs::to_value(&data).unwrap();
924        assert_eq!(value["action"].as_str(), Some("message.summary"));
925        assert_eq!(value["serial"].as_str(), Some("msg:1"));
926        assert_eq!(
927            value["annotations"]["summary"]["reactions:distinct.v1"]["thumbsup"]["total"].as_u64(),
928            Some(5)
929        );
930    }
931}