1mod codec;
7mod dynamic_subscriber;
8mod frame;
9mod nats_transport;
10mod traits;
11mod transport;
12pub mod zmq_transport;
13
14pub use codec::{Codec, MsgpackCodec};
15pub use dynamic_subscriber::DynamicSubscriber;
16pub use frame::{FRAME_HEADER_SIZE, FRAME_VERSION, Frame, FrameError, FrameHeader};
17pub use traits::{EventEnvelope, EventStream, TypedEventStream};
18pub use transport::{EventTransportRx, EventTransportTx, WireStream};
19pub use zmq_transport::{ZmqPubTransport, ZmqSubTransport};
20
21pub use crate::discovery::EventTransportKind;
23
24use std::num::NonZeroUsize;
25use std::sync::Arc;
26use std::sync::atomic::{AtomicU64, Ordering};
27use std::time::{SystemTime, UNIX_EPOCH};
28
29use anyhow::Result;
30use bytes::Bytes;
31use futures::{Stream, StreamExt};
32use lru::LruCache;
33use serde::Serialize;
34use serde::de::DeserializeOwned;
35use std::pin::Pin;
36use std::task::{Context, Poll};
37
38use crate::DistributedRuntime;
39use crate::component::{Component, Namespace};
40use crate::discovery::{
41 Discovery, DiscoveryInstance, DiscoveryQuery, DiscoverySpec, EventChannelQuery, EventTransport,
42};
43use crate::traits::DistributedRuntimeProvider;
44use crate::utils::ip_resolver::get_local_ip_for_advertise;
45
46#[derive(Debug, Clone)]
48pub enum EventScope {
49 Namespace { name: String },
51 Component {
53 namespace: String,
54 component: String,
55 },
56}
57
58impl EventScope {
59 pub fn subject_prefix(&self) -> String {
61 match self {
62 EventScope::Namespace { name } => format!("namespace.{}", name),
63 EventScope::Component {
64 namespace,
65 component,
66 } => {
67 format!("namespace.{}.component.{}", namespace, component)
68 }
69 }
70 }
71
72 pub fn namespace(&self) -> &str {
74 match self {
75 EventScope::Namespace { name } => name,
76 EventScope::Component { namespace, .. } => namespace,
77 }
78 }
79
80 pub fn component(&self) -> Option<&str> {
82 match self {
83 EventScope::Namespace { .. } => None,
84 EventScope::Component { component, .. } => Some(component),
85 }
86 }
87}
88
89#[derive(Debug, Clone)]
95struct BrokerEndpoints {
96 xsub_endpoints: Vec<String>,
97 xpub_endpoints: Vec<String>,
98}
99
100async fn resolve_zmq_broker(
103 drt: &DistributedRuntime,
104 scope: &EventScope,
105) -> Result<Option<BrokerEndpoints>> {
106 if let Ok(broker_url) =
108 std::env::var(crate::config::environment_names::zmq_broker::DYN_ZMQ_BROKER_URL)
109 {
110 let (xsub_endpoints, xpub_endpoints) = parse_broker_url(&broker_url)?;
111 tracing::info!(
112 num_xsub = xsub_endpoints.len(),
113 num_xpub = xpub_endpoints.len(),
114 "Using explicit ZMQ broker URL"
115 );
116 return Ok(Some(BrokerEndpoints {
117 xsub_endpoints,
118 xpub_endpoints,
119 }));
120 }
121
122 if std::env::var(crate::config::environment_names::zmq_broker::DYN_ZMQ_BROKER_ENABLED)
124 .unwrap_or_default()
125 == "true"
126 {
127 let query = DiscoveryQuery::EventChannels(EventChannelQuery::component(
128 scope.namespace().to_string(),
129 "zmq_broker".to_string(),
130 ));
131
132 let instances = drt.discovery().list(query).await?;
133
134 let mut xsub_endpoints = Vec::new();
136 let mut xpub_endpoints = Vec::new();
137
138 for instance in instances {
139 if let DiscoveryInstance::EventChannel { transport, .. } = instance
140 && let EventTransport::ZmqBroker {
141 xsub_endpoints: xsubs,
142 xpub_endpoints: xpubs,
143 } = transport
144 {
145 xsub_endpoints.extend(xsubs);
146 xpub_endpoints.extend(xpubs);
147 }
148 }
149
150 if xsub_endpoints.is_empty() {
151 anyhow::bail!(
152 "DYN_ZMQ_BROKER_ENABLED=true but no broker found in discovery for namespace '{}'",
153 scope.namespace()
154 );
155 }
156
157 tracing::info!(
158 num_brokers = xsub_endpoints.len(),
159 "Discovered ZMQ brokers from discovery plane"
160 );
161
162 return Ok(Some(BrokerEndpoints {
163 xsub_endpoints,
164 xpub_endpoints,
165 }));
166 }
167
168 Ok(None)
170}
171
172fn parse_broker_url(url: &str) -> Result<(Vec<String>, Vec<String>)> {
174 let parts: Vec<&str> = url.split(',').map(|s| s.trim()).collect();
175 if parts.len() != 2 {
176 anyhow::bail!(
177 "Invalid broker URL format. Expected 'xsub=<urls> , xpub=<urls>', got: {}",
178 url
179 );
180 }
181
182 let mut xsub_endpoints = Vec::new();
183 let mut xpub_endpoints = Vec::new();
184
185 for part in parts {
186 if let Some(urls_str) = part.strip_prefix("xsub=") {
187 xsub_endpoints = urls_str
188 .split(';')
189 .map(|s| s.trim().to_string())
190 .filter(|s| !s.is_empty())
191 .collect();
192 } else if let Some(urls_str) = part.strip_prefix("xpub=") {
193 xpub_endpoints = urls_str
194 .split(';')
195 .map(|s| s.trim().to_string())
196 .filter(|s| !s.is_empty())
197 .collect();
198 } else {
199 anyhow::bail!(
200 "Invalid broker URL part. Expected 'xsub=' or 'xpub=' prefix, got: {}",
201 part
202 );
203 }
204 }
205
206 if xsub_endpoints.is_empty() || xpub_endpoints.is_empty() {
207 anyhow::bail!(
208 "Broker URL must contain at least one xsub and one xpub endpoint. Got xsub={:?}, xpub={:?}",
209 xsub_endpoints,
210 xpub_endpoints
211 );
212 }
213
214 Ok((xsub_endpoints, xpub_endpoints))
215}
216
217struct DeduplicatingStream {
220 inner: WireStream,
221 codec: Arc<Codec>,
222 seen_events: LruCache<(u64, u64), ()>, }
224
225impl DeduplicatingStream {
226 fn new(inner: WireStream, codec: Arc<Codec>, cache_size: usize) -> Self {
227 Self {
228 inner,
229 codec,
230 seen_events: LruCache::new(
231 NonZeroUsize::new(cache_size).expect("cache_size must be non-zero"),
232 ),
233 }
234 }
235}
236
237impl Stream for DeduplicatingStream {
238 type Item = Result<Bytes>;
239
240 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
241 loop {
242 match Pin::new(&mut self.inner).poll_next(cx) {
243 Poll::Ready(Some(Ok(bytes))) => {
244 match self.codec.decode_envelope(&bytes) {
246 Ok(envelope) => {
247 let key = (envelope.publisher_id, envelope.sequence);
248
249 if self.seen_events.contains(&key) {
251 tracing::debug!(
253 publisher_id = envelope.publisher_id,
254 sequence = envelope.sequence,
255 "Filtered duplicate event from multi-broker setup"
256 );
257 continue;
258 }
259
260 self.seen_events.put(key, ());
262 return Poll::Ready(Some(Ok(bytes)));
263 }
264 Err(e) => {
265 tracing::warn!(error = %e, "Failed to decode envelope for deduplication");
266 return Poll::Ready(Some(Err(e)));
267 }
268 }
269 }
270 Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
271 Poll::Ready(None) => return Poll::Ready(None),
272 Poll::Pending => return Poll::Pending,
273 }
274 }
275 }
276}
277
278pub struct EventPublisher {
280 transport_kind: EventTransportKind,
281 scope: EventScope,
282 topic: String,
283 publisher_id: u64,
284 sequence: AtomicU64,
285 tx: Arc<dyn EventTransportTx>,
286 codec: Arc<Codec>,
287 runtime_handle: tokio::runtime::Handle,
288 discovery_client: Option<Arc<dyn Discovery>>,
290 discovery_instance: Option<crate::discovery::DiscoveryInstance>,
291}
292
293impl EventPublisher {
294 pub async fn for_component(comp: &Component, topic: impl Into<String>) -> Result<Self> {
302 let transport_kind = comp.drt().default_event_transport_kind();
303 Self::for_component_with_transport(comp, topic, transport_kind).await
304 }
305
306 pub async fn for_component_with_transport(
308 comp: &Component,
309 topic: impl Into<String>,
310 transport_kind: EventTransportKind,
311 ) -> Result<Self> {
312 let drt = comp.drt();
313 let scope = EventScope::Component {
314 namespace: comp.namespace().name(),
315 component: comp.name().to_string(),
316 };
317 Self::new_internal(drt, scope, topic.into(), transport_kind).await
318 }
319
320 pub async fn for_namespace(ns: &Namespace, topic: impl Into<String>) -> Result<Self> {
328 let transport_kind = ns.drt().default_event_transport_kind();
329 Self::for_namespace_with_transport(ns, topic, transport_kind).await
330 }
331
332 pub async fn for_namespace_with_transport(
334 ns: &Namespace,
335 topic: impl Into<String>,
336 transport_kind: EventTransportKind,
337 ) -> Result<Self> {
338 let drt = ns.drt();
339 let scope = EventScope::Namespace { name: ns.name() };
340 Self::new_internal(drt, scope, topic.into(), transport_kind).await
341 }
342
343 async fn new_internal(
344 drt: &DistributedRuntime,
345 scope: EventScope,
346 topic: String,
347 transport_kind: EventTransportKind,
348 ) -> Result<Self> {
349 let publisher_id = drt.discovery().instance_id();
350 let discovery = Some(drt.discovery());
351 let runtime_handle = drt.runtime().secondary();
352
353 enum TransportSetup {
355 Nats(Arc<dyn EventTransportTx>, Arc<Codec>),
356 ZmqDirect(Arc<dyn EventTransportTx>, Arc<Codec>, String), ZmqBroker(Arc<dyn EventTransportTx>, Arc<Codec>),
358 }
359
360 let transport_setup = match transport_kind {
361 EventTransportKind::Nats => {
362 let transport = Arc::new(nats_transport::NatsTransport::new(drt.clone()));
363 let codec = Arc::new(Codec::Msgpack(MsgpackCodec));
364 TransportSetup::Nats(transport as Arc<dyn EventTransportTx>, codec)
365 }
366 EventTransportKind::Zmq => {
367 if let Some(broker) = resolve_zmq_broker(drt, &scope).await? {
369 let pub_transport = if broker.xsub_endpoints.len() == 1 {
371 zmq_transport::ZmqPubTransport::connect(&broker.xsub_endpoints[0], &topic)
372 .await?
373 } else {
374 zmq_transport::ZmqPubTransport::connect_multiple(
375 &broker.xsub_endpoints,
376 &topic,
377 )
378 .await?
379 };
380
381 let codec = Arc::new(Codec::Msgpack(MsgpackCodec));
382 TransportSetup::ZmqBroker(
383 Arc::new(pub_transport) as Arc<dyn EventTransportTx>,
384 codec,
385 )
386 } else {
387 let (pub_transport, actual_bind_endpoint) = std::thread::spawn({
389 let topic = topic.clone();
390 move || {
391 let rt = tokio::runtime::Builder::new_current_thread()
392 .enable_all()
393 .build()
394 .expect("Failed to create Tokio runtime for ZMQ");
395
396 rt.block_on(async move {
397 zmq_transport::ZmqPubTransport::bind("tcp://0.0.0.0:0", &topic)
398 .await
399 .expect("Failed to bind ZMQ publisher")
400 })
401 }
402 })
403 .join()
404 .expect("Failed to join ZMQ initialization thread");
405
406 let actual_port: u16 = actual_bind_endpoint
408 .rsplit(':')
409 .next()
410 .and_then(|s| s.parse().ok())
411 .expect("Failed to parse port from bind endpoint");
412 let local_ip = get_local_ip_for_advertise();
413 let public_endpoint = format!("tcp://{}:{}", local_ip, actual_port);
414
415 let codec = Arc::new(Codec::Msgpack(MsgpackCodec));
416 TransportSetup::ZmqDirect(
417 Arc::new(pub_transport) as Arc<dyn EventTransportTx>,
418 codec,
419 public_endpoint,
420 )
421 }
422 }
423 };
424
425 let (tx, codec, discovery_instance) = match transport_setup {
427 TransportSetup::Nats(tx, codec) => {
428 let transport_config = EventTransport::nats(scope.subject_prefix());
429 let spec = DiscoverySpec::EventChannel {
430 namespace: scope.namespace().to_string(),
431 component: scope.component().unwrap_or("").to_string(),
432 topic: topic.clone(),
433 transport: transport_config,
434 };
435
436 let registered_instance = drt.discovery().register(spec).await?;
437 tracing::info!(
438 topic = %topic,
439 transport = ?transport_kind,
440 instance_id = %registered_instance.instance_id(),
441 "EventPublisher registered with discovery"
442 );
443 (tx, codec, Some(registered_instance))
444 }
445 TransportSetup::ZmqDirect(tx, codec, public_endpoint) => {
446 let transport_config = EventTransport::zmq(public_endpoint);
447 let spec = DiscoverySpec::EventChannel {
448 namespace: scope.namespace().to_string(),
449 component: scope.component().unwrap_or("").to_string(),
450 topic: topic.clone(),
451 transport: transport_config,
452 };
453
454 let registered_instance = drt.discovery().register(spec).await?;
455 tracing::info!(
456 topic = %topic,
457 transport = ?transport_kind,
458 instance_id = %registered_instance.instance_id(),
459 "EventPublisher registered with discovery (direct mode)"
460 );
461 (tx, codec, Some(registered_instance))
462 }
463 TransportSetup::ZmqBroker(tx, codec) => {
464 tracing::info!(
465 topic = %topic,
466 transport = ?transport_kind,
467 "EventPublisher in broker mode - skipping discovery registration"
468 );
469 (tx, codec, None)
470 }
471 };
472
473 Ok(Self {
474 transport_kind,
475 scope,
476 topic,
477 publisher_id,
478 sequence: AtomicU64::new(0),
479 tx,
480 codec,
481 runtime_handle,
482 discovery_client: discovery,
483 discovery_instance,
484 })
485 }
486
487 pub async fn publish<T: Serialize + Send + Sync>(&self, event: &T) -> Result<()> {
489 let payload = self.codec.encode_payload(event)?;
490 self.publish_bytes(payload.to_vec()).await
491 }
492
493 pub async fn publish_bytes(&self, bytes: Vec<u8>) -> Result<()> {
495 let envelope = EventEnvelope {
496 publisher_id: self.publisher_id,
497 sequence: self.sequence.fetch_add(1, Ordering::SeqCst),
498 published_at: current_timestamp_ms(),
499 topic: self.topic.clone(),
500 payload: Bytes::from(bytes),
501 };
502
503 let envelope_bytes = self.codec.encode_envelope(&envelope)?;
504 let subject = format!("{}.{}", self.scope.subject_prefix(), self.topic);
505
506 self.tx.publish(&subject, envelope_bytes).await
507 }
508
509 pub fn publisher_id(&self) -> u64 {
511 self.publisher_id
512 }
513
514 pub fn topic(&self) -> &str {
516 &self.topic
517 }
518
519 pub fn transport_kind(&self) -> EventTransportKind {
521 self.transport_kind
522 }
523}
524
525impl Drop for EventPublisher {
526 fn drop(&mut self) {
527 if let (Some(discovery), Some(instance)) =
529 (self.discovery_client.take(), self.discovery_instance.take())
530 {
531 let topic = self.topic.clone();
532 let instance_id = instance.instance_id();
533 let runtime_handle = self.runtime_handle.clone();
534
535 let spawn_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
538 runtime_handle.spawn(async move {
539 match discovery.unregister(instance).await {
540 Ok(()) => {
541 tracing::info!(
542 topic = %topic,
543 instance_id = %instance_id,
544 "EventPublisher unregistered from discovery"
545 );
546 }
547 Err(e) => {
548 tracing::warn!(
549 topic = %topic,
550 instance_id = %instance_id,
551 error = %e,
552 "Failed to unregister EventPublisher from discovery"
553 );
554 }
555 }
556 });
557 }));
558
559 if spawn_result.is_err() {
560 tracing::warn!(
561 topic = %self.topic,
562 instance_id = %instance_id,
563 "Skipping EventPublisher unregister during drop because the runtime is unavailable"
564 );
565 }
566 }
567 }
568}
569
570pub struct EventSubscriber {
572 stream: EventStream,
573 #[allow(dead_code)]
574 scope: EventScope,
575 #[allow(dead_code)]
576 topic: String,
577 codec: Arc<Codec>,
578}
579
580impl EventSubscriber {
581 pub async fn for_component(comp: &Component, topic: impl Into<String>) -> Result<Self> {
589 let transport_kind = comp.drt().default_event_transport_kind();
590 Self::for_component_with_transport(comp, topic, transport_kind).await
591 }
592
593 pub async fn for_component_with_transport(
595 comp: &Component,
596 topic: impl Into<String>,
597 transport_kind: EventTransportKind,
598 ) -> Result<Self> {
599 let drt = comp.drt();
600 let scope = EventScope::Component {
601 namespace: comp.namespace().name(),
602 component: comp.name().to_string(),
603 };
604 Self::new_internal(drt, scope, topic.into(), transport_kind).await
605 }
606
607 pub async fn for_namespace(ns: &Namespace, topic: impl Into<String>) -> Result<Self> {
615 let transport_kind = ns.drt().default_event_transport_kind();
616 Self::for_namespace_with_transport(ns, topic, transport_kind).await
617 }
618
619 pub async fn for_namespace_with_transport(
621 ns: &Namespace,
622 topic: impl Into<String>,
623 transport_kind: EventTransportKind,
624 ) -> Result<Self> {
625 let drt = ns.drt();
626 let scope = EventScope::Namespace { name: ns.name() };
627 Self::new_internal(drt, scope, topic.into(), transport_kind).await
628 }
629
630 async fn new_internal(
631 drt: &DistributedRuntime,
632 scope: EventScope,
633 topic: String,
634 transport_kind: EventTransportKind,
635 ) -> Result<Self> {
636 let discovery = drt.discovery();
637
638 let (wire_stream, codec): (WireStream, Arc<Codec>) = match transport_kind {
640 EventTransportKind::Nats => {
641 let transport = nats_transport::NatsTransport::new(drt.clone());
642 let subject = format!("{}.{}", scope.subject_prefix(), topic);
643 let stream = transport.subscribe(&subject).await?;
644 let codec = Arc::new(Codec::Msgpack(MsgpackCodec));
645 (stream, codec)
646 }
647 EventTransportKind::Zmq => {
648 if let Some(broker) = resolve_zmq_broker(drt, &scope).await? {
650 let codec = Arc::new(Codec::Msgpack(MsgpackCodec));
652
653 let stream: WireStream = if broker.xpub_endpoints.len() == 1 {
654 let sub_transport = zmq_transport::ZmqSubTransport::connect_broker(
656 &broker.xpub_endpoints[0],
657 &topic,
658 )
659 .await?;
660 sub_transport.subscribe(&topic).await?
661 } else {
662 let sub_transport =
664 zmq_transport::ZmqSubTransport::connect_broker_multiple(
665 &broker.xpub_endpoints,
666 &topic,
667 )
668 .await?;
669 let inner_stream = sub_transport.subscribe(&topic).await?;
670
671 Box::pin(DeduplicatingStream::new(
673 inner_stream,
674 codec.clone(),
675 100_000,
676 ))
677 };
678
679 (stream, codec)
680 } else {
681 let query = match &scope {
683 EventScope::Namespace { name } => {
684 crate::discovery::DiscoveryQuery::EventChannels(
685 crate::discovery::EventChannelQuery::namespace(name.clone()),
686 )
687 }
688 EventScope::Component {
689 namespace,
690 component,
691 } => crate::discovery::DiscoveryQuery::EventChannels(
692 crate::discovery::EventChannelQuery::topic(
693 namespace.clone(),
694 component.clone(),
695 topic.clone(),
696 ),
697 ),
698 };
699
700 let subscriber =
701 Arc::new(DynamicSubscriber::new(discovery, query, topic.clone()));
702
703 let stream = subscriber.start_zmq().await?;
704 let codec = Arc::new(Codec::Msgpack(MsgpackCodec));
705 (stream, codec)
706 }
707 }
708 };
709
710 let topic_filter = topic.clone();
712 let codec_for_stream = codec.clone();
713 let stream = wire_stream.filter_map(move |result| {
714 let codec = codec_for_stream.clone();
715 let topic_filter = topic_filter.clone();
716 async move {
717 match result {
718 Ok(bytes) => match codec.decode_envelope(&bytes) {
719 Ok(envelope) => {
720 if envelope.topic == topic_filter {
722 Some(Ok(envelope))
723 } else {
724 None
725 }
726 }
727 Err(e) => Some(Err(e)),
728 },
729 Err(e) => Some(Err(e)),
730 }
731 }
732 });
733
734 tracing::info!(
735 topic = %topic,
736 transport = ?transport_kind,
737 "EventSubscriber created"
738 );
739
740 Ok(Self {
741 stream: Box::pin(stream),
742 scope,
743 topic,
744 codec,
745 })
746 }
747
748 pub async fn next(&mut self) -> Option<Result<EventEnvelope>> {
750 self.stream.next().await
751 }
752
753 pub fn typed<T: DeserializeOwned + Send + 'static>(self) -> TypedEventSubscriber<T> {
755 TypedEventSubscriber {
756 stream: self.stream,
757 codec: self.codec,
758 _marker: std::marker::PhantomData,
759 }
760 }
761}
762
763pub struct TypedEventSubscriber<T> {
765 stream: EventStream,
766 codec: Arc<Codec>,
767 _marker: std::marker::PhantomData<T>,
768}
769
770impl<T: DeserializeOwned + Send + 'static> TypedEventSubscriber<T> {
771 pub async fn next(&mut self) -> Option<Result<(EventEnvelope, T)>> {
773 let envelope = self.stream.next().await?;
774 match envelope {
775 Ok(env) => match self.codec.decode_payload(&env.payload) {
776 Ok(typed) => Some(Ok((env, typed))),
777 Err(e) => Some(Err(e)),
778 },
779 Err(e) => Some(Err(e)),
780 }
781 }
782}
783
784fn current_timestamp_ms() -> u64 {
786 SystemTime::now()
787 .duration_since(UNIX_EPOCH)
788 .map(|d| d.as_millis() as u64)
789 .unwrap_or(0)
790}
791
792#[cfg(test)]
793mod tests {
794 use super::*;
795 use crate::config::environment_names::event_plane as env;
796
797 #[test]
798 fn test_event_scope_subject_prefix() {
799 let ns_scope = EventScope::Namespace {
800 name: "test-ns".to_string(),
801 };
802 assert_eq!(ns_scope.subject_prefix(), "namespace.test-ns");
803
804 let comp_scope = EventScope::Component {
805 namespace: "test-ns".to_string(),
806 component: "test-comp".to_string(),
807 };
808 assert_eq!(
809 comp_scope.subject_prefix(),
810 "namespace.test-ns.component.test-comp"
811 );
812 }
813
814 #[test]
815 fn test_event_scope_accessors() {
816 let ns_scope = EventScope::Namespace {
817 name: "my-ns".to_string(),
818 };
819 assert_eq!(ns_scope.namespace(), "my-ns");
820 assert_eq!(ns_scope.component(), None);
821
822 let comp_scope = EventScope::Component {
823 namespace: "my-ns".to_string(),
824 component: "my-comp".to_string(),
825 };
826 assert_eq!(comp_scope.namespace(), "my-ns");
827 assert_eq!(comp_scope.component(), Some("my-comp"));
828 }
829
830 #[test]
831 fn test_timestamp_generation() {
832 let ts = current_timestamp_ms();
833
834 assert!(ts > 1577836800000, "Timestamp should be after 2020");
836 assert!(ts < 4102444800000, "Timestamp should be before 2100");
837 }
838
839 #[test]
840 fn test_event_envelope_serde() {
841 let envelope = EventEnvelope {
842 publisher_id: 42,
843 sequence: 10,
844 published_at: 1700000000000,
845 topic: "test-topic".to_string(),
846 payload: Bytes::from("test data"),
847 };
848
849 let json = serde_json::to_string(&envelope).expect("serialize");
850 let deserialized: EventEnvelope = serde_json::from_str(&json).expect("deserialize");
851
852 assert_eq!(deserialized.publisher_id, 42);
853 assert_eq!(deserialized.sequence, 10);
854 assert_eq!(deserialized.published_at, 1700000000000);
855 assert_eq!(deserialized.topic, "test-topic");
856 assert_eq!(deserialized.payload, Bytes::from("test data"));
857 }
858}