1mod codec;
7mod dynamic_subscriber;
8mod frame;
9mod nats_transport;
10mod traits;
11mod transport;
12pub mod zmq_transport;
13
14pub use codec::{Codec, MsgpackCodec};
15pub use dynamic_subscriber::DynamicSubscriber;
16pub use frame::{FRAME_HEADER_SIZE, FRAME_VERSION, Frame, FrameError, FrameHeader};
17pub use traits::{EventEnvelope, EventStream, TypedEventStream};
18pub use transport::{EventTransportRx, EventTransportTx, WireStream};
19pub use zmq_transport::{ZmqPubTransport, ZmqSubTransport};
20
21pub use crate::discovery::EventTransportKind;
23
24use std::num::NonZeroUsize;
25use std::sync::Arc;
26use std::sync::atomic::{AtomicU64, Ordering};
27use std::time::{SystemTime, UNIX_EPOCH};
28
29use anyhow::Result;
30use bytes::Bytes;
31use futures::{Stream, StreamExt};
32use lru::LruCache;
33use serde::Serialize;
34use serde::de::DeserializeOwned;
35use std::pin::Pin;
36use std::task::{Context, Poll};
37
38use crate::DistributedRuntime;
39use crate::component::{Component, Namespace};
40use crate::discovery::{
41 Discovery, DiscoveryInstance, DiscoveryQuery, DiscoverySpec, EventChannelQuery, EventTransport,
42};
43use crate::traits::DistributedRuntimeProvider;
44use crate::utils::ip_resolver::get_local_ip_for_advertise;
45
46#[derive(Debug, Clone)]
48pub enum EventScope {
49 Namespace { name: String },
51 Component {
53 namespace: String,
54 component: String,
55 },
56}
57
58impl EventScope {
59 pub fn subject_prefix(&self) -> String {
61 match self {
62 EventScope::Namespace { name } => format!("namespace.{}", name),
63 EventScope::Component {
64 namespace,
65 component,
66 } => {
67 format!("namespace.{}.component.{}", namespace, component)
68 }
69 }
70 }
71
72 pub fn namespace(&self) -> &str {
74 match self {
75 EventScope::Namespace { name } => name,
76 EventScope::Component { namespace, .. } => namespace,
77 }
78 }
79
80 pub fn component(&self) -> Option<&str> {
82 match self {
83 EventScope::Namespace { .. } => None,
84 EventScope::Component { component, .. } => Some(component),
85 }
86 }
87}
88
89#[derive(Debug, Clone)]
95struct BrokerEndpoints {
96 xsub_endpoints: Vec<String>,
97 xpub_endpoints: Vec<String>,
98}
99
100async fn resolve_zmq_broker(
103 drt: &DistributedRuntime,
104 scope: &EventScope,
105) -> Result<Option<BrokerEndpoints>> {
106 if let Ok(broker_url) =
108 std::env::var(crate::config::environment_names::zmq_broker::DYN_ZMQ_BROKER_URL)
109 {
110 let (xsub_endpoints, xpub_endpoints) = parse_broker_url(&broker_url)?;
111 tracing::info!(
112 num_xsub = xsub_endpoints.len(),
113 num_xpub = xpub_endpoints.len(),
114 "Using explicit ZMQ broker URL"
115 );
116 return Ok(Some(BrokerEndpoints {
117 xsub_endpoints,
118 xpub_endpoints,
119 }));
120 }
121
122 if std::env::var(crate::config::environment_names::zmq_broker::DYN_ZMQ_BROKER_ENABLED)
124 .unwrap_or_default()
125 == "true"
126 {
127 let query = DiscoveryQuery::EventChannels(EventChannelQuery::component(
128 scope.namespace().to_string(),
129 "zmq_broker".to_string(),
130 ));
131
132 let instances = drt.discovery().list(query).await?;
133
134 let mut xsub_endpoints = Vec::new();
136 let mut xpub_endpoints = Vec::new();
137
138 for instance in instances {
139 if let DiscoveryInstance::EventChannel { transport, .. } = instance
140 && let EventTransport::ZmqBroker {
141 xsub_endpoints: xsubs,
142 xpub_endpoints: xpubs,
143 } = transport
144 {
145 xsub_endpoints.extend(xsubs);
146 xpub_endpoints.extend(xpubs);
147 }
148 }
149
150 if xsub_endpoints.is_empty() {
151 anyhow::bail!(
152 "DYN_ZMQ_BROKER_ENABLED=true but no broker found in discovery for namespace '{}'",
153 scope.namespace()
154 );
155 }
156
157 tracing::info!(
158 num_brokers = xsub_endpoints.len(),
159 "Discovered ZMQ brokers from discovery plane"
160 );
161
162 return Ok(Some(BrokerEndpoints {
163 xsub_endpoints,
164 xpub_endpoints,
165 }));
166 }
167
168 Ok(None)
170}
171
172fn parse_broker_url(url: &str) -> Result<(Vec<String>, Vec<String>)> {
174 let parts: Vec<&str> = url.split(',').map(|s| s.trim()).collect();
175 if parts.len() != 2 {
176 anyhow::bail!(
177 "Invalid broker URL format. Expected 'xsub=<urls> , xpub=<urls>', got: {}",
178 url
179 );
180 }
181
182 let mut xsub_endpoints = Vec::new();
183 let mut xpub_endpoints = Vec::new();
184
185 for part in parts {
186 if let Some(urls_str) = part.strip_prefix("xsub=") {
187 xsub_endpoints = urls_str
188 .split(';')
189 .map(|s| s.trim().to_string())
190 .filter(|s| !s.is_empty())
191 .collect();
192 } else if let Some(urls_str) = part.strip_prefix("xpub=") {
193 xpub_endpoints = urls_str
194 .split(';')
195 .map(|s| s.trim().to_string())
196 .filter(|s| !s.is_empty())
197 .collect();
198 } else {
199 anyhow::bail!(
200 "Invalid broker URL part. Expected 'xsub=' or 'xpub=' prefix, got: {}",
201 part
202 );
203 }
204 }
205
206 if xsub_endpoints.is_empty() || xpub_endpoints.is_empty() {
207 anyhow::bail!(
208 "Broker URL must contain at least one xsub and one xpub endpoint. Got xsub={:?}, xpub={:?}",
209 xsub_endpoints,
210 xpub_endpoints
211 );
212 }
213
214 Ok((xsub_endpoints, xpub_endpoints))
215}
216
217struct DeduplicatingStream {
220 inner: WireStream,
221 codec: Arc<Codec>,
222 seen_events: LruCache<(u64, u64), ()>, }
224
225impl DeduplicatingStream {
226 fn new(inner: WireStream, codec: Arc<Codec>, cache_size: usize) -> Self {
227 Self {
228 inner,
229 codec,
230 seen_events: LruCache::new(
231 NonZeroUsize::new(cache_size).expect("cache_size must be non-zero"),
232 ),
233 }
234 }
235}
236
237impl Stream for DeduplicatingStream {
238 type Item = Result<Bytes>;
239
240 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
241 loop {
242 match Pin::new(&mut self.inner).poll_next(cx) {
243 Poll::Ready(Some(Ok(bytes))) => {
244 match self.codec.decode_envelope(&bytes) {
246 Ok(envelope) => {
247 let key = (envelope.publisher_id, envelope.sequence);
248
249 if self.seen_events.contains(&key) {
251 tracing::debug!(
253 publisher_id = envelope.publisher_id,
254 sequence = envelope.sequence,
255 "Filtered duplicate event from multi-broker setup"
256 );
257 continue;
258 }
259
260 self.seen_events.put(key, ());
262 return Poll::Ready(Some(Ok(bytes)));
263 }
264 Err(e) => {
265 tracing::warn!(error = %e, "Failed to decode envelope for deduplication");
266 return Poll::Ready(Some(Err(e)));
267 }
268 }
269 }
270 Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
271 Poll::Ready(None) => return Poll::Ready(None),
272 Poll::Pending => return Poll::Pending,
273 }
274 }
275 }
276}
277
278pub struct EventPublisher {
280 transport_kind: EventTransportKind,
281 scope: EventScope,
282 topic: String,
283 publisher_id: u64,
284 sequence: AtomicU64,
285 tx: Arc<dyn EventTransportTx>,
286 codec: Arc<Codec>,
287 runtime_handle: tokio::runtime::Handle,
288 discovery_client: Option<Arc<dyn Discovery>>,
290 discovery_instance: Option<crate::discovery::DiscoveryInstance>,
291}
292
293impl EventPublisher {
294 pub async fn for_component(comp: &Component, topic: impl Into<String>) -> Result<Self> {
296 Self::for_component_with_transport(comp, topic, EventTransportKind::from_env_or_default())
297 .await
298 }
299
300 pub async fn for_component_with_transport(
302 comp: &Component,
303 topic: impl Into<String>,
304 transport_kind: EventTransportKind,
305 ) -> Result<Self> {
306 let drt = comp.drt();
307 let scope = EventScope::Component {
308 namespace: comp.namespace().name(),
309 component: comp.name().to_string(),
310 };
311 Self::new_internal(drt, scope, topic.into(), transport_kind).await
312 }
313
314 pub async fn for_namespace(ns: &Namespace, topic: impl Into<String>) -> Result<Self> {
316 Self::for_namespace_with_transport(ns, topic, EventTransportKind::from_env_or_default())
317 .await
318 }
319
320 pub async fn for_namespace_with_transport(
322 ns: &Namespace,
323 topic: impl Into<String>,
324 transport_kind: EventTransportKind,
325 ) -> Result<Self> {
326 let drt = ns.drt();
327 let scope = EventScope::Namespace { name: ns.name() };
328 Self::new_internal(drt, scope, topic.into(), transport_kind).await
329 }
330
331 async fn new_internal(
332 drt: &DistributedRuntime,
333 scope: EventScope,
334 topic: String,
335 transport_kind: EventTransportKind,
336 ) -> Result<Self> {
337 let publisher_id = drt.discovery().instance_id();
338 let discovery = Some(drt.discovery());
339 let runtime_handle = drt.runtime().secondary();
340
341 enum TransportSetup {
343 Nats(Arc<dyn EventTransportTx>, Arc<Codec>),
344 ZmqDirect(Arc<dyn EventTransportTx>, Arc<Codec>, String), ZmqBroker(Arc<dyn EventTransportTx>, Arc<Codec>),
346 }
347
348 let transport_setup = match transport_kind {
349 EventTransportKind::Nats => {
350 let transport = Arc::new(nats_transport::NatsTransport::new(drt.clone()));
351 let codec = Arc::new(Codec::Msgpack(MsgpackCodec));
352 TransportSetup::Nats(transport as Arc<dyn EventTransportTx>, codec)
353 }
354 EventTransportKind::Zmq => {
355 if let Some(broker) = resolve_zmq_broker(drt, &scope).await? {
357 let pub_transport = if broker.xsub_endpoints.len() == 1 {
359 zmq_transport::ZmqPubTransport::connect(&broker.xsub_endpoints[0], &topic)
360 .await?
361 } else {
362 zmq_transport::ZmqPubTransport::connect_multiple(
363 &broker.xsub_endpoints,
364 &topic,
365 )
366 .await?
367 };
368
369 let codec = Arc::new(Codec::Msgpack(MsgpackCodec));
370 TransportSetup::ZmqBroker(
371 Arc::new(pub_transport) as Arc<dyn EventTransportTx>,
372 codec,
373 )
374 } else {
375 let (pub_transport, actual_bind_endpoint) = std::thread::spawn({
377 let topic = topic.clone();
378 move || {
379 let rt = tokio::runtime::Builder::new_current_thread()
380 .enable_all()
381 .build()
382 .expect("Failed to create Tokio runtime for ZMQ");
383
384 rt.block_on(async move {
385 zmq_transport::ZmqPubTransport::bind("tcp://0.0.0.0:0", &topic)
386 .await
387 .expect("Failed to bind ZMQ publisher")
388 })
389 }
390 })
391 .join()
392 .expect("Failed to join ZMQ initialization thread");
393
394 let actual_port: u16 = actual_bind_endpoint
396 .rsplit(':')
397 .next()
398 .and_then(|s| s.parse().ok())
399 .expect("Failed to parse port from bind endpoint");
400 let local_ip = get_local_ip_for_advertise();
401 let public_endpoint = format!("tcp://{}:{}", local_ip, actual_port);
402
403 let codec = Arc::new(Codec::Msgpack(MsgpackCodec));
404 TransportSetup::ZmqDirect(
405 Arc::new(pub_transport) as Arc<dyn EventTransportTx>,
406 codec,
407 public_endpoint,
408 )
409 }
410 }
411 };
412
413 let (tx, codec, discovery_instance) = match transport_setup {
415 TransportSetup::Nats(tx, codec) => {
416 let transport_config = EventTransport::nats(scope.subject_prefix());
417 let spec = DiscoverySpec::EventChannel {
418 namespace: scope.namespace().to_string(),
419 component: scope.component().unwrap_or("").to_string(),
420 topic: topic.clone(),
421 transport: transport_config,
422 };
423
424 let registered_instance = drt.discovery().register(spec).await?;
425 tracing::info!(
426 topic = %topic,
427 transport = ?transport_kind,
428 instance_id = %registered_instance.instance_id(),
429 "EventPublisher registered with discovery"
430 );
431 (tx, codec, Some(registered_instance))
432 }
433 TransportSetup::ZmqDirect(tx, codec, public_endpoint) => {
434 let transport_config = EventTransport::zmq(public_endpoint);
435 let spec = DiscoverySpec::EventChannel {
436 namespace: scope.namespace().to_string(),
437 component: scope.component().unwrap_or("").to_string(),
438 topic: topic.clone(),
439 transport: transport_config,
440 };
441
442 let registered_instance = drt.discovery().register(spec).await?;
443 tracing::info!(
444 topic = %topic,
445 transport = ?transport_kind,
446 instance_id = %registered_instance.instance_id(),
447 "EventPublisher registered with discovery (direct mode)"
448 );
449 (tx, codec, Some(registered_instance))
450 }
451 TransportSetup::ZmqBroker(tx, codec) => {
452 tracing::info!(
453 topic = %topic,
454 transport = ?transport_kind,
455 "EventPublisher in broker mode - skipping discovery registration"
456 );
457 (tx, codec, None)
458 }
459 };
460
461 Ok(Self {
462 transport_kind,
463 scope,
464 topic,
465 publisher_id,
466 sequence: AtomicU64::new(0),
467 tx,
468 codec,
469 runtime_handle,
470 discovery_client: discovery,
471 discovery_instance,
472 })
473 }
474
475 pub async fn publish<T: Serialize + Send + Sync>(&self, event: &T) -> Result<()> {
477 let payload = self.codec.encode_payload(event)?;
478 self.publish_bytes(payload.to_vec()).await
479 }
480
481 pub async fn publish_bytes(&self, bytes: Vec<u8>) -> Result<()> {
483 let envelope = EventEnvelope {
484 publisher_id: self.publisher_id,
485 sequence: self.sequence.fetch_add(1, Ordering::SeqCst),
486 published_at: current_timestamp_ms(),
487 topic: self.topic.clone(),
488 payload: Bytes::from(bytes),
489 };
490
491 let envelope_bytes = self.codec.encode_envelope(&envelope)?;
492 let subject = format!("{}.{}", self.scope.subject_prefix(), self.topic);
493
494 self.tx.publish(&subject, envelope_bytes).await
495 }
496
497 pub fn publisher_id(&self) -> u64 {
499 self.publisher_id
500 }
501
502 pub fn topic(&self) -> &str {
504 &self.topic
505 }
506
507 pub fn transport_kind(&self) -> EventTransportKind {
509 self.transport_kind
510 }
511}
512
513impl Drop for EventPublisher {
514 fn drop(&mut self) {
515 if let (Some(discovery), Some(instance)) =
517 (self.discovery_client.take(), self.discovery_instance.take())
518 {
519 let topic = self.topic.clone();
520 let instance_id = instance.instance_id();
521 let runtime_handle = self.runtime_handle.clone();
522
523 let spawn_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
526 runtime_handle.spawn(async move {
527 match discovery.unregister(instance).await {
528 Ok(()) => {
529 tracing::info!(
530 topic = %topic,
531 instance_id = %instance_id,
532 "EventPublisher unregistered from discovery"
533 );
534 }
535 Err(e) => {
536 tracing::warn!(
537 topic = %topic,
538 instance_id = %instance_id,
539 error = %e,
540 "Failed to unregister EventPublisher from discovery"
541 );
542 }
543 }
544 });
545 }));
546
547 if spawn_result.is_err() {
548 tracing::warn!(
549 topic = %self.topic,
550 instance_id = %instance_id,
551 "Skipping EventPublisher unregister during drop because the runtime is unavailable"
552 );
553 }
554 }
555 }
556}
557
558pub struct EventSubscriber {
560 stream: EventStream,
561 #[allow(dead_code)]
562 scope: EventScope,
563 #[allow(dead_code)]
564 topic: String,
565 codec: Arc<Codec>,
566}
567
568impl EventSubscriber {
569 pub async fn for_component(comp: &Component, topic: impl Into<String>) -> Result<Self> {
571 Self::for_component_with_transport(comp, topic, EventTransportKind::from_env_or_default())
572 .await
573 }
574
575 pub async fn for_component_with_transport(
577 comp: &Component,
578 topic: impl Into<String>,
579 transport_kind: EventTransportKind,
580 ) -> Result<Self> {
581 let drt = comp.drt();
582 let scope = EventScope::Component {
583 namespace: comp.namespace().name(),
584 component: comp.name().to_string(),
585 };
586 Self::new_internal(drt, scope, topic.into(), transport_kind).await
587 }
588
589 pub async fn for_namespace(ns: &Namespace, topic: impl Into<String>) -> Result<Self> {
591 Self::for_namespace_with_transport(ns, topic, EventTransportKind::from_env_or_default())
592 .await
593 }
594
595 pub async fn for_namespace_with_transport(
597 ns: &Namespace,
598 topic: impl Into<String>,
599 transport_kind: EventTransportKind,
600 ) -> Result<Self> {
601 let drt = ns.drt();
602 let scope = EventScope::Namespace { name: ns.name() };
603 Self::new_internal(drt, scope, topic.into(), transport_kind).await
604 }
605
606 async fn new_internal(
607 drt: &DistributedRuntime,
608 scope: EventScope,
609 topic: String,
610 transport_kind: EventTransportKind,
611 ) -> Result<Self> {
612 let discovery = drt.discovery();
613
614 let (wire_stream, codec): (WireStream, Arc<Codec>) = match transport_kind {
616 EventTransportKind::Nats => {
617 let transport = nats_transport::NatsTransport::new(drt.clone());
618 let subject = format!("{}.{}", scope.subject_prefix(), topic);
619 let stream = transport.subscribe(&subject).await?;
620 let codec = Arc::new(Codec::Msgpack(MsgpackCodec));
621 (stream, codec)
622 }
623 EventTransportKind::Zmq => {
624 if let Some(broker) = resolve_zmq_broker(drt, &scope).await? {
626 let codec = Arc::new(Codec::Msgpack(MsgpackCodec));
628
629 let stream: WireStream = if broker.xpub_endpoints.len() == 1 {
630 let sub_transport = zmq_transport::ZmqSubTransport::connect_broker(
632 &broker.xpub_endpoints[0],
633 &topic,
634 )
635 .await?;
636 sub_transport.subscribe(&topic).await?
637 } else {
638 let sub_transport =
640 zmq_transport::ZmqSubTransport::connect_broker_multiple(
641 &broker.xpub_endpoints,
642 &topic,
643 )
644 .await?;
645 let inner_stream = sub_transport.subscribe(&topic).await?;
646
647 Box::pin(DeduplicatingStream::new(
649 inner_stream,
650 codec.clone(),
651 100_000,
652 ))
653 };
654
655 (stream, codec)
656 } else {
657 let query = match &scope {
659 EventScope::Namespace { name } => {
660 crate::discovery::DiscoveryQuery::EventChannels(
661 crate::discovery::EventChannelQuery::namespace(name.clone()),
662 )
663 }
664 EventScope::Component {
665 namespace,
666 component,
667 } => crate::discovery::DiscoveryQuery::EventChannels(
668 crate::discovery::EventChannelQuery::topic(
669 namespace.clone(),
670 component.clone(),
671 topic.clone(),
672 ),
673 ),
674 };
675
676 let subscriber =
677 Arc::new(DynamicSubscriber::new(discovery, query, topic.clone()));
678
679 let stream = subscriber.start_zmq().await?;
680 let codec = Arc::new(Codec::Msgpack(MsgpackCodec));
681 (stream, codec)
682 }
683 }
684 };
685
686 let topic_filter = topic.clone();
688 let codec_for_stream = codec.clone();
689 let stream = wire_stream.filter_map(move |result| {
690 let codec = codec_for_stream.clone();
691 let topic_filter = topic_filter.clone();
692 async move {
693 match result {
694 Ok(bytes) => match codec.decode_envelope(&bytes) {
695 Ok(envelope) => {
696 if envelope.topic == topic_filter {
698 Some(Ok(envelope))
699 } else {
700 None
701 }
702 }
703 Err(e) => Some(Err(e)),
704 },
705 Err(e) => Some(Err(e)),
706 }
707 }
708 });
709
710 tracing::info!(
711 topic = %topic,
712 transport = ?transport_kind,
713 "EventSubscriber created"
714 );
715
716 Ok(Self {
717 stream: Box::pin(stream),
718 scope,
719 topic,
720 codec,
721 })
722 }
723
724 pub async fn next(&mut self) -> Option<Result<EventEnvelope>> {
726 self.stream.next().await
727 }
728
729 pub fn typed<T: DeserializeOwned + Send + 'static>(self) -> TypedEventSubscriber<T> {
731 TypedEventSubscriber {
732 stream: self.stream,
733 codec: self.codec,
734 _marker: std::marker::PhantomData,
735 }
736 }
737}
738
739pub struct TypedEventSubscriber<T> {
741 stream: EventStream,
742 codec: Arc<Codec>,
743 _marker: std::marker::PhantomData<T>,
744}
745
746impl<T: DeserializeOwned + Send + 'static> TypedEventSubscriber<T> {
747 pub async fn next(&mut self) -> Option<Result<(EventEnvelope, T)>> {
749 let envelope = self.stream.next().await?;
750 match envelope {
751 Ok(env) => match self.codec.decode_payload(&env.payload) {
752 Ok(typed) => Some(Ok((env, typed))),
753 Err(e) => Some(Err(e)),
754 },
755 Err(e) => Some(Err(e)),
756 }
757 }
758}
759
760fn current_timestamp_ms() -> u64 {
762 SystemTime::now()
763 .duration_since(UNIX_EPOCH)
764 .map(|d| d.as_millis() as u64)
765 .unwrap_or(0)
766}
767
768#[cfg(test)]
769mod tests {
770 use super::*;
771 use crate::config::environment_names::event_plane as env;
772
773 #[test]
774 fn test_event_scope_subject_prefix() {
775 let ns_scope = EventScope::Namespace {
776 name: "test-ns".to_string(),
777 };
778 assert_eq!(ns_scope.subject_prefix(), "namespace.test-ns");
779
780 let comp_scope = EventScope::Component {
781 namespace: "test-ns".to_string(),
782 component: "test-comp".to_string(),
783 };
784 assert_eq!(
785 comp_scope.subject_prefix(),
786 "namespace.test-ns.component.test-comp"
787 );
788 }
789
790 #[test]
791 fn test_event_scope_accessors() {
792 let ns_scope = EventScope::Namespace {
793 name: "my-ns".to_string(),
794 };
795 assert_eq!(ns_scope.namespace(), "my-ns");
796 assert_eq!(ns_scope.component(), None);
797
798 let comp_scope = EventScope::Component {
799 namespace: "my-ns".to_string(),
800 component: "my-comp".to_string(),
801 };
802 assert_eq!(comp_scope.namespace(), "my-ns");
803 assert_eq!(comp_scope.component(), Some("my-comp"));
804 }
805
806 #[test]
807 fn test_timestamp_generation() {
808 let ts = current_timestamp_ms();
809
810 assert!(ts > 1577836800000, "Timestamp should be after 2020");
812 assert!(ts < 4102444800000, "Timestamp should be before 2100");
813 }
814
815 #[test]
816 fn test_event_envelope_serde() {
817 let envelope = EventEnvelope {
818 publisher_id: 42,
819 sequence: 10,
820 published_at: 1700000000000,
821 topic: "test-topic".to_string(),
822 payload: Bytes::from("test data"),
823 };
824
825 let json = serde_json::to_string(&envelope).expect("serialize");
826 let deserialized: EventEnvelope = serde_json::from_str(&json).expect("deserialize");
827
828 assert_eq!(deserialized.publisher_id, 42);
829 assert_eq!(deserialized.sequence, 10);
830 assert_eq!(deserialized.published_at, 1700000000000);
831 assert_eq!(deserialized.topic, "test-topic");
832 assert_eq!(deserialized.payload, Bytes::from("test data"));
833 }
834}