1use ahash::AHashMap;
2use serde::de::Error as _;
3use serde::{Deserialize, Serialize};
4use serde_json::Value as JsonValue;
5use sonic_rs::prelude::*;
6use sonic_rs::{Value, json};
7use std::collections::{BTreeMap, HashMap};
8use std::time::Duration;
9
10use crate::protocol_version::ProtocolVersion;
11
12#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
15#[serde(untagged)]
16pub enum ExtrasValue {
17 String(String),
18 Number(f64),
19 Bool(bool),
20}
21
22#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
29#[serde(rename_all = "camelCase")]
30pub struct MessageExtras {
31 #[serde(skip_serializing_if = "Option::is_none")]
34 pub headers: Option<HashMap<String, ExtrasValue>>,
35
36 #[serde(skip_serializing_if = "Option::is_none")]
39 pub ephemeral: Option<bool>,
40
41 #[serde(skip_serializing_if = "Option::is_none")]
44 pub idempotency_key: Option<String>,
45
46 #[serde(skip_serializing_if = "Option::is_none")]
50 pub push: Option<Value>,
51
52 #[serde(skip_serializing_if = "Option::is_none")]
55 pub echo: Option<bool>,
56}
57
58impl MessageExtras {
59 pub fn validate_headers_from_json(raw: &Value) -> Result<(), String> {
64 if let Some(extras) = raw.get("extras")
65 && let Some(headers) = extras.get("headers")
66 && let Some(obj) = headers.as_object()
67 {
68 for (key, val) in obj.iter() {
69 if val.is_object() || val.is_array() {
70 return Err(format!(
71 "extras.headers must be a flat object — nested objects and arrays are not allowed (key: '{key}')"
72 ));
73 }
74 }
75 }
76 Ok(())
77 }
78}
79
80pub fn generate_message_id() -> String {
82 uuid::Uuid::new_v4().to_string()
83}
84
85pub const ANNOTATION_EVENT_NAME: &str = "sockudo_internal:annotation";
86pub const MESSAGE_SUMMARY_EVENT_NAME: &str = "sockudo_internal:message";
87pub const ANNOTATION_SUBSCRIBE_MODE: &str = "ANNOTATION_SUBSCRIBE";
88
89#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
90pub enum AnnotationEventAction {
91 #[serde(rename = "annotation.create")]
92 Create,
93 #[serde(rename = "annotation.delete")]
94 Delete,
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
98#[serde(rename_all = "camelCase")]
99pub struct AnnotationEventData {
100 pub action: AnnotationEventAction,
101 #[serde(skip_serializing_if = "Option::is_none")]
102 pub id: Option<String>,
103 pub serial: String,
104 pub message_serial: String,
105 #[serde(rename = "type")]
106 pub annotation_type: String,
107 #[serde(skip_serializing_if = "Option::is_none")]
108 pub name: Option<String>,
109 #[serde(skip_serializing_if = "Option::is_none")]
110 pub client_id: Option<String>,
111 #[serde(skip_serializing_if = "Option::is_none")]
112 pub count: Option<u64>,
113 #[serde(skip_serializing_if = "Option::is_none")]
114 pub data: Option<Value>,
115 #[serde(skip_serializing_if = "Option::is_none")]
116 pub encoding: Option<String>,
117 pub timestamp: i64,
118}
119
120#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
121pub struct AnnotationSummaryEnvelope {
122 pub summary: BTreeMap<String, Value>,
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
126#[serde(rename_all = "camelCase")]
127pub struct MessageSummaryData {
128 pub action: String,
129 pub serial: String,
130 pub annotations: AnnotationSummaryEnvelope,
131}
132
133#[derive(Debug, Clone, Serialize, Deserialize)]
134pub struct PresenceData {
135 pub ids: Vec<String>,
136 pub hash: AHashMap<String, Option<Value>>,
137 pub count: usize,
138}
139
140#[derive(Debug, Clone, Serialize, PartialEq)]
141#[serde(untagged)]
142pub enum MessageData {
143 String(String),
144 Structured {
145 #[serde(skip_serializing_if = "Option::is_none")]
146 channel_data: Option<String>,
147 #[serde(skip_serializing_if = "Option::is_none")]
148 channel: Option<String>,
149 #[serde(skip_serializing_if = "Option::is_none")]
150 user_data: Option<String>,
151 #[serde(flatten)]
152 extra: AHashMap<String, Value>,
153 },
154 Json(Value),
155}
156
157impl<'de> Deserialize<'de> for MessageData {
158 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
159 where
160 D: serde::Deserializer<'de>,
161 {
162 let v = JsonValue::deserialize(deserializer)?;
163 if let Some(s) = v.as_str() {
164 return Ok(MessageData::String(s.to_string()));
165 }
166 if let Some(obj) = v.as_object() {
167 let channel_data = obj
170 .get("channel_data")
171 .and_then(|x| x.as_str())
172 .map(ToString::to_string);
173 let channel = obj
174 .get("channel")
175 .and_then(|x| x.as_str())
176 .map(ToString::to_string);
177 let user_data = obj
178 .get("user_data")
179 .and_then(|x| x.as_str())
180 .map(ToString::to_string);
181
182 if channel_data.is_some() || channel.is_some() || user_data.is_some() {
183 let mut extra = AHashMap::new();
184 for (k, val) in obj.iter() {
185 if k != "channel_data" && k != "channel" && k != "user_data" {
186 extra.insert(
187 k.to_string(),
188 serde_json_value_to_sonic(val.clone()).map_err(D::Error::custom)?,
189 );
190 }
191 }
192 return Ok(MessageData::Structured {
193 channel_data,
194 channel,
195 user_data,
196 extra,
197 });
198 }
199 }
200 Ok(MessageData::Json(
201 serde_json_value_to_sonic(v).map_err(D::Error::custom)?,
202 ))
203 }
204}
205
206fn serde_json_value_to_sonic(value: JsonValue) -> Result<Value, String> {
207 let encoded = serde_json::to_string(&value)
208 .map_err(|err| format!("failed to encode json value for MessageData: {err}"))?;
209 sonic_rs::from_str(&encoded)
210 .map_err(|err| format!("failed to decode json value for MessageData: {err}"))
211}
212
213#[derive(Debug, Clone, Serialize, Deserialize)]
214pub struct ErrorData {
215 pub code: Option<u16>,
216 pub message: String,
217}
218
219#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
220pub struct PusherMessage {
221 #[serde(skip_serializing_if = "Option::is_none")]
222 pub event: Option<String>,
223 #[serde(skip_serializing_if = "Option::is_none")]
224 pub channel: Option<String>,
225 #[serde(skip_serializing_if = "Option::is_none")]
226 pub data: Option<MessageData>,
227 #[serde(skip_serializing_if = "Option::is_none")]
228 pub name: Option<String>,
229 #[serde(skip_serializing_if = "Option::is_none")]
230 pub user_id: Option<String>,
231 #[serde(skip_serializing_if = "Option::is_none")]
234 pub tags: Option<BTreeMap<String, String>>,
235 #[serde(skip_serializing_if = "Option::is_none")]
237 pub sequence: Option<u64>,
238 #[serde(skip_serializing_if = "Option::is_none")]
240 pub conflation_key: Option<String>,
241 #[serde(skip_serializing_if = "Option::is_none")]
243 pub message_id: Option<String>,
244 #[serde(skip_serializing_if = "Option::is_none")]
247 pub stream_id: Option<String>,
248 #[serde(skip_serializing_if = "Option::is_none")]
251 pub serial: Option<u64>,
252 #[serde(skip_serializing_if = "Option::is_none")]
257 pub idempotency_key: Option<String>,
258 #[serde(skip_serializing_if = "Option::is_none")]
262 pub extras: Option<MessageExtras>,
263 #[serde(rename = "__delta_seq", skip_serializing_if = "Option::is_none")]
265 pub delta_sequence: Option<u64>,
266 #[serde(rename = "__conflation_key", skip_serializing_if = "Option::is_none")]
268 pub delta_conflation_key: Option<String>,
269}
270
271#[derive(Debug, Clone, Serialize, Deserialize)]
272pub struct PusherApiMessage {
273 #[serde(skip_serializing_if = "Option::is_none")]
274 pub name: Option<String>,
275 #[serde(skip_serializing_if = "Option::is_none")]
276 pub data: Option<ApiMessageData>,
277 #[serde(skip_serializing_if = "Option::is_none")]
278 pub channel: Option<String>,
279 #[serde(skip_serializing_if = "Option::is_none")]
280 pub channels: Option<Vec<String>>,
281 #[serde(skip_serializing_if = "Option::is_none")]
282 pub socket_id: Option<String>,
283 #[serde(skip_serializing_if = "Option::is_none")]
284 pub info: Option<String>,
285 #[serde(skip_serializing_if = "Option::is_none")]
286 pub tags: Option<AHashMap<String, String>>,
287 #[serde(skip_serializing_if = "Option::is_none")]
292 pub delta: Option<bool>,
293 #[serde(skip_serializing_if = "Option::is_none")]
297 pub idempotency_key: Option<String>,
298 #[serde(skip_serializing_if = "Option::is_none")]
300 pub extras: Option<MessageExtras>,
301}
302
303#[derive(Debug, Clone, Serialize, Deserialize)]
304pub struct BatchPusherApiMessage {
305 pub batch: Vec<PusherApiMessage>,
306}
307
308#[derive(Debug, Clone, Serialize, Deserialize)]
309#[serde(untagged)]
310pub enum ApiMessageData {
311 String(String),
312 Json(Value),
313}
314
315#[derive(Debug, Clone, Serialize, Deserialize)]
316pub struct SentPusherMessage {
317 #[serde(skip_serializing_if = "Option::is_none")]
318 pub channel: Option<String>,
319 #[serde(skip_serializing_if = "Option::is_none")]
320 pub event: Option<String>,
321 #[serde(skip_serializing_if = "Option::is_none")]
322 pub data: Option<MessageData>,
323}
324
325impl MessageData {
327 pub fn as_string(&self) -> Option<&str> {
328 match self {
329 MessageData::String(s) => Some(s),
330 _ => None,
331 }
332 }
333
334 pub fn into_string(self) -> Option<String> {
335 match self {
336 MessageData::String(s) => Some(s),
337 _ => None,
338 }
339 }
340
341 pub fn as_value(&self) -> Option<&Value> {
342 match self {
343 MessageData::Structured { extra, .. } => extra.values().next(),
344 _ => None,
345 }
346 }
347}
348
349impl From<String> for MessageData {
350 fn from(s: String) -> Self {
351 MessageData::String(s)
352 }
353}
354
355impl From<Value> for MessageData {
356 fn from(v: Value) -> Self {
357 MessageData::Json(v)
358 }
359}
360
361impl PusherMessage {
362 pub fn is_protocol_ping_or_pong(&self) -> bool {
363 let Some(event) = self.event.as_deref() else {
364 return false;
365 };
366
367 matches!(
368 ProtocolVersion::parse_any_protocol_event(event),
369 Some(("ping", _)) | Some(("pong", _))
370 )
371 }
372
373 pub fn connection_established(socket_id: String, activity_timeout: u64) -> Self {
374 Self {
375 event: Some("pusher:connection_established".to_string()),
376 data: Some(MessageData::from(
377 json!({
378 "socket_id": socket_id,
379 "activity_timeout": activity_timeout })
381 .to_string(),
382 )),
383 channel: None,
384 name: None,
385 user_id: None,
386 sequence: None,
387 conflation_key: None,
388 tags: None,
389 message_id: None,
390 stream_id: None,
391 serial: None,
392 idempotency_key: None,
393 extras: None,
394 delta_sequence: None,
395 delta_conflation_key: None,
396 }
397 }
398 pub fn subscription_succeeded(channel: String, presence_data: Option<PresenceData>) -> Self {
399 let data_obj = if let Some(data) = presence_data {
400 json!({
401 "presence": {
402 "ids": data.ids,
403 "hash": data.hash,
404 "count": data.count
405 }
406 })
407 } else {
408 json!({})
409 };
410
411 Self {
412 event: Some("pusher_internal:subscription_succeeded".to_string()),
413 channel: Some(channel),
414 data: Some(MessageData::String(data_obj.to_string())),
415 name: None,
416 user_id: None,
417 sequence: None,
418 conflation_key: None,
419 tags: None,
420 message_id: None,
421 stream_id: None,
422 serial: None,
423 idempotency_key: None,
424 extras: None,
425 delta_sequence: None,
426 delta_conflation_key: None,
427 }
428 }
429
430 pub fn error(code: u16, message: String, channel: Option<String>) -> Self {
431 Self {
432 event: Some("pusher:error".to_string()),
433 data: Some(MessageData::Json(json!({
434 "code": code,
435 "message": message
436 }))),
437 channel,
438 name: None,
439 user_id: None,
440 sequence: None,
441 conflation_key: None,
442 tags: None,
443 message_id: None,
444 stream_id: None,
445 serial: None,
446 idempotency_key: None,
447 extras: None,
448 delta_sequence: None,
449 delta_conflation_key: None,
450 }
451 }
452
453 pub fn ping() -> Self {
454 Self {
455 event: Some("pusher:ping".to_string()),
456 data: None,
457 channel: None,
458 name: None,
459 user_id: None,
460 sequence: None,
461 conflation_key: None,
462 tags: None,
463 message_id: None,
464 stream_id: None,
465 serial: None,
466 idempotency_key: None,
467 extras: None,
468 delta_sequence: None,
469 delta_conflation_key: None,
470 }
471 }
472 pub fn channel_event<S: Into<String>>(event: S, channel: S, data: Value) -> Self {
473 Self {
474 event: Some(event.into()),
475 channel: Some(channel.into()),
476 data: Some(MessageData::String(data.to_string())),
477 name: None,
478 user_id: None,
479 sequence: None,
480 conflation_key: None,
481 tags: None,
482 message_id: None,
483 stream_id: None,
484 serial: None,
485 idempotency_key: None,
486 extras: None,
487 delta_sequence: None,
488 delta_conflation_key: None,
489 }
490 }
491
492 pub fn member_added(channel: String, user_id: String, user_info: Option<Value>) -> Self {
493 Self {
494 event: Some("pusher_internal:member_added".to_string()),
495 channel: Some(channel),
496 data: Some(MessageData::String(
498 json!({
499 "user_id": user_id,
500 "user_info": user_info.unwrap_or_else(|| json!({}))
501 })
502 .to_string(),
503 )),
504 name: None,
505 user_id: None,
506 sequence: None,
507 conflation_key: None,
508 tags: None,
509 message_id: None,
510 stream_id: None,
511 serial: None,
512 idempotency_key: None,
513 extras: None,
514 delta_sequence: None,
515 delta_conflation_key: None,
516 }
517 }
518
519 pub fn member_removed(channel: String, user_id: String) -> Self {
520 Self {
521 event: Some("pusher_internal:member_removed".to_string()),
522 channel: Some(channel),
523 data: Some(MessageData::String(
525 json!({
526 "user_id": user_id
527 })
528 .to_string(),
529 )),
530 name: None,
531 user_id: None,
532 sequence: None,
533 conflation_key: None,
534 tags: None,
535 message_id: None,
536 stream_id: None,
537 serial: None,
538 idempotency_key: None,
539 extras: None,
540 delta_sequence: None,
541 delta_conflation_key: None,
542 }
543 }
544
545 pub fn pong() -> Self {
547 Self {
548 event: Some("pusher:pong".to_string()),
549 data: None,
550 channel: None,
551 name: None,
552 user_id: None,
553 sequence: None,
554 conflation_key: None,
555 tags: None,
556 message_id: None,
557 stream_id: None,
558 serial: None,
559 idempotency_key: None,
560 extras: None,
561 delta_sequence: None,
562 delta_conflation_key: None,
563 }
564 }
565
566 pub fn channel_info(
568 occupied: bool,
569 subscription_count: Option<u64>,
570 user_count: Option<u64>,
571 cache_data: Option<(String, Duration)>,
572 ) -> Value {
573 let mut response = json!({
574 "occupied": occupied
575 });
576
577 if let Some(count) = subscription_count {
578 response["subscription_count"] = json!(count);
579 }
580
581 if let Some(count) = user_count {
582 response["user_count"] = json!(count);
583 }
584
585 if let Some((data, ttl)) = cache_data {
586 response["cache"] = json!({
587 "data": data,
588 "ttl": ttl.as_secs()
589 });
590 }
591
592 response
593 }
594
595 pub fn channels_list(channels_info: AHashMap<String, Value>) -> Value {
597 json!({
598 "channels": channels_info
599 })
600 }
601
602 pub fn user_list(user_ids: Vec<String>) -> Value {
604 let users = user_ids
605 .into_iter()
606 .map(|id| json!({ "id": id }))
607 .collect::<Vec<_>>();
608
609 json!({ "users": users })
610 }
611
612 pub fn batch_response(batch_info: Vec<Value>) -> Value {
614 json!({ "batch": batch_info })
615 }
616
617 pub fn success_response() -> Value {
619 json!({ "ok": true })
620 }
621
622 pub fn watchlist_online_event(user_ids: Vec<String>) -> Self {
623 Self {
624 event: Some("online".to_string()),
625 channel: None, name: None,
627 data: Some(MessageData::Json(json!({
628 "user_ids": user_ids
629 }))),
630 user_id: None,
631 sequence: None,
632 conflation_key: None,
633 tags: None,
634 message_id: None,
635 stream_id: None,
636 serial: None,
637 idempotency_key: None,
638 extras: None,
639 delta_sequence: None,
640 delta_conflation_key: None,
641 }
642 }
643
644 pub fn watchlist_offline_event(user_ids: Vec<String>) -> Self {
645 Self {
646 event: Some("offline".to_string()),
647 channel: None,
648 name: None,
649 data: Some(MessageData::Json(json!({
650 "user_ids": user_ids
651 }))),
652 user_id: None,
653 sequence: None,
654 conflation_key: None,
655 tags: None,
656 message_id: None,
657 stream_id: None,
658 serial: None,
659 idempotency_key: None,
660 extras: None,
661 delta_sequence: None,
662 delta_conflation_key: None,
663 }
664 }
665
666 pub fn cache_miss_event(channel: String) -> Self {
667 Self {
668 event: Some("pusher:cache_miss".to_string()),
669 channel: Some(channel),
670 data: Some(MessageData::String("{}".to_string())),
671 name: None,
672 user_id: None,
673 sequence: None,
674 conflation_key: None,
675 tags: None,
676 message_id: None,
677 stream_id: None,
678 serial: None,
679 idempotency_key: None,
680 extras: None,
681 delta_sequence: None,
682 delta_conflation_key: None,
683 }
684 }
685
686 pub fn signin_success(user_data: String) -> Self {
687 Self {
688 event: Some("pusher:signin_success".to_string()),
689 data: Some(MessageData::Json(json!({
690 "user_data": user_data
691 }))),
692 channel: None,
693 name: None,
694 user_id: None,
695 sequence: None,
696 conflation_key: None,
697 tags: None,
698 message_id: None,
699 stream_id: None,
700 serial: None,
701 idempotency_key: None,
702 extras: None,
703 delta_sequence: None,
704 delta_conflation_key: None,
705 }
706 }
707
708 pub fn delta_message(
710 channel: String,
711 event: String,
712 delta_base64: String,
713 base_sequence: u32,
714 target_sequence: u32,
715 algorithm: &str,
716 ) -> Self {
717 Self {
718 event: Some("pusher:delta".to_string()),
719 channel: Some(channel.clone()),
720 data: Some(MessageData::String(
721 json!({
722 "channel": channel,
723 "event": event,
724 "delta": delta_base64,
725 "base_seq": base_sequence,
726 "target_seq": target_sequence,
727 "algorithm": algorithm,
728 })
729 .to_string(),
730 )),
731 name: None,
732 user_id: None,
733 sequence: None,
734 conflation_key: None,
735 tags: None,
736 message_id: None,
737 stream_id: None,
738 serial: None,
739 idempotency_key: None,
740 extras: None,
741 delta_sequence: None,
742 delta_conflation_key: None,
743 }
744 }
745
746 pub fn rewrite_prefix(&mut self, version: ProtocolVersion) {
749 if let Some(ref event) = self.event {
750 self.event = Some(version.rewrite_event_prefix(event));
751 }
752 }
753
754 pub fn is_ephemeral(&self) -> bool {
756 self.extras
757 .as_ref()
758 .and_then(|e| e.ephemeral)
759 .unwrap_or(false)
760 }
761
762 pub fn extras_idempotency_key(&self) -> Option<&str> {
764 self.extras
765 .as_ref()
766 .and_then(|e| e.idempotency_key.as_deref())
767 }
768
769 pub fn should_echo(&self, connection_default: bool) -> bool {
772 self.extras
773 .as_ref()
774 .and_then(|e| e.echo)
775 .unwrap_or(connection_default)
776 }
777
778 pub fn filter_headers(&self) -> Option<&HashMap<String, ExtrasValue>> {
780 self.extras.as_ref().and_then(|e| e.headers.as_ref())
781 }
782
783 pub fn should_include_extras(protocol: &ProtocolVersion) -> bool {
785 matches!(protocol, ProtocolVersion::V2)
786 }
787
788 pub fn add_base_sequence(mut self, base_sequence: u32) -> Self {
790 if let Some(MessageData::String(ref data_str)) = self.data
791 && let Ok(mut data_obj) = sonic_rs::from_str::<Value>(data_str)
792 && let Some(obj) = data_obj.as_object_mut()
793 {
794 obj.insert("__delta_base_seq", json!(base_sequence));
795 self.data = Some(MessageData::String(data_obj.to_string()));
796 }
797 self
798 }
799
800 pub fn delta_compression_enabled(default_algorithm: &str) -> Self {
802 Self {
803 event: Some("pusher:delta_compression_enabled".to_string()),
804 data: Some(MessageData::Json(json!({
805 "enabled": true,
806 "default_algorithm": default_algorithm,
807 }))),
808 channel: None,
809 name: None,
810 user_id: None,
811 sequence: None,
812 conflation_key: None,
813 tags: None,
814 message_id: None,
815 stream_id: None,
816 serial: None,
817 idempotency_key: None,
818 extras: None,
819 delta_sequence: None,
820 delta_conflation_key: None,
821 }
822 }
823}
824
825pub trait InfoQueryParser {
827 fn parse_info(&self) -> Vec<&str>;
828 fn wants_user_count(&self) -> bool;
829 fn wants_subscription_count(&self) -> bool;
830 fn wants_cache(&self) -> bool;
831}
832
833impl InfoQueryParser for Option<&String> {
834 fn parse_info(&self) -> Vec<&str> {
835 self.map(|s| s.split(',').collect::<Vec<_>>())
836 .unwrap_or_default()
837 }
838
839 fn wants_user_count(&self) -> bool {
840 self.parse_info().contains(&"user_count")
841 }
842
843 fn wants_subscription_count(&self) -> bool {
844 self.parse_info().contains(&"subscription_count")
845 }
846
847 fn wants_cache(&self) -> bool {
848 self.parse_info().contains(&"cache")
849 }
850}
851
852#[cfg(test)]
853mod tests {
854 use super::{
855 AnnotationEventAction, AnnotationEventData, AnnotationSummaryEnvelope, MessageSummaryData,
856 PusherMessage,
857 };
858 use sonic_rs::JsonValueTrait;
859 use std::collections::BTreeMap;
860
861 #[test]
862 fn protocol_heartbeat_detection_matches_both_prefix_families() {
863 let mut ping = PusherMessage::ping();
864 assert!(ping.is_protocol_ping_or_pong());
865
866 ping.rewrite_prefix(crate::protocol_version::ProtocolVersion::V2);
867 assert!(ping.is_protocol_ping_or_pong());
868
869 let mut pong = PusherMessage::pong();
870 assert!(pong.is_protocol_ping_or_pong());
871
872 pong.rewrite_prefix(crate::protocol_version::ProtocolVersion::V2);
873 assert!(pong.is_protocol_ping_or_pong());
874 }
875
876 #[test]
877 fn protocol_heartbeat_detection_ignores_regular_messages() {
878 let message = PusherMessage::channel_event(
879 "chat.message",
880 "room",
881 sonic_rs::json!({"text": "hello"}),
882 );
883
884 assert!(!message.is_protocol_ping_or_pong());
885 }
886
887 #[test]
888 fn annotation_create_serializes_camel_case_contract() {
889 let data = AnnotationEventData {
890 action: AnnotationEventAction::Create,
891 id: Some("ann-1".to_string()),
892 serial: "ann:1".to_string(),
893 message_serial: "msg:1".to_string(),
894 annotation_type: "reactions:distinct.v1".to_string(),
895 name: Some("thumbsup".to_string()),
896 client_id: Some("user-123".to_string()),
897 count: Some(1),
898 data: Some(sonic_rs::json!({"raw": true})),
899 encoding: None,
900 timestamp: 1_700_000_000_000,
901 };
902
903 let value = sonic_rs::to_value(&data).unwrap();
904 assert_eq!(value["action"].as_str(), Some("annotation.create"));
905 assert_eq!(value["messageSerial"].as_str(), Some("msg:1"));
906 assert_eq!(value["type"].as_str(), Some("reactions:distinct.v1"));
907 assert_eq!(value["clientId"].as_str(), Some("user-123"));
908 }
909
910 #[test]
911 fn message_summary_serializes_summary_envelope() {
912 let mut summary = BTreeMap::new();
913 summary.insert(
914 "reactions:distinct.v1".to_string(),
915 sonic_rs::json!({"thumbsup": {"total": 5, "clientIds": ["a"], "clipped": false}}),
916 );
917 let data = MessageSummaryData {
918 action: "message.summary".to_string(),
919 serial: "msg:1".to_string(),
920 annotations: AnnotationSummaryEnvelope { summary },
921 };
922
923 let value = sonic_rs::to_value(&data).unwrap();
924 assert_eq!(value["action"].as_str(), Some("message.summary"));
925 assert_eq!(value["serial"].as_str(), Some("msg:1"));
926 assert_eq!(
927 value["annotations"]["summary"]["reactions:distinct.v1"]["thumbsup"]["total"].as_u64(),
928 Some(5)
929 );
930 }
931}