1use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::future::Future;
8use std::pin::Pin;
9
10pub(crate) type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
18#[serde(rename_all = "camelCase")]
19pub struct Event {
20 pub id: String,
22
23 pub subject: String,
25
26 pub category: String,
28
29 #[serde(default)]
34 pub event_type: String,
35
36 #[serde(default = "default_version")]
41 pub version: u32,
42
43 pub payload: serde_json::Value,
45
46 pub summary: String,
48
49 pub source: String,
51
52 pub timestamp: u64,
54
55 #[serde(default)]
57 pub metadata: HashMap<String, String>,
58}
59
60fn default_version() -> u32 {
61 1
62}
63
64impl Event {
65 pub fn new(
67 subject: impl Into<String>,
68 category: impl Into<String>,
69 summary: impl Into<String>,
70 source: impl Into<String>,
71 payload: serde_json::Value,
72 ) -> Self {
73 Self {
74 id: format!("evt-{}", uuid::Uuid::new_v4()),
75 subject: subject.into(),
76 category: category.into(),
77 event_type: String::new(),
78 version: 1,
79 payload,
80 summary: summary.into(),
81 source: source.into(),
82 timestamp: now_millis(),
83 metadata: HashMap::new(),
84 }
85 }
86
87 pub fn typed(
89 subject: impl Into<String>,
90 category: impl Into<String>,
91 event_type: impl Into<String>,
92 version: u32,
93 summary: impl Into<String>,
94 source: impl Into<String>,
95 payload: serde_json::Value,
96 ) -> Self {
97 Self {
98 id: format!("evt-{}", uuid::Uuid::new_v4()),
99 subject: subject.into(),
100 category: category.into(),
101 event_type: event_type.into(),
102 version,
103 payload,
104 summary: summary.into(),
105 source: source.into(),
106 timestamp: now_millis(),
107 metadata: HashMap::new(),
108 }
109 }
110
111 pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
113 self.metadata.insert(key.into(), value.into());
114 self
115 }
116}
117
118#[derive(Debug, Clone)]
120pub struct ReceivedEvent {
121 pub event: Event,
123
124 pub sequence: u64,
126
127 pub num_delivered: u64,
129
130 pub stream: String,
132}
133
134#[derive(Debug, Clone, Serialize, Deserialize)]
136#[serde(rename_all = "camelCase")]
137pub struct SubscriptionFilter {
138 pub subscriber_id: String,
140
141 pub subjects: Vec<String>,
143
144 pub durable: bool,
146
147 #[serde(default, skip_serializing_if = "Option::is_none")]
149 pub options: Option<SubscribeOptions>,
150}
151
152#[derive(Debug, Clone, Default, Serialize, Deserialize)]
154#[serde(rename_all = "camelCase")]
155pub struct EventCounts {
156 pub categories: HashMap<String, u64>,
158
159 pub total: u64,
161}
162
163#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
168#[serde(rename_all = "camelCase", tag = "type")]
169pub enum DeliverPolicy {
170 #[default]
172 All,
173 Last,
175 New,
177 ByStartSequence { sequence: u64 },
179 ByStartTime { timestamp: u64 },
181 LastPerSubject,
183}
184
185#[derive(Debug, Clone, Default, Serialize, Deserialize)]
190#[serde(rename_all = "camelCase")]
191pub struct PublishOptions {
192 #[serde(default, skip_serializing_if = "Option::is_none")]
197 pub msg_id: Option<String>,
198
199 #[serde(default, skip_serializing_if = "Option::is_none")]
204 pub expected_sequence: Option<u64>,
205
206 #[serde(default, skip_serializing_if = "Option::is_none")]
208 pub timeout_secs: Option<u64>,
209}
210
211#[derive(Debug, Clone, Default, Serialize, Deserialize)]
216#[serde(rename_all = "camelCase")]
217pub struct SubscribeOptions {
218 #[serde(default, skip_serializing_if = "Option::is_none")]
223 pub max_deliver: Option<i64>,
224
225 #[serde(default, skip_serializing_if = "Vec::is_empty")]
230 pub backoff_secs: Vec<u64>,
231
232 #[serde(default, skip_serializing_if = "Option::is_none")]
238 pub max_ack_pending: Option<i64>,
239
240 #[serde(default)]
242 pub deliver_policy: DeliverPolicy,
243
244 #[serde(default, skip_serializing_if = "Option::is_none")]
248 pub ack_wait_secs: Option<u64>,
249}
250
251pub(crate) fn now_millis() -> u64 {
253 std::time::SystemTime::now()
254 .duration_since(std::time::UNIX_EPOCH)
255 .unwrap_or_default()
256 .as_millis() as u64
257}
258
259#[cfg(test)]
260mod tests {
261 use super::*;
262
263 #[test]
264 fn test_event_creation() {
265 let event = Event::new(
266 "events.market.forex",
267 "market",
268 "USD/CNY rate change",
269 "reuters",
270 serde_json::json!({"rate": 7.35}),
271 );
272
273 assert!(event.id.starts_with("evt-"));
274 assert_eq!(event.subject, "events.market.forex");
275 assert_eq!(event.category, "market");
276 assert_eq!(event.source, "reuters");
277 assert!(event.timestamp > 0);
278 assert!(event.metadata.is_empty());
279 }
280
281 #[test]
282 fn test_event_with_metadata() {
283 let event = Event::new(
284 "events.system.deploy",
285 "system",
286 "Deployed v1.2",
287 "ci",
288 serde_json::json!({}),
289 )
290 .with_metadata("env", "production")
291 .with_metadata("version", "1.2.0");
292
293 assert_eq!(event.metadata.len(), 2);
294 assert_eq!(event.metadata["env"], "production");
295 assert_eq!(event.metadata["version"], "1.2.0");
296 }
297
298 #[test]
299 fn test_event_serialization_roundtrip() {
300 let event = Event::new(
301 "events.market.forex",
302 "market",
303 "Rate change",
304 "reuters",
305 serde_json::json!({"rate": 7.35}),
306 )
307 .with_metadata("region", "asia");
308
309 let json = serde_json::to_string(&event).unwrap();
310 assert!(json.contains("\"subject\":\"events.market.forex\""));
311 assert!(json.contains("\"category\":\"market\""));
312
313 let parsed: Event = serde_json::from_str(&json).unwrap();
314 assert_eq!(parsed.id, event.id);
315 assert_eq!(parsed.subject, event.subject);
316 assert_eq!(parsed.metadata["region"], "asia");
317 }
318
319 #[test]
320 fn test_event_counts_default() {
321 let counts = EventCounts::default();
322 assert_eq!(counts.total, 0);
323 assert!(counts.categories.is_empty());
324 }
325
326 #[test]
327 fn test_subscription_filter_serialization() {
328 let filter = SubscriptionFilter {
329 subscriber_id: "financial-analyst".to_string(),
330 subjects: vec!["events.market.>".to_string()],
331 durable: true,
332 options: None,
333 };
334
335 let json = serde_json::to_string(&filter).unwrap();
336 assert!(json.contains("\"subscriberId\":\"financial-analyst\""));
337 assert!(json.contains("\"durable\":true"));
338
339 let parsed: SubscriptionFilter = serde_json::from_str(&json).unwrap();
340 assert_eq!(parsed.subscriber_id, "financial-analyst");
341 assert!(parsed.durable);
342 }
343
344 #[test]
345 fn test_publish_options_default() {
346 let opts = PublishOptions::default();
347 assert!(opts.msg_id.is_none());
348 assert!(opts.expected_sequence.is_none());
349 assert!(opts.timeout_secs.is_none());
350 }
351
352 #[test]
353 fn test_publish_options_serialization() {
354 let opts = PublishOptions {
355 msg_id: Some("dedup-123".to_string()),
356 expected_sequence: Some(42),
357 timeout_secs: Some(5),
358 };
359
360 let json = serde_json::to_string(&opts).unwrap();
361 assert!(json.contains("\"msgId\":\"dedup-123\""));
362 assert!(json.contains("\"expectedSequence\":42"));
363 assert!(json.contains("\"timeoutSecs\":5"));
364
365 let parsed: PublishOptions = serde_json::from_str(&json).unwrap();
366 assert_eq!(parsed.msg_id.unwrap(), "dedup-123");
367 assert_eq!(parsed.expected_sequence.unwrap(), 42);
368 }
369
370 #[test]
371 fn test_publish_options_skip_none_fields() {
372 let opts = PublishOptions::default();
373 let json = serde_json::to_string(&opts).unwrap();
374 assert!(!json.contains("msgId"));
375 assert!(!json.contains("expectedSequence"));
376 assert!(!json.contains("timeoutSecs"));
377 }
378
379 #[test]
380 fn test_subscribe_options_default() {
381 let opts = SubscribeOptions::default();
382 assert!(opts.max_deliver.is_none());
383 assert!(opts.backoff_secs.is_empty());
384 assert!(opts.max_ack_pending.is_none());
385 assert_eq!(opts.deliver_policy, DeliverPolicy::All);
386 assert!(opts.ack_wait_secs.is_none());
387 }
388
389 #[test]
390 fn test_subscribe_options_serialization() {
391 let opts = SubscribeOptions {
392 max_deliver: Some(5),
393 backoff_secs: vec![1, 5, 30],
394 max_ack_pending: Some(1000),
395 deliver_policy: DeliverPolicy::New,
396 ack_wait_secs: Some(30),
397 };
398
399 let json = serde_json::to_string(&opts).unwrap();
400 assert!(json.contains("\"maxDeliver\":5"));
401 assert!(json.contains("\"backoffSecs\":[1,5,30]"));
402 assert!(json.contains("\"maxAckPending\":1000"));
403 assert!(json.contains("\"ackWaitSecs\":30"));
404
405 let parsed: SubscribeOptions = serde_json::from_str(&json).unwrap();
406 assert_eq!(parsed.max_deliver.unwrap(), 5);
407 assert_eq!(parsed.backoff_secs, vec![1, 5, 30]);
408 assert_eq!(parsed.max_ack_pending.unwrap(), 1000);
409 assert_eq!(parsed.deliver_policy, DeliverPolicy::New);
410 }
411
412 #[test]
413 fn test_subscribe_options_skip_empty_fields() {
414 let opts = SubscribeOptions::default();
415 let json = serde_json::to_string(&opts).unwrap();
416 assert!(!json.contains("maxDeliver"));
417 assert!(!json.contains("backoffSecs"));
418 assert!(!json.contains("maxAckPending"));
419 assert!(!json.contains("ackWaitSecs"));
420 }
421
422 #[test]
423 fn test_deliver_policy_variants() {
424 let cases = vec![
425 (DeliverPolicy::All, "All"),
426 (DeliverPolicy::Last, "Last"),
427 (DeliverPolicy::New, "New"),
428 (DeliverPolicy::LastPerSubject, "LastPerSubject"),
429 ];
430
431 for (policy, _) in &cases {
432 let json = serde_json::to_string(policy).unwrap();
433 let parsed: DeliverPolicy = serde_json::from_str(&json).unwrap();
434 assert_eq!(&parsed, policy);
435 }
436 }
437
438 #[test]
439 fn test_deliver_policy_by_start_sequence() {
440 let policy = DeliverPolicy::ByStartSequence { sequence: 100 };
441 let json = serde_json::to_string(&policy).unwrap();
442 assert!(json.contains("\"sequence\":100"));
443
444 let parsed: DeliverPolicy = serde_json::from_str(&json).unwrap();
445 assert_eq!(parsed, DeliverPolicy::ByStartSequence { sequence: 100 });
446 }
447
448 #[test]
449 fn test_deliver_policy_by_start_time() {
450 let ts = 1700000000000u64;
451 let policy = DeliverPolicy::ByStartTime { timestamp: ts };
452 let json = serde_json::to_string(&policy).unwrap();
453 assert!(json.contains(&format!("\"timestamp\":{}", ts)));
454
455 let parsed: DeliverPolicy = serde_json::from_str(&json).unwrap();
456 assert_eq!(parsed, DeliverPolicy::ByStartTime { timestamp: ts });
457 }
458
459 #[test]
460 fn test_event_default_version() {
461 let event = Event::new(
462 "events.test.a",
463 "test",
464 "Test",
465 "test",
466 serde_json::json!({}),
467 );
468 assert_eq!(event.version, 1);
469 assert_eq!(event.event_type, "");
470 }
471
472 #[test]
473 fn test_event_typed() {
474 let event = Event::typed(
475 "events.market.forex",
476 "market",
477 "forex.rate_change",
478 2,
479 "USD/CNY rate change",
480 "reuters",
481 serde_json::json!({"rate": 7.35}),
482 );
483
484 assert!(event.id.starts_with("evt-"));
485 assert_eq!(event.event_type, "forex.rate_change");
486 assert_eq!(event.version, 2);
487 assert_eq!(event.category, "market");
488 }
489
490 #[test]
491 fn test_event_version_serialization() {
492 let event = Event::typed(
493 "events.test.a",
494 "test",
495 "test.created",
496 3,
497 "Test",
498 "test",
499 serde_json::json!({}),
500 );
501
502 let json = serde_json::to_string(&event).unwrap();
503 assert!(json.contains("\"eventType\":\"test.created\""));
504 assert!(json.contains("\"version\":3"));
505
506 let parsed: Event = serde_json::from_str(&json).unwrap();
507 assert_eq!(parsed.event_type, "test.created");
508 assert_eq!(parsed.version, 3);
509 }
510
511 #[test]
512 fn test_event_version_backward_compat() {
513 let json = r#"{
515 "id": "evt-123",
516 "subject": "events.test.a",
517 "category": "test",
518 "payload": {},
519 "summary": "Test",
520 "source": "test",
521 "timestamp": 1700000000000
522 }"#;
523
524 let event: Event = serde_json::from_str(json).unwrap();
525 assert_eq!(event.event_type, "");
526 assert_eq!(event.version, 1);
527 }
528}