Skip to main content

dynamo_runtime/transports/event_plane/
mod.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Generic Event Plane for transport-agnostic pub/sub communication.
5
6mod codec;
7mod dynamic_subscriber;
8mod frame;
9mod nats_transport;
10mod traits;
11mod transport;
12pub mod zmq_transport;
13
14pub use codec::{Codec, MsgpackCodec};
15pub use dynamic_subscriber::DynamicSubscriber;
16pub use frame::{FRAME_HEADER_SIZE, FRAME_VERSION, Frame, FrameError, FrameHeader};
17pub use traits::{EventEnvelope, EventStream, TypedEventStream};
18pub use transport::{EventTransportRx, EventTransportTx, WireStream};
19pub use zmq_transport::{ZmqPubTransport, ZmqSubTransport};
20
21// Re-export transport kind from discovery for convenience
22pub use crate::discovery::EventTransportKind;
23
24use std::num::NonZeroUsize;
25use std::sync::Arc;
26use std::sync::atomic::{AtomicU64, Ordering};
27use std::time::{SystemTime, UNIX_EPOCH};
28
29use anyhow::Result;
30use bytes::Bytes;
31use futures::{Stream, StreamExt};
32use lru::LruCache;
33use serde::Serialize;
34use serde::de::DeserializeOwned;
35use std::pin::Pin;
36use std::task::{Context, Poll};
37
38use crate::DistributedRuntime;
39use crate::component::{Component, Namespace};
40use crate::discovery::{
41    Discovery, DiscoveryInstance, DiscoveryQuery, DiscoverySpec, EventChannelQuery, EventTransport,
42};
43use crate::traits::DistributedRuntimeProvider;
44use crate::utils::ip_resolver::get_local_ip_for_advertise;
45
46/// Scope of the event plane - determines the subject prefix for pub/sub.
47#[derive(Debug, Clone)]
48pub enum EventScope {
49    /// Namespace-level scope: `namespace.{name}`
50    Namespace { name: String },
51    /// Component-level scope: `namespace.{namespace}.component.{component}`
52    Component {
53        namespace: String,
54        component: String,
55    },
56}
57
58impl EventScope {
59    /// Returns the subject prefix for this scope.
60    pub fn subject_prefix(&self) -> String {
61        match self {
62            EventScope::Namespace { name } => format!("namespace.{}", name),
63            EventScope::Component {
64                namespace,
65                component,
66            } => {
67                format!("namespace.{}.component.{}", namespace, component)
68            }
69        }
70    }
71
72    /// Get the namespace name
73    pub fn namespace(&self) -> &str {
74        match self {
75            EventScope::Namespace { name } => name,
76            EventScope::Component { namespace, .. } => namespace,
77        }
78    }
79
80    /// Get the component name (if component-scoped)
81    pub fn component(&self) -> Option<&str> {
82        match self {
83            EventScope::Namespace { .. } => None,
84            EventScope::Component { component, .. } => Some(component),
85        }
86    }
87}
88
89// ============================================================================
90// Broker Resolution Logic
91// ============================================================================
92
93/// Broker endpoints for ZMQ broker mode
94#[derive(Debug, Clone)]
95struct BrokerEndpoints {
96    xsub_endpoints: Vec<String>,
97    xpub_endpoints: Vec<String>,
98}
99
100/// Resolve ZMQ broker endpoints from environment or discovery
101/// Returns None if broker mode is not configured (direct mode)
102async fn resolve_zmq_broker(
103    drt: &DistributedRuntime,
104    scope: &EventScope,
105) -> Result<Option<BrokerEndpoints>> {
106    // Priority 1: Explicit URL from DYN_ZMQ_BROKER_URL
107    if let Ok(broker_url) =
108        std::env::var(crate::config::environment_names::zmq_broker::DYN_ZMQ_BROKER_URL)
109    {
110        let (xsub_endpoints, xpub_endpoints) = parse_broker_url(&broker_url)?;
111        tracing::info!(
112            num_xsub = xsub_endpoints.len(),
113            num_xpub = xpub_endpoints.len(),
114            "Using explicit ZMQ broker URL"
115        );
116        return Ok(Some(BrokerEndpoints {
117            xsub_endpoints,
118            xpub_endpoints,
119        }));
120    }
121
122    // Priority 2: Discovery-based lookup if DYN_ZMQ_BROKER_ENABLED=true
123    if std::env::var(crate::config::environment_names::zmq_broker::DYN_ZMQ_BROKER_ENABLED)
124        .unwrap_or_default()
125        == "true"
126    {
127        let query = DiscoveryQuery::EventChannels(EventChannelQuery::component(
128            scope.namespace().to_string(),
129            "zmq_broker".to_string(),
130        ));
131
132        let instances = drt.discovery().list(query).await?;
133
134        // Collect all broker instances (multiple brokers for HA)
135        let mut xsub_endpoints = Vec::new();
136        let mut xpub_endpoints = Vec::new();
137
138        for instance in instances {
139            if let DiscoveryInstance::EventChannel { transport, .. } = instance
140                && let EventTransport::ZmqBroker {
141                    xsub_endpoints: xsubs,
142                    xpub_endpoints: xpubs,
143                } = transport
144            {
145                xsub_endpoints.extend(xsubs);
146                xpub_endpoints.extend(xpubs);
147            }
148        }
149
150        if xsub_endpoints.is_empty() {
151            anyhow::bail!(
152                "DYN_ZMQ_BROKER_ENABLED=true but no broker found in discovery for namespace '{}'",
153                scope.namespace()
154            );
155        }
156
157        tracing::info!(
158            num_brokers = xsub_endpoints.len(),
159            "Discovered ZMQ brokers from discovery plane"
160        );
161
162        return Ok(Some(BrokerEndpoints {
163            xsub_endpoints,
164            xpub_endpoints,
165        }));
166    }
167
168    // No broker configured - use direct mode
169    Ok(None)
170}
171
172/// Parse broker URL format: "xsub=tcp://host1:5555;tcp://host2:5555 , xpub=tcp://host1:5556;tcp://host2:5556"
173fn parse_broker_url(url: &str) -> Result<(Vec<String>, Vec<String>)> {
174    let parts: Vec<&str> = url.split(',').map(|s| s.trim()).collect();
175    if parts.len() != 2 {
176        anyhow::bail!(
177            "Invalid broker URL format. Expected 'xsub=<urls> , xpub=<urls>', got: {}",
178            url
179        );
180    }
181
182    let mut xsub_endpoints = Vec::new();
183    let mut xpub_endpoints = Vec::new();
184
185    for part in parts {
186        if let Some(urls_str) = part.strip_prefix("xsub=") {
187            xsub_endpoints = urls_str
188                .split(';')
189                .map(|s| s.trim().to_string())
190                .filter(|s| !s.is_empty())
191                .collect();
192        } else if let Some(urls_str) = part.strip_prefix("xpub=") {
193            xpub_endpoints = urls_str
194                .split(';')
195                .map(|s| s.trim().to_string())
196                .filter(|s| !s.is_empty())
197                .collect();
198        } else {
199            anyhow::bail!(
200                "Invalid broker URL part. Expected 'xsub=' or 'xpub=' prefix, got: {}",
201                part
202            );
203        }
204    }
205
206    if xsub_endpoints.is_empty() || xpub_endpoints.is_empty() {
207        anyhow::bail!(
208            "Broker URL must contain at least one xsub and one xpub endpoint. Got xsub={:?}, xpub={:?}",
209            xsub_endpoints,
210            xpub_endpoints
211        );
212    }
213
214    Ok((xsub_endpoints, xpub_endpoints))
215}
216
217/// Deduplicates events based on (publisher_id, sequence) tuple
218/// Required when connecting to multiple brokers in HA mode
219struct DeduplicatingStream {
220    inner: WireStream,
221    codec: Arc<Codec>,
222    seen_events: LruCache<(u64, u64), ()>, // (publisher_id, sequence) -> ()
223}
224
225impl DeduplicatingStream {
226    fn new(inner: WireStream, codec: Arc<Codec>, cache_size: usize) -> Self {
227        Self {
228            inner,
229            codec,
230            seen_events: LruCache::new(
231                NonZeroUsize::new(cache_size).expect("cache_size must be non-zero"),
232            ),
233        }
234    }
235}
236
237impl Stream for DeduplicatingStream {
238    type Item = Result<Bytes>;
239
240    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
241        loop {
242            match Pin::new(&mut self.inner).poll_next(cx) {
243                Poll::Ready(Some(Ok(bytes))) => {
244                    // Decode envelope to extract publisher_id and sequence
245                    match self.codec.decode_envelope(&bytes) {
246                        Ok(envelope) => {
247                            let key = (envelope.publisher_id, envelope.sequence);
248
249                            // Check if we've seen this event before
250                            if self.seen_events.contains(&key) {
251                                // Duplicate - skip and continue loop
252                                tracing::debug!(
253                                    publisher_id = envelope.publisher_id,
254                                    sequence = envelope.sequence,
255                                    "Filtered duplicate event from multi-broker setup"
256                                );
257                                continue;
258                            }
259
260                            // New event - record and return
261                            self.seen_events.put(key, ());
262                            return Poll::Ready(Some(Ok(bytes)));
263                        }
264                        Err(e) => {
265                            tracing::warn!(error = %e, "Failed to decode envelope for deduplication");
266                            return Poll::Ready(Some(Err(e)));
267                        }
268                    }
269                }
270                Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
271                Poll::Ready(None) => return Poll::Ready(None),
272                Poll::Pending => return Poll::Pending,
273            }
274        }
275    }
276}
277
278/// Event publisher for a specific topic.
279pub struct EventPublisher {
280    transport_kind: EventTransportKind,
281    scope: EventScope,
282    topic: String,
283    publisher_id: u64,
284    sequence: AtomicU64,
285    tx: Arc<dyn EventTransportTx>,
286    codec: Arc<Codec>,
287    runtime_handle: tokio::runtime::Handle,
288    /// Discovery client and registered instance for unregistration on drop
289    discovery_client: Option<Arc<dyn Discovery>>,
290    discovery_instance: Option<crate::discovery::DiscoveryInstance>,
291}
292
293impl EventPublisher {
294    /// Create a publisher for a component-scoped topic.
295    pub async fn for_component(comp: &Component, topic: impl Into<String>) -> Result<Self> {
296        Self::for_component_with_transport(comp, topic, EventTransportKind::from_env_or_default())
297            .await
298    }
299
300    /// Create a publisher with explicit transport.
301    pub async fn for_component_with_transport(
302        comp: &Component,
303        topic: impl Into<String>,
304        transport_kind: EventTransportKind,
305    ) -> Result<Self> {
306        let drt = comp.drt();
307        let scope = EventScope::Component {
308            namespace: comp.namespace().name(),
309            component: comp.name().to_string(),
310        };
311        Self::new_internal(drt, scope, topic.into(), transport_kind).await
312    }
313
314    /// Create a publisher for a namespace-scoped topic.
315    pub async fn for_namespace(ns: &Namespace, topic: impl Into<String>) -> Result<Self> {
316        Self::for_namespace_with_transport(ns, topic, EventTransportKind::from_env_or_default())
317            .await
318    }
319
320    /// Create a namespace publisher with explicit transport.
321    pub async fn for_namespace_with_transport(
322        ns: &Namespace,
323        topic: impl Into<String>,
324        transport_kind: EventTransportKind,
325    ) -> Result<Self> {
326        let drt = ns.drt();
327        let scope = EventScope::Namespace { name: ns.name() };
328        Self::new_internal(drt, scope, topic.into(), transport_kind).await
329    }
330
331    async fn new_internal(
332        drt: &DistributedRuntime,
333        scope: EventScope,
334        topic: String,
335        transport_kind: EventTransportKind,
336    ) -> Result<Self> {
337        let publisher_id = drt.discovery().instance_id();
338        let discovery = Some(drt.discovery());
339        let runtime_handle = drt.runtime().secondary();
340
341        // Use Msgpack codec for all transports
342        enum TransportSetup {
343            Nats(Arc<dyn EventTransportTx>, Arc<Codec>),
344            ZmqDirect(Arc<dyn EventTransportTx>, Arc<Codec>, String), // includes public endpoint
345            ZmqBroker(Arc<dyn EventTransportTx>, Arc<Codec>),
346        }
347
348        let transport_setup = match transport_kind {
349            EventTransportKind::Nats => {
350                let transport = Arc::new(nats_transport::NatsTransport::new(drt.clone()));
351                let codec = Arc::new(Codec::Msgpack(MsgpackCodec));
352                TransportSetup::Nats(transport as Arc<dyn EventTransportTx>, codec)
353            }
354            EventTransportKind::Zmq => {
355                // Check for broker mode
356                if let Some(broker) = resolve_zmq_broker(drt, &scope).await? {
357                    // BROKER MODE: Connect to broker (single or multiple endpoints)
358                    let pub_transport = if broker.xsub_endpoints.len() == 1 {
359                        zmq_transport::ZmqPubTransport::connect(&broker.xsub_endpoints[0], &topic)
360                            .await?
361                    } else {
362                        zmq_transport::ZmqPubTransport::connect_multiple(
363                            &broker.xsub_endpoints,
364                            &topic,
365                        )
366                        .await?
367                    };
368
369                    let codec = Arc::new(Codec::Msgpack(MsgpackCodec));
370                    TransportSetup::ZmqBroker(
371                        Arc::new(pub_transport) as Arc<dyn EventTransportTx>,
372                        codec,
373                    )
374                } else {
375                    // DIRECT MODE: Bind PUB socket
376                    let (pub_transport, actual_bind_endpoint) = std::thread::spawn({
377                        let topic = topic.clone();
378                        move || {
379                            let rt = tokio::runtime::Builder::new_current_thread()
380                                .enable_all()
381                                .build()
382                                .expect("Failed to create Tokio runtime for ZMQ");
383
384                            rt.block_on(async move {
385                                zmq_transport::ZmqPubTransport::bind("tcp://0.0.0.0:0", &topic)
386                                    .await
387                                    .expect("Failed to bind ZMQ publisher")
388                            })
389                        }
390                    })
391                    .join()
392                    .expect("Failed to join ZMQ initialization thread");
393
394                    // Get local IP for public endpoint
395                    let actual_port: u16 = actual_bind_endpoint
396                        .rsplit(':')
397                        .next()
398                        .and_then(|s| s.parse().ok())
399                        .expect("Failed to parse port from bind endpoint");
400                    let local_ip = get_local_ip_for_advertise();
401                    let public_endpoint = format!("tcp://{}:{}", local_ip, actual_port);
402
403                    let codec = Arc::new(Codec::Msgpack(MsgpackCodec));
404                    TransportSetup::ZmqDirect(
405                        Arc::new(pub_transport) as Arc<dyn EventTransportTx>,
406                        codec,
407                        public_endpoint,
408                    )
409                }
410            }
411        };
412
413        // Extract transport and codec, and register if needed
414        let (tx, codec, discovery_instance) = match transport_setup {
415            TransportSetup::Nats(tx, codec) => {
416                let transport_config = EventTransport::nats(scope.subject_prefix());
417                let spec = DiscoverySpec::EventChannel {
418                    namespace: scope.namespace().to_string(),
419                    component: scope.component().unwrap_or("").to_string(),
420                    topic: topic.clone(),
421                    transport: transport_config,
422                };
423
424                let registered_instance = drt.discovery().register(spec).await?;
425                tracing::info!(
426                    topic = %topic,
427                    transport = ?transport_kind,
428                    instance_id = %registered_instance.instance_id(),
429                    "EventPublisher registered with discovery"
430                );
431                (tx, codec, Some(registered_instance))
432            }
433            TransportSetup::ZmqDirect(tx, codec, public_endpoint) => {
434                let transport_config = EventTransport::zmq(public_endpoint);
435                let spec = DiscoverySpec::EventChannel {
436                    namespace: scope.namespace().to_string(),
437                    component: scope.component().unwrap_or("").to_string(),
438                    topic: topic.clone(),
439                    transport: transport_config,
440                };
441
442                let registered_instance = drt.discovery().register(spec).await?;
443                tracing::info!(
444                    topic = %topic,
445                    transport = ?transport_kind,
446                    instance_id = %registered_instance.instance_id(),
447                    "EventPublisher registered with discovery (direct mode)"
448                );
449                (tx, codec, Some(registered_instance))
450            }
451            TransportSetup::ZmqBroker(tx, codec) => {
452                tracing::info!(
453                    topic = %topic,
454                    transport = ?transport_kind,
455                    "EventPublisher in broker mode - skipping discovery registration"
456                );
457                (tx, codec, None)
458            }
459        };
460
461        Ok(Self {
462            transport_kind,
463            scope,
464            topic,
465            publisher_id,
466            sequence: AtomicU64::new(0),
467            tx,
468            codec,
469            runtime_handle,
470            discovery_client: discovery,
471            discovery_instance,
472        })
473    }
474
475    /// Publish a serializable event.
476    pub async fn publish<T: Serialize + Send + Sync>(&self, event: &T) -> Result<()> {
477        let payload = self.codec.encode_payload(event)?;
478        self.publish_bytes(payload.to_vec()).await
479    }
480
481    /// Publish raw bytes.
482    pub async fn publish_bytes(&self, bytes: Vec<u8>) -> Result<()> {
483        let envelope = EventEnvelope {
484            publisher_id: self.publisher_id,
485            sequence: self.sequence.fetch_add(1, Ordering::SeqCst),
486            published_at: current_timestamp_ms(),
487            topic: self.topic.clone(),
488            payload: Bytes::from(bytes),
489        };
490
491        let envelope_bytes = self.codec.encode_envelope(&envelope)?;
492        let subject = format!("{}.{}", self.scope.subject_prefix(), self.topic);
493
494        self.tx.publish(&subject, envelope_bytes).await
495    }
496
497    /// Get the publisher ID.
498    pub fn publisher_id(&self) -> u64 {
499        self.publisher_id
500    }
501
502    /// Get the topic.
503    pub fn topic(&self) -> &str {
504        &self.topic
505    }
506
507    /// Get the transport kind.
508    pub fn transport_kind(&self) -> EventTransportKind {
509        self.transport_kind
510    }
511}
512
513impl Drop for EventPublisher {
514    fn drop(&mut self) {
515        // Unregister from discovery on drop
516        if let (Some(discovery), Some(instance)) =
517            (self.discovery_client.take(), self.discovery_instance.take())
518        {
519            let topic = self.topic.clone();
520            let instance_id = instance.instance_id();
521            let runtime_handle = self.runtime_handle.clone();
522
523            // Drop can run outside any Tokio context (notably via PyO3 finalizers), so use
524            // the runtime that created the publisher rather than the ambient thread state.
525            let spawn_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
526                runtime_handle.spawn(async move {
527                    match discovery.unregister(instance).await {
528                        Ok(()) => {
529                            tracing::info!(
530                                topic = %topic,
531                                instance_id = %instance_id,
532                                "EventPublisher unregistered from discovery"
533                            );
534                        }
535                        Err(e) => {
536                            tracing::warn!(
537                                topic = %topic,
538                                instance_id = %instance_id,
539                                error = %e,
540                                "Failed to unregister EventPublisher from discovery"
541                            );
542                        }
543                    }
544                });
545            }));
546
547            if spawn_result.is_err() {
548                tracing::warn!(
549                    topic = %self.topic,
550                    instance_id = %instance_id,
551                    "Skipping EventPublisher unregister during drop because the runtime is unavailable"
552                );
553            }
554        }
555    }
556}
557
558/// Event subscriber for a specific topic.
559pub struct EventSubscriber {
560    stream: EventStream,
561    #[allow(dead_code)]
562    scope: EventScope,
563    #[allow(dead_code)]
564    topic: String,
565    codec: Arc<Codec>,
566}
567
568impl EventSubscriber {
569    /// Create a subscriber for a component-scoped topic.
570    pub async fn for_component(comp: &Component, topic: impl Into<String>) -> Result<Self> {
571        Self::for_component_with_transport(comp, topic, EventTransportKind::from_env_or_default())
572            .await
573    }
574
575    /// Create a subscriber with explicit transport.
576    pub async fn for_component_with_transport(
577        comp: &Component,
578        topic: impl Into<String>,
579        transport_kind: EventTransportKind,
580    ) -> Result<Self> {
581        let drt = comp.drt();
582        let scope = EventScope::Component {
583            namespace: comp.namespace().name(),
584            component: comp.name().to_string(),
585        };
586        Self::new_internal(drt, scope, topic.into(), transport_kind).await
587    }
588
589    /// Create a subscriber for a namespace-scoped topic.
590    pub async fn for_namespace(ns: &Namespace, topic: impl Into<String>) -> Result<Self> {
591        Self::for_namespace_with_transport(ns, topic, EventTransportKind::from_env_or_default())
592            .await
593    }
594
595    /// Create a namespace subscriber with explicit transport.
596    pub async fn for_namespace_with_transport(
597        ns: &Namespace,
598        topic: impl Into<String>,
599        transport_kind: EventTransportKind,
600    ) -> Result<Self> {
601        let drt = ns.drt();
602        let scope = EventScope::Namespace { name: ns.name() };
603        Self::new_internal(drt, scope, topic.into(), transport_kind).await
604    }
605
606    async fn new_internal(
607        drt: &DistributedRuntime,
608        scope: EventScope,
609        topic: String,
610        transport_kind: EventTransportKind,
611    ) -> Result<Self> {
612        let discovery = drt.discovery();
613
614        // Use Msgpack codec for all transports
615        let (wire_stream, codec): (WireStream, Arc<Codec>) = match transport_kind {
616            EventTransportKind::Nats => {
617                let transport = nats_transport::NatsTransport::new(drt.clone());
618                let subject = format!("{}.{}", scope.subject_prefix(), topic);
619                let stream = transport.subscribe(&subject).await?;
620                let codec = Arc::new(Codec::Msgpack(MsgpackCodec));
621                (stream, codec)
622            }
623            EventTransportKind::Zmq => {
624                // Check for broker mode
625                if let Some(broker) = resolve_zmq_broker(drt, &scope).await? {
626                    // BROKER MODE: Connect to broker's XPUB (single or multiple endpoints)
627                    let codec = Arc::new(Codec::Msgpack(MsgpackCodec));
628
629                    let stream: WireStream = if broker.xpub_endpoints.len() == 1 {
630                        // Single broker - no deduplication needed
631                        let sub_transport = zmq_transport::ZmqSubTransport::connect_broker(
632                            &broker.xpub_endpoints[0],
633                            &topic,
634                        )
635                        .await?;
636                        sub_transport.subscribe(&topic).await?
637                    } else {
638                        // Multiple brokers - need deduplication
639                        let sub_transport =
640                            zmq_transport::ZmqSubTransport::connect_broker_multiple(
641                                &broker.xpub_endpoints,
642                                &topic,
643                            )
644                            .await?;
645                        let inner_stream = sub_transport.subscribe(&topic).await?;
646
647                        // Wrap with deduplication (default cache size: 100,000 entries)
648                        Box::pin(DeduplicatingStream::new(
649                            inner_stream,
650                            codec.clone(),
651                            100_000,
652                        ))
653                    };
654
655                    (stream, codec)
656                } else {
657                    // DIRECT MODE: Use dynamic subscriber to discover and connect to publishers
658                    let query = match &scope {
659                        EventScope::Namespace { name } => {
660                            crate::discovery::DiscoveryQuery::EventChannels(
661                                crate::discovery::EventChannelQuery::namespace(name.clone()),
662                            )
663                        }
664                        EventScope::Component {
665                            namespace,
666                            component,
667                        } => crate::discovery::DiscoveryQuery::EventChannels(
668                            crate::discovery::EventChannelQuery::topic(
669                                namespace.clone(),
670                                component.clone(),
671                                topic.clone(),
672                            ),
673                        ),
674                    };
675
676                    let subscriber =
677                        Arc::new(DynamicSubscriber::new(discovery, query, topic.clone()));
678
679                    let stream = subscriber.start_zmq().await?;
680                    let codec = Arc::new(Codec::Msgpack(MsgpackCodec));
681                    (stream, codec)
682                }
683            }
684        };
685
686        // Filter by topic and decode envelopes
687        let topic_filter = topic.clone();
688        let codec_for_stream = codec.clone();
689        let stream = wire_stream.filter_map(move |result| {
690            let codec = codec_for_stream.clone();
691            let topic_filter = topic_filter.clone();
692            async move {
693                match result {
694                    Ok(bytes) => match codec.decode_envelope(&bytes) {
695                        Ok(envelope) => {
696                            // Filter by topic for transports that don't support native filtering
697                            if envelope.topic == topic_filter {
698                                Some(Ok(envelope))
699                            } else {
700                                None
701                            }
702                        }
703                        Err(e) => Some(Err(e)),
704                    },
705                    Err(e) => Some(Err(e)),
706                }
707            }
708        });
709
710        tracing::info!(
711            topic = %topic,
712            transport = ?transport_kind,
713            "EventSubscriber created"
714        );
715
716        Ok(Self {
717            stream: Box::pin(stream),
718            scope,
719            topic,
720            codec,
721        })
722    }
723
724    /// Get the next event envelope.
725    pub async fn next(&mut self) -> Option<Result<EventEnvelope>> {
726        self.stream.next().await
727    }
728
729    /// Subscribe with automatic deserialization.
730    pub fn typed<T: DeserializeOwned + Send + 'static>(self) -> TypedEventSubscriber<T> {
731        TypedEventSubscriber {
732            stream: self.stream,
733            codec: self.codec,
734            _marker: std::marker::PhantomData,
735        }
736    }
737}
738
739/// Typed event subscriber that deserializes payloads.
740pub struct TypedEventSubscriber<T> {
741    stream: EventStream,
742    codec: Arc<Codec>,
743    _marker: std::marker::PhantomData<T>,
744}
745
746impl<T: DeserializeOwned + Send + 'static> TypedEventSubscriber<T> {
747    /// Get the next typed event with its envelope.
748    pub async fn next(&mut self) -> Option<Result<(EventEnvelope, T)>> {
749        let envelope = self.stream.next().await?;
750        match envelope {
751            Ok(env) => match self.codec.decode_payload(&env.payload) {
752                Ok(typed) => Some(Ok((env, typed))),
753                Err(e) => Some(Err(e)),
754            },
755            Err(e) => Some(Err(e)),
756        }
757    }
758}
759
760/// Get current timestamp in milliseconds since Unix epoch.
761fn current_timestamp_ms() -> u64 {
762    SystemTime::now()
763        .duration_since(UNIX_EPOCH)
764        .map(|d| d.as_millis() as u64)
765        .unwrap_or(0)
766}
767
768#[cfg(test)]
769mod tests {
770    use super::*;
771    use crate::config::environment_names::event_plane as env;
772
773    #[test]
774    fn test_event_scope_subject_prefix() {
775        let ns_scope = EventScope::Namespace {
776            name: "test-ns".to_string(),
777        };
778        assert_eq!(ns_scope.subject_prefix(), "namespace.test-ns");
779
780        let comp_scope = EventScope::Component {
781            namespace: "test-ns".to_string(),
782            component: "test-comp".to_string(),
783        };
784        assert_eq!(
785            comp_scope.subject_prefix(),
786            "namespace.test-ns.component.test-comp"
787        );
788    }
789
790    #[test]
791    fn test_event_scope_accessors() {
792        let ns_scope = EventScope::Namespace {
793            name: "my-ns".to_string(),
794        };
795        assert_eq!(ns_scope.namespace(), "my-ns");
796        assert_eq!(ns_scope.component(), None);
797
798        let comp_scope = EventScope::Component {
799            namespace: "my-ns".to_string(),
800            component: "my-comp".to_string(),
801        };
802        assert_eq!(comp_scope.namespace(), "my-ns");
803        assert_eq!(comp_scope.component(), Some("my-comp"));
804    }
805
806    #[test]
807    fn test_timestamp_generation() {
808        let ts = current_timestamp_ms();
809
810        // Should be after Jan 1, 2020 (1577836800000) and before Jan 1, 2100 (4102444800000)
811        assert!(ts > 1577836800000, "Timestamp should be after 2020");
812        assert!(ts < 4102444800000, "Timestamp should be before 2100");
813    }
814
815    #[test]
816    fn test_event_envelope_serde() {
817        let envelope = EventEnvelope {
818            publisher_id: 42,
819            sequence: 10,
820            published_at: 1700000000000,
821            topic: "test-topic".to_string(),
822            payload: Bytes::from("test data"),
823        };
824
825        let json = serde_json::to_string(&envelope).expect("serialize");
826        let deserialized: EventEnvelope = serde_json::from_str(&json).expect("deserialize");
827
828        assert_eq!(deserialized.publisher_id, 42);
829        assert_eq!(deserialized.sequence, 10);
830        assert_eq!(deserialized.published_at, 1700000000000);
831        assert_eq!(deserialized.topic, "test-topic");
832        assert_eq!(deserialized.payload, Bytes::from("test data"));
833    }
834}