1use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7
8#[derive(Debug, Clone, Serialize, Deserialize)]
13#[serde(rename_all = "camelCase")]
14pub struct Event {
15 pub id: String,
17
18 pub subject: String,
20
21 pub category: String,
23
24 #[serde(default)]
29 pub event_type: String,
30
31 #[serde(default = "default_version")]
36 pub version: u32,
37
38 pub payload: serde_json::Value,
40
41 pub summary: String,
43
44 pub source: String,
46
47 pub timestamp: u64,
49
50 #[serde(default)]
52 pub metadata: HashMap<String, String>,
53}
54
55fn default_version() -> u32 {
56 1
57}
58
59impl Event {
60 pub fn new(
62 subject: impl Into<String>,
63 category: impl Into<String>,
64 summary: impl Into<String>,
65 source: impl Into<String>,
66 payload: serde_json::Value,
67 ) -> Self {
68 Self {
69 id: format!("evt-{}", uuid::Uuid::new_v4()),
70 subject: subject.into(),
71 category: category.into(),
72 event_type: String::new(),
73 version: 1,
74 payload,
75 summary: summary.into(),
76 source: source.into(),
77 timestamp: now_millis(),
78 metadata: HashMap::new(),
79 }
80 }
81
82 pub fn typed(
84 subject: impl Into<String>,
85 category: impl Into<String>,
86 event_type: impl Into<String>,
87 version: u32,
88 summary: impl Into<String>,
89 source: impl Into<String>,
90 payload: serde_json::Value,
91 ) -> Self {
92 Self {
93 id: format!("evt-{}", uuid::Uuid::new_v4()),
94 subject: subject.into(),
95 category: category.into(),
96 event_type: event_type.into(),
97 version,
98 payload,
99 summary: summary.into(),
100 source: source.into(),
101 timestamp: now_millis(),
102 metadata: HashMap::new(),
103 }
104 }
105
106 pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
108 self.metadata.insert(key.into(), value.into());
109 self
110 }
111}
112
113#[derive(Debug, Clone)]
115pub struct ReceivedEvent {
116 pub event: Event,
118
119 pub sequence: u64,
121
122 pub num_delivered: u64,
124
125 pub stream: String,
127}
128
129#[derive(Debug, Clone, Serialize, Deserialize)]
131#[serde(rename_all = "camelCase")]
132pub struct SubscriptionFilter {
133 pub subscriber_id: String,
135
136 pub subjects: Vec<String>,
138
139 pub durable: bool,
141
142 #[serde(default, skip_serializing_if = "Option::is_none")]
144 pub options: Option<SubscribeOptions>,
145}
146
147#[derive(Debug, Clone, Default, Serialize, Deserialize)]
149#[serde(rename_all = "camelCase")]
150pub struct EventCounts {
151 pub categories: HashMap<String, u64>,
153
154 pub total: u64,
156}
157
158#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
163#[serde(rename_all = "camelCase", tag = "type")]
164pub enum DeliverPolicy {
165 #[default]
167 All,
168 Last,
170 New,
172 ByStartSequence { sequence: u64 },
174 ByStartTime { timestamp: u64 },
176 LastPerSubject,
178}
179
180#[derive(Debug, Clone, Default, Serialize, Deserialize)]
185#[serde(rename_all = "camelCase")]
186pub struct PublishOptions {
187 #[serde(default, skip_serializing_if = "Option::is_none")]
192 pub msg_id: Option<String>,
193
194 #[serde(default, skip_serializing_if = "Option::is_none")]
199 pub expected_sequence: Option<u64>,
200
201 #[serde(default, skip_serializing_if = "Option::is_none")]
203 pub timeout_secs: Option<u64>,
204}
205
206#[derive(Debug, Clone, Default, Serialize, Deserialize)]
211#[serde(rename_all = "camelCase")]
212pub struct SubscribeOptions {
213 #[serde(default, skip_serializing_if = "Option::is_none")]
218 pub max_deliver: Option<i64>,
219
220 #[serde(default, skip_serializing_if = "Vec::is_empty")]
225 pub backoff_secs: Vec<u64>,
226
227 #[serde(default, skip_serializing_if = "Option::is_none")]
233 pub max_ack_pending: Option<i64>,
234
235 #[serde(default)]
237 pub deliver_policy: DeliverPolicy,
238
239 #[serde(default, skip_serializing_if = "Option::is_none")]
243 pub ack_wait_secs: Option<u64>,
244}
245
246fn now_millis() -> u64 {
248 std::time::SystemTime::now()
249 .duration_since(std::time::UNIX_EPOCH)
250 .unwrap_or_default()
251 .as_millis() as u64
252}
253
254#[cfg(test)]
255mod tests {
256 use super::*;
257
258 #[test]
259 fn test_event_creation() {
260 let event = Event::new(
261 "events.market.forex",
262 "market",
263 "USD/CNY rate change",
264 "reuters",
265 serde_json::json!({"rate": 7.35}),
266 );
267
268 assert!(event.id.starts_with("evt-"));
269 assert_eq!(event.subject, "events.market.forex");
270 assert_eq!(event.category, "market");
271 assert_eq!(event.source, "reuters");
272 assert!(event.timestamp > 0);
273 assert!(event.metadata.is_empty());
274 }
275
276 #[test]
277 fn test_event_with_metadata() {
278 let event = Event::new(
279 "events.system.deploy",
280 "system",
281 "Deployed v1.2",
282 "ci",
283 serde_json::json!({}),
284 )
285 .with_metadata("env", "production")
286 .with_metadata("version", "1.2.0");
287
288 assert_eq!(event.metadata.len(), 2);
289 assert_eq!(event.metadata["env"], "production");
290 assert_eq!(event.metadata["version"], "1.2.0");
291 }
292
293 #[test]
294 fn test_event_serialization_roundtrip() {
295 let event = Event::new(
296 "events.market.forex",
297 "market",
298 "Rate change",
299 "reuters",
300 serde_json::json!({"rate": 7.35}),
301 )
302 .with_metadata("region", "asia");
303
304 let json = serde_json::to_string(&event).unwrap();
305 assert!(json.contains("\"subject\":\"events.market.forex\""));
306 assert!(json.contains("\"category\":\"market\""));
307
308 let parsed: Event = serde_json::from_str(&json).unwrap();
309 assert_eq!(parsed.id, event.id);
310 assert_eq!(parsed.subject, event.subject);
311 assert_eq!(parsed.metadata["region"], "asia");
312 }
313
314 #[test]
315 fn test_event_counts_default() {
316 let counts = EventCounts::default();
317 assert_eq!(counts.total, 0);
318 assert!(counts.categories.is_empty());
319 }
320
321 #[test]
322 fn test_subscription_filter_serialization() {
323 let filter = SubscriptionFilter {
324 subscriber_id: "financial-analyst".to_string(),
325 subjects: vec!["events.market.>".to_string()],
326 durable: true,
327 options: None,
328 };
329
330 let json = serde_json::to_string(&filter).unwrap();
331 assert!(json.contains("\"subscriberId\":\"financial-analyst\""));
332 assert!(json.contains("\"durable\":true"));
333
334 let parsed: SubscriptionFilter = serde_json::from_str(&json).unwrap();
335 assert_eq!(parsed.subscriber_id, "financial-analyst");
336 assert!(parsed.durable);
337 }
338
339 #[test]
340 fn test_publish_options_default() {
341 let opts = PublishOptions::default();
342 assert!(opts.msg_id.is_none());
343 assert!(opts.expected_sequence.is_none());
344 assert!(opts.timeout_secs.is_none());
345 }
346
347 #[test]
348 fn test_publish_options_serialization() {
349 let opts = PublishOptions {
350 msg_id: Some("dedup-123".to_string()),
351 expected_sequence: Some(42),
352 timeout_secs: Some(5),
353 };
354
355 let json = serde_json::to_string(&opts).unwrap();
356 assert!(json.contains("\"msgId\":\"dedup-123\""));
357 assert!(json.contains("\"expectedSequence\":42"));
358 assert!(json.contains("\"timeoutSecs\":5"));
359
360 let parsed: PublishOptions = serde_json::from_str(&json).unwrap();
361 assert_eq!(parsed.msg_id.unwrap(), "dedup-123");
362 assert_eq!(parsed.expected_sequence.unwrap(), 42);
363 }
364
365 #[test]
366 fn test_publish_options_skip_none_fields() {
367 let opts = PublishOptions::default();
368 let json = serde_json::to_string(&opts).unwrap();
369 assert!(!json.contains("msgId"));
370 assert!(!json.contains("expectedSequence"));
371 assert!(!json.contains("timeoutSecs"));
372 }
373
374 #[test]
375 fn test_subscribe_options_default() {
376 let opts = SubscribeOptions::default();
377 assert!(opts.max_deliver.is_none());
378 assert!(opts.backoff_secs.is_empty());
379 assert!(opts.max_ack_pending.is_none());
380 assert_eq!(opts.deliver_policy, DeliverPolicy::All);
381 assert!(opts.ack_wait_secs.is_none());
382 }
383
384 #[test]
385 fn test_subscribe_options_serialization() {
386 let opts = SubscribeOptions {
387 max_deliver: Some(5),
388 backoff_secs: vec![1, 5, 30],
389 max_ack_pending: Some(1000),
390 deliver_policy: DeliverPolicy::New,
391 ack_wait_secs: Some(30),
392 };
393
394 let json = serde_json::to_string(&opts).unwrap();
395 assert!(json.contains("\"maxDeliver\":5"));
396 assert!(json.contains("\"backoffSecs\":[1,5,30]"));
397 assert!(json.contains("\"maxAckPending\":1000"));
398 assert!(json.contains("\"ackWaitSecs\":30"));
399
400 let parsed: SubscribeOptions = serde_json::from_str(&json).unwrap();
401 assert_eq!(parsed.max_deliver.unwrap(), 5);
402 assert_eq!(parsed.backoff_secs, vec![1, 5, 30]);
403 assert_eq!(parsed.max_ack_pending.unwrap(), 1000);
404 assert_eq!(parsed.deliver_policy, DeliverPolicy::New);
405 }
406
407 #[test]
408 fn test_subscribe_options_skip_empty_fields() {
409 let opts = SubscribeOptions::default();
410 let json = serde_json::to_string(&opts).unwrap();
411 assert!(!json.contains("maxDeliver"));
412 assert!(!json.contains("backoffSecs"));
413 assert!(!json.contains("maxAckPending"));
414 assert!(!json.contains("ackWaitSecs"));
415 }
416
417 #[test]
418 fn test_deliver_policy_variants() {
419 let cases = vec![
420 (DeliverPolicy::All, "All"),
421 (DeliverPolicy::Last, "Last"),
422 (DeliverPolicy::New, "New"),
423 (DeliverPolicy::LastPerSubject, "LastPerSubject"),
424 ];
425
426 for (policy, _) in &cases {
427 let json = serde_json::to_string(policy).unwrap();
428 let parsed: DeliverPolicy = serde_json::from_str(&json).unwrap();
429 assert_eq!(&parsed, policy);
430 }
431 }
432
433 #[test]
434 fn test_deliver_policy_by_start_sequence() {
435 let policy = DeliverPolicy::ByStartSequence { sequence: 100 };
436 let json = serde_json::to_string(&policy).unwrap();
437 assert!(json.contains("\"sequence\":100"));
438
439 let parsed: DeliverPolicy = serde_json::from_str(&json).unwrap();
440 assert_eq!(parsed, DeliverPolicy::ByStartSequence { sequence: 100 });
441 }
442
443 #[test]
444 fn test_deliver_policy_by_start_time() {
445 let ts = 1700000000000u64;
446 let policy = DeliverPolicy::ByStartTime { timestamp: ts };
447 let json = serde_json::to_string(&policy).unwrap();
448 assert!(json.contains(&format!("\"timestamp\":{}", ts)));
449
450 let parsed: DeliverPolicy = serde_json::from_str(&json).unwrap();
451 assert_eq!(parsed, DeliverPolicy::ByStartTime { timestamp: ts });
452 }
453
454 #[test]
455 fn test_event_default_version() {
456 let event = Event::new(
457 "events.test.a",
458 "test",
459 "Test",
460 "test",
461 serde_json::json!({}),
462 );
463 assert_eq!(event.version, 1);
464 assert_eq!(event.event_type, "");
465 }
466
467 #[test]
468 fn test_event_typed() {
469 let event = Event::typed(
470 "events.market.forex",
471 "market",
472 "forex.rate_change",
473 2,
474 "USD/CNY rate change",
475 "reuters",
476 serde_json::json!({"rate": 7.35}),
477 );
478
479 assert!(event.id.starts_with("evt-"));
480 assert_eq!(event.event_type, "forex.rate_change");
481 assert_eq!(event.version, 2);
482 assert_eq!(event.category, "market");
483 }
484
485 #[test]
486 fn test_event_version_serialization() {
487 let event = Event::typed(
488 "events.test.a",
489 "test",
490 "test.created",
491 3,
492 "Test",
493 "test",
494 serde_json::json!({}),
495 );
496
497 let json = serde_json::to_string(&event).unwrap();
498 assert!(json.contains("\"eventType\":\"test.created\""));
499 assert!(json.contains("\"version\":3"));
500
501 let parsed: Event = serde_json::from_str(&json).unwrap();
502 assert_eq!(parsed.event_type, "test.created");
503 assert_eq!(parsed.version, 3);
504 }
505
506 #[test]
507 fn test_event_version_backward_compat() {
508 let json = r#"{
510 "id": "evt-123",
511 "subject": "events.test.a",
512 "category": "test",
513 "payload": {},
514 "summary": "Test",
515 "source": "test",
516 "timestamp": 1700000000000
517 }"#;
518
519 let event: Event = serde_json::from_str(json).unwrap();
520 assert_eq!(event.event_type, "");
521 assert_eq!(event.version, 1);
522 }
523}