Skip to main content

dynamo_runtime/transports/event_plane/
mod.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Generic Event Plane for transport-agnostic pub/sub communication.
5
6mod codec;
7mod dynamic_subscriber;
8mod frame;
9mod nats_transport;
10mod traits;
11mod transport;
12pub mod zmq_transport;
13
14pub use codec::{Codec, MsgpackCodec};
15pub use dynamic_subscriber::DynamicSubscriber;
16pub use frame::{FRAME_HEADER_SIZE, FRAME_VERSION, Frame, FrameError, FrameHeader};
17pub use traits::{EventEnvelope, EventStream, TypedEventStream};
18pub use transport::{EventTransportRx, EventTransportTx, WireStream};
19pub use zmq_transport::{ZmqPubTransport, ZmqSubTransport};
20
21// Re-export transport kind from discovery for convenience
22pub use crate::discovery::EventTransportKind;
23
24use std::num::NonZeroUsize;
25use std::sync::Arc;
26use std::sync::atomic::{AtomicU64, Ordering};
27use std::time::{SystemTime, UNIX_EPOCH};
28
29use anyhow::Result;
30use bytes::Bytes;
31use futures::{Stream, StreamExt};
32use lru::LruCache;
33use serde::Serialize;
34use serde::de::DeserializeOwned;
35use std::pin::Pin;
36use std::task::{Context, Poll};
37
38use crate::DistributedRuntime;
39use crate::component::{Component, Namespace};
40use crate::discovery::{
41    Discovery, DiscoveryInstance, DiscoveryQuery, DiscoverySpec, EventChannelQuery, EventTransport,
42};
43use crate::traits::DistributedRuntimeProvider;
44use crate::utils::ip_resolver::get_local_ip_for_advertise;
45
46/// Scope of the event plane - determines the subject prefix for pub/sub.
47#[derive(Debug, Clone)]
48pub enum EventScope {
49    /// Namespace-level scope: `namespace.{name}`
50    Namespace { name: String },
51    /// Component-level scope: `namespace.{namespace}.component.{component}`
52    Component {
53        namespace: String,
54        component: String,
55    },
56}
57
58impl EventScope {
59    /// Returns the subject prefix for this scope.
60    pub fn subject_prefix(&self) -> String {
61        match self {
62            EventScope::Namespace { name } => format!("namespace.{}", name),
63            EventScope::Component {
64                namespace,
65                component,
66            } => {
67                format!("namespace.{}.component.{}", namespace, component)
68            }
69        }
70    }
71
72    /// Get the namespace name
73    pub fn namespace(&self) -> &str {
74        match self {
75            EventScope::Namespace { name } => name,
76            EventScope::Component { namespace, .. } => namespace,
77        }
78    }
79
80    /// Get the component name (if component-scoped)
81    pub fn component(&self) -> Option<&str> {
82        match self {
83            EventScope::Namespace { .. } => None,
84            EventScope::Component { component, .. } => Some(component),
85        }
86    }
87}
88
89// ============================================================================
90// Broker Resolution Logic
91// ============================================================================
92
93/// Broker endpoints for ZMQ broker mode
94#[derive(Debug, Clone)]
95struct BrokerEndpoints {
96    xsub_endpoints: Vec<String>,
97    xpub_endpoints: Vec<String>,
98}
99
100/// Resolve ZMQ broker endpoints from environment or discovery
101/// Returns None if broker mode is not configured (direct mode)
102async fn resolve_zmq_broker(
103    drt: &DistributedRuntime,
104    scope: &EventScope,
105) -> Result<Option<BrokerEndpoints>> {
106    // Priority 1: Explicit URL from DYN_ZMQ_BROKER_URL
107    if let Ok(broker_url) =
108        std::env::var(crate::config::environment_names::zmq_broker::DYN_ZMQ_BROKER_URL)
109    {
110        let (xsub_endpoints, xpub_endpoints) = parse_broker_url(&broker_url)?;
111        tracing::info!(
112            num_xsub = xsub_endpoints.len(),
113            num_xpub = xpub_endpoints.len(),
114            "Using explicit ZMQ broker URL"
115        );
116        return Ok(Some(BrokerEndpoints {
117            xsub_endpoints,
118            xpub_endpoints,
119        }));
120    }
121
122    // Priority 2: Discovery-based lookup if DYN_ZMQ_BROKER_ENABLED=true
123    if std::env::var(crate::config::environment_names::zmq_broker::DYN_ZMQ_BROKER_ENABLED)
124        .unwrap_or_default()
125        == "true"
126    {
127        let query = DiscoveryQuery::EventChannels(EventChannelQuery::component(
128            scope.namespace().to_string(),
129            "zmq_broker".to_string(),
130        ));
131
132        let instances = drt.discovery().list(query).await?;
133
134        // Collect all broker instances (multiple brokers for HA)
135        let mut xsub_endpoints = Vec::new();
136        let mut xpub_endpoints = Vec::new();
137
138        for instance in instances {
139            if let DiscoveryInstance::EventChannel { transport, .. } = instance
140                && let EventTransport::ZmqBroker {
141                    xsub_endpoints: xsubs,
142                    xpub_endpoints: xpubs,
143                } = transport
144            {
145                xsub_endpoints.extend(xsubs);
146                xpub_endpoints.extend(xpubs);
147            }
148        }
149
150        if xsub_endpoints.is_empty() {
151            anyhow::bail!(
152                "DYN_ZMQ_BROKER_ENABLED=true but no broker found in discovery for namespace '{}'",
153                scope.namespace()
154            );
155        }
156
157        tracing::info!(
158            num_brokers = xsub_endpoints.len(),
159            "Discovered ZMQ brokers from discovery plane"
160        );
161
162        return Ok(Some(BrokerEndpoints {
163            xsub_endpoints,
164            xpub_endpoints,
165        }));
166    }
167
168    // No broker configured - use direct mode
169    Ok(None)
170}
171
172/// Parse broker URL format: "xsub=tcp://host1:5555;tcp://host2:5555 , xpub=tcp://host1:5556;tcp://host2:5556"
173fn parse_broker_url(url: &str) -> Result<(Vec<String>, Vec<String>)> {
174    let parts: Vec<&str> = url.split(',').map(|s| s.trim()).collect();
175    if parts.len() != 2 {
176        anyhow::bail!(
177            "Invalid broker URL format. Expected 'xsub=<urls> , xpub=<urls>', got: {}",
178            url
179        );
180    }
181
182    let mut xsub_endpoints = Vec::new();
183    let mut xpub_endpoints = Vec::new();
184
185    for part in parts {
186        if let Some(urls_str) = part.strip_prefix("xsub=") {
187            xsub_endpoints = urls_str
188                .split(';')
189                .map(|s| s.trim().to_string())
190                .filter(|s| !s.is_empty())
191                .collect();
192        } else if let Some(urls_str) = part.strip_prefix("xpub=") {
193            xpub_endpoints = urls_str
194                .split(';')
195                .map(|s| s.trim().to_string())
196                .filter(|s| !s.is_empty())
197                .collect();
198        } else {
199            anyhow::bail!(
200                "Invalid broker URL part. Expected 'xsub=' or 'xpub=' prefix, got: {}",
201                part
202            );
203        }
204    }
205
206    if xsub_endpoints.is_empty() || xpub_endpoints.is_empty() {
207        anyhow::bail!(
208            "Broker URL must contain at least one xsub and one xpub endpoint. Got xsub={:?}, xpub={:?}",
209            xsub_endpoints,
210            xpub_endpoints
211        );
212    }
213
214    Ok((xsub_endpoints, xpub_endpoints))
215}
216
217/// Deduplicates events based on (publisher_id, sequence) tuple
218/// Required when connecting to multiple brokers in HA mode
219struct DeduplicatingStream {
220    inner: WireStream,
221    codec: Arc<Codec>,
222    seen_events: LruCache<(u64, u64), ()>, // (publisher_id, sequence) -> ()
223}
224
225impl DeduplicatingStream {
226    fn new(inner: WireStream, codec: Arc<Codec>, cache_size: usize) -> Self {
227        Self {
228            inner,
229            codec,
230            seen_events: LruCache::new(
231                NonZeroUsize::new(cache_size).expect("cache_size must be non-zero"),
232            ),
233        }
234    }
235}
236
237impl Stream for DeduplicatingStream {
238    type Item = Result<Bytes>;
239
240    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
241        loop {
242            match Pin::new(&mut self.inner).poll_next(cx) {
243                Poll::Ready(Some(Ok(bytes))) => {
244                    // Decode envelope to extract publisher_id and sequence
245                    match self.codec.decode_envelope(&bytes) {
246                        Ok(envelope) => {
247                            let key = (envelope.publisher_id, envelope.sequence);
248
249                            // Check if we've seen this event before
250                            if self.seen_events.contains(&key) {
251                                // Duplicate - skip and continue loop
252                                tracing::debug!(
253                                    publisher_id = envelope.publisher_id,
254                                    sequence = envelope.sequence,
255                                    "Filtered duplicate event from multi-broker setup"
256                                );
257                                continue;
258                            }
259
260                            // New event - record and return
261                            self.seen_events.put(key, ());
262                            return Poll::Ready(Some(Ok(bytes)));
263                        }
264                        Err(e) => {
265                            tracing::warn!(error = %e, "Failed to decode envelope for deduplication");
266                            return Poll::Ready(Some(Err(e)));
267                        }
268                    }
269                }
270                Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
271                Poll::Ready(None) => return Poll::Ready(None),
272                Poll::Pending => return Poll::Pending,
273            }
274        }
275    }
276}
277
278/// Event publisher for a specific topic.
279pub struct EventPublisher {
280    transport_kind: EventTransportKind,
281    scope: EventScope,
282    topic: String,
283    publisher_id: u64,
284    sequence: AtomicU64,
285    tx: Arc<dyn EventTransportTx>,
286    codec: Arc<Codec>,
287    /// Discovery client and registered instance for unregistration on drop
288    discovery_client: Option<Arc<dyn Discovery>>,
289    discovery_instance: Option<crate::discovery::DiscoveryInstance>,
290}
291
292impl EventPublisher {
293    /// Create a publisher for a component-scoped topic.
294    pub async fn for_component(comp: &Component, topic: impl Into<String>) -> Result<Self> {
295        Self::for_component_with_transport(comp, topic, EventTransportKind::from_env_or_default())
296            .await
297    }
298
299    /// Create a publisher with explicit transport.
300    pub async fn for_component_with_transport(
301        comp: &Component,
302        topic: impl Into<String>,
303        transport_kind: EventTransportKind,
304    ) -> Result<Self> {
305        let drt = comp.drt();
306        let scope = EventScope::Component {
307            namespace: comp.namespace().name(),
308            component: comp.name().to_string(),
309        };
310        Self::new_internal(drt, scope, topic.into(), transport_kind).await
311    }
312
313    /// Create a publisher for a namespace-scoped topic.
314    pub async fn for_namespace(ns: &Namespace, topic: impl Into<String>) -> Result<Self> {
315        Self::for_namespace_with_transport(ns, topic, EventTransportKind::from_env_or_default())
316            .await
317    }
318
319    /// Create a namespace publisher with explicit transport.
320    pub async fn for_namespace_with_transport(
321        ns: &Namespace,
322        topic: impl Into<String>,
323        transport_kind: EventTransportKind,
324    ) -> Result<Self> {
325        let drt = ns.drt();
326        let scope = EventScope::Namespace { name: ns.name() };
327        Self::new_internal(drt, scope, topic.into(), transport_kind).await
328    }
329
330    async fn new_internal(
331        drt: &DistributedRuntime,
332        scope: EventScope,
333        topic: String,
334        transport_kind: EventTransportKind,
335    ) -> Result<Self> {
336        let publisher_id = drt.discovery().instance_id();
337        let discovery = Some(drt.discovery());
338
339        // Use Msgpack codec for all transports
340        enum TransportSetup {
341            Nats(Arc<dyn EventTransportTx>, Arc<Codec>),
342            ZmqDirect(Arc<dyn EventTransportTx>, Arc<Codec>, String), // includes public endpoint
343            ZmqBroker(Arc<dyn EventTransportTx>, Arc<Codec>),
344        }
345
346        let transport_setup = match transport_kind {
347            EventTransportKind::Nats => {
348                let transport = Arc::new(nats_transport::NatsTransport::new(drt.clone()));
349                let codec = Arc::new(Codec::Msgpack(MsgpackCodec));
350                TransportSetup::Nats(transport as Arc<dyn EventTransportTx>, codec)
351            }
352            EventTransportKind::Zmq => {
353                // Check for broker mode
354                if let Some(broker) = resolve_zmq_broker(drt, &scope).await? {
355                    // BROKER MODE: Connect to broker (single or multiple endpoints)
356                    let pub_transport = if broker.xsub_endpoints.len() == 1 {
357                        zmq_transport::ZmqPubTransport::connect(&broker.xsub_endpoints[0], &topic)
358                            .await?
359                    } else {
360                        zmq_transport::ZmqPubTransport::connect_multiple(
361                            &broker.xsub_endpoints,
362                            &topic,
363                        )
364                        .await?
365                    };
366
367                    let codec = Arc::new(Codec::Msgpack(MsgpackCodec));
368                    TransportSetup::ZmqBroker(
369                        Arc::new(pub_transport) as Arc<dyn EventTransportTx>,
370                        codec,
371                    )
372                } else {
373                    // DIRECT MODE: Bind PUB socket
374                    let (pub_transport, actual_bind_endpoint) = std::thread::spawn({
375                        let topic = topic.clone();
376                        move || {
377                            let rt = tokio::runtime::Builder::new_current_thread()
378                                .enable_all()
379                                .build()
380                                .expect("Failed to create Tokio runtime for ZMQ");
381
382                            rt.block_on(async move {
383                                zmq_transport::ZmqPubTransport::bind("tcp://0.0.0.0:0", &topic)
384                                    .await
385                                    .expect("Failed to bind ZMQ publisher")
386                            })
387                        }
388                    })
389                    .join()
390                    .expect("Failed to join ZMQ initialization thread");
391
392                    // Get local IP for public endpoint
393                    let actual_port: u16 = actual_bind_endpoint
394                        .rsplit(':')
395                        .next()
396                        .and_then(|s| s.parse().ok())
397                        .expect("Failed to parse port from bind endpoint");
398                    let local_ip = get_local_ip_for_advertise();
399                    let public_endpoint = format!("tcp://{}:{}", local_ip, actual_port);
400
401                    let codec = Arc::new(Codec::Msgpack(MsgpackCodec));
402                    TransportSetup::ZmqDirect(
403                        Arc::new(pub_transport) as Arc<dyn EventTransportTx>,
404                        codec,
405                        public_endpoint,
406                    )
407                }
408            }
409        };
410
411        // Extract transport and codec, and register if needed
412        let (tx, codec, discovery_instance) = match transport_setup {
413            TransportSetup::Nats(tx, codec) => {
414                let transport_config = EventTransport::nats(scope.subject_prefix());
415                let spec = DiscoverySpec::EventChannel {
416                    namespace: scope.namespace().to_string(),
417                    component: scope.component().unwrap_or("").to_string(),
418                    topic: topic.clone(),
419                    transport: transport_config,
420                };
421
422                let registered_instance = drt.discovery().register(spec).await?;
423                tracing::info!(
424                    topic = %topic,
425                    transport = ?transport_kind,
426                    instance_id = %registered_instance.instance_id(),
427                    "EventPublisher registered with discovery"
428                );
429                (tx, codec, Some(registered_instance))
430            }
431            TransportSetup::ZmqDirect(tx, codec, public_endpoint) => {
432                let transport_config = EventTransport::zmq(public_endpoint);
433                let spec = DiscoverySpec::EventChannel {
434                    namespace: scope.namespace().to_string(),
435                    component: scope.component().unwrap_or("").to_string(),
436                    topic: topic.clone(),
437                    transport: transport_config,
438                };
439
440                let registered_instance = drt.discovery().register(spec).await?;
441                tracing::info!(
442                    topic = %topic,
443                    transport = ?transport_kind,
444                    instance_id = %registered_instance.instance_id(),
445                    "EventPublisher registered with discovery (direct mode)"
446                );
447                (tx, codec, Some(registered_instance))
448            }
449            TransportSetup::ZmqBroker(tx, codec) => {
450                tracing::info!(
451                    topic = %topic,
452                    transport = ?transport_kind,
453                    "EventPublisher in broker mode - skipping discovery registration"
454                );
455                (tx, codec, None)
456            }
457        };
458
459        Ok(Self {
460            transport_kind,
461            scope,
462            topic,
463            publisher_id,
464            sequence: AtomicU64::new(0),
465            tx,
466            codec,
467            discovery_client: discovery,
468            discovery_instance,
469        })
470    }
471
472    /// Publish a serializable event.
473    pub async fn publish<T: Serialize + Send + Sync>(&self, event: &T) -> Result<()> {
474        let payload = self.codec.encode_payload(event)?;
475        self.publish_bytes(payload.to_vec()).await
476    }
477
478    /// Publish raw bytes.
479    pub async fn publish_bytes(&self, bytes: Vec<u8>) -> Result<()> {
480        let envelope = EventEnvelope {
481            publisher_id: self.publisher_id,
482            sequence: self.sequence.fetch_add(1, Ordering::SeqCst),
483            published_at: current_timestamp_ms(),
484            topic: self.topic.clone(),
485            payload: Bytes::from(bytes),
486        };
487
488        let envelope_bytes = self.codec.encode_envelope(&envelope)?;
489        let subject = format!("{}.{}", self.scope.subject_prefix(), self.topic);
490
491        self.tx.publish(&subject, envelope_bytes).await
492    }
493
494    /// Get the publisher ID.
495    pub fn publisher_id(&self) -> u64 {
496        self.publisher_id
497    }
498
499    /// Get the topic.
500    pub fn topic(&self) -> &str {
501        &self.topic
502    }
503
504    /// Get the transport kind.
505    pub fn transport_kind(&self) -> EventTransportKind {
506        self.transport_kind
507    }
508}
509
510impl Drop for EventPublisher {
511    fn drop(&mut self) {
512        // Unregister from discovery on drop
513        if let (Some(discovery), Some(instance)) =
514            (self.discovery_client.take(), self.discovery_instance.take())
515        {
516            let topic = self.topic.clone();
517            let instance_id = instance.instance_id();
518
519            // Spawn background task for async unregister since Drop is sync
520            tokio::spawn(async move {
521                match discovery.unregister(instance).await {
522                    Ok(()) => {
523                        tracing::info!(
524                            topic = %topic,
525                            instance_id = %instance_id,
526                            "EventPublisher unregistered from discovery"
527                        );
528                    }
529                    Err(e) => {
530                        tracing::warn!(
531                            topic = %topic,
532                            instance_id = %instance_id,
533                            error = %e,
534                            "Failed to unregister EventPublisher from discovery"
535                        );
536                    }
537                }
538            });
539        }
540    }
541}
542
543/// Event subscriber for a specific topic.
544pub struct EventSubscriber {
545    stream: EventStream,
546    #[allow(dead_code)]
547    scope: EventScope,
548    #[allow(dead_code)]
549    topic: String,
550    codec: Arc<Codec>,
551}
552
553impl EventSubscriber {
554    /// Create a subscriber for a component-scoped topic.
555    pub async fn for_component(comp: &Component, topic: impl Into<String>) -> Result<Self> {
556        Self::for_component_with_transport(comp, topic, EventTransportKind::from_env_or_default())
557            .await
558    }
559
560    /// Create a subscriber with explicit transport.
561    pub async fn for_component_with_transport(
562        comp: &Component,
563        topic: impl Into<String>,
564        transport_kind: EventTransportKind,
565    ) -> Result<Self> {
566        let drt = comp.drt();
567        let scope = EventScope::Component {
568            namespace: comp.namespace().name(),
569            component: comp.name().to_string(),
570        };
571        Self::new_internal(drt, scope, topic.into(), transport_kind).await
572    }
573
574    /// Create a subscriber for a namespace-scoped topic.
575    pub async fn for_namespace(ns: &Namespace, topic: impl Into<String>) -> Result<Self> {
576        Self::for_namespace_with_transport(ns, topic, EventTransportKind::from_env_or_default())
577            .await
578    }
579
580    /// Create a namespace subscriber with explicit transport.
581    pub async fn for_namespace_with_transport(
582        ns: &Namespace,
583        topic: impl Into<String>,
584        transport_kind: EventTransportKind,
585    ) -> Result<Self> {
586        let drt = ns.drt();
587        let scope = EventScope::Namespace { name: ns.name() };
588        Self::new_internal(drt, scope, topic.into(), transport_kind).await
589    }
590
591    async fn new_internal(
592        drt: &DistributedRuntime,
593        scope: EventScope,
594        topic: String,
595        transport_kind: EventTransportKind,
596    ) -> Result<Self> {
597        let discovery = drt.discovery();
598
599        // Use Msgpack codec for all transports
600        let (wire_stream, codec): (WireStream, Arc<Codec>) = match transport_kind {
601            EventTransportKind::Nats => {
602                let transport = nats_transport::NatsTransport::new(drt.clone());
603                let subject = format!("{}.{}", scope.subject_prefix(), topic);
604                let stream = transport.subscribe(&subject).await?;
605                let codec = Arc::new(Codec::Msgpack(MsgpackCodec));
606                (stream, codec)
607            }
608            EventTransportKind::Zmq => {
609                // Check for broker mode
610                if let Some(broker) = resolve_zmq_broker(drt, &scope).await? {
611                    // BROKER MODE: Connect to broker's XPUB (single or multiple endpoints)
612                    let codec = Arc::new(Codec::Msgpack(MsgpackCodec));
613
614                    let stream: WireStream = if broker.xpub_endpoints.len() == 1 {
615                        // Single broker - no deduplication needed
616                        let sub_transport = zmq_transport::ZmqSubTransport::connect_broker(
617                            &broker.xpub_endpoints[0],
618                            &topic,
619                        )
620                        .await?;
621                        sub_transport.subscribe(&topic).await?
622                    } else {
623                        // Multiple brokers - need deduplication
624                        let sub_transport =
625                            zmq_transport::ZmqSubTransport::connect_broker_multiple(
626                                &broker.xpub_endpoints,
627                                &topic,
628                            )
629                            .await?;
630                        let inner_stream = sub_transport.subscribe(&topic).await?;
631
632                        // Wrap with deduplication (default cache size: 100,000 entries)
633                        Box::pin(DeduplicatingStream::new(
634                            inner_stream,
635                            codec.clone(),
636                            100_000,
637                        ))
638                    };
639
640                    (stream, codec)
641                } else {
642                    // DIRECT MODE: Use dynamic subscriber to discover and connect to publishers
643                    let query = match &scope {
644                        EventScope::Namespace { name } => {
645                            crate::discovery::DiscoveryQuery::EventChannels(
646                                crate::discovery::EventChannelQuery::namespace(name.clone()),
647                            )
648                        }
649                        EventScope::Component {
650                            namespace,
651                            component,
652                        } => crate::discovery::DiscoveryQuery::EventChannels(
653                            crate::discovery::EventChannelQuery::topic(
654                                namespace.clone(),
655                                component.clone(),
656                                topic.clone(),
657                            ),
658                        ),
659                    };
660
661                    let subscriber =
662                        Arc::new(DynamicSubscriber::new(discovery, query, topic.clone()));
663
664                    let stream = subscriber.start_zmq().await?;
665                    let codec = Arc::new(Codec::Msgpack(MsgpackCodec));
666                    (stream, codec)
667                }
668            }
669        };
670
671        // Filter by topic and decode envelopes
672        let topic_filter = topic.clone();
673        let codec_for_stream = codec.clone();
674        let stream = wire_stream.filter_map(move |result| {
675            let codec = codec_for_stream.clone();
676            let topic_filter = topic_filter.clone();
677            async move {
678                match result {
679                    Ok(bytes) => match codec.decode_envelope(&bytes) {
680                        Ok(envelope) => {
681                            // Filter by topic for transports that don't support native filtering
682                            if envelope.topic == topic_filter {
683                                Some(Ok(envelope))
684                            } else {
685                                None
686                            }
687                        }
688                        Err(e) => Some(Err(e)),
689                    },
690                    Err(e) => Some(Err(e)),
691                }
692            }
693        });
694
695        tracing::info!(
696            topic = %topic,
697            transport = ?transport_kind,
698            "EventSubscriber created"
699        );
700
701        Ok(Self {
702            stream: Box::pin(stream),
703            scope,
704            topic,
705            codec,
706        })
707    }
708
709    /// Get the next event envelope.
710    pub async fn next(&mut self) -> Option<Result<EventEnvelope>> {
711        self.stream.next().await
712    }
713
714    /// Subscribe with automatic deserialization.
715    pub fn typed<T: DeserializeOwned + Send + 'static>(self) -> TypedEventSubscriber<T> {
716        TypedEventSubscriber {
717            stream: self.stream,
718            codec: self.codec,
719            _marker: std::marker::PhantomData,
720        }
721    }
722}
723
724/// Typed event subscriber that deserializes payloads.
725pub struct TypedEventSubscriber<T> {
726    stream: EventStream,
727    codec: Arc<Codec>,
728    _marker: std::marker::PhantomData<T>,
729}
730
731impl<T: DeserializeOwned + Send + 'static> TypedEventSubscriber<T> {
732    /// Get the next typed event with its envelope.
733    pub async fn next(&mut self) -> Option<Result<(EventEnvelope, T)>> {
734        let envelope = self.stream.next().await?;
735        match envelope {
736            Ok(env) => match self.codec.decode_payload(&env.payload) {
737                Ok(typed) => Some(Ok((env, typed))),
738                Err(e) => Some(Err(e)),
739            },
740            Err(e) => Some(Err(e)),
741        }
742    }
743}
744
745/// Get current timestamp in milliseconds since Unix epoch.
746fn current_timestamp_ms() -> u64 {
747    SystemTime::now()
748        .duration_since(UNIX_EPOCH)
749        .map(|d| d.as_millis() as u64)
750        .unwrap_or(0)
751}
752
753#[cfg(test)]
754mod tests {
755    use super::*;
756    use crate::config::environment_names::event_plane as env;
757
758    #[test]
759    fn test_event_scope_subject_prefix() {
760        let ns_scope = EventScope::Namespace {
761            name: "test-ns".to_string(),
762        };
763        assert_eq!(ns_scope.subject_prefix(), "namespace.test-ns");
764
765        let comp_scope = EventScope::Component {
766            namespace: "test-ns".to_string(),
767            component: "test-comp".to_string(),
768        };
769        assert_eq!(
770            comp_scope.subject_prefix(),
771            "namespace.test-ns.component.test-comp"
772        );
773    }
774
775    #[test]
776    fn test_event_scope_accessors() {
777        let ns_scope = EventScope::Namespace {
778            name: "my-ns".to_string(),
779        };
780        assert_eq!(ns_scope.namespace(), "my-ns");
781        assert_eq!(ns_scope.component(), None);
782
783        let comp_scope = EventScope::Component {
784            namespace: "my-ns".to_string(),
785            component: "my-comp".to_string(),
786        };
787        assert_eq!(comp_scope.namespace(), "my-ns");
788        assert_eq!(comp_scope.component(), Some("my-comp"));
789    }
790
791    #[test]
792    fn test_timestamp_generation() {
793        let ts = current_timestamp_ms();
794
795        // Should be after Jan 1, 2020 (1577836800000) and before Jan 1, 2100 (4102444800000)
796        assert!(ts > 1577836800000, "Timestamp should be after 2020");
797        assert!(ts < 4102444800000, "Timestamp should be before 2100");
798    }
799
800    #[test]
801    fn test_event_envelope_serde() {
802        let envelope = EventEnvelope {
803            publisher_id: 42,
804            sequence: 10,
805            published_at: 1700000000000,
806            topic: "test-topic".to_string(),
807            payload: Bytes::from("test data"),
808        };
809
810        let json = serde_json::to_string(&envelope).expect("serialize");
811        let deserialized: EventEnvelope = serde_json::from_str(&json).expect("deserialize");
812
813        assert_eq!(deserialized.publisher_id, 42);
814        assert_eq!(deserialized.sequence, 10);
815        assert_eq!(deserialized.published_at, 1700000000000);
816        assert_eq!(deserialized.topic, "test-topic");
817        assert_eq!(deserialized.payload, Bytes::from("test data"));
818    }
819}