Skip to main content

sockudo_protocol/
messages.rs

1use ahash::AHashMap;
2use serde::de::Error as _;
3use serde::{Deserialize, Serialize};
4use serde_json::Value as JsonValue;
5use sonic_rs::prelude::*;
6use sonic_rs::{Value, json};
7use std::collections::{BTreeMap, HashMap};
8use std::time::Duration;
9
10use crate::protocol_version::ProtocolVersion;
11
12/// Allowed value types for extras.headers.
13/// Flat only — no Object or Array variant so nesting is structurally impossible.
14#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
15#[serde(untagged)]
16pub enum ExtrasValue {
17    String(String),
18    Number(f64),
19    Bool(bool),
20}
21
22/// Structured metadata envelope for V2-specific message features.
23///
24/// Present on the wire for V2 connections only. V1 connections receive messages
25/// with extras stripped entirely. Pusher SDKs ignore unknown fields so the
26/// field is safe to carry through internal pipelines even when the publisher
27/// is V1.
28#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
29#[serde(rename_all = "camelCase")]
30pub struct MessageExtras {
31    /// Flat metadata for server-side event name filtering.
32    /// Must be a flat object — no nested objects, no arrays.
33    #[serde(skip_serializing_if = "Option::is_none")]
34    pub headers: Option<HashMap<String, ExtrasValue>>,
35
36    /// If true: skip connection recovery buffer and webhook forwarding.
37    /// Deliver to currently connected V2 subscribers only.
38    #[serde(skip_serializing_if = "Option::is_none")]
39    pub ephemeral: Option<bool>,
40
41    /// Server-side deduplication key. If the same key arrives again within
42    /// the app's idempotency TTL window, the message is silently dropped.
43    #[serde(skip_serializing_if = "Option::is_none")]
44    pub idempotency_key: Option<String>,
45
46    /// Per-message echo control. Overrides the connection-level echo setting
47    /// when explicitly set.
48    #[serde(skip_serializing_if = "Option::is_none")]
49    pub echo: Option<bool>,
50}
51
52impl MessageExtras {
53    /// Validate that headers (if present) contain only flat scalar values.
54    /// This is structurally guaranteed by `ExtrasValue` having no Object/Array
55    /// variants, but this method provides an explicit check with a clear error
56    /// when validating raw JSON before deserialization.
57    pub fn validate_headers_from_json(raw: &Value) -> Result<(), String> {
58        if let Some(extras) = raw.get("extras")
59            && let Some(headers) = extras.get("headers")
60            && let Some(obj) = headers.as_object()
61        {
62            for (key, val) in obj.iter() {
63                if val.is_object() || val.is_array() {
64                    return Err(format!(
65                        "extras.headers must be a flat object — nested objects and arrays are not allowed (key: '{key}')"
66                    ));
67                }
68            }
69        }
70        Ok(())
71    }
72}
73
74/// Generate a unique message ID (UUIDv4) for client-side deduplication.
75pub fn generate_message_id() -> String {
76    uuid::Uuid::new_v4().to_string()
77}
78
79pub const ANNOTATION_EVENT_NAME: &str = "sockudo_internal:annotation";
80pub const MESSAGE_SUMMARY_EVENT_NAME: &str = "sockudo_internal:message";
81pub const ANNOTATION_SUBSCRIBE_MODE: &str = "ANNOTATION_SUBSCRIBE";
82
83#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
84pub enum AnnotationEventAction {
85    #[serde(rename = "annotation.create")]
86    Create,
87    #[serde(rename = "annotation.delete")]
88    Delete,
89}
90
91#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
92#[serde(rename_all = "camelCase")]
93pub struct AnnotationEventData {
94    pub action: AnnotationEventAction,
95    #[serde(skip_serializing_if = "Option::is_none")]
96    pub id: Option<String>,
97    pub serial: String,
98    pub message_serial: String,
99    #[serde(rename = "type")]
100    pub annotation_type: String,
101    #[serde(skip_serializing_if = "Option::is_none")]
102    pub name: Option<String>,
103    #[serde(skip_serializing_if = "Option::is_none")]
104    pub client_id: Option<String>,
105    #[serde(skip_serializing_if = "Option::is_none")]
106    pub count: Option<u64>,
107    #[serde(skip_serializing_if = "Option::is_none")]
108    pub data: Option<Value>,
109    #[serde(skip_serializing_if = "Option::is_none")]
110    pub encoding: Option<String>,
111    pub timestamp: i64,
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
115pub struct AnnotationSummaryEnvelope {
116    pub summary: BTreeMap<String, Value>,
117}
118
119#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
120#[serde(rename_all = "camelCase")]
121pub struct MessageSummaryData {
122    pub action: String,
123    pub serial: String,
124    pub annotations: AnnotationSummaryEnvelope,
125}
126
127#[derive(Debug, Clone, Serialize, Deserialize)]
128pub struct PresenceData {
129    pub ids: Vec<String>,
130    pub hash: AHashMap<String, Option<Value>>,
131    pub count: usize,
132}
133
134#[derive(Debug, Clone, Serialize, PartialEq)]
135#[serde(untagged)]
136pub enum MessageData {
137    String(String),
138    Structured {
139        #[serde(skip_serializing_if = "Option::is_none")]
140        channel_data: Option<String>,
141        #[serde(skip_serializing_if = "Option::is_none")]
142        channel: Option<String>,
143        #[serde(skip_serializing_if = "Option::is_none")]
144        user_data: Option<String>,
145        #[serde(flatten)]
146        extra: AHashMap<String, Value>,
147    },
148    Json(Value),
149}
150
151impl<'de> Deserialize<'de> for MessageData {
152    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
153    where
154        D: serde::Deserializer<'de>,
155    {
156        let v = JsonValue::deserialize(deserializer)?;
157        if let Some(s) = v.as_str() {
158            return Ok(MessageData::String(s.to_string()));
159        }
160        if let Some(obj) = v.as_object() {
161            // Flatten workaround for sonic-rs issue #114:
162            // manually split known structured keys and keep remaining keys in `extra`.
163            let channel_data = obj
164                .get("channel_data")
165                .and_then(|x| x.as_str())
166                .map(ToString::to_string);
167            let channel = obj
168                .get("channel")
169                .and_then(|x| x.as_str())
170                .map(ToString::to_string);
171            let user_data = obj
172                .get("user_data")
173                .and_then(|x| x.as_str())
174                .map(ToString::to_string);
175
176            if channel_data.is_some() || channel.is_some() || user_data.is_some() {
177                let mut extra = AHashMap::new();
178                for (k, val) in obj.iter() {
179                    if k != "channel_data" && k != "channel" && k != "user_data" {
180                        extra.insert(
181                            k.to_string(),
182                            serde_json_value_to_sonic(val.clone()).map_err(D::Error::custom)?,
183                        );
184                    }
185                }
186                return Ok(MessageData::Structured {
187                    channel_data,
188                    channel,
189                    user_data,
190                    extra,
191                });
192            }
193        }
194        Ok(MessageData::Json(
195            serde_json_value_to_sonic(v).map_err(D::Error::custom)?,
196        ))
197    }
198}
199
200fn serde_json_value_to_sonic(value: JsonValue) -> Result<Value, String> {
201    let encoded = serde_json::to_string(&value)
202        .map_err(|err| format!("failed to encode json value for MessageData: {err}"))?;
203    sonic_rs::from_str(&encoded)
204        .map_err(|err| format!("failed to decode json value for MessageData: {err}"))
205}
206
207#[derive(Debug, Clone, Serialize, Deserialize)]
208pub struct ErrorData {
209    pub code: Option<u16>,
210    pub message: String,
211}
212
213#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
214pub struct PusherMessage {
215    #[serde(skip_serializing_if = "Option::is_none")]
216    pub event: Option<String>,
217    #[serde(skip_serializing_if = "Option::is_none")]
218    pub channel: Option<String>,
219    #[serde(skip_serializing_if = "Option::is_none")]
220    pub data: Option<MessageData>,
221    #[serde(skip_serializing_if = "Option::is_none")]
222    pub name: Option<String>,
223    #[serde(skip_serializing_if = "Option::is_none")]
224    pub user_id: Option<String>,
225    /// Tags for filtering - uses BTreeMap for deterministic serialization order
226    /// which is required for delta compression to work correctly
227    #[serde(skip_serializing_if = "Option::is_none")]
228    pub tags: Option<BTreeMap<String, String>>,
229    /// Delta compression sequence number for full messages
230    #[serde(skip_serializing_if = "Option::is_none")]
231    pub sequence: Option<u64>,
232    /// Delta compression conflation key for message grouping
233    #[serde(skip_serializing_if = "Option::is_none")]
234    pub conflation_key: Option<String>,
235    /// Unique message ID for client-side deduplication
236    #[serde(skip_serializing_if = "Option::is_none")]
237    pub message_id: Option<String>,
238    /// Opaque per-channel continuity token for durable history and recovery.
239    /// Changes only when the server can no longer prove continuity for the channel stream.
240    #[serde(skip_serializing_if = "Option::is_none")]
241    pub stream_id: Option<String>,
242    /// Monotonically increasing serial for connection recovery.
243    /// Assigned per-channel at broadcast time when connection recovery is enabled.
244    #[serde(skip_serializing_if = "Option::is_none")]
245    pub serial: Option<u64>,
246    /// Idempotency key for cross-region deduplication.
247    /// Threaded from the HTTP publish request through the broadcast pipeline
248    /// so that receiving nodes can register it in their local cache.
249    /// Never sent to WebSocket clients.
250    #[serde(skip_serializing_if = "Option::is_none")]
251    pub idempotency_key: Option<String>,
252    /// V2 message extras envelope. Carries ephemeral flag, per-message echo
253    /// control, header-based filtering metadata, and extras-level idempotency.
254    /// Stripped from V1 deliveries; included in V2 wire format.
255    #[serde(skip_serializing_if = "Option::is_none")]
256    pub extras: Option<MessageExtras>,
257    /// Delta sequence marker for full messages in V2 delta streams.
258    #[serde(rename = "__delta_seq", skip_serializing_if = "Option::is_none")]
259    pub delta_sequence: Option<u64>,
260    /// Delta conflation key marker for full messages in V2 delta streams.
261    #[serde(rename = "__conflation_key", skip_serializing_if = "Option::is_none")]
262    pub delta_conflation_key: Option<String>,
263}
264
265#[derive(Debug, Clone, Serialize, Deserialize)]
266pub struct PusherApiMessage {
267    #[serde(skip_serializing_if = "Option::is_none")]
268    pub name: Option<String>,
269    #[serde(skip_serializing_if = "Option::is_none")]
270    pub data: Option<ApiMessageData>,
271    #[serde(skip_serializing_if = "Option::is_none")]
272    pub channel: Option<String>,
273    #[serde(skip_serializing_if = "Option::is_none")]
274    pub channels: Option<Vec<String>>,
275    #[serde(skip_serializing_if = "Option::is_none")]
276    pub socket_id: Option<String>,
277    #[serde(skip_serializing_if = "Option::is_none")]
278    pub info: Option<String>,
279    #[serde(skip_serializing_if = "Option::is_none")]
280    pub tags: Option<AHashMap<String, String>>,
281    /// Per-publish delta compression control.
282    /// - `Some(true)`: Force delta compression for this message (if client supports it)
283    /// - `Some(false)`: Force full message (skip delta compression)
284    /// - `None`: Use default behavior based on channel/global configuration
285    #[serde(skip_serializing_if = "Option::is_none")]
286    pub delta: Option<bool>,
287    /// Idempotency key for deduplicating publish requests.
288    /// If the same key is seen within the TTL window, the server returns the
289    /// cached response without re-broadcasting.
290    #[serde(skip_serializing_if = "Option::is_none")]
291    pub idempotency_key: Option<String>,
292    /// V2 extras envelope. Passed through to PusherMessage for V2 delivery.
293    #[serde(skip_serializing_if = "Option::is_none")]
294    pub extras: Option<MessageExtras>,
295}
296
297#[derive(Debug, Clone, Serialize, Deserialize)]
298pub struct BatchPusherApiMessage {
299    pub batch: Vec<PusherApiMessage>,
300}
301
302#[derive(Debug, Clone, Serialize, Deserialize)]
303#[serde(untagged)]
304pub enum ApiMessageData {
305    String(String),
306    Json(Value),
307}
308
309#[derive(Debug, Clone, Serialize, Deserialize)]
310pub struct SentPusherMessage {
311    #[serde(skip_serializing_if = "Option::is_none")]
312    pub channel: Option<String>,
313    #[serde(skip_serializing_if = "Option::is_none")]
314    pub event: Option<String>,
315    #[serde(skip_serializing_if = "Option::is_none")]
316    pub data: Option<MessageData>,
317}
318
319// Helper implementations
320impl MessageData {
321    pub fn as_string(&self) -> Option<&str> {
322        match self {
323            MessageData::String(s) => Some(s),
324            _ => None,
325        }
326    }
327
328    pub fn into_string(self) -> Option<String> {
329        match self {
330            MessageData::String(s) => Some(s),
331            _ => None,
332        }
333    }
334
335    pub fn as_value(&self) -> Option<&Value> {
336        match self {
337            MessageData::Structured { extra, .. } => extra.values().next(),
338            _ => None,
339        }
340    }
341}
342
343impl From<String> for MessageData {
344    fn from(s: String) -> Self {
345        MessageData::String(s)
346    }
347}
348
349impl From<Value> for MessageData {
350    fn from(v: Value) -> Self {
351        MessageData::Json(v)
352    }
353}
354
355impl PusherMessage {
356    pub fn is_protocol_ping_or_pong(&self) -> bool {
357        let Some(event) = self.event.as_deref() else {
358            return false;
359        };
360
361        matches!(
362            ProtocolVersion::parse_any_protocol_event(event),
363            Some(("ping", _)) | Some(("pong", _))
364        )
365    }
366
367    pub fn connection_established(socket_id: String, activity_timeout: u64) -> Self {
368        Self {
369            event: Some("pusher:connection_established".to_string()),
370            data: Some(MessageData::from(
371                json!({
372                    "socket_id": socket_id,
373                    "activity_timeout": activity_timeout  // Now configurable
374                })
375                .to_string(),
376            )),
377            channel: None,
378            name: None,
379            user_id: None,
380            sequence: None,
381            conflation_key: None,
382            tags: None,
383            message_id: None,
384            stream_id: None,
385            serial: None,
386            idempotency_key: None,
387            extras: None,
388            delta_sequence: None,
389            delta_conflation_key: None,
390        }
391    }
392    pub fn subscription_succeeded(channel: String, presence_data: Option<PresenceData>) -> Self {
393        let data_obj = if let Some(data) = presence_data {
394            json!({
395                "presence": {
396                    "ids": data.ids,
397                    "hash": data.hash,
398                    "count": data.count
399                }
400            })
401        } else {
402            json!({})
403        };
404
405        Self {
406            event: Some("pusher_internal:subscription_succeeded".to_string()),
407            channel: Some(channel),
408            data: Some(MessageData::String(data_obj.to_string())),
409            name: None,
410            user_id: None,
411            sequence: None,
412            conflation_key: None,
413            tags: None,
414            message_id: None,
415            stream_id: None,
416            serial: None,
417            idempotency_key: None,
418            extras: None,
419            delta_sequence: None,
420            delta_conflation_key: None,
421        }
422    }
423
424    pub fn error(code: u16, message: String, channel: Option<String>) -> Self {
425        Self {
426            event: Some("pusher:error".to_string()),
427            data: Some(MessageData::Json(json!({
428                "code": code,
429                "message": message
430            }))),
431            channel,
432            name: None,
433            user_id: None,
434            sequence: None,
435            conflation_key: None,
436            tags: None,
437            message_id: None,
438            stream_id: None,
439            serial: None,
440            idempotency_key: None,
441            extras: None,
442            delta_sequence: None,
443            delta_conflation_key: None,
444        }
445    }
446
447    pub fn ping() -> Self {
448        Self {
449            event: Some("pusher:ping".to_string()),
450            data: None,
451            channel: None,
452            name: None,
453            user_id: None,
454            sequence: None,
455            conflation_key: None,
456            tags: None,
457            message_id: None,
458            stream_id: None,
459            serial: None,
460            idempotency_key: None,
461            extras: None,
462            delta_sequence: None,
463            delta_conflation_key: None,
464        }
465    }
466    pub fn channel_event<S: Into<String>>(event: S, channel: S, data: Value) -> Self {
467        Self {
468            event: Some(event.into()),
469            channel: Some(channel.into()),
470            data: Some(MessageData::String(data.to_string())),
471            name: None,
472            user_id: None,
473            sequence: None,
474            conflation_key: None,
475            tags: None,
476            message_id: None,
477            stream_id: None,
478            serial: None,
479            idempotency_key: None,
480            extras: None,
481            delta_sequence: None,
482            delta_conflation_key: None,
483        }
484    }
485
486    pub fn member_added(channel: String, user_id: String, user_info: Option<Value>) -> Self {
487        Self {
488            event: Some("pusher_internal:member_added".to_string()),
489            channel: Some(channel),
490            // FIX: Use MessageData::String with JSON-encoded string instead of MessageData::Json
491            data: Some(MessageData::String(
492                json!({
493                    "user_id": user_id,
494                    "user_info": user_info.unwrap_or_else(|| json!({}))
495                })
496                .to_string(),
497            )),
498            name: None,
499            user_id: None,
500            sequence: None,
501            conflation_key: None,
502            tags: None,
503            message_id: None,
504            stream_id: None,
505            serial: None,
506            idempotency_key: None,
507            extras: None,
508            delta_sequence: None,
509            delta_conflation_key: None,
510        }
511    }
512
513    pub fn member_removed(channel: String, user_id: String) -> Self {
514        Self {
515            event: Some("pusher_internal:member_removed".to_string()),
516            channel: Some(channel),
517            // FIX: Also apply same fix to member_removed for consistency
518            data: Some(MessageData::String(
519                json!({
520                    "user_id": user_id
521                })
522                .to_string(),
523            )),
524            name: None,
525            user_id: None,
526            sequence: None,
527            conflation_key: None,
528            tags: None,
529            message_id: None,
530            stream_id: None,
531            serial: None,
532            idempotency_key: None,
533            extras: None,
534            delta_sequence: None,
535            delta_conflation_key: None,
536        }
537    }
538
539    // New helper method for pong response
540    pub fn pong() -> Self {
541        Self {
542            event: Some("pusher:pong".to_string()),
543            data: None,
544            channel: None,
545            name: None,
546            user_id: None,
547            sequence: None,
548            conflation_key: None,
549            tags: None,
550            message_id: None,
551            stream_id: None,
552            serial: None,
553            idempotency_key: None,
554            extras: None,
555            delta_sequence: None,
556            delta_conflation_key: None,
557        }
558    }
559
560    // Helper for creating channel info response
561    pub fn channel_info(
562        occupied: bool,
563        subscription_count: Option<u64>,
564        user_count: Option<u64>,
565        cache_data: Option<(String, Duration)>,
566    ) -> Value {
567        let mut response = json!({
568            "occupied": occupied
569        });
570
571        if let Some(count) = subscription_count {
572            response["subscription_count"] = json!(count);
573        }
574
575        if let Some(count) = user_count {
576            response["user_count"] = json!(count);
577        }
578
579        if let Some((data, ttl)) = cache_data {
580            response["cache"] = json!({
581                "data": data,
582                "ttl": ttl.as_secs()
583            });
584        }
585
586        response
587    }
588
589    // Helper for creating channels list response
590    pub fn channels_list(channels_info: AHashMap<String, Value>) -> Value {
591        json!({
592            "channels": channels_info
593        })
594    }
595
596    // Helper for creating user list response
597    pub fn user_list(user_ids: Vec<String>) -> Value {
598        let users = user_ids
599            .into_iter()
600            .map(|id| json!({ "id": id }))
601            .collect::<Vec<_>>();
602
603        json!({ "users": users })
604    }
605
606    // Helper for batch events response
607    pub fn batch_response(batch_info: Vec<Value>) -> Value {
608        json!({ "batch": batch_info })
609    }
610
611    // Helper for simple success response
612    pub fn success_response() -> Value {
613        json!({ "ok": true })
614    }
615
616    pub fn watchlist_online_event(user_ids: Vec<String>) -> Self {
617        Self {
618            event: Some("online".to_string()),
619            channel: None, // Watchlist events don't use channels
620            name: None,
621            data: Some(MessageData::Json(json!({
622                "user_ids": user_ids
623            }))),
624            user_id: None,
625            sequence: None,
626            conflation_key: None,
627            tags: None,
628            message_id: None,
629            stream_id: None,
630            serial: None,
631            idempotency_key: None,
632            extras: None,
633            delta_sequence: None,
634            delta_conflation_key: None,
635        }
636    }
637
638    pub fn watchlist_offline_event(user_ids: Vec<String>) -> Self {
639        Self {
640            event: Some("offline".to_string()),
641            channel: None,
642            name: None,
643            data: Some(MessageData::Json(json!({
644                "user_ids": user_ids
645            }))),
646            user_id: None,
647            sequence: None,
648            conflation_key: None,
649            tags: None,
650            message_id: None,
651            stream_id: None,
652            serial: None,
653            idempotency_key: None,
654            extras: None,
655            delta_sequence: None,
656            delta_conflation_key: None,
657        }
658    }
659
660    pub fn cache_miss_event(channel: String) -> Self {
661        Self {
662            event: Some("pusher:cache_miss".to_string()),
663            channel: Some(channel),
664            data: Some(MessageData::String("{}".to_string())),
665            name: None,
666            user_id: None,
667            sequence: None,
668            conflation_key: None,
669            tags: None,
670            message_id: None,
671            stream_id: None,
672            serial: None,
673            idempotency_key: None,
674            extras: None,
675            delta_sequence: None,
676            delta_conflation_key: None,
677        }
678    }
679
680    pub fn signin_success(user_data: String) -> Self {
681        Self {
682            event: Some("pusher:signin_success".to_string()),
683            data: Some(MessageData::Json(json!({
684                "user_data": user_data
685            }))),
686            channel: None,
687            name: None,
688            user_id: None,
689            sequence: None,
690            conflation_key: None,
691            tags: None,
692            message_id: None,
693            stream_id: None,
694            serial: None,
695            idempotency_key: None,
696            extras: None,
697            delta_sequence: None,
698            delta_conflation_key: None,
699        }
700    }
701
702    /// Create a delta-compressed message
703    pub fn delta_message(
704        channel: String,
705        event: String,
706        delta_base64: String,
707        base_sequence: u32,
708        target_sequence: u32,
709        algorithm: &str,
710    ) -> Self {
711        Self {
712            event: Some("pusher:delta".to_string()),
713            channel: Some(channel.clone()),
714            data: Some(MessageData::String(
715                json!({
716                    "channel": channel,
717                    "event": event,
718                    "delta": delta_base64,
719                    "base_seq": base_sequence,
720                    "target_seq": target_sequence,
721                    "algorithm": algorithm,
722                })
723                .to_string(),
724            )),
725            name: None,
726            user_id: None,
727            sequence: None,
728            conflation_key: None,
729            tags: None,
730            message_id: None,
731            stream_id: None,
732            serial: None,
733            idempotency_key: None,
734            extras: None,
735            delta_sequence: None,
736            delta_conflation_key: None,
737        }
738    }
739
740    /// Rewrite the event name prefix to match the given protocol version.
741    /// This is the single translation point between V1 (`pusher:`) and V2 (`sockudo:`) wire formats.
742    pub fn rewrite_prefix(&mut self, version: ProtocolVersion) {
743        if let Some(ref event) = self.event {
744            self.event = Some(version.rewrite_event_prefix(event));
745        }
746    }
747
748    /// Returns true if this message is ephemeral (skip recovery buffer and webhooks).
749    pub fn is_ephemeral(&self) -> bool {
750        self.extras
751            .as_ref()
752            .and_then(|e| e.ephemeral)
753            .unwrap_or(false)
754    }
755
756    /// Returns the extras-level idempotency key, if set.
757    pub fn extras_idempotency_key(&self) -> Option<&str> {
758        self.extras
759            .as_ref()
760            .and_then(|e| e.idempotency_key.as_deref())
761    }
762
763    /// Resolve whether this message should be echoed back to the publishing socket.
764    /// Message-level `extras.echo` takes precedence over the connection default.
765    pub fn should_echo(&self, connection_default: bool) -> bool {
766        self.extras
767            .as_ref()
768            .and_then(|e| e.echo)
769            .unwrap_or(connection_default)
770    }
771
772    /// Returns the extras headers for server-side filtering, if present.
773    pub fn filter_headers(&self) -> Option<&HashMap<String, ExtrasValue>> {
774        self.extras.as_ref().and_then(|e| e.headers.as_ref())
775    }
776
777    /// Returns true if the given protocol version should receive extras in delivered messages.
778    pub fn should_include_extras(protocol: &ProtocolVersion) -> bool {
779        matches!(protocol, ProtocolVersion::V2)
780    }
781
782    /// Add base sequence marker to a full message for delta tracking
783    pub fn add_base_sequence(mut self, base_sequence: u32) -> Self {
784        if let Some(MessageData::String(ref data_str)) = self.data
785            && let Ok(mut data_obj) = sonic_rs::from_str::<Value>(data_str)
786            && let Some(obj) = data_obj.as_object_mut()
787        {
788            obj.insert("__delta_base_seq", json!(base_sequence));
789            self.data = Some(MessageData::String(data_obj.to_string()));
790        }
791        self
792    }
793
794    /// Create delta compression enabled confirmation
795    pub fn delta_compression_enabled(default_algorithm: &str) -> Self {
796        Self {
797            event: Some("pusher:delta_compression_enabled".to_string()),
798            data: Some(MessageData::Json(json!({
799                "enabled": true,
800                "default_algorithm": default_algorithm,
801            }))),
802            channel: None,
803            name: None,
804            user_id: None,
805            sequence: None,
806            conflation_key: None,
807            tags: None,
808            message_id: None,
809            stream_id: None,
810            serial: None,
811            idempotency_key: None,
812            extras: None,
813            delta_sequence: None,
814            delta_conflation_key: None,
815        }
816    }
817}
818
819// Add a helper extension trait for working with info parameters
820pub trait InfoQueryParser {
821    fn parse_info(&self) -> Vec<&str>;
822    fn wants_user_count(&self) -> bool;
823    fn wants_subscription_count(&self) -> bool;
824    fn wants_cache(&self) -> bool;
825}
826
827impl InfoQueryParser for Option<&String> {
828    fn parse_info(&self) -> Vec<&str> {
829        self.map(|s| s.split(',').collect::<Vec<_>>())
830            .unwrap_or_default()
831    }
832
833    fn wants_user_count(&self) -> bool {
834        self.parse_info().contains(&"user_count")
835    }
836
837    fn wants_subscription_count(&self) -> bool {
838        self.parse_info().contains(&"subscription_count")
839    }
840
841    fn wants_cache(&self) -> bool {
842        self.parse_info().contains(&"cache")
843    }
844}
845
846#[cfg(test)]
847mod tests {
848    use super::{
849        AnnotationEventAction, AnnotationEventData, AnnotationSummaryEnvelope, MessageSummaryData,
850        PusherMessage,
851    };
852    use sonic_rs::JsonValueTrait;
853    use std::collections::BTreeMap;
854
855    #[test]
856    fn protocol_heartbeat_detection_matches_both_prefix_families() {
857        let mut ping = PusherMessage::ping();
858        assert!(ping.is_protocol_ping_or_pong());
859
860        ping.rewrite_prefix(crate::protocol_version::ProtocolVersion::V2);
861        assert!(ping.is_protocol_ping_or_pong());
862
863        let mut pong = PusherMessage::pong();
864        assert!(pong.is_protocol_ping_or_pong());
865
866        pong.rewrite_prefix(crate::protocol_version::ProtocolVersion::V2);
867        assert!(pong.is_protocol_ping_or_pong());
868    }
869
870    #[test]
871    fn protocol_heartbeat_detection_ignores_regular_messages() {
872        let message = PusherMessage::channel_event(
873            "chat.message",
874            "room",
875            sonic_rs::json!({"text": "hello"}),
876        );
877
878        assert!(!message.is_protocol_ping_or_pong());
879    }
880
881    #[test]
882    fn annotation_create_serializes_camel_case_contract() {
883        let data = AnnotationEventData {
884            action: AnnotationEventAction::Create,
885            id: Some("ann-1".to_string()),
886            serial: "ann:1".to_string(),
887            message_serial: "msg:1".to_string(),
888            annotation_type: "reactions:distinct.v1".to_string(),
889            name: Some("thumbsup".to_string()),
890            client_id: Some("user-123".to_string()),
891            count: Some(1),
892            data: Some(sonic_rs::json!({"raw": true})),
893            encoding: None,
894            timestamp: 1_700_000_000_000,
895        };
896
897        let value = sonic_rs::to_value(&data).unwrap();
898        assert_eq!(value["action"].as_str(), Some("annotation.create"));
899        assert_eq!(value["messageSerial"].as_str(), Some("msg:1"));
900        assert_eq!(value["type"].as_str(), Some("reactions:distinct.v1"));
901        assert_eq!(value["clientId"].as_str(), Some("user-123"));
902    }
903
904    #[test]
905    fn message_summary_serializes_summary_envelope() {
906        let mut summary = BTreeMap::new();
907        summary.insert(
908            "reactions:distinct.v1".to_string(),
909            sonic_rs::json!({"thumbsup": {"total": 5, "clientIds": ["a"], "clipped": false}}),
910        );
911        let data = MessageSummaryData {
912            action: "message.summary".to_string(),
913            serial: "msg:1".to_string(),
914            annotations: AnnotationSummaryEnvelope { summary },
915        };
916
917        let value = sonic_rs::to_value(&data).unwrap();
918        assert_eq!(value["action"].as_str(), Some("message.summary"));
919        assert_eq!(value["serial"].as_str(), Some("msg:1"));
920        assert_eq!(
921            value["annotations"]["summary"]["reactions:distinct.v1"]["thumbsup"]["total"].as_u64(),
922            Some(5)
923        );
924    }
925}