Skip to main content

sockudo_protocol/
versioned_messages.rs

1use crate::messages::{ExtrasValue, MessageData, MessageExtras, PusherMessage};
2use serde::{Deserialize, Serialize};
3use sonic_rs::Value;
4use std::collections::HashMap;
5
6pub const HEADER_ACTION: &str = "sockudo_action";
7pub const HEADER_MESSAGE_SERIAL: &str = "sockudo_message_serial";
8pub const HEADER_VERSION_SERIAL: &str = "sockudo_version_serial";
9pub const HEADER_HISTORY_SERIAL: &str = "sockudo_history_serial";
10pub const HEADER_VERSION_TIMESTAMP_MS: &str = "sockudo_version_timestamp_ms";
11
12#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
13#[serde(rename_all = "snake_case")]
14pub enum MessageAction {
15    Create,
16    Update,
17    Delete,
18    Append,
19    Summary,
20}
21
22impl MessageAction {
23    pub fn as_str(self) -> &'static str {
24        match self {
25            Self::Create => "message.create",
26            Self::Update => "message.update",
27            Self::Delete => "message.delete",
28            Self::Append => "message.append",
29            Self::Summary => "message.summary",
30        }
31    }
32
33    pub fn v2_event_name(self) -> String {
34        format!("sockudo:{}", self.as_str())
35    }
36}
37
38#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
39#[serde(rename_all = "snake_case")]
40pub enum VersionDirection {
41    NewestFirst,
42    OldestFirst,
43}
44
45impl VersionDirection {
46    pub fn as_str(self) -> &'static str {
47        match self {
48            Self::NewestFirst => "newest_first",
49            Self::OldestFirst => "oldest_first",
50        }
51    }
52}
53
54#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
55#[serde(rename_all = "snake_case")]
56pub enum ClearField {
57    Name,
58    Data,
59    Extras,
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
63pub struct MessageVersionMetadata {
64    pub serial: String,
65    #[serde(skip_serializing_if = "Option::is_none")]
66    pub client_id: Option<String>,
67    pub timestamp_ms: i64,
68    #[serde(skip_serializing_if = "Option::is_none")]
69    pub description: Option<String>,
70    #[serde(skip_serializing_if = "Option::is_none")]
71    pub metadata: Option<Value>,
72}
73
74impl MessageVersionMetadata {
75    pub fn validate(&self) -> Result<(), String> {
76        if self.serial.trim().is_empty() {
77            return Err("version.serial must not be empty".to_string());
78        }
79        Ok(())
80    }
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
84pub struct VersionedRealtimeMessage {
85    #[serde(flatten)]
86    pub message: PusherMessage,
87    pub action: MessageAction,
88    pub message_serial: String,
89    #[serde(skip_serializing_if = "Option::is_none")]
90    pub history_serial: Option<u64>,
91    #[serde(skip_serializing_if = "Option::is_none")]
92    pub delivery_serial: Option<u64>,
93    #[serde(skip_serializing_if = "Option::is_none")]
94    pub version: Option<MessageVersionMetadata>,
95}
96
97impl VersionedRealtimeMessage {
98    pub fn validate_v2(&self) -> Result<(), String> {
99        if self.message_serial.trim().is_empty() {
100            return Err("message_serial must not be empty".to_string());
101        }
102
103        let expected_event = self.action.v2_event_name();
104        match self.message.event.as_deref() {
105            Some(event) if event == expected_event => {}
106            Some(event) => {
107                return Err(format!(
108                    "event '{}' does not match action '{}'",
109                    event,
110                    self.action.as_str()
111                ));
112            }
113            None => {
114                return Err(format!(
115                    "event must be present for versioned action '{}'",
116                    self.action.as_str()
117                ));
118            }
119        }
120
121        match self.message.channel.as_deref() {
122            Some(channel) if !channel.trim().is_empty() => {}
123            _ => return Err("channel must be present for versioned messages".to_string()),
124        }
125
126        let version = self
127            .version
128            .as_ref()
129            .ok_or_else(|| "version metadata must be present for versioned messages".to_string())?;
130        version.validate()?;
131
132        if self.history_serial.is_none() {
133            return Err("history_serial must be present for versioned messages".to_string());
134        }
135
136        let delivery_serial = self
137            .delivery_serial
138            .ok_or_else(|| "delivery_serial must be present for versioned messages".to_string())?;
139
140        if self.message.serial != Some(delivery_serial) {
141            return Err(
142                "message.serial must match delivery_serial for versioned messages".to_string(),
143            );
144        }
145
146        Ok(())
147    }
148}
149
150#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
151#[serde(default)]
152pub struct UpdateMessageRequest {
153    #[serde(skip_serializing_if = "Option::is_none")]
154    pub name: Option<String>,
155    #[serde(skip_serializing_if = "Option::is_none")]
156    pub data: Option<MessageData>,
157    #[serde(skip_serializing_if = "Option::is_none")]
158    pub extras: Option<MessageExtras>,
159    #[serde(skip_serializing_if = "Vec::is_empty")]
160    pub clear_fields: Vec<ClearField>,
161    #[serde(skip_serializing_if = "Option::is_none")]
162    pub client_id: Option<String>,
163    #[serde(skip_serializing_if = "Option::is_none")]
164    pub socket_id: Option<String>,
165    #[serde(skip_serializing_if = "Option::is_none")]
166    pub description: Option<String>,
167    #[serde(skip_serializing_if = "Option::is_none")]
168    pub metadata: Option<Value>,
169}
170
171impl UpdateMessageRequest {
172    pub fn validate(&self) -> Result<(), String> {
173        let has_patch = self.name.is_some()
174            || self.data.is_some()
175            || self.extras.is_some()
176            || !self.clear_fields.is_empty()
177            || self.client_id.is_some()
178            || self.description.is_some()
179            || self.metadata.is_some();
180
181        if !has_patch {
182            return Err(
183                "update request must include at least one patch field or operation metadata"
184                    .to_string(),
185            );
186        }
187
188        validate_unique_clear_fields(&self.clear_fields)?;
189        validate_clear_field_conflicts(
190            self.name.is_some(),
191            self.data.is_some(),
192            self.extras.is_some(),
193            &self.clear_fields,
194            "update",
195        )
196    }
197}
198
199#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
200#[serde(default)]
201pub struct DeleteMessageRequest {
202    #[serde(skip_serializing_if = "Option::is_none")]
203    pub data: Option<MessageData>,
204    #[serde(skip_serializing_if = "Option::is_none")]
205    pub extras: Option<MessageExtras>,
206    #[serde(skip_serializing_if = "Vec::is_empty")]
207    pub clear_fields: Vec<ClearField>,
208    #[serde(skip_serializing_if = "Option::is_none")]
209    pub client_id: Option<String>,
210    #[serde(skip_serializing_if = "Option::is_none")]
211    pub socket_id: Option<String>,
212    #[serde(skip_serializing_if = "Option::is_none")]
213    pub description: Option<String>,
214    #[serde(skip_serializing_if = "Option::is_none")]
215    pub metadata: Option<Value>,
216}
217
218impl DeleteMessageRequest {
219    pub fn validate(&self) -> Result<(), String> {
220        validate_unique_clear_fields(&self.clear_fields)?;
221        validate_clear_field_conflicts(
222            false,
223            self.data.is_some(),
224            self.extras.is_some(),
225            &self.clear_fields,
226            "delete",
227        )
228    }
229}
230
231#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
232pub struct AppendMessageRequest {
233    pub data: String,
234    #[serde(skip_serializing_if = "Option::is_none")]
235    pub client_id: Option<String>,
236    #[serde(skip_serializing_if = "Option::is_none")]
237    pub socket_id: Option<String>,
238    #[serde(skip_serializing_if = "Option::is_none")]
239    pub description: Option<String>,
240    #[serde(skip_serializing_if = "Option::is_none")]
241    pub metadata: Option<Value>,
242}
243
244impl AppendMessageRequest {
245    pub fn validate(&self) -> Result<(), String> {
246        if self.data.is_empty() {
247            return Err("append request data must not be empty".to_string());
248        }
249        Ok(())
250    }
251}
252
253#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
254#[serde(default)]
255pub struct MessageVersionsQuery {
256    pub limit: Option<usize>,
257    pub direction: Option<VersionDirection>,
258    pub cursor: Option<String>,
259}
260
261impl MessageVersionsQuery {
262    pub fn validate(&self) -> Result<(), String> {
263        if self.limit == Some(0) {
264            return Err("versions limit must be greater than 0".to_string());
265        }
266        Ok(())
267    }
268}
269
270#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
271pub struct MutationResponse {
272    pub channel: String,
273    pub message_serial: String,
274    pub action: MessageAction,
275    pub accepted: bool,
276    #[serde(skip_serializing_if = "Option::is_none")]
277    pub version_serial: Option<String>,
278    pub status: String,
279}
280
281#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
282pub struct GetMessageResponse {
283    pub channel: String,
284    pub item: VersionedRealtimeMessage,
285}
286
287#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
288pub struct ListMessageVersionsResponse {
289    pub channel: String,
290    pub direction: VersionDirection,
291    pub limit: usize,
292    pub has_more: bool,
293    #[serde(skip_serializing_if = "Option::is_none")]
294    pub next_cursor: Option<String>,
295    pub items: Vec<VersionedRealtimeMessage>,
296}
297
298fn validate_unique_clear_fields(fields: &[ClearField]) -> Result<(), String> {
299    let mut seen = std::collections::HashSet::new();
300    for field in fields {
301        if !seen.insert(*field) {
302            return Err("clear_fields must not contain duplicates".to_string());
303        }
304    }
305    Ok(())
306}
307
308fn validate_clear_field_conflicts(
309    has_name: bool,
310    has_data: bool,
311    has_extras: bool,
312    fields: &[ClearField],
313    request_label: &str,
314) -> Result<(), String> {
315    for field in fields {
316        let conflicts = match field {
317            ClearField::Name => has_name,
318            ClearField::Data => has_data,
319            ClearField::Extras => has_extras,
320        };
321        if conflicts {
322            return Err(format!(
323                "{request_label} request cannot both replace and clear the same field"
324            ));
325        }
326    }
327    Ok(())
328}
329
330pub fn apply_runtime_metadata(
331    message: &mut PusherMessage,
332    action: MessageAction,
333    message_serial: &str,
334    version: &MessageVersionMetadata,
335    history_serial: Option<u64>,
336) {
337    let extras = message.extras.get_or_insert_with(MessageExtras::default);
338    let headers = extras.headers.get_or_insert_with(HashMap::new);
339    headers.insert(
340        HEADER_ACTION.to_string(),
341        ExtrasValue::String(action.as_str().to_string()),
342    );
343    headers.insert(
344        HEADER_MESSAGE_SERIAL.to_string(),
345        ExtrasValue::String(message_serial.to_string()),
346    );
347    headers.insert(
348        HEADER_VERSION_SERIAL.to_string(),
349        ExtrasValue::String(version.serial.clone()),
350    );
351    headers.insert(
352        HEADER_VERSION_TIMESTAMP_MS.to_string(),
353        ExtrasValue::Number(version.timestamp_ms as f64),
354    );
355    if let Some(history_serial) = history_serial {
356        headers.insert(
357            HEADER_HISTORY_SERIAL.to_string(),
358            ExtrasValue::Number(history_serial as f64),
359        );
360    }
361}
362
363pub fn extract_runtime_message_serial(message: &PusherMessage) -> Option<&str> {
364    match message
365        .extras
366        .as_ref()
367        .and_then(|extras| extras.headers.as_ref())
368        .and_then(|headers| headers.get(HEADER_MESSAGE_SERIAL))
369    {
370        Some(ExtrasValue::String(value)) => Some(value.as_str()),
371        _ => None,
372    }
373}
374
375pub fn extract_runtime_action(message: &PusherMessage) -> Option<MessageAction> {
376    match message
377        .extras
378        .as_ref()
379        .and_then(|extras| extras.headers.as_ref())
380        .and_then(|headers| headers.get(HEADER_ACTION))
381    {
382        Some(ExtrasValue::String(value)) => match value.as_str() {
383            "message.create" => Some(MessageAction::Create),
384            "message.update" => Some(MessageAction::Update),
385            "message.delete" => Some(MessageAction::Delete),
386            "message.append" => Some(MessageAction::Append),
387            "message.summary" => Some(MessageAction::Summary),
388            _ => None,
389        },
390        _ => None,
391    }
392}
393
394#[cfg(test)]
395mod tests {
396    use super::*;
397    use crate::messages::MessageData;
398    use sonic_rs::json;
399
400    #[test]
401    fn update_request_rejects_empty_body() {
402        let error = UpdateMessageRequest::default().validate().unwrap_err();
403        assert!(error.contains("at least one patch field"));
404    }
405
406    #[test]
407    fn append_request_rejects_empty_data() {
408        let error = AppendMessageRequest {
409            data: String::new(),
410            client_id: None,
411            socket_id: None,
412            description: None,
413            metadata: None,
414        }
415        .validate()
416        .unwrap_err();
417        assert!(error.contains("must not be empty"));
418    }
419
420    #[test]
421    fn versioned_realtime_message_validates_v2_event_name() {
422        let message = VersionedRealtimeMessage {
423            message: PusherMessage {
424                event: Some("sockudo:message.update".to_string()),
425                channel: Some("chat".to_string()),
426                data: Some(MessageData::Json(json!({"hello": "world"}))),
427                name: Some("chat.message".to_string()),
428                user_id: None,
429                tags: None,
430                sequence: None,
431                conflation_key: None,
432                message_id: None,
433                stream_id: None,
434                serial: Some(3),
435                idempotency_key: None,
436                extras: None,
437                delta_sequence: None,
438                delta_conflation_key: None,
439            },
440            action: MessageAction::Update,
441            message_serial: "msg:1".to_string(),
442            history_serial: Some(1),
443            delivery_serial: Some(3),
444            version: Some(MessageVersionMetadata {
445                serial: "ver:2".to_string(),
446                client_id: None,
447                timestamp_ms: 1,
448                description: None,
449                metadata: None,
450            }),
451        };
452
453        message.validate_v2().unwrap();
454    }
455
456    #[test]
457    fn versioned_realtime_message_rejects_mismatched_event() {
458        let message = VersionedRealtimeMessage {
459            message: PusherMessage {
460                event: Some("sockudo:message.delete".to_string()),
461                channel: Some("chat".to_string()),
462                data: Some(MessageData::String("hello".to_string())),
463                name: Some("chat.message".to_string()),
464                user_id: None,
465                tags: None,
466                sequence: None,
467                conflation_key: None,
468                message_id: None,
469                stream_id: None,
470                serial: Some(3),
471                idempotency_key: None,
472                extras: None,
473                delta_sequence: None,
474                delta_conflation_key: None,
475            },
476            action: MessageAction::Update,
477            message_serial: "msg:1".to_string(),
478            history_serial: Some(1),
479            delivery_serial: Some(3),
480            version: Some(MessageVersionMetadata {
481                serial: "ver:2".to_string(),
482                client_id: None,
483                timestamp_ms: 1,
484                description: None,
485                metadata: None,
486            }),
487        };
488
489        let error = message.validate_v2().unwrap_err();
490        assert!(error.contains("does not match action"));
491    }
492
493    #[test]
494    fn versioned_realtime_message_requires_version_metadata() {
495        let message = VersionedRealtimeMessage {
496            message: PusherMessage {
497                event: Some("sockudo:message.update".to_string()),
498                channel: Some("chat".to_string()),
499                data: Some(MessageData::String("hello".to_string())),
500                name: Some("chat.message".to_string()),
501                user_id: None,
502                tags: None,
503                sequence: None,
504                conflation_key: None,
505                message_id: None,
506                stream_id: None,
507                serial: Some(3),
508                idempotency_key: None,
509                extras: None,
510                delta_sequence: None,
511                delta_conflation_key: None,
512            },
513            action: MessageAction::Update,
514            message_serial: "msg:1".to_string(),
515            history_serial: Some(1),
516            delivery_serial: Some(3),
517            version: None,
518        };
519
520        let error = message.validate_v2().unwrap_err();
521        assert!(error.contains("version metadata"));
522    }
523
524    #[test]
525    fn update_request_rejects_replace_and_clear_same_field() {
526        let error = UpdateMessageRequest {
527            name: Some("chat.message".to_string()),
528            data: None,
529            extras: None,
530            clear_fields: vec![ClearField::Name],
531            client_id: None,
532            socket_id: None,
533            description: None,
534            metadata: None,
535        }
536        .validate()
537        .unwrap_err();
538
539        assert!(error.contains("cannot both replace and clear"));
540    }
541
542    #[test]
543    fn delete_request_rejects_replace_and_clear_same_field() {
544        let error = DeleteMessageRequest {
545            data: Some(MessageData::String("gone".to_string())),
546            extras: None,
547            clear_fields: vec![ClearField::Data],
548            client_id: None,
549            socket_id: None,
550            description: None,
551            metadata: None,
552        }
553        .validate()
554        .unwrap_err();
555
556        assert!(error.contains("cannot both replace and clear"));
557    }
558
559    #[test]
560    fn update_request_deserializes_string_data_via_sonic() {
561        let request: UpdateMessageRequest =
562            sonic_rs::from_str(r#"{"data":"hello brave","description":"replace base"}"#).unwrap();
563
564        assert_eq!(
565            request.data,
566            Some(MessageData::String("hello brave".to_string()))
567        );
568        assert_eq!(request.description.as_deref(), Some("replace base"));
569    }
570
571    #[test]
572    fn update_request_deserializes_string_data_via_serde_json() {
573        let request: UpdateMessageRequest =
574            serde_json::from_str(r#"{"data":"hello brave","description":"replace base"}"#).unwrap();
575
576        assert_eq!(
577            request.data,
578            Some(MessageData::String("hello brave".to_string()))
579        );
580        assert_eq!(request.description.as_deref(), Some("replace base"));
581    }
582
583    #[test]
584    fn delete_request_deserializes_string_data_via_serde_json() {
585        let request: DeleteMessageRequest =
586            serde_json::from_str(r#"{"data":"gone","description":"soft delete"}"#).unwrap();
587
588        assert_eq!(request.data, Some(MessageData::String("gone".to_string())));
589        assert_eq!(request.description.as_deref(), Some("soft delete"));
590    }
591}