1use crate::messages::{ExtrasValue, MessageData, MessageExtras, PusherMessage};
2use serde::{Deserialize, Serialize};
3use sonic_rs::Value;
4use std::collections::HashMap;
5
6pub const HEADER_ACTION: &str = "sockudo_action";
7pub const HEADER_MESSAGE_SERIAL: &str = "sockudo_message_serial";
8pub const HEADER_VERSION_SERIAL: &str = "sockudo_version_serial";
9pub const HEADER_HISTORY_SERIAL: &str = "sockudo_history_serial";
10pub const HEADER_VERSION_TIMESTAMP_MS: &str = "sockudo_version_timestamp_ms";
11
12#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
13#[serde(rename_all = "snake_case")]
14pub enum MessageAction {
15 Create,
16 Update,
17 Delete,
18 Append,
19 Summary,
20}
21
22impl MessageAction {
23 pub fn as_str(self) -> &'static str {
24 match self {
25 Self::Create => "message.create",
26 Self::Update => "message.update",
27 Self::Delete => "message.delete",
28 Self::Append => "message.append",
29 Self::Summary => "message.summary",
30 }
31 }
32
33 pub fn v2_event_name(self) -> String {
34 format!("sockudo:{}", self.as_str())
35 }
36}
37
38#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
39#[serde(rename_all = "snake_case")]
40pub enum VersionDirection {
41 NewestFirst,
42 OldestFirst,
43}
44
45impl VersionDirection {
46 pub fn as_str(self) -> &'static str {
47 match self {
48 Self::NewestFirst => "newest_first",
49 Self::OldestFirst => "oldest_first",
50 }
51 }
52}
53
54#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
55#[serde(rename_all = "snake_case")]
56pub enum ClearField {
57 Name,
58 Data,
59 Extras,
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
63pub struct MessageVersionMetadata {
64 pub serial: String,
65 #[serde(skip_serializing_if = "Option::is_none")]
66 pub client_id: Option<String>,
67 pub timestamp_ms: i64,
68 #[serde(skip_serializing_if = "Option::is_none")]
69 pub description: Option<String>,
70 #[serde(skip_serializing_if = "Option::is_none")]
71 pub metadata: Option<Value>,
72}
73
74impl MessageVersionMetadata {
75 pub fn validate(&self) -> Result<(), String> {
76 if self.serial.trim().is_empty() {
77 return Err("version.serial must not be empty".to_string());
78 }
79 Ok(())
80 }
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
84pub struct VersionedRealtimeMessage {
85 #[serde(flatten)]
86 pub message: PusherMessage,
87 pub action: MessageAction,
88 pub message_serial: String,
89 #[serde(skip_serializing_if = "Option::is_none")]
90 pub history_serial: Option<u64>,
91 #[serde(skip_serializing_if = "Option::is_none")]
92 pub delivery_serial: Option<u64>,
93 #[serde(skip_serializing_if = "Option::is_none")]
94 pub version: Option<MessageVersionMetadata>,
95}
96
97impl VersionedRealtimeMessage {
98 pub fn validate_v2(&self) -> Result<(), String> {
99 if self.message_serial.trim().is_empty() {
100 return Err("message_serial must not be empty".to_string());
101 }
102
103 let expected_event = self.action.v2_event_name();
104 match self.message.event.as_deref() {
105 Some(event) if event == expected_event => {}
106 Some(event) => {
107 return Err(format!(
108 "event '{}' does not match action '{}'",
109 event,
110 self.action.as_str()
111 ));
112 }
113 None => {
114 return Err(format!(
115 "event must be present for versioned action '{}'",
116 self.action.as_str()
117 ));
118 }
119 }
120
121 match self.message.channel.as_deref() {
122 Some(channel) if !channel.trim().is_empty() => {}
123 _ => return Err("channel must be present for versioned messages".to_string()),
124 }
125
126 let version = self
127 .version
128 .as_ref()
129 .ok_or_else(|| "version metadata must be present for versioned messages".to_string())?;
130 version.validate()?;
131
132 if self.history_serial.is_none() {
133 return Err("history_serial must be present for versioned messages".to_string());
134 }
135
136 let delivery_serial = self
137 .delivery_serial
138 .ok_or_else(|| "delivery_serial must be present for versioned messages".to_string())?;
139
140 if self.message.serial != Some(delivery_serial) {
141 return Err(
142 "message.serial must match delivery_serial for versioned messages".to_string(),
143 );
144 }
145
146 Ok(())
147 }
148}
149
150#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
151#[serde(default)]
152pub struct UpdateMessageRequest {
153 #[serde(skip_serializing_if = "Option::is_none")]
154 pub name: Option<String>,
155 #[serde(skip_serializing_if = "Option::is_none")]
156 pub data: Option<MessageData>,
157 #[serde(skip_serializing_if = "Option::is_none")]
158 pub extras: Option<MessageExtras>,
159 #[serde(skip_serializing_if = "Vec::is_empty")]
160 pub clear_fields: Vec<ClearField>,
161 #[serde(skip_serializing_if = "Option::is_none")]
162 pub client_id: Option<String>,
163 #[serde(skip_serializing_if = "Option::is_none")]
164 pub socket_id: Option<String>,
165 #[serde(skip_serializing_if = "Option::is_none")]
166 pub description: Option<String>,
167 #[serde(skip_serializing_if = "Option::is_none")]
168 pub metadata: Option<Value>,
169}
170
171impl UpdateMessageRequest {
172 pub fn validate(&self) -> Result<(), String> {
173 let has_patch = self.name.is_some()
174 || self.data.is_some()
175 || self.extras.is_some()
176 || !self.clear_fields.is_empty()
177 || self.client_id.is_some()
178 || self.description.is_some()
179 || self.metadata.is_some();
180
181 if !has_patch {
182 return Err(
183 "update request must include at least one patch field or operation metadata"
184 .to_string(),
185 );
186 }
187
188 validate_unique_clear_fields(&self.clear_fields)?;
189 validate_clear_field_conflicts(
190 self.name.is_some(),
191 self.data.is_some(),
192 self.extras.is_some(),
193 &self.clear_fields,
194 "update",
195 )
196 }
197}
198
199#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
200#[serde(default)]
201pub struct DeleteMessageRequest {
202 #[serde(skip_serializing_if = "Option::is_none")]
203 pub data: Option<MessageData>,
204 #[serde(skip_serializing_if = "Option::is_none")]
205 pub extras: Option<MessageExtras>,
206 #[serde(skip_serializing_if = "Vec::is_empty")]
207 pub clear_fields: Vec<ClearField>,
208 #[serde(skip_serializing_if = "Option::is_none")]
209 pub client_id: Option<String>,
210 #[serde(skip_serializing_if = "Option::is_none")]
211 pub socket_id: Option<String>,
212 #[serde(skip_serializing_if = "Option::is_none")]
213 pub description: Option<String>,
214 #[serde(skip_serializing_if = "Option::is_none")]
215 pub metadata: Option<Value>,
216}
217
218impl DeleteMessageRequest {
219 pub fn validate(&self) -> Result<(), String> {
220 validate_unique_clear_fields(&self.clear_fields)?;
221 validate_clear_field_conflicts(
222 false,
223 self.data.is_some(),
224 self.extras.is_some(),
225 &self.clear_fields,
226 "delete",
227 )
228 }
229}
230
231#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
232pub struct AppendMessageRequest {
233 pub data: String,
234 #[serde(skip_serializing_if = "Option::is_none")]
235 pub client_id: Option<String>,
236 #[serde(skip_serializing_if = "Option::is_none")]
237 pub socket_id: Option<String>,
238 #[serde(skip_serializing_if = "Option::is_none")]
239 pub description: Option<String>,
240 #[serde(skip_serializing_if = "Option::is_none")]
241 pub metadata: Option<Value>,
242}
243
244impl AppendMessageRequest {
245 pub fn validate(&self) -> Result<(), String> {
246 if self.data.is_empty() {
247 return Err("append request data must not be empty".to_string());
248 }
249 Ok(())
250 }
251}
252
253#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
254#[serde(default)]
255pub struct MessageVersionsQuery {
256 pub limit: Option<usize>,
257 pub direction: Option<VersionDirection>,
258 pub cursor: Option<String>,
259}
260
261impl MessageVersionsQuery {
262 pub fn validate(&self) -> Result<(), String> {
263 if self.limit == Some(0) {
264 return Err("versions limit must be greater than 0".to_string());
265 }
266 Ok(())
267 }
268}
269
270#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
271pub struct MutationResponse {
272 pub channel: String,
273 pub message_serial: String,
274 pub action: MessageAction,
275 pub accepted: bool,
276 #[serde(skip_serializing_if = "Option::is_none")]
277 pub version_serial: Option<String>,
278 pub status: String,
279}
280
281#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
282pub struct GetMessageResponse {
283 pub channel: String,
284 pub item: VersionedRealtimeMessage,
285}
286
287#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
288pub struct ListMessageVersionsResponse {
289 pub channel: String,
290 pub direction: VersionDirection,
291 pub limit: usize,
292 pub has_more: bool,
293 #[serde(skip_serializing_if = "Option::is_none")]
294 pub next_cursor: Option<String>,
295 pub items: Vec<VersionedRealtimeMessage>,
296}
297
298fn validate_unique_clear_fields(fields: &[ClearField]) -> Result<(), String> {
299 let mut seen = std::collections::HashSet::new();
300 for field in fields {
301 if !seen.insert(*field) {
302 return Err("clear_fields must not contain duplicates".to_string());
303 }
304 }
305 Ok(())
306}
307
308fn validate_clear_field_conflicts(
309 has_name: bool,
310 has_data: bool,
311 has_extras: bool,
312 fields: &[ClearField],
313 request_label: &str,
314) -> Result<(), String> {
315 for field in fields {
316 let conflicts = match field {
317 ClearField::Name => has_name,
318 ClearField::Data => has_data,
319 ClearField::Extras => has_extras,
320 };
321 if conflicts {
322 return Err(format!(
323 "{request_label} request cannot both replace and clear the same field"
324 ));
325 }
326 }
327 Ok(())
328}
329
330pub fn apply_runtime_metadata(
331 message: &mut PusherMessage,
332 action: MessageAction,
333 message_serial: &str,
334 version: &MessageVersionMetadata,
335 history_serial: Option<u64>,
336) {
337 let extras = message.extras.get_or_insert_with(MessageExtras::default);
338 let headers = extras.headers.get_or_insert_with(HashMap::new);
339 headers.insert(
340 HEADER_ACTION.to_string(),
341 ExtrasValue::String(action.as_str().to_string()),
342 );
343 headers.insert(
344 HEADER_MESSAGE_SERIAL.to_string(),
345 ExtrasValue::String(message_serial.to_string()),
346 );
347 headers.insert(
348 HEADER_VERSION_SERIAL.to_string(),
349 ExtrasValue::String(version.serial.clone()),
350 );
351 headers.insert(
352 HEADER_VERSION_TIMESTAMP_MS.to_string(),
353 ExtrasValue::Number(version.timestamp_ms as f64),
354 );
355 if let Some(history_serial) = history_serial {
356 headers.insert(
357 HEADER_HISTORY_SERIAL.to_string(),
358 ExtrasValue::Number(history_serial as f64),
359 );
360 }
361}
362
363pub fn extract_runtime_message_serial(message: &PusherMessage) -> Option<&str> {
364 match message
365 .extras
366 .as_ref()
367 .and_then(|extras| extras.headers.as_ref())
368 .and_then(|headers| headers.get(HEADER_MESSAGE_SERIAL))
369 {
370 Some(ExtrasValue::String(value)) => Some(value.as_str()),
371 _ => None,
372 }
373}
374
375pub fn extract_runtime_action(message: &PusherMessage) -> Option<MessageAction> {
376 match message
377 .extras
378 .as_ref()
379 .and_then(|extras| extras.headers.as_ref())
380 .and_then(|headers| headers.get(HEADER_ACTION))
381 {
382 Some(ExtrasValue::String(value)) => match value.as_str() {
383 "message.create" => Some(MessageAction::Create),
384 "message.update" => Some(MessageAction::Update),
385 "message.delete" => Some(MessageAction::Delete),
386 "message.append" => Some(MessageAction::Append),
387 "message.summary" => Some(MessageAction::Summary),
388 _ => None,
389 },
390 _ => None,
391 }
392}
393
394#[cfg(test)]
395mod tests {
396 use super::*;
397 use crate::messages::MessageData;
398 use sonic_rs::json;
399
400 #[test]
401 fn update_request_rejects_empty_body() {
402 let error = UpdateMessageRequest::default().validate().unwrap_err();
403 assert!(error.contains("at least one patch field"));
404 }
405
406 #[test]
407 fn append_request_rejects_empty_data() {
408 let error = AppendMessageRequest {
409 data: String::new(),
410 client_id: None,
411 socket_id: None,
412 description: None,
413 metadata: None,
414 }
415 .validate()
416 .unwrap_err();
417 assert!(error.contains("must not be empty"));
418 }
419
420 #[test]
421 fn versioned_realtime_message_validates_v2_event_name() {
422 let message = VersionedRealtimeMessage {
423 message: PusherMessage {
424 event: Some("sockudo:message.update".to_string()),
425 channel: Some("chat".to_string()),
426 data: Some(MessageData::Json(json!({"hello": "world"}))),
427 name: Some("chat.message".to_string()),
428 user_id: None,
429 tags: None,
430 sequence: None,
431 conflation_key: None,
432 message_id: None,
433 stream_id: None,
434 serial: Some(3),
435 idempotency_key: None,
436 extras: None,
437 delta_sequence: None,
438 delta_conflation_key: None,
439 },
440 action: MessageAction::Update,
441 message_serial: "msg:1".to_string(),
442 history_serial: Some(1),
443 delivery_serial: Some(3),
444 version: Some(MessageVersionMetadata {
445 serial: "ver:2".to_string(),
446 client_id: None,
447 timestamp_ms: 1,
448 description: None,
449 metadata: None,
450 }),
451 };
452
453 message.validate_v2().unwrap();
454 }
455
456 #[test]
457 fn versioned_realtime_message_rejects_mismatched_event() {
458 let message = VersionedRealtimeMessage {
459 message: PusherMessage {
460 event: Some("sockudo:message.delete".to_string()),
461 channel: Some("chat".to_string()),
462 data: Some(MessageData::String("hello".to_string())),
463 name: Some("chat.message".to_string()),
464 user_id: None,
465 tags: None,
466 sequence: None,
467 conflation_key: None,
468 message_id: None,
469 stream_id: None,
470 serial: Some(3),
471 idempotency_key: None,
472 extras: None,
473 delta_sequence: None,
474 delta_conflation_key: None,
475 },
476 action: MessageAction::Update,
477 message_serial: "msg:1".to_string(),
478 history_serial: Some(1),
479 delivery_serial: Some(3),
480 version: Some(MessageVersionMetadata {
481 serial: "ver:2".to_string(),
482 client_id: None,
483 timestamp_ms: 1,
484 description: None,
485 metadata: None,
486 }),
487 };
488
489 let error = message.validate_v2().unwrap_err();
490 assert!(error.contains("does not match action"));
491 }
492
493 #[test]
494 fn versioned_realtime_message_requires_version_metadata() {
495 let message = VersionedRealtimeMessage {
496 message: PusherMessage {
497 event: Some("sockudo:message.update".to_string()),
498 channel: Some("chat".to_string()),
499 data: Some(MessageData::String("hello".to_string())),
500 name: Some("chat.message".to_string()),
501 user_id: None,
502 tags: None,
503 sequence: None,
504 conflation_key: None,
505 message_id: None,
506 stream_id: None,
507 serial: Some(3),
508 idempotency_key: None,
509 extras: None,
510 delta_sequence: None,
511 delta_conflation_key: None,
512 },
513 action: MessageAction::Update,
514 message_serial: "msg:1".to_string(),
515 history_serial: Some(1),
516 delivery_serial: Some(3),
517 version: None,
518 };
519
520 let error = message.validate_v2().unwrap_err();
521 assert!(error.contains("version metadata"));
522 }
523
524 #[test]
525 fn update_request_rejects_replace_and_clear_same_field() {
526 let error = UpdateMessageRequest {
527 name: Some("chat.message".to_string()),
528 data: None,
529 extras: None,
530 clear_fields: vec![ClearField::Name],
531 client_id: None,
532 socket_id: None,
533 description: None,
534 metadata: None,
535 }
536 .validate()
537 .unwrap_err();
538
539 assert!(error.contains("cannot both replace and clear"));
540 }
541
542 #[test]
543 fn delete_request_rejects_replace_and_clear_same_field() {
544 let error = DeleteMessageRequest {
545 data: Some(MessageData::String("gone".to_string())),
546 extras: None,
547 clear_fields: vec![ClearField::Data],
548 client_id: None,
549 socket_id: None,
550 description: None,
551 metadata: None,
552 }
553 .validate()
554 .unwrap_err();
555
556 assert!(error.contains("cannot both replace and clear"));
557 }
558
559 #[test]
560 fn update_request_deserializes_string_data_via_sonic() {
561 let request: UpdateMessageRequest =
562 sonic_rs::from_str(r#"{"data":"hello brave","description":"replace base"}"#).unwrap();
563
564 assert_eq!(
565 request.data,
566 Some(MessageData::String("hello brave".to_string()))
567 );
568 assert_eq!(request.description.as_deref(), Some("replace base"));
569 }
570
571 #[test]
572 fn update_request_deserializes_string_data_via_serde_json() {
573 let request: UpdateMessageRequest =
574 serde_json::from_str(r#"{"data":"hello brave","description":"replace base"}"#).unwrap();
575
576 assert_eq!(
577 request.data,
578 Some(MessageData::String("hello brave".to_string()))
579 );
580 assert_eq!(request.description.as_deref(), Some("replace base"));
581 }
582
583 #[test]
584 fn delete_request_deserializes_string_data_via_serde_json() {
585 let request: DeleteMessageRequest =
586 serde_json::from_str(r#"{"data":"gone","description":"soft delete"}"#).unwrap();
587
588 assert_eq!(request.data, Some(MessageData::String("gone".to_string())));
589 assert_eq!(request.description.as_deref(), Some("soft delete"));
590 }
591}