Skip to main content

a3s_event/
types.rs

1//! Core event types for the a3s-event system
2//!
3//! All types use camelCase JSON serialization for wire compatibility.
4
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7
8/// A single event in the system
9///
10/// Events are published to subjects following the dot-separated convention:
11/// `events.<category>.<topic>` (e.g., `events.market.forex.usd_cny`)
12#[derive(Debug, Clone, Serialize, Deserialize)]
13#[serde(rename_all = "camelCase")]
14pub struct Event {
15    /// Unique event identifier (evt-<uuid>)
16    pub id: String,
17
18    /// Subject this event was published to
19    pub subject: String,
20
21    /// Top-level category for grouping (e.g., "market", "system")
22    pub category: String,
23
24    /// Event type identifier (e.g., "forex.rate_change", "deploy.completed")
25    ///
26    /// Used by schema registry to look up validation rules.
27    /// Defaults to empty string for untyped events.
28    #[serde(default)]
29    pub event_type: String,
30
31    /// Schema version for this event type (e.g., 1, 2, 3)
32    ///
33    /// Incremented when the payload schema changes.
34    /// Defaults to 1 for new events.
35    #[serde(default = "default_version")]
36    pub version: u32,
37
38    /// Event payload — arbitrary JSON data
39    pub payload: serde_json::Value,
40
41    /// Human-readable summary
42    pub summary: String,
43
44    /// Source system or service that produced this event
45    pub source: String,
46
47    /// Unix timestamp in milliseconds
48    pub timestamp: u64,
49
50    /// Optional key-value metadata
51    #[serde(default)]
52    pub metadata: HashMap<String, String>,
53}
54
55fn default_version() -> u32 {
56    1
57}
58
59impl Event {
60    /// Create a new event with auto-generated id and timestamp
61    pub fn new(
62        subject: impl Into<String>,
63        category: impl Into<String>,
64        summary: impl Into<String>,
65        source: impl Into<String>,
66        payload: serde_json::Value,
67    ) -> Self {
68        Self {
69            id: format!("evt-{}", uuid::Uuid::new_v4()),
70            subject: subject.into(),
71            category: category.into(),
72            event_type: String::new(),
73            version: 1,
74            payload,
75            summary: summary.into(),
76            source: source.into(),
77            timestamp: now_millis(),
78            metadata: HashMap::new(),
79        }
80    }
81
82    /// Create a typed event with explicit event_type and version
83    pub fn typed(
84        subject: impl Into<String>,
85        category: impl Into<String>,
86        event_type: impl Into<String>,
87        version: u32,
88        summary: impl Into<String>,
89        source: impl Into<String>,
90        payload: serde_json::Value,
91    ) -> Self {
92        Self {
93            id: format!("evt-{}", uuid::Uuid::new_v4()),
94            subject: subject.into(),
95            category: category.into(),
96            event_type: event_type.into(),
97            version,
98            payload,
99            summary: summary.into(),
100            source: source.into(),
101            timestamp: now_millis(),
102            metadata: HashMap::new(),
103        }
104    }
105
106    /// Add a metadata entry
107    pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
108        self.metadata.insert(key.into(), value.into());
109        self
110    }
111}
112
113/// A received event with delivery context
114#[derive(Debug, Clone)]
115pub struct ReceivedEvent {
116    /// The event data
117    pub event: Event,
118
119    /// Provider-assigned sequence number
120    pub sequence: u64,
121
122    /// Number of delivery attempts
123    pub num_delivered: u64,
124
125    /// Stream/topic name
126    pub stream: String,
127}
128
129/// Subscription filter for creating consumers
130#[derive(Debug, Clone, Serialize, Deserialize)]
131#[serde(rename_all = "camelCase")]
132pub struct SubscriptionFilter {
133    /// Subscriber identifier (e.g., persona id)
134    pub subscriber_id: String,
135
136    /// Subject filter patterns (e.g., ["events.market.>", "events.system.>"])
137    pub subjects: Vec<String>,
138
139    /// Whether this is a durable subscription (survives reconnects)
140    pub durable: bool,
141
142    /// Provider-specific subscription options (optional)
143    #[serde(default, skip_serializing_if = "Option::is_none")]
144    pub options: Option<SubscribeOptions>,
145}
146
147/// Event counts grouped by category
148#[derive(Debug, Clone, Default, Serialize, Deserialize)]
149#[serde(rename_all = "camelCase")]
150pub struct EventCounts {
151    /// Counts per category
152    pub categories: HashMap<String, u64>,
153
154    /// Total event count
155    pub total: u64,
156}
157
158/// Delivery policy for subscriptions
159///
160/// Controls where a new consumer starts reading from the stream.
161/// Maps to provider-native delivery policies (e.g., NATS `DeliverPolicy`).
162#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
163#[serde(rename_all = "camelCase", tag = "type")]
164pub enum DeliverPolicy {
165    /// Deliver all available messages
166    #[default]
167    All,
168    /// Deliver starting from the last message
169    Last,
170    /// Deliver only new messages published after subscription
171    New,
172    /// Deliver starting from a specific sequence number
173    ByStartSequence { sequence: u64 },
174    /// Deliver starting from a specific timestamp (Unix milliseconds)
175    ByStartTime { timestamp: u64 },
176    /// Deliver the last message per subject
177    LastPerSubject,
178}
179
180/// Options for publishing events
181///
182/// Exposes provider-native publish capabilities. Unsupported options
183/// are ignored by providers that don't support them.
184#[derive(Debug, Clone, Default, Serialize, Deserialize)]
185#[serde(rename_all = "camelCase")]
186pub struct PublishOptions {
187    /// Deduplication message ID (NATS: `Nats-Msg-Id` header)
188    ///
189    /// If set, the provider uses this to deduplicate messages within
190    /// its deduplication window.
191    #[serde(default, skip_serializing_if = "Option::is_none")]
192    pub msg_id: Option<String>,
193
194    /// Expected last sequence number (optimistic concurrency)
195    ///
196    /// Publish fails if the stream's last sequence doesn't match.
197    /// NATS: `Nats-Expected-Last-Sequence` header.
198    #[serde(default, skip_serializing_if = "Option::is_none")]
199    pub expected_sequence: Option<u64>,
200
201    /// Publish timeout in seconds (overrides provider default)
202    #[serde(default, skip_serializing_if = "Option::is_none")]
203    pub timeout_secs: Option<u64>,
204}
205
206/// Options for creating subscriptions
207///
208/// Exposes provider-native consumer capabilities. Unsupported options
209/// are ignored by providers that don't support them.
210#[derive(Debug, Clone, Default, Serialize, Deserialize)]
211#[serde(rename_all = "camelCase")]
212pub struct SubscribeOptions {
213    /// Maximum delivery attempts before giving up (NATS: `MaxDeliver`)
214    ///
215    /// After this many failed deliveries, the message is dropped or
216    /// routed to a dead letter queue (if configured).
217    #[serde(default, skip_serializing_if = "Option::is_none")]
218    pub max_deliver: Option<i64>,
219
220    /// Backoff intervals in seconds between redelivery attempts
221    ///
222    /// NATS: maps to consumer `BackOff` durations.
223    /// Example: `vec![1, 5, 30]` — retry after 1s, 5s, 30s.
224    #[serde(default, skip_serializing_if = "Vec::is_empty")]
225    pub backoff_secs: Vec<u64>,
226
227    /// Maximum number of unacknowledged messages in flight
228    ///
229    /// Provides backpressure — consumer won't receive new messages
230    /// until pending acks drop below this limit.
231    /// NATS: `MaxAckPending`.
232    #[serde(default, skip_serializing_if = "Option::is_none")]
233    pub max_ack_pending: Option<i64>,
234
235    /// Where to start consuming from
236    #[serde(default)]
237    pub deliver_policy: DeliverPolicy,
238
239    /// How long to wait for an ack before redelivery (seconds)
240    ///
241    /// NATS: `AckWait`. Default depends on provider.
242    #[serde(default, skip_serializing_if = "Option::is_none")]
243    pub ack_wait_secs: Option<u64>,
244}
245
246/// Current time in Unix milliseconds
247fn now_millis() -> u64 {
248    std::time::SystemTime::now()
249        .duration_since(std::time::UNIX_EPOCH)
250        .unwrap_or_default()
251        .as_millis() as u64
252}
253
254#[cfg(test)]
255mod tests {
256    use super::*;
257
258    #[test]
259    fn test_event_creation() {
260        let event = Event::new(
261            "events.market.forex",
262            "market",
263            "USD/CNY rate change",
264            "reuters",
265            serde_json::json!({"rate": 7.35}),
266        );
267
268        assert!(event.id.starts_with("evt-"));
269        assert_eq!(event.subject, "events.market.forex");
270        assert_eq!(event.category, "market");
271        assert_eq!(event.source, "reuters");
272        assert!(event.timestamp > 0);
273        assert!(event.metadata.is_empty());
274    }
275
276    #[test]
277    fn test_event_with_metadata() {
278        let event = Event::new(
279            "events.system.deploy",
280            "system",
281            "Deployed v1.2",
282            "ci",
283            serde_json::json!({}),
284        )
285        .with_metadata("env", "production")
286        .with_metadata("version", "1.2.0");
287
288        assert_eq!(event.metadata.len(), 2);
289        assert_eq!(event.metadata["env"], "production");
290        assert_eq!(event.metadata["version"], "1.2.0");
291    }
292
293    #[test]
294    fn test_event_serialization_roundtrip() {
295        let event = Event::new(
296            "events.market.forex",
297            "market",
298            "Rate change",
299            "reuters",
300            serde_json::json!({"rate": 7.35}),
301        )
302        .with_metadata("region", "asia");
303
304        let json = serde_json::to_string(&event).unwrap();
305        assert!(json.contains("\"subject\":\"events.market.forex\""));
306        assert!(json.contains("\"category\":\"market\""));
307
308        let parsed: Event = serde_json::from_str(&json).unwrap();
309        assert_eq!(parsed.id, event.id);
310        assert_eq!(parsed.subject, event.subject);
311        assert_eq!(parsed.metadata["region"], "asia");
312    }
313
314    #[test]
315    fn test_event_counts_default() {
316        let counts = EventCounts::default();
317        assert_eq!(counts.total, 0);
318        assert!(counts.categories.is_empty());
319    }
320
321    #[test]
322    fn test_subscription_filter_serialization() {
323        let filter = SubscriptionFilter {
324            subscriber_id: "financial-analyst".to_string(),
325            subjects: vec!["events.market.>".to_string()],
326            durable: true,
327            options: None,
328        };
329
330        let json = serde_json::to_string(&filter).unwrap();
331        assert!(json.contains("\"subscriberId\":\"financial-analyst\""));
332        assert!(json.contains("\"durable\":true"));
333
334        let parsed: SubscriptionFilter = serde_json::from_str(&json).unwrap();
335        assert_eq!(parsed.subscriber_id, "financial-analyst");
336        assert!(parsed.durable);
337    }
338
339    #[test]
340    fn test_publish_options_default() {
341        let opts = PublishOptions::default();
342        assert!(opts.msg_id.is_none());
343        assert!(opts.expected_sequence.is_none());
344        assert!(opts.timeout_secs.is_none());
345    }
346
347    #[test]
348    fn test_publish_options_serialization() {
349        let opts = PublishOptions {
350            msg_id: Some("dedup-123".to_string()),
351            expected_sequence: Some(42),
352            timeout_secs: Some(5),
353        };
354
355        let json = serde_json::to_string(&opts).unwrap();
356        assert!(json.contains("\"msgId\":\"dedup-123\""));
357        assert!(json.contains("\"expectedSequence\":42"));
358        assert!(json.contains("\"timeoutSecs\":5"));
359
360        let parsed: PublishOptions = serde_json::from_str(&json).unwrap();
361        assert_eq!(parsed.msg_id.unwrap(), "dedup-123");
362        assert_eq!(parsed.expected_sequence.unwrap(), 42);
363    }
364
365    #[test]
366    fn test_publish_options_skip_none_fields() {
367        let opts = PublishOptions::default();
368        let json = serde_json::to_string(&opts).unwrap();
369        assert!(!json.contains("msgId"));
370        assert!(!json.contains("expectedSequence"));
371        assert!(!json.contains("timeoutSecs"));
372    }
373
374    #[test]
375    fn test_subscribe_options_default() {
376        let opts = SubscribeOptions::default();
377        assert!(opts.max_deliver.is_none());
378        assert!(opts.backoff_secs.is_empty());
379        assert!(opts.max_ack_pending.is_none());
380        assert_eq!(opts.deliver_policy, DeliverPolicy::All);
381        assert!(opts.ack_wait_secs.is_none());
382    }
383
384    #[test]
385    fn test_subscribe_options_serialization() {
386        let opts = SubscribeOptions {
387            max_deliver: Some(5),
388            backoff_secs: vec![1, 5, 30],
389            max_ack_pending: Some(1000),
390            deliver_policy: DeliverPolicy::New,
391            ack_wait_secs: Some(30),
392        };
393
394        let json = serde_json::to_string(&opts).unwrap();
395        assert!(json.contains("\"maxDeliver\":5"));
396        assert!(json.contains("\"backoffSecs\":[1,5,30]"));
397        assert!(json.contains("\"maxAckPending\":1000"));
398        assert!(json.contains("\"ackWaitSecs\":30"));
399
400        let parsed: SubscribeOptions = serde_json::from_str(&json).unwrap();
401        assert_eq!(parsed.max_deliver.unwrap(), 5);
402        assert_eq!(parsed.backoff_secs, vec![1, 5, 30]);
403        assert_eq!(parsed.max_ack_pending.unwrap(), 1000);
404        assert_eq!(parsed.deliver_policy, DeliverPolicy::New);
405    }
406
407    #[test]
408    fn test_subscribe_options_skip_empty_fields() {
409        let opts = SubscribeOptions::default();
410        let json = serde_json::to_string(&opts).unwrap();
411        assert!(!json.contains("maxDeliver"));
412        assert!(!json.contains("backoffSecs"));
413        assert!(!json.contains("maxAckPending"));
414        assert!(!json.contains("ackWaitSecs"));
415    }
416
417    #[test]
418    fn test_deliver_policy_variants() {
419        let cases = vec![
420            (DeliverPolicy::All, "All"),
421            (DeliverPolicy::Last, "Last"),
422            (DeliverPolicy::New, "New"),
423            (DeliverPolicy::LastPerSubject, "LastPerSubject"),
424        ];
425
426        for (policy, _) in &cases {
427            let json = serde_json::to_string(policy).unwrap();
428            let parsed: DeliverPolicy = serde_json::from_str(&json).unwrap();
429            assert_eq!(&parsed, policy);
430        }
431    }
432
433    #[test]
434    fn test_deliver_policy_by_start_sequence() {
435        let policy = DeliverPolicy::ByStartSequence { sequence: 100 };
436        let json = serde_json::to_string(&policy).unwrap();
437        assert!(json.contains("\"sequence\":100"));
438
439        let parsed: DeliverPolicy = serde_json::from_str(&json).unwrap();
440        assert_eq!(parsed, DeliverPolicy::ByStartSequence { sequence: 100 });
441    }
442
443    #[test]
444    fn test_deliver_policy_by_start_time() {
445        let ts = 1700000000000u64;
446        let policy = DeliverPolicy::ByStartTime { timestamp: ts };
447        let json = serde_json::to_string(&policy).unwrap();
448        assert!(json.contains(&format!("\"timestamp\":{}", ts)));
449
450        let parsed: DeliverPolicy = serde_json::from_str(&json).unwrap();
451        assert_eq!(parsed, DeliverPolicy::ByStartTime { timestamp: ts });
452    }
453
454    #[test]
455    fn test_event_default_version() {
456        let event = Event::new(
457            "events.test.a",
458            "test",
459            "Test",
460            "test",
461            serde_json::json!({}),
462        );
463        assert_eq!(event.version, 1);
464        assert_eq!(event.event_type, "");
465    }
466
467    #[test]
468    fn test_event_typed() {
469        let event = Event::typed(
470            "events.market.forex",
471            "market",
472            "forex.rate_change",
473            2,
474            "USD/CNY rate change",
475            "reuters",
476            serde_json::json!({"rate": 7.35}),
477        );
478
479        assert!(event.id.starts_with("evt-"));
480        assert_eq!(event.event_type, "forex.rate_change");
481        assert_eq!(event.version, 2);
482        assert_eq!(event.category, "market");
483    }
484
485    #[test]
486    fn test_event_version_serialization() {
487        let event = Event::typed(
488            "events.test.a",
489            "test",
490            "test.created",
491            3,
492            "Test",
493            "test",
494            serde_json::json!({}),
495        );
496
497        let json = serde_json::to_string(&event).unwrap();
498        assert!(json.contains("\"eventType\":\"test.created\""));
499        assert!(json.contains("\"version\":3"));
500
501        let parsed: Event = serde_json::from_str(&json).unwrap();
502        assert_eq!(parsed.event_type, "test.created");
503        assert_eq!(parsed.version, 3);
504    }
505
506    #[test]
507    fn test_event_version_backward_compat() {
508        // Old events without event_type/version should deserialize with defaults
509        let json = r#"{
510            "id": "evt-123",
511            "subject": "events.test.a",
512            "category": "test",
513            "payload": {},
514            "summary": "Test",
515            "source": "test",
516            "timestamp": 1700000000000
517        }"#;
518
519        let event: Event = serde_json::from_str(json).unwrap();
520        assert_eq!(event.event_type, "");
521        assert_eq!(event.version, 1);
522    }
523}