Skip to main content

a3s_event/
types.rs

1//! Core event types for the a3s-event system
2//!
3//! All types use camelCase JSON serialization for wire compatibility.
4
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::future::Future;
8use std::pin::Pin;
9
10/// A pinned, boxed, Send future — replaces `futures::future::BoxFuture`
11pub(crate) type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
12
13/// A single event in the system
14///
15/// Events are published to subjects following the dot-separated convention:
16/// `events.<category>.<topic>` (e.g., `events.market.forex.usd_cny`)
17#[derive(Debug, Clone, Serialize, Deserialize)]
18#[serde(rename_all = "camelCase")]
19pub struct Event {
20    /// Unique event identifier (evt-<uuid>)
21    pub id: String,
22
23    /// Subject this event was published to
24    pub subject: String,
25
26    /// Top-level category for grouping (e.g., "market", "system")
27    pub category: String,
28
29    /// Event type identifier (e.g., "forex.rate_change", "deploy.completed")
30    ///
31    /// Used by schema registry to look up validation rules.
32    /// Defaults to empty string for untyped events.
33    #[serde(default)]
34    pub event_type: String,
35
36    /// Schema version for this event type (e.g., 1, 2, 3)
37    ///
38    /// Incremented when the payload schema changes.
39    /// Defaults to 1 for new events.
40    #[serde(default = "default_version")]
41    pub version: u32,
42
43    /// Event payload — arbitrary JSON data
44    pub payload: serde_json::Value,
45
46    /// Human-readable summary
47    pub summary: String,
48
49    /// Source system or service that produced this event
50    pub source: String,
51
52    /// Unix timestamp in milliseconds
53    pub timestamp: u64,
54
55    /// Optional key-value metadata
56    #[serde(default)]
57    pub metadata: HashMap<String, String>,
58}
59
60fn default_version() -> u32 {
61    1
62}
63
64impl Event {
65    /// Create a new event with auto-generated id and timestamp
66    pub fn new(
67        subject: impl Into<String>,
68        category: impl Into<String>,
69        summary: impl Into<String>,
70        source: impl Into<String>,
71        payload: serde_json::Value,
72    ) -> Self {
73        Self {
74            id: format!("evt-{}", uuid::Uuid::new_v4()),
75            subject: subject.into(),
76            category: category.into(),
77            event_type: String::new(),
78            version: 1,
79            payload,
80            summary: summary.into(),
81            source: source.into(),
82            timestamp: now_millis(),
83            metadata: HashMap::new(),
84        }
85    }
86
87    /// Create a typed event with explicit event_type and version
88    pub fn typed(
89        subject: impl Into<String>,
90        category: impl Into<String>,
91        event_type: impl Into<String>,
92        version: u32,
93        summary: impl Into<String>,
94        source: impl Into<String>,
95        payload: serde_json::Value,
96    ) -> Self {
97        Self {
98            id: format!("evt-{}", uuid::Uuid::new_v4()),
99            subject: subject.into(),
100            category: category.into(),
101            event_type: event_type.into(),
102            version,
103            payload,
104            summary: summary.into(),
105            source: source.into(),
106            timestamp: now_millis(),
107            metadata: HashMap::new(),
108        }
109    }
110
111    /// Add a metadata entry
112    pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
113        self.metadata.insert(key.into(), value.into());
114        self
115    }
116}
117
118/// A received event with delivery context
119#[derive(Debug, Clone)]
120pub struct ReceivedEvent {
121    /// The event data
122    pub event: Event,
123
124    /// Provider-assigned sequence number
125    pub sequence: u64,
126
127    /// Number of delivery attempts
128    pub num_delivered: u64,
129
130    /// Stream/topic name
131    pub stream: String,
132}
133
134/// Subscription filter for creating consumers
135#[derive(Debug, Clone, Serialize, Deserialize)]
136#[serde(rename_all = "camelCase")]
137pub struct SubscriptionFilter {
138    /// Subscriber identifier (e.g., persona id)
139    pub subscriber_id: String,
140
141    /// Subject filter patterns (e.g., ["events.market.>", "events.system.>"])
142    pub subjects: Vec<String>,
143
144    /// Whether this is a durable subscription (survives reconnects)
145    pub durable: bool,
146
147    /// Provider-specific subscription options (optional)
148    #[serde(default, skip_serializing_if = "Option::is_none")]
149    pub options: Option<SubscribeOptions>,
150}
151
152/// Event counts grouped by category
153#[derive(Debug, Clone, Default, Serialize, Deserialize)]
154#[serde(rename_all = "camelCase")]
155pub struct EventCounts {
156    /// Counts per category
157    pub categories: HashMap<String, u64>,
158
159    /// Total event count
160    pub total: u64,
161}
162
163/// Delivery policy for subscriptions
164///
165/// Controls where a new consumer starts reading from the stream.
166/// Maps to provider-native delivery policies (e.g., NATS `DeliverPolicy`).
167#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
168#[serde(rename_all = "camelCase", tag = "type")]
169pub enum DeliverPolicy {
170    /// Deliver all available messages
171    #[default]
172    All,
173    /// Deliver starting from the last message
174    Last,
175    /// Deliver only new messages published after subscription
176    New,
177    /// Deliver starting from a specific sequence number
178    ByStartSequence { sequence: u64 },
179    /// Deliver starting from a specific timestamp (Unix milliseconds)
180    ByStartTime { timestamp: u64 },
181    /// Deliver the last message per subject
182    LastPerSubject,
183}
184
185/// Options for publishing events
186///
187/// Exposes provider-native publish capabilities. Unsupported options
188/// are ignored by providers that don't support them.
189#[derive(Debug, Clone, Default, Serialize, Deserialize)]
190#[serde(rename_all = "camelCase")]
191pub struct PublishOptions {
192    /// Deduplication message ID (NATS: `Nats-Msg-Id` header)
193    ///
194    /// If set, the provider uses this to deduplicate messages within
195    /// its deduplication window.
196    #[serde(default, skip_serializing_if = "Option::is_none")]
197    pub msg_id: Option<String>,
198
199    /// Expected last sequence number (optimistic concurrency)
200    ///
201    /// Publish fails if the stream's last sequence doesn't match.
202    /// NATS: `Nats-Expected-Last-Sequence` header.
203    #[serde(default, skip_serializing_if = "Option::is_none")]
204    pub expected_sequence: Option<u64>,
205
206    /// Publish timeout in seconds (overrides provider default)
207    #[serde(default, skip_serializing_if = "Option::is_none")]
208    pub timeout_secs: Option<u64>,
209}
210
211/// Options for creating subscriptions
212///
213/// Exposes provider-native consumer capabilities. Unsupported options
214/// are ignored by providers that don't support them.
215#[derive(Debug, Clone, Default, Serialize, Deserialize)]
216#[serde(rename_all = "camelCase")]
217pub struct SubscribeOptions {
218    /// Maximum delivery attempts before giving up (NATS: `MaxDeliver`)
219    ///
220    /// After this many failed deliveries, the message is dropped or
221    /// routed to a dead letter queue (if configured).
222    #[serde(default, skip_serializing_if = "Option::is_none")]
223    pub max_deliver: Option<i64>,
224
225    /// Backoff intervals in seconds between redelivery attempts
226    ///
227    /// NATS: maps to consumer `BackOff` durations.
228    /// Example: `vec![1, 5, 30]` — retry after 1s, 5s, 30s.
229    #[serde(default, skip_serializing_if = "Vec::is_empty")]
230    pub backoff_secs: Vec<u64>,
231
232    /// Maximum number of unacknowledged messages in flight
233    ///
234    /// Provides backpressure — consumer won't receive new messages
235    /// until pending acks drop below this limit.
236    /// NATS: `MaxAckPending`.
237    #[serde(default, skip_serializing_if = "Option::is_none")]
238    pub max_ack_pending: Option<i64>,
239
240    /// Where to start consuming from
241    #[serde(default)]
242    pub deliver_policy: DeliverPolicy,
243
244    /// How long to wait for an ack before redelivery (seconds)
245    ///
246    /// NATS: `AckWait`. Default depends on provider.
247    #[serde(default, skip_serializing_if = "Option::is_none")]
248    pub ack_wait_secs: Option<u64>,
249}
250
251/// Current time in Unix milliseconds
252pub(crate) fn now_millis() -> u64 {
253    std::time::SystemTime::now()
254        .duration_since(std::time::UNIX_EPOCH)
255        .unwrap_or_default()
256        .as_millis() as u64
257}
258
259#[cfg(test)]
260mod tests {
261    use super::*;
262
263    #[test]
264    fn test_event_creation() {
265        let event = Event::new(
266            "events.market.forex",
267            "market",
268            "USD/CNY rate change",
269            "reuters",
270            serde_json::json!({"rate": 7.35}),
271        );
272
273        assert!(event.id.starts_with("evt-"));
274        assert_eq!(event.subject, "events.market.forex");
275        assert_eq!(event.category, "market");
276        assert_eq!(event.source, "reuters");
277        assert!(event.timestamp > 0);
278        assert!(event.metadata.is_empty());
279    }
280
281    #[test]
282    fn test_event_with_metadata() {
283        let event = Event::new(
284            "events.system.deploy",
285            "system",
286            "Deployed v1.2",
287            "ci",
288            serde_json::json!({}),
289        )
290        .with_metadata("env", "production")
291        .with_metadata("version", "1.2.0");
292
293        assert_eq!(event.metadata.len(), 2);
294        assert_eq!(event.metadata["env"], "production");
295        assert_eq!(event.metadata["version"], "1.2.0");
296    }
297
298    #[test]
299    fn test_event_serialization_roundtrip() {
300        let event = Event::new(
301            "events.market.forex",
302            "market",
303            "Rate change",
304            "reuters",
305            serde_json::json!({"rate": 7.35}),
306        )
307        .with_metadata("region", "asia");
308
309        let json = serde_json::to_string(&event).unwrap();
310        assert!(json.contains("\"subject\":\"events.market.forex\""));
311        assert!(json.contains("\"category\":\"market\""));
312
313        let parsed: Event = serde_json::from_str(&json).unwrap();
314        assert_eq!(parsed.id, event.id);
315        assert_eq!(parsed.subject, event.subject);
316        assert_eq!(parsed.metadata["region"], "asia");
317    }
318
319    #[test]
320    fn test_event_counts_default() {
321        let counts = EventCounts::default();
322        assert_eq!(counts.total, 0);
323        assert!(counts.categories.is_empty());
324    }
325
326    #[test]
327    fn test_subscription_filter_serialization() {
328        let filter = SubscriptionFilter {
329            subscriber_id: "financial-analyst".to_string(),
330            subjects: vec!["events.market.>".to_string()],
331            durable: true,
332            options: None,
333        };
334
335        let json = serde_json::to_string(&filter).unwrap();
336        assert!(json.contains("\"subscriberId\":\"financial-analyst\""));
337        assert!(json.contains("\"durable\":true"));
338
339        let parsed: SubscriptionFilter = serde_json::from_str(&json).unwrap();
340        assert_eq!(parsed.subscriber_id, "financial-analyst");
341        assert!(parsed.durable);
342    }
343
344    #[test]
345    fn test_publish_options_default() {
346        let opts = PublishOptions::default();
347        assert!(opts.msg_id.is_none());
348        assert!(opts.expected_sequence.is_none());
349        assert!(opts.timeout_secs.is_none());
350    }
351
352    #[test]
353    fn test_publish_options_serialization() {
354        let opts = PublishOptions {
355            msg_id: Some("dedup-123".to_string()),
356            expected_sequence: Some(42),
357            timeout_secs: Some(5),
358        };
359
360        let json = serde_json::to_string(&opts).unwrap();
361        assert!(json.contains("\"msgId\":\"dedup-123\""));
362        assert!(json.contains("\"expectedSequence\":42"));
363        assert!(json.contains("\"timeoutSecs\":5"));
364
365        let parsed: PublishOptions = serde_json::from_str(&json).unwrap();
366        assert_eq!(parsed.msg_id.unwrap(), "dedup-123");
367        assert_eq!(parsed.expected_sequence.unwrap(), 42);
368    }
369
370    #[test]
371    fn test_publish_options_skip_none_fields() {
372        let opts = PublishOptions::default();
373        let json = serde_json::to_string(&opts).unwrap();
374        assert!(!json.contains("msgId"));
375        assert!(!json.contains("expectedSequence"));
376        assert!(!json.contains("timeoutSecs"));
377    }
378
379    #[test]
380    fn test_subscribe_options_default() {
381        let opts = SubscribeOptions::default();
382        assert!(opts.max_deliver.is_none());
383        assert!(opts.backoff_secs.is_empty());
384        assert!(opts.max_ack_pending.is_none());
385        assert_eq!(opts.deliver_policy, DeliverPolicy::All);
386        assert!(opts.ack_wait_secs.is_none());
387    }
388
389    #[test]
390    fn test_subscribe_options_serialization() {
391        let opts = SubscribeOptions {
392            max_deliver: Some(5),
393            backoff_secs: vec![1, 5, 30],
394            max_ack_pending: Some(1000),
395            deliver_policy: DeliverPolicy::New,
396            ack_wait_secs: Some(30),
397        };
398
399        let json = serde_json::to_string(&opts).unwrap();
400        assert!(json.contains("\"maxDeliver\":5"));
401        assert!(json.contains("\"backoffSecs\":[1,5,30]"));
402        assert!(json.contains("\"maxAckPending\":1000"));
403        assert!(json.contains("\"ackWaitSecs\":30"));
404
405        let parsed: SubscribeOptions = serde_json::from_str(&json).unwrap();
406        assert_eq!(parsed.max_deliver.unwrap(), 5);
407        assert_eq!(parsed.backoff_secs, vec![1, 5, 30]);
408        assert_eq!(parsed.max_ack_pending.unwrap(), 1000);
409        assert_eq!(parsed.deliver_policy, DeliverPolicy::New);
410    }
411
412    #[test]
413    fn test_subscribe_options_skip_empty_fields() {
414        let opts = SubscribeOptions::default();
415        let json = serde_json::to_string(&opts).unwrap();
416        assert!(!json.contains("maxDeliver"));
417        assert!(!json.contains("backoffSecs"));
418        assert!(!json.contains("maxAckPending"));
419        assert!(!json.contains("ackWaitSecs"));
420    }
421
422    #[test]
423    fn test_deliver_policy_variants() {
424        let cases = vec![
425            (DeliverPolicy::All, "All"),
426            (DeliverPolicy::Last, "Last"),
427            (DeliverPolicy::New, "New"),
428            (DeliverPolicy::LastPerSubject, "LastPerSubject"),
429        ];
430
431        for (policy, _) in &cases {
432            let json = serde_json::to_string(policy).unwrap();
433            let parsed: DeliverPolicy = serde_json::from_str(&json).unwrap();
434            assert_eq!(&parsed, policy);
435        }
436    }
437
438    #[test]
439    fn test_deliver_policy_by_start_sequence() {
440        let policy = DeliverPolicy::ByStartSequence { sequence: 100 };
441        let json = serde_json::to_string(&policy).unwrap();
442        assert!(json.contains("\"sequence\":100"));
443
444        let parsed: DeliverPolicy = serde_json::from_str(&json).unwrap();
445        assert_eq!(parsed, DeliverPolicy::ByStartSequence { sequence: 100 });
446    }
447
448    #[test]
449    fn test_deliver_policy_by_start_time() {
450        let ts = 1700000000000u64;
451        let policy = DeliverPolicy::ByStartTime { timestamp: ts };
452        let json = serde_json::to_string(&policy).unwrap();
453        assert!(json.contains(&format!("\"timestamp\":{}", ts)));
454
455        let parsed: DeliverPolicy = serde_json::from_str(&json).unwrap();
456        assert_eq!(parsed, DeliverPolicy::ByStartTime { timestamp: ts });
457    }
458
459    #[test]
460    fn test_event_default_version() {
461        let event = Event::new(
462            "events.test.a",
463            "test",
464            "Test",
465            "test",
466            serde_json::json!({}),
467        );
468        assert_eq!(event.version, 1);
469        assert_eq!(event.event_type, "");
470    }
471
472    #[test]
473    fn test_event_typed() {
474        let event = Event::typed(
475            "events.market.forex",
476            "market",
477            "forex.rate_change",
478            2,
479            "USD/CNY rate change",
480            "reuters",
481            serde_json::json!({"rate": 7.35}),
482        );
483
484        assert!(event.id.starts_with("evt-"));
485        assert_eq!(event.event_type, "forex.rate_change");
486        assert_eq!(event.version, 2);
487        assert_eq!(event.category, "market");
488    }
489
490    #[test]
491    fn test_event_version_serialization() {
492        let event = Event::typed(
493            "events.test.a",
494            "test",
495            "test.created",
496            3,
497            "Test",
498            "test",
499            serde_json::json!({}),
500        );
501
502        let json = serde_json::to_string(&event).unwrap();
503        assert!(json.contains("\"eventType\":\"test.created\""));
504        assert!(json.contains("\"version\":3"));
505
506        let parsed: Event = serde_json::from_str(&json).unwrap();
507        assert_eq!(parsed.event_type, "test.created");
508        assert_eq!(parsed.version, 3);
509    }
510
511    #[test]
512    fn test_event_version_backward_compat() {
513        // Old events without event_type/version should deserialize with defaults
514        let json = r#"{
515            "id": "evt-123",
516            "subject": "events.test.a",
517            "category": "test",
518            "payload": {},
519            "summary": "Test",
520            "source": "test",
521            "timestamp": 1700000000000
522        }"#;
523
524        let event: Event = serde_json::from_str(json).unwrap();
525        assert_eq!(event.event_type, "");
526        assert_eq!(event.version, 1);
527    }
528}