Skip to main content

dynamo_runtime/transports/event_plane/
mod.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Generic Event Plane for transport-agnostic pub/sub communication.
5
6mod codec;
7mod dynamic_subscriber;
8mod frame;
9mod nats_transport;
10mod traits;
11mod transport;
12pub mod zmq_transport;
13
14pub use codec::{Codec, MsgpackCodec};
15pub use dynamic_subscriber::DynamicSubscriber;
16pub use frame::{FRAME_HEADER_SIZE, FRAME_VERSION, Frame, FrameError, FrameHeader};
17pub use traits::{EventEnvelope, EventStream, TypedEventStream};
18pub use transport::{EventTransportRx, EventTransportTx, WireStream};
19pub use zmq_transport::{ZmqPubTransport, ZmqSubTransport};
20
21// Re-export transport kind from discovery for convenience
22pub use crate::discovery::EventTransportKind;
23
24use std::num::NonZeroUsize;
25use std::sync::Arc;
26use std::sync::atomic::{AtomicU64, Ordering};
27use std::time::{SystemTime, UNIX_EPOCH};
28
29use anyhow::Result;
30use bytes::Bytes;
31use futures::{Stream, StreamExt};
32use lru::LruCache;
33use serde::Serialize;
34use serde::de::DeserializeOwned;
35use std::pin::Pin;
36use std::task::{Context, Poll};
37
38use crate::DistributedRuntime;
39use crate::component::{Component, Namespace};
40use crate::discovery::{
41    Discovery, DiscoveryInstance, DiscoveryQuery, DiscoverySpec, EventChannelQuery, EventTransport,
42};
43use crate::traits::DistributedRuntimeProvider;
44use crate::utils::ip_resolver::get_local_ip_for_advertise;
45
46/// Scope of the event plane - determines the subject prefix for pub/sub.
47#[derive(Debug, Clone)]
48pub enum EventScope {
49    /// Namespace-level scope: `namespace.{name}`
50    Namespace { name: String },
51    /// Component-level scope: `namespace.{namespace}.component.{component}`
52    Component {
53        namespace: String,
54        component: String,
55    },
56}
57
58impl EventScope {
59    /// Returns the subject prefix for this scope.
60    pub fn subject_prefix(&self) -> String {
61        match self {
62            EventScope::Namespace { name } => format!("namespace.{}", name),
63            EventScope::Component {
64                namespace,
65                component,
66            } => {
67                format!("namespace.{}.component.{}", namespace, component)
68            }
69        }
70    }
71
72    /// Get the namespace name
73    pub fn namespace(&self) -> &str {
74        match self {
75            EventScope::Namespace { name } => name,
76            EventScope::Component { namespace, .. } => namespace,
77        }
78    }
79
80    /// Get the component name (if component-scoped)
81    pub fn component(&self) -> Option<&str> {
82        match self {
83            EventScope::Namespace { .. } => None,
84            EventScope::Component { component, .. } => Some(component),
85        }
86    }
87}
88
89// ============================================================================
90// Broker Resolution Logic
91// ============================================================================
92
93/// Broker endpoints for ZMQ broker mode
94#[derive(Debug, Clone)]
95struct BrokerEndpoints {
96    xsub_endpoints: Vec<String>,
97    xpub_endpoints: Vec<String>,
98}
99
100/// Resolve ZMQ broker endpoints from environment or discovery
101/// Returns None if broker mode is not configured (direct mode)
102async fn resolve_zmq_broker(
103    drt: &DistributedRuntime,
104    scope: &EventScope,
105) -> Result<Option<BrokerEndpoints>> {
106    // Priority 1: Explicit URL from DYN_ZMQ_BROKER_URL
107    if let Ok(broker_url) =
108        std::env::var(crate::config::environment_names::zmq_broker::DYN_ZMQ_BROKER_URL)
109    {
110        let (xsub_endpoints, xpub_endpoints) = parse_broker_url(&broker_url)?;
111        tracing::info!(
112            num_xsub = xsub_endpoints.len(),
113            num_xpub = xpub_endpoints.len(),
114            "Using explicit ZMQ broker URL"
115        );
116        return Ok(Some(BrokerEndpoints {
117            xsub_endpoints,
118            xpub_endpoints,
119        }));
120    }
121
122    // Priority 2: Discovery-based lookup if DYN_ZMQ_BROKER_ENABLED=true
123    if std::env::var(crate::config::environment_names::zmq_broker::DYN_ZMQ_BROKER_ENABLED)
124        .unwrap_or_default()
125        == "true"
126    {
127        let query = DiscoveryQuery::EventChannels(EventChannelQuery::component(
128            scope.namespace().to_string(),
129            "zmq_broker".to_string(),
130        ));
131
132        let instances = drt.discovery().list(query).await?;
133
134        // Collect all broker instances (multiple brokers for HA)
135        let mut xsub_endpoints = Vec::new();
136        let mut xpub_endpoints = Vec::new();
137
138        for instance in instances {
139            if let DiscoveryInstance::EventChannel { transport, .. } = instance
140                && let EventTransport::ZmqBroker {
141                    xsub_endpoints: xsubs,
142                    xpub_endpoints: xpubs,
143                } = transport
144            {
145                xsub_endpoints.extend(xsubs);
146                xpub_endpoints.extend(xpubs);
147            }
148        }
149
150        if xsub_endpoints.is_empty() {
151            anyhow::bail!(
152                "DYN_ZMQ_BROKER_ENABLED=true but no broker found in discovery for namespace '{}'",
153                scope.namespace()
154            );
155        }
156
157        tracing::info!(
158            num_brokers = xsub_endpoints.len(),
159            "Discovered ZMQ brokers from discovery plane"
160        );
161
162        return Ok(Some(BrokerEndpoints {
163            xsub_endpoints,
164            xpub_endpoints,
165        }));
166    }
167
168    // No broker configured - use direct mode
169    Ok(None)
170}
171
172/// Parse broker URL format: "xsub=tcp://host1:5555;tcp://host2:5555 , xpub=tcp://host1:5556;tcp://host2:5556"
173fn parse_broker_url(url: &str) -> Result<(Vec<String>, Vec<String>)> {
174    let parts: Vec<&str> = url.split(',').map(|s| s.trim()).collect();
175    if parts.len() != 2 {
176        anyhow::bail!(
177            "Invalid broker URL format. Expected 'xsub=<urls> , xpub=<urls>', got: {}",
178            url
179        );
180    }
181
182    let mut xsub_endpoints = Vec::new();
183    let mut xpub_endpoints = Vec::new();
184
185    for part in parts {
186        if let Some(urls_str) = part.strip_prefix("xsub=") {
187            xsub_endpoints = urls_str
188                .split(';')
189                .map(|s| s.trim().to_string())
190                .filter(|s| !s.is_empty())
191                .collect();
192        } else if let Some(urls_str) = part.strip_prefix("xpub=") {
193            xpub_endpoints = urls_str
194                .split(';')
195                .map(|s| s.trim().to_string())
196                .filter(|s| !s.is_empty())
197                .collect();
198        } else {
199            anyhow::bail!(
200                "Invalid broker URL part. Expected 'xsub=' or 'xpub=' prefix, got: {}",
201                part
202            );
203        }
204    }
205
206    if xsub_endpoints.is_empty() || xpub_endpoints.is_empty() {
207        anyhow::bail!(
208            "Broker URL must contain at least one xsub and one xpub endpoint. Got xsub={:?}, xpub={:?}",
209            xsub_endpoints,
210            xpub_endpoints
211        );
212    }
213
214    Ok((xsub_endpoints, xpub_endpoints))
215}
216
217/// Deduplicates events based on (publisher_id, sequence) tuple
218/// Required when connecting to multiple brokers in HA mode
219struct DeduplicatingStream {
220    inner: WireStream,
221    codec: Arc<Codec>,
222    seen_events: LruCache<(u64, u64), ()>, // (publisher_id, sequence) -> ()
223}
224
225impl DeduplicatingStream {
226    fn new(inner: WireStream, codec: Arc<Codec>, cache_size: usize) -> Self {
227        Self {
228            inner,
229            codec,
230            seen_events: LruCache::new(
231                NonZeroUsize::new(cache_size).expect("cache_size must be non-zero"),
232            ),
233        }
234    }
235}
236
237impl Stream for DeduplicatingStream {
238    type Item = Result<Bytes>;
239
240    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
241        loop {
242            match Pin::new(&mut self.inner).poll_next(cx) {
243                Poll::Ready(Some(Ok(bytes))) => {
244                    // Decode envelope to extract publisher_id and sequence
245                    match self.codec.decode_envelope(&bytes) {
246                        Ok(envelope) => {
247                            let key = (envelope.publisher_id, envelope.sequence);
248
249                            // Check if we've seen this event before
250                            if self.seen_events.contains(&key) {
251                                // Duplicate - skip and continue loop
252                                tracing::debug!(
253                                    publisher_id = envelope.publisher_id,
254                                    sequence = envelope.sequence,
255                                    "Filtered duplicate event from multi-broker setup"
256                                );
257                                continue;
258                            }
259
260                            // New event - record and return
261                            self.seen_events.put(key, ());
262                            return Poll::Ready(Some(Ok(bytes)));
263                        }
264                        Err(e) => {
265                            tracing::warn!(error = %e, "Failed to decode envelope for deduplication");
266                            return Poll::Ready(Some(Err(e)));
267                        }
268                    }
269                }
270                Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
271                Poll::Ready(None) => return Poll::Ready(None),
272                Poll::Pending => return Poll::Pending,
273            }
274        }
275    }
276}
277
278/// Event publisher for a specific topic.
279pub struct EventPublisher {
280    transport_kind: EventTransportKind,
281    scope: EventScope,
282    topic: String,
283    publisher_id: u64,
284    sequence: AtomicU64,
285    tx: Arc<dyn EventTransportTx>,
286    codec: Arc<Codec>,
287    runtime_handle: tokio::runtime::Handle,
288    /// Discovery client and registered instance for unregistration on drop
289    discovery_client: Option<Arc<dyn Discovery>>,
290    discovery_instance: Option<crate::discovery::DiscoveryInstance>,
291}
292
293impl EventPublisher {
294    /// Create a publisher for a component-scoped topic.
295    ///
296    /// The event transport is chosen automatically: if `DYN_EVENT_PLANE` is set that
297    /// value is used; otherwise the runtime's default is used (ZMQ for local backends
298    /// such as `file`/`mem`, NATS for distributed backends such as `etcd`/`kubernetes`).
299    /// Use [`for_component_with_transport`](Self::for_component_with_transport) to
300    /// override explicitly.
301    pub async fn for_component(comp: &Component, topic: impl Into<String>) -> Result<Self> {
302        let transport_kind = comp.drt().default_event_transport_kind();
303        Self::for_component_with_transport(comp, topic, transport_kind).await
304    }
305
306    /// Create a publisher with explicit transport.
307    pub async fn for_component_with_transport(
308        comp: &Component,
309        topic: impl Into<String>,
310        transport_kind: EventTransportKind,
311    ) -> Result<Self> {
312        let drt = comp.drt();
313        let scope = EventScope::Component {
314            namespace: comp.namespace().name(),
315            component: comp.name().to_string(),
316        };
317        Self::new_internal(drt, scope, topic.into(), transport_kind).await
318    }
319
320    /// Create a publisher for a namespace-scoped topic.
321    ///
322    /// The event transport is chosen automatically: if `DYN_EVENT_PLANE` is set that
323    /// value is used; otherwise the runtime's default is used (ZMQ for local backends
324    /// such as `file`/`mem`, NATS for distributed backends such as `etcd`/`kubernetes`).
325    /// Use [`for_namespace_with_transport`](Self::for_namespace_with_transport) to
326    /// override explicitly.
327    pub async fn for_namespace(ns: &Namespace, topic: impl Into<String>) -> Result<Self> {
328        let transport_kind = ns.drt().default_event_transport_kind();
329        Self::for_namespace_with_transport(ns, topic, transport_kind).await
330    }
331
332    /// Create a namespace publisher with explicit transport.
333    pub async fn for_namespace_with_transport(
334        ns: &Namespace,
335        topic: impl Into<String>,
336        transport_kind: EventTransportKind,
337    ) -> Result<Self> {
338        let drt = ns.drt();
339        let scope = EventScope::Namespace { name: ns.name() };
340        Self::new_internal(drt, scope, topic.into(), transport_kind).await
341    }
342
343    async fn new_internal(
344        drt: &DistributedRuntime,
345        scope: EventScope,
346        topic: String,
347        transport_kind: EventTransportKind,
348    ) -> Result<Self> {
349        let publisher_id = drt.discovery().instance_id();
350        let discovery = Some(drt.discovery());
351        let runtime_handle = drt.runtime().secondary();
352
353        // Use Msgpack codec for all transports
354        enum TransportSetup {
355            Nats(Arc<dyn EventTransportTx>, Arc<Codec>),
356            ZmqDirect(Arc<dyn EventTransportTx>, Arc<Codec>, String), // includes public endpoint
357            ZmqBroker(Arc<dyn EventTransportTx>, Arc<Codec>),
358        }
359
360        let transport_setup = match transport_kind {
361            EventTransportKind::Nats => {
362                let transport = Arc::new(nats_transport::NatsTransport::new(drt.clone()));
363                let codec = Arc::new(Codec::Msgpack(MsgpackCodec));
364                TransportSetup::Nats(transport as Arc<dyn EventTransportTx>, codec)
365            }
366            EventTransportKind::Zmq => {
367                // Check for broker mode
368                if let Some(broker) = resolve_zmq_broker(drt, &scope).await? {
369                    // BROKER MODE: Connect to broker (single or multiple endpoints)
370                    let pub_transport = if broker.xsub_endpoints.len() == 1 {
371                        zmq_transport::ZmqPubTransport::connect(&broker.xsub_endpoints[0], &topic)
372                            .await?
373                    } else {
374                        zmq_transport::ZmqPubTransport::connect_multiple(
375                            &broker.xsub_endpoints,
376                            &topic,
377                        )
378                        .await?
379                    };
380
381                    let codec = Arc::new(Codec::Msgpack(MsgpackCodec));
382                    TransportSetup::ZmqBroker(
383                        Arc::new(pub_transport) as Arc<dyn EventTransportTx>,
384                        codec,
385                    )
386                } else {
387                    // DIRECT MODE: Bind PUB socket
388                    let (pub_transport, actual_bind_endpoint) = std::thread::spawn({
389                        let topic = topic.clone();
390                        move || {
391                            let rt = tokio::runtime::Builder::new_current_thread()
392                                .enable_all()
393                                .build()
394                                .expect("Failed to create Tokio runtime for ZMQ");
395
396                            rt.block_on(async move {
397                                zmq_transport::ZmqPubTransport::bind("tcp://0.0.0.0:0", &topic)
398                                    .await
399                                    .expect("Failed to bind ZMQ publisher")
400                            })
401                        }
402                    })
403                    .join()
404                    .expect("Failed to join ZMQ initialization thread");
405
406                    // Get local IP for public endpoint
407                    let actual_port: u16 = actual_bind_endpoint
408                        .rsplit(':')
409                        .next()
410                        .and_then(|s| s.parse().ok())
411                        .expect("Failed to parse port from bind endpoint");
412                    let local_ip = get_local_ip_for_advertise();
413                    let public_endpoint = format!("tcp://{}:{}", local_ip, actual_port);
414
415                    let codec = Arc::new(Codec::Msgpack(MsgpackCodec));
416                    TransportSetup::ZmqDirect(
417                        Arc::new(pub_transport) as Arc<dyn EventTransportTx>,
418                        codec,
419                        public_endpoint,
420                    )
421                }
422            }
423        };
424
425        // Extract transport and codec, and register if needed
426        let (tx, codec, discovery_instance) = match transport_setup {
427            TransportSetup::Nats(tx, codec) => {
428                let transport_config = EventTransport::nats(scope.subject_prefix());
429                let spec = DiscoverySpec::EventChannel {
430                    namespace: scope.namespace().to_string(),
431                    component: scope.component().unwrap_or("").to_string(),
432                    topic: topic.clone(),
433                    transport: transport_config,
434                };
435
436                let registered_instance = drt.discovery().register(spec).await?;
437                tracing::info!(
438                    topic = %topic,
439                    transport = ?transport_kind,
440                    instance_id = %registered_instance.instance_id(),
441                    "EventPublisher registered with discovery"
442                );
443                (tx, codec, Some(registered_instance))
444            }
445            TransportSetup::ZmqDirect(tx, codec, public_endpoint) => {
446                let transport_config = EventTransport::zmq(public_endpoint);
447                let spec = DiscoverySpec::EventChannel {
448                    namespace: scope.namespace().to_string(),
449                    component: scope.component().unwrap_or("").to_string(),
450                    topic: topic.clone(),
451                    transport: transport_config,
452                };
453
454                let registered_instance = drt.discovery().register(spec).await?;
455                tracing::info!(
456                    topic = %topic,
457                    transport = ?transport_kind,
458                    instance_id = %registered_instance.instance_id(),
459                    "EventPublisher registered with discovery (direct mode)"
460                );
461                (tx, codec, Some(registered_instance))
462            }
463            TransportSetup::ZmqBroker(tx, codec) => {
464                tracing::info!(
465                    topic = %topic,
466                    transport = ?transport_kind,
467                    "EventPublisher in broker mode - skipping discovery registration"
468                );
469                (tx, codec, None)
470            }
471        };
472
473        Ok(Self {
474            transport_kind,
475            scope,
476            topic,
477            publisher_id,
478            sequence: AtomicU64::new(0),
479            tx,
480            codec,
481            runtime_handle,
482            discovery_client: discovery,
483            discovery_instance,
484        })
485    }
486
487    /// Publish a serializable event.
488    pub async fn publish<T: Serialize + Send + Sync>(&self, event: &T) -> Result<()> {
489        let payload = self.codec.encode_payload(event)?;
490        self.publish_bytes(payload.to_vec()).await
491    }
492
493    /// Publish raw bytes.
494    pub async fn publish_bytes(&self, bytes: Vec<u8>) -> Result<()> {
495        let envelope = EventEnvelope {
496            publisher_id: self.publisher_id,
497            sequence: self.sequence.fetch_add(1, Ordering::SeqCst),
498            published_at: current_timestamp_ms(),
499            topic: self.topic.clone(),
500            payload: Bytes::from(bytes),
501        };
502
503        let envelope_bytes = self.codec.encode_envelope(&envelope)?;
504        let subject = format!("{}.{}", self.scope.subject_prefix(), self.topic);
505
506        self.tx.publish(&subject, envelope_bytes).await
507    }
508
509    /// Get the publisher ID.
510    pub fn publisher_id(&self) -> u64 {
511        self.publisher_id
512    }
513
514    /// Get the topic.
515    pub fn topic(&self) -> &str {
516        &self.topic
517    }
518
519    /// Get the transport kind.
520    pub fn transport_kind(&self) -> EventTransportKind {
521        self.transport_kind
522    }
523}
524
525impl Drop for EventPublisher {
526    fn drop(&mut self) {
527        // Unregister from discovery on drop
528        if let (Some(discovery), Some(instance)) =
529            (self.discovery_client.take(), self.discovery_instance.take())
530        {
531            let topic = self.topic.clone();
532            let instance_id = instance.instance_id();
533            let runtime_handle = self.runtime_handle.clone();
534
535            // Drop can run outside any Tokio context (notably via PyO3 finalizers), so use
536            // the runtime that created the publisher rather than the ambient thread state.
537            let spawn_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
538                runtime_handle.spawn(async move {
539                    match discovery.unregister(instance).await {
540                        Ok(()) => {
541                            tracing::info!(
542                                topic = %topic,
543                                instance_id = %instance_id,
544                                "EventPublisher unregistered from discovery"
545                            );
546                        }
547                        Err(e) => {
548                            tracing::warn!(
549                                topic = %topic,
550                                instance_id = %instance_id,
551                                error = %e,
552                                "Failed to unregister EventPublisher from discovery"
553                            );
554                        }
555                    }
556                });
557            }));
558
559            if spawn_result.is_err() {
560                tracing::warn!(
561                    topic = %self.topic,
562                    instance_id = %instance_id,
563                    "Skipping EventPublisher unregister during drop because the runtime is unavailable"
564                );
565            }
566        }
567    }
568}
569
570/// Event subscriber for a specific topic.
571pub struct EventSubscriber {
572    stream: EventStream,
573    #[allow(dead_code)]
574    scope: EventScope,
575    #[allow(dead_code)]
576    topic: String,
577    codec: Arc<Codec>,
578}
579
580impl EventSubscriber {
581    /// Create a subscriber for a component-scoped topic.
582    ///
583    /// The event transport is chosen automatically: if `DYN_EVENT_PLANE` is set that
584    /// value is used; otherwise the runtime's default is used (ZMQ for local backends
585    /// such as `file`/`mem`, NATS for distributed backends such as `etcd`/`kubernetes`).
586    /// Use [`for_component_with_transport`](Self::for_component_with_transport) to
587    /// override explicitly.
588    pub async fn for_component(comp: &Component, topic: impl Into<String>) -> Result<Self> {
589        let transport_kind = comp.drt().default_event_transport_kind();
590        Self::for_component_with_transport(comp, topic, transport_kind).await
591    }
592
593    /// Create a subscriber with explicit transport.
594    pub async fn for_component_with_transport(
595        comp: &Component,
596        topic: impl Into<String>,
597        transport_kind: EventTransportKind,
598    ) -> Result<Self> {
599        let drt = comp.drt();
600        let scope = EventScope::Component {
601            namespace: comp.namespace().name(),
602            component: comp.name().to_string(),
603        };
604        Self::new_internal(drt, scope, topic.into(), transport_kind).await
605    }
606
607    /// Create a subscriber for a namespace-scoped topic.
608    ///
609    /// The event transport is chosen automatically: if `DYN_EVENT_PLANE` is set that
610    /// value is used; otherwise the runtime's default is used (ZMQ for local backends
611    /// such as `file`/`mem`, NATS for distributed backends such as `etcd`/`kubernetes`).
612    /// Use [`for_namespace_with_transport`](Self::for_namespace_with_transport) to
613    /// override explicitly.
614    pub async fn for_namespace(ns: &Namespace, topic: impl Into<String>) -> Result<Self> {
615        let transport_kind = ns.drt().default_event_transport_kind();
616        Self::for_namespace_with_transport(ns, topic, transport_kind).await
617    }
618
619    /// Create a namespace subscriber with explicit transport.
620    pub async fn for_namespace_with_transport(
621        ns: &Namespace,
622        topic: impl Into<String>,
623        transport_kind: EventTransportKind,
624    ) -> Result<Self> {
625        let drt = ns.drt();
626        let scope = EventScope::Namespace { name: ns.name() };
627        Self::new_internal(drt, scope, topic.into(), transport_kind).await
628    }
629
630    async fn new_internal(
631        drt: &DistributedRuntime,
632        scope: EventScope,
633        topic: String,
634        transport_kind: EventTransportKind,
635    ) -> Result<Self> {
636        let discovery = drt.discovery();
637
638        // Use Msgpack codec for all transports
639        let (wire_stream, codec): (WireStream, Arc<Codec>) = match transport_kind {
640            EventTransportKind::Nats => {
641                let transport = nats_transport::NatsTransport::new(drt.clone());
642                let subject = format!("{}.{}", scope.subject_prefix(), topic);
643                let stream = transport.subscribe(&subject).await?;
644                let codec = Arc::new(Codec::Msgpack(MsgpackCodec));
645                (stream, codec)
646            }
647            EventTransportKind::Zmq => {
648                // Check for broker mode
649                if let Some(broker) = resolve_zmq_broker(drt, &scope).await? {
650                    // BROKER MODE: Connect to broker's XPUB (single or multiple endpoints)
651                    let codec = Arc::new(Codec::Msgpack(MsgpackCodec));
652
653                    let stream: WireStream = if broker.xpub_endpoints.len() == 1 {
654                        // Single broker - no deduplication needed
655                        let sub_transport = zmq_transport::ZmqSubTransport::connect_broker(
656                            &broker.xpub_endpoints[0],
657                            &topic,
658                        )
659                        .await?;
660                        sub_transport.subscribe(&topic).await?
661                    } else {
662                        // Multiple brokers - need deduplication
663                        let sub_transport =
664                            zmq_transport::ZmqSubTransport::connect_broker_multiple(
665                                &broker.xpub_endpoints,
666                                &topic,
667                            )
668                            .await?;
669                        let inner_stream = sub_transport.subscribe(&topic).await?;
670
671                        // Wrap with deduplication (default cache size: 100,000 entries)
672                        Box::pin(DeduplicatingStream::new(
673                            inner_stream,
674                            codec.clone(),
675                            100_000,
676                        ))
677                    };
678
679                    (stream, codec)
680                } else {
681                    // DIRECT MODE: Use dynamic subscriber to discover and connect to publishers
682                    let query = match &scope {
683                        EventScope::Namespace { name } => {
684                            crate::discovery::DiscoveryQuery::EventChannels(
685                                crate::discovery::EventChannelQuery::namespace(name.clone()),
686                            )
687                        }
688                        EventScope::Component {
689                            namespace,
690                            component,
691                        } => crate::discovery::DiscoveryQuery::EventChannels(
692                            crate::discovery::EventChannelQuery::topic(
693                                namespace.clone(),
694                                component.clone(),
695                                topic.clone(),
696                            ),
697                        ),
698                    };
699
700                    let subscriber =
701                        Arc::new(DynamicSubscriber::new(discovery, query, topic.clone()));
702
703                    let stream = subscriber.start_zmq().await?;
704                    let codec = Arc::new(Codec::Msgpack(MsgpackCodec));
705                    (stream, codec)
706                }
707            }
708        };
709
710        // Filter by topic and decode envelopes
711        let topic_filter = topic.clone();
712        let codec_for_stream = codec.clone();
713        let stream = wire_stream.filter_map(move |result| {
714            let codec = codec_for_stream.clone();
715            let topic_filter = topic_filter.clone();
716            async move {
717                match result {
718                    Ok(bytes) => match codec.decode_envelope(&bytes) {
719                        Ok(envelope) => {
720                            // Filter by topic for transports that don't support native filtering
721                            if envelope.topic == topic_filter {
722                                Some(Ok(envelope))
723                            } else {
724                                None
725                            }
726                        }
727                        Err(e) => Some(Err(e)),
728                    },
729                    Err(e) => Some(Err(e)),
730                }
731            }
732        });
733
734        tracing::info!(
735            topic = %topic,
736            transport = ?transport_kind,
737            "EventSubscriber created"
738        );
739
740        Ok(Self {
741            stream: Box::pin(stream),
742            scope,
743            topic,
744            codec,
745        })
746    }
747
748    /// Get the next event envelope.
749    pub async fn next(&mut self) -> Option<Result<EventEnvelope>> {
750        self.stream.next().await
751    }
752
753    /// Subscribe with automatic deserialization.
754    pub fn typed<T: DeserializeOwned + Send + 'static>(self) -> TypedEventSubscriber<T> {
755        TypedEventSubscriber {
756            stream: self.stream,
757            codec: self.codec,
758            _marker: std::marker::PhantomData,
759        }
760    }
761}
762
763/// Typed event subscriber that deserializes payloads.
764pub struct TypedEventSubscriber<T> {
765    stream: EventStream,
766    codec: Arc<Codec>,
767    _marker: std::marker::PhantomData<T>,
768}
769
770impl<T: DeserializeOwned + Send + 'static> TypedEventSubscriber<T> {
771    /// Get the next typed event with its envelope.
772    pub async fn next(&mut self) -> Option<Result<(EventEnvelope, T)>> {
773        let envelope = self.stream.next().await?;
774        match envelope {
775            Ok(env) => match self.codec.decode_payload(&env.payload) {
776                Ok(typed) => Some(Ok((env, typed))),
777                Err(e) => Some(Err(e)),
778            },
779            Err(e) => Some(Err(e)),
780        }
781    }
782}
783
784/// Get current timestamp in milliseconds since Unix epoch.
785fn current_timestamp_ms() -> u64 {
786    SystemTime::now()
787        .duration_since(UNIX_EPOCH)
788        .map(|d| d.as_millis() as u64)
789        .unwrap_or(0)
790}
791
792#[cfg(test)]
793mod tests {
794    use super::*;
795    use crate::config::environment_names::event_plane as env;
796
797    #[test]
798    fn test_event_scope_subject_prefix() {
799        let ns_scope = EventScope::Namespace {
800            name: "test-ns".to_string(),
801        };
802        assert_eq!(ns_scope.subject_prefix(), "namespace.test-ns");
803
804        let comp_scope = EventScope::Component {
805            namespace: "test-ns".to_string(),
806            component: "test-comp".to_string(),
807        };
808        assert_eq!(
809            comp_scope.subject_prefix(),
810            "namespace.test-ns.component.test-comp"
811        );
812    }
813
814    #[test]
815    fn test_event_scope_accessors() {
816        let ns_scope = EventScope::Namespace {
817            name: "my-ns".to_string(),
818        };
819        assert_eq!(ns_scope.namespace(), "my-ns");
820        assert_eq!(ns_scope.component(), None);
821
822        let comp_scope = EventScope::Component {
823            namespace: "my-ns".to_string(),
824            component: "my-comp".to_string(),
825        };
826        assert_eq!(comp_scope.namespace(), "my-ns");
827        assert_eq!(comp_scope.component(), Some("my-comp"));
828    }
829
830    #[test]
831    fn test_timestamp_generation() {
832        let ts = current_timestamp_ms();
833
834        // Should be after Jan 1, 2020 (1577836800000) and before Jan 1, 2100 (4102444800000)
835        assert!(ts > 1577836800000, "Timestamp should be after 2020");
836        assert!(ts < 4102444800000, "Timestamp should be before 2100");
837    }
838
839    #[test]
840    fn test_event_envelope_serde() {
841        let envelope = EventEnvelope {
842            publisher_id: 42,
843            sequence: 10,
844            published_at: 1700000000000,
845            topic: "test-topic".to_string(),
846            payload: Bytes::from("test data"),
847        };
848
849        let json = serde_json::to_string(&envelope).expect("serialize");
850        let deserialized: EventEnvelope = serde_json::from_str(&json).expect("deserialize");
851
852        assert_eq!(deserialized.publisher_id, 42);
853        assert_eq!(deserialized.sequence, 10);
854        assert_eq!(deserialized.published_at, 1700000000000);
855        assert_eq!(deserialized.topic, "test-topic");
856        assert_eq!(deserialized.payload, Bytes::from("test data"));
857    }
858}