1use ahash::AHashMap;
2use serde::de::Error as _;
3use serde::{Deserialize, Serialize};
4use serde_json::Value as JsonValue;
5use sonic_rs::prelude::*;
6use sonic_rs::{Value, json};
7use std::collections::{BTreeMap, HashMap};
8use std::time::Duration;
9
10use crate::protocol_version::ProtocolVersion;
11
12#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
15#[serde(untagged)]
16pub enum ExtrasValue {
17 String(String),
18 Number(f64),
19 Bool(bool),
20}
21
22#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
29#[serde(rename_all = "camelCase")]
30pub struct MessageExtras {
31 #[serde(skip_serializing_if = "Option::is_none")]
34 pub headers: Option<HashMap<String, ExtrasValue>>,
35
36 #[serde(skip_serializing_if = "Option::is_none")]
39 pub ephemeral: Option<bool>,
40
41 #[serde(skip_serializing_if = "Option::is_none")]
44 pub idempotency_key: Option<String>,
45
46 #[serde(skip_serializing_if = "Option::is_none")]
49 pub echo: Option<bool>,
50}
51
52impl MessageExtras {
53 pub fn validate_headers_from_json(raw: &Value) -> Result<(), String> {
58 if let Some(extras) = raw.get("extras")
59 && let Some(headers) = extras.get("headers")
60 && let Some(obj) = headers.as_object()
61 {
62 for (key, val) in obj.iter() {
63 if val.is_object() || val.is_array() {
64 return Err(format!(
65 "extras.headers must be a flat object — nested objects and arrays are not allowed (key: '{key}')"
66 ));
67 }
68 }
69 }
70 Ok(())
71 }
72}
73
74pub fn generate_message_id() -> String {
76 uuid::Uuid::new_v4().to_string()
77}
78
79pub const ANNOTATION_EVENT_NAME: &str = "sockudo_internal:annotation";
80pub const MESSAGE_SUMMARY_EVENT_NAME: &str = "sockudo_internal:message";
81pub const ANNOTATION_SUBSCRIBE_MODE: &str = "ANNOTATION_SUBSCRIBE";
82
83#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
84pub enum AnnotationEventAction {
85 #[serde(rename = "annotation.create")]
86 Create,
87 #[serde(rename = "annotation.delete")]
88 Delete,
89}
90
91#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
92#[serde(rename_all = "camelCase")]
93pub struct AnnotationEventData {
94 pub action: AnnotationEventAction,
95 #[serde(skip_serializing_if = "Option::is_none")]
96 pub id: Option<String>,
97 pub serial: String,
98 pub message_serial: String,
99 #[serde(rename = "type")]
100 pub annotation_type: String,
101 #[serde(skip_serializing_if = "Option::is_none")]
102 pub name: Option<String>,
103 #[serde(skip_serializing_if = "Option::is_none")]
104 pub client_id: Option<String>,
105 #[serde(skip_serializing_if = "Option::is_none")]
106 pub count: Option<u64>,
107 #[serde(skip_serializing_if = "Option::is_none")]
108 pub data: Option<Value>,
109 #[serde(skip_serializing_if = "Option::is_none")]
110 pub encoding: Option<String>,
111 pub timestamp: i64,
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
115pub struct AnnotationSummaryEnvelope {
116 pub summary: BTreeMap<String, Value>,
117}
118
119#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
120#[serde(rename_all = "camelCase")]
121pub struct MessageSummaryData {
122 pub action: String,
123 pub serial: String,
124 pub annotations: AnnotationSummaryEnvelope,
125}
126
127#[derive(Debug, Clone, Serialize, Deserialize)]
128pub struct PresenceData {
129 pub ids: Vec<String>,
130 pub hash: AHashMap<String, Option<Value>>,
131 pub count: usize,
132}
133
134#[derive(Debug, Clone, Serialize, PartialEq)]
135#[serde(untagged)]
136pub enum MessageData {
137 String(String),
138 Structured {
139 #[serde(skip_serializing_if = "Option::is_none")]
140 channel_data: Option<String>,
141 #[serde(skip_serializing_if = "Option::is_none")]
142 channel: Option<String>,
143 #[serde(skip_serializing_if = "Option::is_none")]
144 user_data: Option<String>,
145 #[serde(flatten)]
146 extra: AHashMap<String, Value>,
147 },
148 Json(Value),
149}
150
151impl<'de> Deserialize<'de> for MessageData {
152 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
153 where
154 D: serde::Deserializer<'de>,
155 {
156 let v = JsonValue::deserialize(deserializer)?;
157 if let Some(s) = v.as_str() {
158 return Ok(MessageData::String(s.to_string()));
159 }
160 if let Some(obj) = v.as_object() {
161 let channel_data = obj
164 .get("channel_data")
165 .and_then(|x| x.as_str())
166 .map(ToString::to_string);
167 let channel = obj
168 .get("channel")
169 .and_then(|x| x.as_str())
170 .map(ToString::to_string);
171 let user_data = obj
172 .get("user_data")
173 .and_then(|x| x.as_str())
174 .map(ToString::to_string);
175
176 if channel_data.is_some() || channel.is_some() || user_data.is_some() {
177 let mut extra = AHashMap::new();
178 for (k, val) in obj.iter() {
179 if k != "channel_data" && k != "channel" && k != "user_data" {
180 extra.insert(
181 k.to_string(),
182 serde_json_value_to_sonic(val.clone()).map_err(D::Error::custom)?,
183 );
184 }
185 }
186 return Ok(MessageData::Structured {
187 channel_data,
188 channel,
189 user_data,
190 extra,
191 });
192 }
193 }
194 Ok(MessageData::Json(
195 serde_json_value_to_sonic(v).map_err(D::Error::custom)?,
196 ))
197 }
198}
199
200fn serde_json_value_to_sonic(value: JsonValue) -> Result<Value, String> {
201 let encoded = serde_json::to_string(&value)
202 .map_err(|err| format!("failed to encode json value for MessageData: {err}"))?;
203 sonic_rs::from_str(&encoded)
204 .map_err(|err| format!("failed to decode json value for MessageData: {err}"))
205}
206
207#[derive(Debug, Clone, Serialize, Deserialize)]
208pub struct ErrorData {
209 pub code: Option<u16>,
210 pub message: String,
211}
212
213#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
214pub struct PusherMessage {
215 #[serde(skip_serializing_if = "Option::is_none")]
216 pub event: Option<String>,
217 #[serde(skip_serializing_if = "Option::is_none")]
218 pub channel: Option<String>,
219 #[serde(skip_serializing_if = "Option::is_none")]
220 pub data: Option<MessageData>,
221 #[serde(skip_serializing_if = "Option::is_none")]
222 pub name: Option<String>,
223 #[serde(skip_serializing_if = "Option::is_none")]
224 pub user_id: Option<String>,
225 #[serde(skip_serializing_if = "Option::is_none")]
228 pub tags: Option<BTreeMap<String, String>>,
229 #[serde(skip_serializing_if = "Option::is_none")]
231 pub sequence: Option<u64>,
232 #[serde(skip_serializing_if = "Option::is_none")]
234 pub conflation_key: Option<String>,
235 #[serde(skip_serializing_if = "Option::is_none")]
237 pub message_id: Option<String>,
238 #[serde(skip_serializing_if = "Option::is_none")]
241 pub stream_id: Option<String>,
242 #[serde(skip_serializing_if = "Option::is_none")]
245 pub serial: Option<u64>,
246 #[serde(skip_serializing_if = "Option::is_none")]
251 pub idempotency_key: Option<String>,
252 #[serde(skip_serializing_if = "Option::is_none")]
256 pub extras: Option<MessageExtras>,
257 #[serde(rename = "__delta_seq", skip_serializing_if = "Option::is_none")]
259 pub delta_sequence: Option<u64>,
260 #[serde(rename = "__conflation_key", skip_serializing_if = "Option::is_none")]
262 pub delta_conflation_key: Option<String>,
263}
264
265#[derive(Debug, Clone, Serialize, Deserialize)]
266pub struct PusherApiMessage {
267 #[serde(skip_serializing_if = "Option::is_none")]
268 pub name: Option<String>,
269 #[serde(skip_serializing_if = "Option::is_none")]
270 pub data: Option<ApiMessageData>,
271 #[serde(skip_serializing_if = "Option::is_none")]
272 pub channel: Option<String>,
273 #[serde(skip_serializing_if = "Option::is_none")]
274 pub channels: Option<Vec<String>>,
275 #[serde(skip_serializing_if = "Option::is_none")]
276 pub socket_id: Option<String>,
277 #[serde(skip_serializing_if = "Option::is_none")]
278 pub info: Option<String>,
279 #[serde(skip_serializing_if = "Option::is_none")]
280 pub tags: Option<AHashMap<String, String>>,
281 #[serde(skip_serializing_if = "Option::is_none")]
286 pub delta: Option<bool>,
287 #[serde(skip_serializing_if = "Option::is_none")]
291 pub idempotency_key: Option<String>,
292 #[serde(skip_serializing_if = "Option::is_none")]
294 pub extras: Option<MessageExtras>,
295}
296
297#[derive(Debug, Clone, Serialize, Deserialize)]
298pub struct BatchPusherApiMessage {
299 pub batch: Vec<PusherApiMessage>,
300}
301
302#[derive(Debug, Clone, Serialize, Deserialize)]
303#[serde(untagged)]
304pub enum ApiMessageData {
305 String(String),
306 Json(Value),
307}
308
309#[derive(Debug, Clone, Serialize, Deserialize)]
310pub struct SentPusherMessage {
311 #[serde(skip_serializing_if = "Option::is_none")]
312 pub channel: Option<String>,
313 #[serde(skip_serializing_if = "Option::is_none")]
314 pub event: Option<String>,
315 #[serde(skip_serializing_if = "Option::is_none")]
316 pub data: Option<MessageData>,
317}
318
319impl MessageData {
321 pub fn as_string(&self) -> Option<&str> {
322 match self {
323 MessageData::String(s) => Some(s),
324 _ => None,
325 }
326 }
327
328 pub fn into_string(self) -> Option<String> {
329 match self {
330 MessageData::String(s) => Some(s),
331 _ => None,
332 }
333 }
334
335 pub fn as_value(&self) -> Option<&Value> {
336 match self {
337 MessageData::Structured { extra, .. } => extra.values().next(),
338 _ => None,
339 }
340 }
341}
342
343impl From<String> for MessageData {
344 fn from(s: String) -> Self {
345 MessageData::String(s)
346 }
347}
348
349impl From<Value> for MessageData {
350 fn from(v: Value) -> Self {
351 MessageData::Json(v)
352 }
353}
354
355impl PusherMessage {
356 pub fn is_protocol_ping_or_pong(&self) -> bool {
357 let Some(event) = self.event.as_deref() else {
358 return false;
359 };
360
361 matches!(
362 ProtocolVersion::parse_any_protocol_event(event),
363 Some(("ping", _)) | Some(("pong", _))
364 )
365 }
366
367 pub fn connection_established(socket_id: String, activity_timeout: u64) -> Self {
368 Self {
369 event: Some("pusher:connection_established".to_string()),
370 data: Some(MessageData::from(
371 json!({
372 "socket_id": socket_id,
373 "activity_timeout": activity_timeout })
375 .to_string(),
376 )),
377 channel: None,
378 name: None,
379 user_id: None,
380 sequence: None,
381 conflation_key: None,
382 tags: None,
383 message_id: None,
384 stream_id: None,
385 serial: None,
386 idempotency_key: None,
387 extras: None,
388 delta_sequence: None,
389 delta_conflation_key: None,
390 }
391 }
392 pub fn subscription_succeeded(channel: String, presence_data: Option<PresenceData>) -> Self {
393 let data_obj = if let Some(data) = presence_data {
394 json!({
395 "presence": {
396 "ids": data.ids,
397 "hash": data.hash,
398 "count": data.count
399 }
400 })
401 } else {
402 json!({})
403 };
404
405 Self {
406 event: Some("pusher_internal:subscription_succeeded".to_string()),
407 channel: Some(channel),
408 data: Some(MessageData::String(data_obj.to_string())),
409 name: None,
410 user_id: None,
411 sequence: None,
412 conflation_key: None,
413 tags: None,
414 message_id: None,
415 stream_id: None,
416 serial: None,
417 idempotency_key: None,
418 extras: None,
419 delta_sequence: None,
420 delta_conflation_key: None,
421 }
422 }
423
424 pub fn error(code: u16, message: String, channel: Option<String>) -> Self {
425 Self {
426 event: Some("pusher:error".to_string()),
427 data: Some(MessageData::Json(json!({
428 "code": code,
429 "message": message
430 }))),
431 channel,
432 name: None,
433 user_id: None,
434 sequence: None,
435 conflation_key: None,
436 tags: None,
437 message_id: None,
438 stream_id: None,
439 serial: None,
440 idempotency_key: None,
441 extras: None,
442 delta_sequence: None,
443 delta_conflation_key: None,
444 }
445 }
446
447 pub fn ping() -> Self {
448 Self {
449 event: Some("pusher:ping".to_string()),
450 data: None,
451 channel: None,
452 name: None,
453 user_id: None,
454 sequence: None,
455 conflation_key: None,
456 tags: None,
457 message_id: None,
458 stream_id: None,
459 serial: None,
460 idempotency_key: None,
461 extras: None,
462 delta_sequence: None,
463 delta_conflation_key: None,
464 }
465 }
466 pub fn channel_event<S: Into<String>>(event: S, channel: S, data: Value) -> Self {
467 Self {
468 event: Some(event.into()),
469 channel: Some(channel.into()),
470 data: Some(MessageData::String(data.to_string())),
471 name: None,
472 user_id: None,
473 sequence: None,
474 conflation_key: None,
475 tags: None,
476 message_id: None,
477 stream_id: None,
478 serial: None,
479 idempotency_key: None,
480 extras: None,
481 delta_sequence: None,
482 delta_conflation_key: None,
483 }
484 }
485
486 pub fn member_added(channel: String, user_id: String, user_info: Option<Value>) -> Self {
487 Self {
488 event: Some("pusher_internal:member_added".to_string()),
489 channel: Some(channel),
490 data: Some(MessageData::String(
492 json!({
493 "user_id": user_id,
494 "user_info": user_info.unwrap_or_else(|| json!({}))
495 })
496 .to_string(),
497 )),
498 name: None,
499 user_id: None,
500 sequence: None,
501 conflation_key: None,
502 tags: None,
503 message_id: None,
504 stream_id: None,
505 serial: None,
506 idempotency_key: None,
507 extras: None,
508 delta_sequence: None,
509 delta_conflation_key: None,
510 }
511 }
512
513 pub fn member_removed(channel: String, user_id: String) -> Self {
514 Self {
515 event: Some("pusher_internal:member_removed".to_string()),
516 channel: Some(channel),
517 data: Some(MessageData::String(
519 json!({
520 "user_id": user_id
521 })
522 .to_string(),
523 )),
524 name: None,
525 user_id: None,
526 sequence: None,
527 conflation_key: None,
528 tags: None,
529 message_id: None,
530 stream_id: None,
531 serial: None,
532 idempotency_key: None,
533 extras: None,
534 delta_sequence: None,
535 delta_conflation_key: None,
536 }
537 }
538
539 pub fn pong() -> Self {
541 Self {
542 event: Some("pusher:pong".to_string()),
543 data: None,
544 channel: None,
545 name: None,
546 user_id: None,
547 sequence: None,
548 conflation_key: None,
549 tags: None,
550 message_id: None,
551 stream_id: None,
552 serial: None,
553 idempotency_key: None,
554 extras: None,
555 delta_sequence: None,
556 delta_conflation_key: None,
557 }
558 }
559
560 pub fn channel_info(
562 occupied: bool,
563 subscription_count: Option<u64>,
564 user_count: Option<u64>,
565 cache_data: Option<(String, Duration)>,
566 ) -> Value {
567 let mut response = json!({
568 "occupied": occupied
569 });
570
571 if let Some(count) = subscription_count {
572 response["subscription_count"] = json!(count);
573 }
574
575 if let Some(count) = user_count {
576 response["user_count"] = json!(count);
577 }
578
579 if let Some((data, ttl)) = cache_data {
580 response["cache"] = json!({
581 "data": data,
582 "ttl": ttl.as_secs()
583 });
584 }
585
586 response
587 }
588
589 pub fn channels_list(channels_info: AHashMap<String, Value>) -> Value {
591 json!({
592 "channels": channels_info
593 })
594 }
595
596 pub fn user_list(user_ids: Vec<String>) -> Value {
598 let users = user_ids
599 .into_iter()
600 .map(|id| json!({ "id": id }))
601 .collect::<Vec<_>>();
602
603 json!({ "users": users })
604 }
605
606 pub fn batch_response(batch_info: Vec<Value>) -> Value {
608 json!({ "batch": batch_info })
609 }
610
611 pub fn success_response() -> Value {
613 json!({ "ok": true })
614 }
615
616 pub fn watchlist_online_event(user_ids: Vec<String>) -> Self {
617 Self {
618 event: Some("online".to_string()),
619 channel: None, name: None,
621 data: Some(MessageData::Json(json!({
622 "user_ids": user_ids
623 }))),
624 user_id: None,
625 sequence: None,
626 conflation_key: None,
627 tags: None,
628 message_id: None,
629 stream_id: None,
630 serial: None,
631 idempotency_key: None,
632 extras: None,
633 delta_sequence: None,
634 delta_conflation_key: None,
635 }
636 }
637
638 pub fn watchlist_offline_event(user_ids: Vec<String>) -> Self {
639 Self {
640 event: Some("offline".to_string()),
641 channel: None,
642 name: None,
643 data: Some(MessageData::Json(json!({
644 "user_ids": user_ids
645 }))),
646 user_id: None,
647 sequence: None,
648 conflation_key: None,
649 tags: None,
650 message_id: None,
651 stream_id: None,
652 serial: None,
653 idempotency_key: None,
654 extras: None,
655 delta_sequence: None,
656 delta_conflation_key: None,
657 }
658 }
659
660 pub fn cache_miss_event(channel: String) -> Self {
661 Self {
662 event: Some("pusher:cache_miss".to_string()),
663 channel: Some(channel),
664 data: Some(MessageData::String("{}".to_string())),
665 name: None,
666 user_id: None,
667 sequence: None,
668 conflation_key: None,
669 tags: None,
670 message_id: None,
671 stream_id: None,
672 serial: None,
673 idempotency_key: None,
674 extras: None,
675 delta_sequence: None,
676 delta_conflation_key: None,
677 }
678 }
679
680 pub fn signin_success(user_data: String) -> Self {
681 Self {
682 event: Some("pusher:signin_success".to_string()),
683 data: Some(MessageData::Json(json!({
684 "user_data": user_data
685 }))),
686 channel: None,
687 name: None,
688 user_id: None,
689 sequence: None,
690 conflation_key: None,
691 tags: None,
692 message_id: None,
693 stream_id: None,
694 serial: None,
695 idempotency_key: None,
696 extras: None,
697 delta_sequence: None,
698 delta_conflation_key: None,
699 }
700 }
701
702 pub fn delta_message(
704 channel: String,
705 event: String,
706 delta_base64: String,
707 base_sequence: u32,
708 target_sequence: u32,
709 algorithm: &str,
710 ) -> Self {
711 Self {
712 event: Some("pusher:delta".to_string()),
713 channel: Some(channel.clone()),
714 data: Some(MessageData::String(
715 json!({
716 "channel": channel,
717 "event": event,
718 "delta": delta_base64,
719 "base_seq": base_sequence,
720 "target_seq": target_sequence,
721 "algorithm": algorithm,
722 })
723 .to_string(),
724 )),
725 name: None,
726 user_id: None,
727 sequence: None,
728 conflation_key: None,
729 tags: None,
730 message_id: None,
731 stream_id: None,
732 serial: None,
733 idempotency_key: None,
734 extras: None,
735 delta_sequence: None,
736 delta_conflation_key: None,
737 }
738 }
739
740 pub fn rewrite_prefix(&mut self, version: ProtocolVersion) {
743 if let Some(ref event) = self.event {
744 self.event = Some(version.rewrite_event_prefix(event));
745 }
746 }
747
748 pub fn is_ephemeral(&self) -> bool {
750 self.extras
751 .as_ref()
752 .and_then(|e| e.ephemeral)
753 .unwrap_or(false)
754 }
755
756 pub fn extras_idempotency_key(&self) -> Option<&str> {
758 self.extras
759 .as_ref()
760 .and_then(|e| e.idempotency_key.as_deref())
761 }
762
763 pub fn should_echo(&self, connection_default: bool) -> bool {
766 self.extras
767 .as_ref()
768 .and_then(|e| e.echo)
769 .unwrap_or(connection_default)
770 }
771
772 pub fn filter_headers(&self) -> Option<&HashMap<String, ExtrasValue>> {
774 self.extras.as_ref().and_then(|e| e.headers.as_ref())
775 }
776
777 pub fn should_include_extras(protocol: &ProtocolVersion) -> bool {
779 matches!(protocol, ProtocolVersion::V2)
780 }
781
782 pub fn add_base_sequence(mut self, base_sequence: u32) -> Self {
784 if let Some(MessageData::String(ref data_str)) = self.data
785 && let Ok(mut data_obj) = sonic_rs::from_str::<Value>(data_str)
786 && let Some(obj) = data_obj.as_object_mut()
787 {
788 obj.insert("__delta_base_seq", json!(base_sequence));
789 self.data = Some(MessageData::String(data_obj.to_string()));
790 }
791 self
792 }
793
794 pub fn delta_compression_enabled(default_algorithm: &str) -> Self {
796 Self {
797 event: Some("pusher:delta_compression_enabled".to_string()),
798 data: Some(MessageData::Json(json!({
799 "enabled": true,
800 "default_algorithm": default_algorithm,
801 }))),
802 channel: None,
803 name: None,
804 user_id: None,
805 sequence: None,
806 conflation_key: None,
807 tags: None,
808 message_id: None,
809 stream_id: None,
810 serial: None,
811 idempotency_key: None,
812 extras: None,
813 delta_sequence: None,
814 delta_conflation_key: None,
815 }
816 }
817}
818
819pub trait InfoQueryParser {
821 fn parse_info(&self) -> Vec<&str>;
822 fn wants_user_count(&self) -> bool;
823 fn wants_subscription_count(&self) -> bool;
824 fn wants_cache(&self) -> bool;
825}
826
827impl InfoQueryParser for Option<&String> {
828 fn parse_info(&self) -> Vec<&str> {
829 self.map(|s| s.split(',').collect::<Vec<_>>())
830 .unwrap_or_default()
831 }
832
833 fn wants_user_count(&self) -> bool {
834 self.parse_info().contains(&"user_count")
835 }
836
837 fn wants_subscription_count(&self) -> bool {
838 self.parse_info().contains(&"subscription_count")
839 }
840
841 fn wants_cache(&self) -> bool {
842 self.parse_info().contains(&"cache")
843 }
844}
845
846#[cfg(test)]
847mod tests {
848 use super::{
849 AnnotationEventAction, AnnotationEventData, AnnotationSummaryEnvelope, MessageSummaryData,
850 PusherMessage,
851 };
852 use sonic_rs::JsonValueTrait;
853 use std::collections::BTreeMap;
854
855 #[test]
856 fn protocol_heartbeat_detection_matches_both_prefix_families() {
857 let mut ping = PusherMessage::ping();
858 assert!(ping.is_protocol_ping_or_pong());
859
860 ping.rewrite_prefix(crate::protocol_version::ProtocolVersion::V2);
861 assert!(ping.is_protocol_ping_or_pong());
862
863 let mut pong = PusherMessage::pong();
864 assert!(pong.is_protocol_ping_or_pong());
865
866 pong.rewrite_prefix(crate::protocol_version::ProtocolVersion::V2);
867 assert!(pong.is_protocol_ping_or_pong());
868 }
869
870 #[test]
871 fn protocol_heartbeat_detection_ignores_regular_messages() {
872 let message = PusherMessage::channel_event(
873 "chat.message",
874 "room",
875 sonic_rs::json!({"text": "hello"}),
876 );
877
878 assert!(!message.is_protocol_ping_or_pong());
879 }
880
881 #[test]
882 fn annotation_create_serializes_camel_case_contract() {
883 let data = AnnotationEventData {
884 action: AnnotationEventAction::Create,
885 id: Some("ann-1".to_string()),
886 serial: "ann:1".to_string(),
887 message_serial: "msg:1".to_string(),
888 annotation_type: "reactions:distinct.v1".to_string(),
889 name: Some("thumbsup".to_string()),
890 client_id: Some("user-123".to_string()),
891 count: Some(1),
892 data: Some(sonic_rs::json!({"raw": true})),
893 encoding: None,
894 timestamp: 1_700_000_000_000,
895 };
896
897 let value = sonic_rs::to_value(&data).unwrap();
898 assert_eq!(value["action"].as_str(), Some("annotation.create"));
899 assert_eq!(value["messageSerial"].as_str(), Some("msg:1"));
900 assert_eq!(value["type"].as_str(), Some("reactions:distinct.v1"));
901 assert_eq!(value["clientId"].as_str(), Some("user-123"));
902 }
903
904 #[test]
905 fn message_summary_serializes_summary_envelope() {
906 let mut summary = BTreeMap::new();
907 summary.insert(
908 "reactions:distinct.v1".to_string(),
909 sonic_rs::json!({"thumbsup": {"total": 5, "clientIds": ["a"], "clipped": false}}),
910 );
911 let data = MessageSummaryData {
912 action: "message.summary".to_string(),
913 serial: "msg:1".to_string(),
914 annotations: AnnotationSummaryEnvelope { summary },
915 };
916
917 let value = sonic_rs::to_value(&data).unwrap();
918 assert_eq!(value["action"].as_str(), Some("message.summary"));
919 assert_eq!(value["serial"].as_str(), Some("msg:1"));
920 assert_eq!(
921 value["annotations"]["summary"]["reactions:distinct.v1"]["thumbsup"]["total"].as_u64(),
922 Some(5)
923 );
924 }
925}