1mod codec;
7mod dynamic_subscriber;
8mod frame;
9mod nats_transport;
10mod traits;
11mod transport;
12pub mod zmq_transport;
13
14pub use codec::{Codec, MsgpackCodec};
15pub use dynamic_subscriber::DynamicSubscriber;
16pub use frame::{FRAME_HEADER_SIZE, FRAME_VERSION, Frame, FrameError, FrameHeader};
17pub use traits::{EventEnvelope, EventStream, TypedEventStream};
18pub use transport::{EventTransportRx, EventTransportTx, WireStream};
19pub use zmq_transport::{ZmqPubTransport, ZmqSubTransport};
20
21pub use crate::discovery::EventTransportKind;
23
24use std::num::NonZeroUsize;
25use std::sync::Arc;
26use std::sync::atomic::{AtomicU64, Ordering};
27use std::time::{SystemTime, UNIX_EPOCH};
28
29use anyhow::Result;
30use bytes::Bytes;
31use futures::{Stream, StreamExt};
32use lru::LruCache;
33use serde::Serialize;
34use serde::de::DeserializeOwned;
35use std::pin::Pin;
36use std::task::{Context, Poll};
37
38use crate::DistributedRuntime;
39use crate::component::{Component, Namespace};
40use crate::discovery::{
41 Discovery, DiscoveryInstance, DiscoveryQuery, DiscoverySpec, EventChannelQuery, EventTransport,
42};
43use crate::traits::DistributedRuntimeProvider;
44use crate::utils::ip_resolver::get_local_ip_for_advertise;
45
46#[derive(Debug, Clone)]
48pub enum EventScope {
49 Namespace { name: String },
51 Component {
53 namespace: String,
54 component: String,
55 },
56}
57
58impl EventScope {
59 pub fn subject_prefix(&self) -> String {
61 match self {
62 EventScope::Namespace { name } => format!("namespace.{}", name),
63 EventScope::Component {
64 namespace,
65 component,
66 } => {
67 format!("namespace.{}.component.{}", namespace, component)
68 }
69 }
70 }
71
72 pub fn namespace(&self) -> &str {
74 match self {
75 EventScope::Namespace { name } => name,
76 EventScope::Component { namespace, .. } => namespace,
77 }
78 }
79
80 pub fn component(&self) -> Option<&str> {
82 match self {
83 EventScope::Namespace { .. } => None,
84 EventScope::Component { component, .. } => Some(component),
85 }
86 }
87}
88
89#[derive(Debug, Clone)]
95struct BrokerEndpoints {
96 xsub_endpoints: Vec<String>,
97 xpub_endpoints: Vec<String>,
98}
99
100async fn resolve_zmq_broker(
103 drt: &DistributedRuntime,
104 scope: &EventScope,
105) -> Result<Option<BrokerEndpoints>> {
106 if let Ok(broker_url) =
108 std::env::var(crate::config::environment_names::zmq_broker::DYN_ZMQ_BROKER_URL)
109 {
110 let (xsub_endpoints, xpub_endpoints) = parse_broker_url(&broker_url)?;
111 tracing::info!(
112 num_xsub = xsub_endpoints.len(),
113 num_xpub = xpub_endpoints.len(),
114 "Using explicit ZMQ broker URL"
115 );
116 return Ok(Some(BrokerEndpoints {
117 xsub_endpoints,
118 xpub_endpoints,
119 }));
120 }
121
122 if std::env::var(crate::config::environment_names::zmq_broker::DYN_ZMQ_BROKER_ENABLED)
124 .unwrap_or_default()
125 == "true"
126 {
127 let query = DiscoveryQuery::EventChannels(EventChannelQuery::component(
128 scope.namespace().to_string(),
129 "zmq_broker".to_string(),
130 ));
131
132 let instances = drt.discovery().list(query).await?;
133
134 let mut xsub_endpoints = Vec::new();
136 let mut xpub_endpoints = Vec::new();
137
138 for instance in instances {
139 if let DiscoveryInstance::EventChannel { transport, .. } = instance
140 && let EventTransport::ZmqBroker {
141 xsub_endpoints: xsubs,
142 xpub_endpoints: xpubs,
143 } = transport
144 {
145 xsub_endpoints.extend(xsubs);
146 xpub_endpoints.extend(xpubs);
147 }
148 }
149
150 if xsub_endpoints.is_empty() {
151 anyhow::bail!(
152 "DYN_ZMQ_BROKER_ENABLED=true but no broker found in discovery for namespace '{}'",
153 scope.namespace()
154 );
155 }
156
157 tracing::info!(
158 num_brokers = xsub_endpoints.len(),
159 "Discovered ZMQ brokers from discovery plane"
160 );
161
162 return Ok(Some(BrokerEndpoints {
163 xsub_endpoints,
164 xpub_endpoints,
165 }));
166 }
167
168 Ok(None)
170}
171
172fn parse_broker_url(url: &str) -> Result<(Vec<String>, Vec<String>)> {
174 let parts: Vec<&str> = url.split(',').map(|s| s.trim()).collect();
175 if parts.len() != 2 {
176 anyhow::bail!(
177 "Invalid broker URL format. Expected 'xsub=<urls> , xpub=<urls>', got: {}",
178 url
179 );
180 }
181
182 let mut xsub_endpoints = Vec::new();
183 let mut xpub_endpoints = Vec::new();
184
185 for part in parts {
186 if let Some(urls_str) = part.strip_prefix("xsub=") {
187 xsub_endpoints = urls_str
188 .split(';')
189 .map(|s| s.trim().to_string())
190 .filter(|s| !s.is_empty())
191 .collect();
192 } else if let Some(urls_str) = part.strip_prefix("xpub=") {
193 xpub_endpoints = urls_str
194 .split(';')
195 .map(|s| s.trim().to_string())
196 .filter(|s| !s.is_empty())
197 .collect();
198 } else {
199 anyhow::bail!(
200 "Invalid broker URL part. Expected 'xsub=' or 'xpub=' prefix, got: {}",
201 part
202 );
203 }
204 }
205
206 if xsub_endpoints.is_empty() || xpub_endpoints.is_empty() {
207 anyhow::bail!(
208 "Broker URL must contain at least one xsub and one xpub endpoint. Got xsub={:?}, xpub={:?}",
209 xsub_endpoints,
210 xpub_endpoints
211 );
212 }
213
214 Ok((xsub_endpoints, xpub_endpoints))
215}
216
217struct DeduplicatingStream {
220 inner: WireStream,
221 codec: Arc<Codec>,
222 seen_events: LruCache<(u64, u64), ()>, }
224
225impl DeduplicatingStream {
226 fn new(inner: WireStream, codec: Arc<Codec>, cache_size: usize) -> Self {
227 Self {
228 inner,
229 codec,
230 seen_events: LruCache::new(
231 NonZeroUsize::new(cache_size).expect("cache_size must be non-zero"),
232 ),
233 }
234 }
235}
236
237impl Stream for DeduplicatingStream {
238 type Item = Result<Bytes>;
239
240 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
241 loop {
242 match Pin::new(&mut self.inner).poll_next(cx) {
243 Poll::Ready(Some(Ok(bytes))) => {
244 match self.codec.decode_envelope(&bytes) {
246 Ok(envelope) => {
247 let key = (envelope.publisher_id, envelope.sequence);
248
249 if self.seen_events.contains(&key) {
251 tracing::debug!(
253 publisher_id = envelope.publisher_id,
254 sequence = envelope.sequence,
255 "Filtered duplicate event from multi-broker setup"
256 );
257 continue;
258 }
259
260 self.seen_events.put(key, ());
262 return Poll::Ready(Some(Ok(bytes)));
263 }
264 Err(e) => {
265 tracing::warn!(error = %e, "Failed to decode envelope for deduplication");
266 return Poll::Ready(Some(Err(e)));
267 }
268 }
269 }
270 Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
271 Poll::Ready(None) => return Poll::Ready(None),
272 Poll::Pending => return Poll::Pending,
273 }
274 }
275 }
276}
277
278pub struct EventPublisher {
280 transport_kind: EventTransportKind,
281 scope: EventScope,
282 topic: String,
283 publisher_id: u64,
284 sequence: AtomicU64,
285 tx: Arc<dyn EventTransportTx>,
286 codec: Arc<Codec>,
287 discovery_client: Option<Arc<dyn Discovery>>,
289 discovery_instance: Option<crate::discovery::DiscoveryInstance>,
290}
291
292impl EventPublisher {
293 pub async fn for_component(comp: &Component, topic: impl Into<String>) -> Result<Self> {
295 Self::for_component_with_transport(comp, topic, EventTransportKind::from_env_or_default())
296 .await
297 }
298
299 pub async fn for_component_with_transport(
301 comp: &Component,
302 topic: impl Into<String>,
303 transport_kind: EventTransportKind,
304 ) -> Result<Self> {
305 let drt = comp.drt();
306 let scope = EventScope::Component {
307 namespace: comp.namespace().name(),
308 component: comp.name().to_string(),
309 };
310 Self::new_internal(drt, scope, topic.into(), transport_kind).await
311 }
312
313 pub async fn for_namespace(ns: &Namespace, topic: impl Into<String>) -> Result<Self> {
315 Self::for_namespace_with_transport(ns, topic, EventTransportKind::from_env_or_default())
316 .await
317 }
318
319 pub async fn for_namespace_with_transport(
321 ns: &Namespace,
322 topic: impl Into<String>,
323 transport_kind: EventTransportKind,
324 ) -> Result<Self> {
325 let drt = ns.drt();
326 let scope = EventScope::Namespace { name: ns.name() };
327 Self::new_internal(drt, scope, topic.into(), transport_kind).await
328 }
329
330 async fn new_internal(
331 drt: &DistributedRuntime,
332 scope: EventScope,
333 topic: String,
334 transport_kind: EventTransportKind,
335 ) -> Result<Self> {
336 let publisher_id = drt.discovery().instance_id();
337 let discovery = Some(drt.discovery());
338
339 enum TransportSetup {
341 Nats(Arc<dyn EventTransportTx>, Arc<Codec>),
342 ZmqDirect(Arc<dyn EventTransportTx>, Arc<Codec>, String), ZmqBroker(Arc<dyn EventTransportTx>, Arc<Codec>),
344 }
345
346 let transport_setup = match transport_kind {
347 EventTransportKind::Nats => {
348 let transport = Arc::new(nats_transport::NatsTransport::new(drt.clone()));
349 let codec = Arc::new(Codec::Msgpack(MsgpackCodec));
350 TransportSetup::Nats(transport as Arc<dyn EventTransportTx>, codec)
351 }
352 EventTransportKind::Zmq => {
353 if let Some(broker) = resolve_zmq_broker(drt, &scope).await? {
355 let pub_transport = if broker.xsub_endpoints.len() == 1 {
357 zmq_transport::ZmqPubTransport::connect(&broker.xsub_endpoints[0], &topic)
358 .await?
359 } else {
360 zmq_transport::ZmqPubTransport::connect_multiple(
361 &broker.xsub_endpoints,
362 &topic,
363 )
364 .await?
365 };
366
367 let codec = Arc::new(Codec::Msgpack(MsgpackCodec));
368 TransportSetup::ZmqBroker(
369 Arc::new(pub_transport) as Arc<dyn EventTransportTx>,
370 codec,
371 )
372 } else {
373 let (pub_transport, actual_bind_endpoint) = std::thread::spawn({
375 let topic = topic.clone();
376 move || {
377 let rt = tokio::runtime::Builder::new_current_thread()
378 .enable_all()
379 .build()
380 .expect("Failed to create Tokio runtime for ZMQ");
381
382 rt.block_on(async move {
383 zmq_transport::ZmqPubTransport::bind("tcp://0.0.0.0:0", &topic)
384 .await
385 .expect("Failed to bind ZMQ publisher")
386 })
387 }
388 })
389 .join()
390 .expect("Failed to join ZMQ initialization thread");
391
392 let actual_port: u16 = actual_bind_endpoint
394 .rsplit(':')
395 .next()
396 .and_then(|s| s.parse().ok())
397 .expect("Failed to parse port from bind endpoint");
398 let local_ip = get_local_ip_for_advertise();
399 let public_endpoint = format!("tcp://{}:{}", local_ip, actual_port);
400
401 let codec = Arc::new(Codec::Msgpack(MsgpackCodec));
402 TransportSetup::ZmqDirect(
403 Arc::new(pub_transport) as Arc<dyn EventTransportTx>,
404 codec,
405 public_endpoint,
406 )
407 }
408 }
409 };
410
411 let (tx, codec, discovery_instance) = match transport_setup {
413 TransportSetup::Nats(tx, codec) => {
414 let transport_config = EventTransport::nats(scope.subject_prefix());
415 let spec = DiscoverySpec::EventChannel {
416 namespace: scope.namespace().to_string(),
417 component: scope.component().unwrap_or("").to_string(),
418 topic: topic.clone(),
419 transport: transport_config,
420 };
421
422 let registered_instance = drt.discovery().register(spec).await?;
423 tracing::info!(
424 topic = %topic,
425 transport = ?transport_kind,
426 instance_id = %registered_instance.instance_id(),
427 "EventPublisher registered with discovery"
428 );
429 (tx, codec, Some(registered_instance))
430 }
431 TransportSetup::ZmqDirect(tx, codec, public_endpoint) => {
432 let transport_config = EventTransport::zmq(public_endpoint);
433 let spec = DiscoverySpec::EventChannel {
434 namespace: scope.namespace().to_string(),
435 component: scope.component().unwrap_or("").to_string(),
436 topic: topic.clone(),
437 transport: transport_config,
438 };
439
440 let registered_instance = drt.discovery().register(spec).await?;
441 tracing::info!(
442 topic = %topic,
443 transport = ?transport_kind,
444 instance_id = %registered_instance.instance_id(),
445 "EventPublisher registered with discovery (direct mode)"
446 );
447 (tx, codec, Some(registered_instance))
448 }
449 TransportSetup::ZmqBroker(tx, codec) => {
450 tracing::info!(
451 topic = %topic,
452 transport = ?transport_kind,
453 "EventPublisher in broker mode - skipping discovery registration"
454 );
455 (tx, codec, None)
456 }
457 };
458
459 Ok(Self {
460 transport_kind,
461 scope,
462 topic,
463 publisher_id,
464 sequence: AtomicU64::new(0),
465 tx,
466 codec,
467 discovery_client: discovery,
468 discovery_instance,
469 })
470 }
471
472 pub async fn publish<T: Serialize + Send + Sync>(&self, event: &T) -> Result<()> {
474 let payload = self.codec.encode_payload(event)?;
475 self.publish_bytes(payload.to_vec()).await
476 }
477
478 pub async fn publish_bytes(&self, bytes: Vec<u8>) -> Result<()> {
480 let envelope = EventEnvelope {
481 publisher_id: self.publisher_id,
482 sequence: self.sequence.fetch_add(1, Ordering::SeqCst),
483 published_at: current_timestamp_ms(),
484 topic: self.topic.clone(),
485 payload: Bytes::from(bytes),
486 };
487
488 let envelope_bytes = self.codec.encode_envelope(&envelope)?;
489 let subject = format!("{}.{}", self.scope.subject_prefix(), self.topic);
490
491 self.tx.publish(&subject, envelope_bytes).await
492 }
493
494 pub fn publisher_id(&self) -> u64 {
496 self.publisher_id
497 }
498
499 pub fn topic(&self) -> &str {
501 &self.topic
502 }
503
504 pub fn transport_kind(&self) -> EventTransportKind {
506 self.transport_kind
507 }
508}
509
510impl Drop for EventPublisher {
511 fn drop(&mut self) {
512 if let (Some(discovery), Some(instance)) =
514 (self.discovery_client.take(), self.discovery_instance.take())
515 {
516 let topic = self.topic.clone();
517 let instance_id = instance.instance_id();
518
519 tokio::spawn(async move {
521 match discovery.unregister(instance).await {
522 Ok(()) => {
523 tracing::info!(
524 topic = %topic,
525 instance_id = %instance_id,
526 "EventPublisher unregistered from discovery"
527 );
528 }
529 Err(e) => {
530 tracing::warn!(
531 topic = %topic,
532 instance_id = %instance_id,
533 error = %e,
534 "Failed to unregister EventPublisher from discovery"
535 );
536 }
537 }
538 });
539 }
540 }
541}
542
543pub struct EventSubscriber {
545 stream: EventStream,
546 #[allow(dead_code)]
547 scope: EventScope,
548 #[allow(dead_code)]
549 topic: String,
550 codec: Arc<Codec>,
551}
552
553impl EventSubscriber {
554 pub async fn for_component(comp: &Component, topic: impl Into<String>) -> Result<Self> {
556 Self::for_component_with_transport(comp, topic, EventTransportKind::from_env_or_default())
557 .await
558 }
559
560 pub async fn for_component_with_transport(
562 comp: &Component,
563 topic: impl Into<String>,
564 transport_kind: EventTransportKind,
565 ) -> Result<Self> {
566 let drt = comp.drt();
567 let scope = EventScope::Component {
568 namespace: comp.namespace().name(),
569 component: comp.name().to_string(),
570 };
571 Self::new_internal(drt, scope, topic.into(), transport_kind).await
572 }
573
574 pub async fn for_namespace(ns: &Namespace, topic: impl Into<String>) -> Result<Self> {
576 Self::for_namespace_with_transport(ns, topic, EventTransportKind::from_env_or_default())
577 .await
578 }
579
580 pub async fn for_namespace_with_transport(
582 ns: &Namespace,
583 topic: impl Into<String>,
584 transport_kind: EventTransportKind,
585 ) -> Result<Self> {
586 let drt = ns.drt();
587 let scope = EventScope::Namespace { name: ns.name() };
588 Self::new_internal(drt, scope, topic.into(), transport_kind).await
589 }
590
591 async fn new_internal(
592 drt: &DistributedRuntime,
593 scope: EventScope,
594 topic: String,
595 transport_kind: EventTransportKind,
596 ) -> Result<Self> {
597 let discovery = drt.discovery();
598
599 let (wire_stream, codec): (WireStream, Arc<Codec>) = match transport_kind {
601 EventTransportKind::Nats => {
602 let transport = nats_transport::NatsTransport::new(drt.clone());
603 let subject = format!("{}.{}", scope.subject_prefix(), topic);
604 let stream = transport.subscribe(&subject).await?;
605 let codec = Arc::new(Codec::Msgpack(MsgpackCodec));
606 (stream, codec)
607 }
608 EventTransportKind::Zmq => {
609 if let Some(broker) = resolve_zmq_broker(drt, &scope).await? {
611 let codec = Arc::new(Codec::Msgpack(MsgpackCodec));
613
614 let stream: WireStream = if broker.xpub_endpoints.len() == 1 {
615 let sub_transport = zmq_transport::ZmqSubTransport::connect_broker(
617 &broker.xpub_endpoints[0],
618 &topic,
619 )
620 .await?;
621 sub_transport.subscribe(&topic).await?
622 } else {
623 let sub_transport =
625 zmq_transport::ZmqSubTransport::connect_broker_multiple(
626 &broker.xpub_endpoints,
627 &topic,
628 )
629 .await?;
630 let inner_stream = sub_transport.subscribe(&topic).await?;
631
632 Box::pin(DeduplicatingStream::new(
634 inner_stream,
635 codec.clone(),
636 100_000,
637 ))
638 };
639
640 (stream, codec)
641 } else {
642 let query = match &scope {
644 EventScope::Namespace { name } => {
645 crate::discovery::DiscoveryQuery::EventChannels(
646 crate::discovery::EventChannelQuery::namespace(name.clone()),
647 )
648 }
649 EventScope::Component {
650 namespace,
651 component,
652 } => crate::discovery::DiscoveryQuery::EventChannels(
653 crate::discovery::EventChannelQuery::topic(
654 namespace.clone(),
655 component.clone(),
656 topic.clone(),
657 ),
658 ),
659 };
660
661 let subscriber =
662 Arc::new(DynamicSubscriber::new(discovery, query, topic.clone()));
663
664 let stream = subscriber.start_zmq().await?;
665 let codec = Arc::new(Codec::Msgpack(MsgpackCodec));
666 (stream, codec)
667 }
668 }
669 };
670
671 let topic_filter = topic.clone();
673 let codec_for_stream = codec.clone();
674 let stream = wire_stream.filter_map(move |result| {
675 let codec = codec_for_stream.clone();
676 let topic_filter = topic_filter.clone();
677 async move {
678 match result {
679 Ok(bytes) => match codec.decode_envelope(&bytes) {
680 Ok(envelope) => {
681 if envelope.topic == topic_filter {
683 Some(Ok(envelope))
684 } else {
685 None
686 }
687 }
688 Err(e) => Some(Err(e)),
689 },
690 Err(e) => Some(Err(e)),
691 }
692 }
693 });
694
695 tracing::info!(
696 topic = %topic,
697 transport = ?transport_kind,
698 "EventSubscriber created"
699 );
700
701 Ok(Self {
702 stream: Box::pin(stream),
703 scope,
704 topic,
705 codec,
706 })
707 }
708
709 pub async fn next(&mut self) -> Option<Result<EventEnvelope>> {
711 self.stream.next().await
712 }
713
714 pub fn typed<T: DeserializeOwned + Send + 'static>(self) -> TypedEventSubscriber<T> {
716 TypedEventSubscriber {
717 stream: self.stream,
718 codec: self.codec,
719 _marker: std::marker::PhantomData,
720 }
721 }
722}
723
724pub struct TypedEventSubscriber<T> {
726 stream: EventStream,
727 codec: Arc<Codec>,
728 _marker: std::marker::PhantomData<T>,
729}
730
731impl<T: DeserializeOwned + Send + 'static> TypedEventSubscriber<T> {
732 pub async fn next(&mut self) -> Option<Result<(EventEnvelope, T)>> {
734 let envelope = self.stream.next().await?;
735 match envelope {
736 Ok(env) => match self.codec.decode_payload(&env.payload) {
737 Ok(typed) => Some(Ok((env, typed))),
738 Err(e) => Some(Err(e)),
739 },
740 Err(e) => Some(Err(e)),
741 }
742 }
743}
744
745fn current_timestamp_ms() -> u64 {
747 SystemTime::now()
748 .duration_since(UNIX_EPOCH)
749 .map(|d| d.as_millis() as u64)
750 .unwrap_or(0)
751}
752
753#[cfg(test)]
754mod tests {
755 use super::*;
756 use crate::config::environment_names::event_plane as env;
757
758 #[test]
759 fn test_event_scope_subject_prefix() {
760 let ns_scope = EventScope::Namespace {
761 name: "test-ns".to_string(),
762 };
763 assert_eq!(ns_scope.subject_prefix(), "namespace.test-ns");
764
765 let comp_scope = EventScope::Component {
766 namespace: "test-ns".to_string(),
767 component: "test-comp".to_string(),
768 };
769 assert_eq!(
770 comp_scope.subject_prefix(),
771 "namespace.test-ns.component.test-comp"
772 );
773 }
774
775 #[test]
776 fn test_event_scope_accessors() {
777 let ns_scope = EventScope::Namespace {
778 name: "my-ns".to_string(),
779 };
780 assert_eq!(ns_scope.namespace(), "my-ns");
781 assert_eq!(ns_scope.component(), None);
782
783 let comp_scope = EventScope::Component {
784 namespace: "my-ns".to_string(),
785 component: "my-comp".to_string(),
786 };
787 assert_eq!(comp_scope.namespace(), "my-ns");
788 assert_eq!(comp_scope.component(), Some("my-comp"));
789 }
790
791 #[test]
792 fn test_timestamp_generation() {
793 let ts = current_timestamp_ms();
794
795 assert!(ts > 1577836800000, "Timestamp should be after 2020");
797 assert!(ts < 4102444800000, "Timestamp should be before 2100");
798 }
799
800 #[test]
801 fn test_event_envelope_serde() {
802 let envelope = EventEnvelope {
803 publisher_id: 42,
804 sequence: 10,
805 published_at: 1700000000000,
806 topic: "test-topic".to_string(),
807 payload: Bytes::from("test data"),
808 };
809
810 let json = serde_json::to_string(&envelope).expect("serialize");
811 let deserialized: EventEnvelope = serde_json::from_str(&json).expect("deserialize");
812
813 assert_eq!(deserialized.publisher_id, 42);
814 assert_eq!(deserialized.sequence, 10);
815 assert_eq!(deserialized.published_at, 1700000000000);
816 assert_eq!(deserialized.topic, "test-topic");
817 assert_eq!(deserialized.payload, Bytes::from("test data"));
818 }
819}