net/event.rs
1//! Event types for the Net event bus.
2//!
3//! Events are opaque JSON values - the event bus performs no schema validation
4//! or interpretation of event content.
5//!
6//! # Performance Optimization
7//!
8//! Since Net is schema-agnostic, we offer multiple event representations:
9//!
10//! - `Event`: Standard wrapper around `serde_json::Value` (convenient but slower)
11//! - `RawEvent`: Pre-serialized bytes with cached hash (fastest for high-throughput)
12//!
13//! For maximum performance, use `RawEvent::from_bytes()` when you already have
14//! JSON bytes (e.g., from a network buffer or file).
15
16use bytes::Bytes;
17use serde::{Deserialize, Serialize};
18use serde_json::Value as JsonValue;
19
20/// An opaque event - any valid JSON value.
21///
22/// The event bus does not validate, interpret, or enforce any schema.
23/// Events are treated as opaque binary blobs internally.
24#[derive(Debug, Clone, Serialize, Deserialize)]
25#[repr(transparent)]
26pub struct Event(pub JsonValue);
27
28impl Event {
29 /// Create a new event from a JSON value.
30 #[inline]
31 pub fn new(value: JsonValue) -> Self {
32 Self(value)
33 }
34
35 /// Create an event from a JSON string.
36 #[inline]
37 #[allow(clippy::should_implement_trait)]
38 pub fn from_str(s: &str) -> Result<Self, serde_json::Error> {
39 serde_json::from_str(s).map(Self)
40 }
41
42 /// Create an event from raw bytes.
43 #[inline]
44 pub fn from_slice(bytes: &[u8]) -> Result<Self, serde_json::Error> {
45 serde_json::from_slice(bytes).map(Self)
46 }
47
48 /// Get the inner JSON value.
49 #[inline]
50 pub fn into_inner(self) -> JsonValue {
51 self.0
52 }
53
54 /// Get a reference to the inner JSON value.
55 #[inline]
56 pub fn as_value(&self) -> &JsonValue {
57 &self.0
58 }
59
60 /// Convert to a raw event (serializes once, caches the result).
61 #[inline]
62 pub fn into_raw(self) -> RawEvent {
63 RawEvent::from_value(self.0)
64 }
65}
66
67impl From<JsonValue> for Event {
68 #[inline]
69 fn from(value: JsonValue) -> Self {
70 Self(value)
71 }
72}
73
74impl From<Event> for JsonValue {
75 #[inline]
76 fn from(event: Event) -> Self {
77 event.0
78 }
79}
80
81/// A pre-serialized event with cached hash.
82///
83/// This is the high-performance event type for schema-agnostic ingestion.
84/// By storing the raw bytes and pre-computed hash, we avoid:
85/// - Re-serialization on every operation
86/// - Repeated hashing for shard selection
87///
88/// # Example
89///
90/// ```rust,ignore
91/// // From network buffer or file (zero-copy if Bytes is used)
92/// let raw = RawEvent::from_bytes(network_buffer);
93///
94/// // From existing JSON value (serializes once)
95/// let raw = RawEvent::from_value(json!({"key": "value"}));
96///
97/// // Ingestion uses cached hash - no re-serialization
98/// bus.ingest_raw(raw)?;
99/// ```
100#[derive(Clone)]
101pub struct RawEvent {
102 /// Pre-serialized JSON bytes.
103 bytes: Bytes,
104 /// Pre-computed hash for shard selection.
105 hash: u64,
106}
107
108impl RawEvent {
109 /// Create a raw event from bytes.
110 ///
111 /// The bytes must be valid JSON. No validation is performed for performance.
112 /// Use `from_bytes_validated` if you need validation.
113 #[inline]
114 pub fn from_bytes(bytes: impl Into<Bytes>) -> Self {
115 let bytes = bytes.into();
116 let hash = xxhash_rust::xxh3::xxh3_64(&bytes);
117 Self { bytes, hash }
118 }
119
120 /// Create a raw event from bytes with a pre-computed hash.
121 ///
122 /// Use this when you've already computed the xxhash (e.g., for reused events).
123 /// The caller is responsible for ensuring the hash matches the bytes.
124 #[inline]
125 pub fn from_bytes_with_hash(bytes: impl Into<Bytes>, hash: u64) -> Self {
126 Self {
127 bytes: bytes.into(),
128 hash,
129 }
130 }
131
132 /// Create a raw event from bytes with JSON validation.
133 #[inline]
134 pub fn from_bytes_validated(bytes: impl Into<Bytes>) -> Result<Self, serde_json::Error> {
135 let bytes = bytes.into();
136 // Validate it's valid JSON by attempting to parse
137 let _: JsonValue = serde_json::from_slice(&bytes)?;
138 let hash = xxhash_rust::xxh3::xxh3_64(&bytes);
139 Ok(Self { bytes, hash })
140 }
141
142 /// Create a raw event from a JSON value.
143 ///
144 /// This serializes the value once and caches the result.
145 ///
146 /// `serde_json::to_vec(&JsonValue)` is infallible by
147 /// construction — the value tree is a known-good JSON
148 /// structure with no fallible serializer in the path —
149 /// modulo OOM, which the global allocator handles via
150 /// abort. The `unwrap_or_default()` fallback keeps the
151 /// non-panic contract for a hypothetical future serde-json
152 /// change that introduced a fallible path on `Value`. Pre-
153 /// fix `expect("Value serialization is infallible")` panicked
154 /// at the call site if the assumption ever broke; the panic
155 /// would unwind across `from_value`'s callers (bus ingest,
156 /// FFI ingest paths) where the contract is non-panicking.
157 #[inline]
158 pub fn from_value(value: JsonValue) -> Self {
159 let bytes = Bytes::from(serde_json::to_vec(&value).unwrap_or_default());
160 let hash = xxhash_rust::xxh3::xxh3_64(&bytes);
161 Self { bytes, hash }
162 }
163
164 /// Creates a RawEvent from a string. No validation is performed for
165 /// performance — see `from_bytes_validated` for a validating alternative.
166 #[inline]
167 #[allow(clippy::should_implement_trait)]
168 pub fn from_str(s: &str) -> Self {
169 Self::from_bytes(Bytes::copy_from_slice(s.as_bytes()))
170 }
171
172 /// Get the raw bytes.
173 #[inline]
174 pub fn as_bytes(&self) -> &[u8] {
175 &self.bytes
176 }
177
178 /// Get the bytes (clone is cheap - reference counted).
179 #[inline]
180 pub fn bytes(&self) -> Bytes {
181 self.bytes.clone()
182 }
183
184 /// Get the pre-computed hash.
185 #[inline]
186 pub fn hash(&self) -> u64 {
187 self.hash
188 }
189
190 /// Parse the bytes into a JSON value (for when you need to inspect).
191 #[inline]
192 pub fn parse(&self) -> Result<JsonValue, serde_json::Error> {
193 serde_json::from_slice(&self.bytes)
194 }
195
196 /// Get the byte length.
197 #[inline]
198 pub fn len(&self) -> usize {
199 self.bytes.len()
200 }
201
202 /// Check if empty.
203 #[inline]
204 pub fn is_empty(&self) -> bool {
205 self.bytes.is_empty()
206 }
207}
208
209impl std::fmt::Debug for RawEvent {
210 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
211 f.debug_struct("RawEvent")
212 .field("len", &self.bytes.len())
213 .field("hash", &self.hash)
214 .finish()
215 }
216}
217
218impl From<Event> for RawEvent {
219 #[inline]
220 fn from(event: Event) -> Self {
221 event.into_raw()
222 }
223}
224
225impl From<JsonValue> for RawEvent {
226 #[inline]
227 fn from(value: JsonValue) -> Self {
228 RawEvent::from_value(value)
229 }
230}
231
232/// Internal event representation with metadata assigned at ingestion.
233///
234/// This is the canonical form of an event within the event bus.
235/// The `insertion_ts` provides deterministic ordering within a shard.
236///
237/// Uses `Bytes` for zero-copy, reference-counted storage.
238#[derive(Debug, Clone)]
239pub struct InternalEvent {
240 /// Pre-serialized JSON payload (reference-counted, zero-copy clone).
241 pub raw: Bytes,
242
243 /// Monotonically increasing insertion timestamp (nanoseconds).
244 /// Strictly ordered within a shard, not globally.
245 pub insertion_ts: u64,
246
247 /// Shard this event was assigned to.
248 pub shard_id: u16,
249}
250
251impl InternalEvent {
252 /// Create a new internal event from raw bytes.
253 #[inline]
254 pub fn new(raw: Bytes, insertion_ts: u64, shard_id: u16) -> Self {
255 Self {
256 raw,
257 insertion_ts,
258 shard_id,
259 }
260 }
261
262 /// Create from a JSON value (serializes once).
263 ///
264 /// See `RawEvent::from_value` for the rationale on
265 /// `unwrap_or_default()` instead of `expect()`.
266 #[inline]
267 pub fn from_value(value: JsonValue, insertion_ts: u64, shard_id: u16) -> Self {
268 let raw = Bytes::from(serde_json::to_vec(&value).unwrap_or_default());
269 Self {
270 raw,
271 insertion_ts,
272 shard_id,
273 }
274 }
275
276 /// Parse the raw bytes into a JSON value.
277 #[inline]
278 pub fn parse(&self) -> Result<JsonValue, serde_json::Error> {
279 serde_json::from_slice(&self.raw)
280 }
281
282 /// Get the raw bytes as a slice.
283 #[inline]
284 pub fn as_bytes(&self) -> &[u8] {
285 &self.raw
286 }
287}
288
289/// A batch of events for adapter dispatch.
290///
291/// Batches are formed by shard workers and contain strictly ordered
292/// events from a single shard.
293#[derive(Debug, Clone)]
294pub struct Batch {
295 /// The shard this batch belongs to.
296 pub shard_id: u16,
297
298 /// Events in insertion order.
299 pub events: Vec<InternalEvent>,
300
301 /// Sequence number of the first event in this batch.
302 /// Used for idempotent retry handling.
303 pub sequence_start: u64,
304
305 /// Per-process nonce sampled once at process start. Adapters
306 /// that persist `(shard_id, sequence_start)` for dedup
307 /// (JetStream `Nats-Msg-Id`, Redis stream MAXLEN keys, etc.)
308 /// must include this in the dedup key — otherwise a producer
309 /// that restarts within the backend's dedup window collides
310 /// with its prior incarnation on `(shard, 0, 0)` and the new
311 /// batches are silently discarded as duplicates.
312 ///
313 /// `BatchWorker::next_sequence` is process-local and resets to
314 /// zero on restart; the nonce is the global discriminator that
315 /// makes the composite key globally unique across restarts
316 /// even though the per-process counter is not durable.
317 pub process_nonce: u64,
318}
319
320/// Per-process nonce used by [`Batch::process_nonce`]. Sampled once
321/// (lazy) from a mix of entropy sources so two processes launched on
322/// the same machine within a single nanosecond tick are still
323/// distinguishable.
324///
325/// We don't use `getrandom` here because it's a feature-gated
326/// optional dep — `event.rs` is in the always-compiled core. Instead
327/// we run xxh3 over multiple sources whose joint state is effectively
328/// never identical across two adjacent process starts: wall-clock
329/// nanos, monotonic-clock nanos (resilient to wall-clock skew),
330/// pid, the address of a stack-local (gives an ASLR component),
331/// and the current thread id. Plain XOR of two sources (the
332/// previous implementation) collapses to zero whenever the
333/// components happen to share bit patterns; xxh3 mixes them so any
334/// single non-degenerate source dominates the output.
335pub fn batch_process_nonce() -> u64 {
336 use std::sync::OnceLock;
337 static NONCE: OnceLock<u64> = OnceLock::new();
338 *NONCE.get_or_init(|| {
339 use std::hash::{Hash, Hasher};
340 use std::time::Instant;
341
342 let wall_nanos = std::time::SystemTime::now()
343 .duration_since(std::time::UNIX_EPOCH)
344 .map(|d| d.as_nanos() as u64)
345 .unwrap_or(0);
346 // `Instant::now()` is monotonic — distinct entropy from
347 // `SystemTime` (the OS may slew wall-clock backwards but
348 // never the monotonic source). `Instant` doesn't expose a
349 // public u64 accessor, so we mix the bytes of its `Debug`
350 // repr.
351 let mono_marker = format!("{:?}", Instant::now());
352 let pid = std::process::id() as u64;
353 // Address of a stack local — adds an ASLR-derived
354 // component that differs across process starts even when
355 // the time / pid components happen to collide.
356 let stack_marker: usize = &pid as *const u64 as usize;
357 let mut tid_hasher = std::collections::hash_map::DefaultHasher::new();
358 std::thread::current().id().hash(&mut tid_hasher);
359 let tid = tid_hasher.finish();
360
361 let mut buf = [0u8; 64];
362 buf[..8].copy_from_slice(&wall_nanos.to_le_bytes());
363 buf[8..16].copy_from_slice(&pid.to_le_bytes());
364 buf[16..24].copy_from_slice(&(stack_marker as u64).to_le_bytes());
365 buf[24..32].copy_from_slice(&tid.to_le_bytes());
366 // Pack as much of the monotonic marker bytes as fits.
367 let mono_bytes = mono_marker.as_bytes();
368 let n = mono_bytes.len().min(32);
369 buf[32..32 + n].copy_from_slice(&mono_bytes[..n]);
370
371 let nonce = xxhash_rust::xxh3::xxh3_64(&buf);
372 // Refuse `0` — some consumers treat 0 as a sentinel.
373 // Probability of xxh3 returning exactly 0 is 2^-64; we
374 // map it to 1.
375 if nonce == 0 {
376 1
377 } else {
378 nonce
379 }
380 })
381}
382
383impl Batch {
384 /// Create a new batch using the per-process nonce
385 /// ([`batch_process_nonce`]). Convenience for tests and for
386 /// callers that don't thread a custom producer nonce through.
387 /// Production paths constructed via the bus go through
388 /// [`Self::with_nonce`] with the bus's loaded
389 /// `producer_nonce_path` value so retries dedup across
390 /// process restart.
391 #[inline]
392 pub fn new(shard_id: u16, events: Vec<InternalEvent>, sequence_start: u64) -> Self {
393 Self::with_nonce(shard_id, events, sequence_start, batch_process_nonce())
394 }
395
396 /// Create a new batch with an explicit producer nonce. Used by
397 /// the bus's `BatchWorker` and `remove_shard_internal`'s
398 /// stranded-flush so adapters keying dedup on
399 /// `(producer_nonce, shard, sequence_start, i)` see the same
400 /// nonce across process restart when the bus is configured
401 /// with a `producer_nonce_path`.
402 ///
403 /// A `producer_nonce == 0` is coerced to `1` to preserve the
404 /// non-zero invariant that `batch_process_nonce` and
405 /// `dedup_state::PersistentProducerNonce::create_new` already
406 /// uphold (each generates non-zero u64s and re-rolls on the
407 /// astronomical 1-in-2^64 zero draw).
408 ///
409 /// The zero coercion is **defense-in-depth against future
410 /// codecs**: a downstream caller that constructs a
411 /// `Batch::with_nonce(..., 0)` directly (e.g. tests, hand-built
412 /// fixtures) would otherwise emit `dedup_id` keys starting
413 /// `0:` — collision-prone with any future codec that reserves
414 /// `0` as "no nonce, use the legacy path." Today's
415 /// `adapter/jetstream.rs::on_batch` just formats
416 /// `process_nonce` as `{:x}` with no special-casing, so the
417 /// hazard is latent rather than active. Coercing to 1 keeps
418 /// the invariant that every shipped batch has a non-zero
419 /// producer nonce regardless of caller hygiene.
420 #[inline]
421 pub fn with_nonce(
422 shard_id: u16,
423 events: Vec<InternalEvent>,
424 sequence_start: u64,
425 producer_nonce: u64,
426 ) -> Self {
427 Self {
428 shard_id,
429 events,
430 sequence_start,
431 process_nonce: if producer_nonce == 0 {
432 1
433 } else {
434 producer_nonce
435 },
436 }
437 }
438
439 /// Returns the number of events in this batch.
440 #[inline]
441 pub fn len(&self) -> usize {
442 self.events.len()
443 }
444
445 /// Returns true if this batch is empty.
446 #[inline]
447 pub fn is_empty(&self) -> bool {
448 self.events.is_empty()
449 }
450}
451
452/// An event retrieved from storage with its backend-specific ID.
453#[derive(Debug, Clone)]
454pub struct StoredEvent {
455 /// Backend-specific identifier.
456 pub id: String,
457
458 /// Raw JSON payload bytes (deferred parsing for performance).
459 pub raw: Bytes,
460
461 /// Insertion timestamp from ingestion.
462 pub insertion_ts: u64,
463
464 /// Shard this event belongs to.
465 pub shard_id: u16,
466
467 /// Application-level idempotency key as written by the producer.
468 /// Adapters carry an opaque dedup token on the wire (Redis Streams
469 /// uses a `dedup_id` field; JetStream uses `Nats-Msg-Id`). The
470 /// trait-level consumer ([`crate::adapter::Adapter::poll_shard`])
471 /// surfaces it here so callers can drive their own dedup table
472 /// without re-reading the raw broker payload. `None` when the
473 /// adapter or the wire entry doesn't carry one.
474 pub dedup_id: Option<String>,
475}
476
477impl StoredEvent {
478 /// Create a new stored event from raw bytes.
479 #[inline]
480 pub fn new(id: String, raw: Bytes, insertion_ts: u64, shard_id: u16) -> Self {
481 Self {
482 id,
483 raw,
484 insertion_ts,
485 shard_id,
486 dedup_id: None,
487 }
488 }
489
490 /// Create a new stored event from a JSON value (serializes once).
491 ///
492 /// See `RawEvent::from_value` for the rationale on
493 /// `unwrap_or_default()` instead of `expect()`.
494 #[inline]
495 pub fn from_value(id: String, value: JsonValue, insertion_ts: u64, shard_id: u16) -> Self {
496 let raw = Bytes::from(serde_json::to_vec(&value).unwrap_or_default());
497 Self {
498 id,
499 raw,
500 insertion_ts,
501 shard_id,
502 dedup_id: None,
503 }
504 }
505
506 /// Attach an application-level dedup identifier (the producer's
507 /// `dedup_id` / `Nats-Msg-Id`). Returns `self` for chaining.
508 #[inline]
509 #[must_use]
510 pub fn with_dedup_id(mut self, dedup_id: Option<String>) -> Self {
511 self.dedup_id = dedup_id;
512 self
513 }
514
515 /// Parse the raw bytes into a JSON value on demand.
516 #[inline]
517 pub fn parse(&self) -> Result<JsonValue, serde_json::Error> {
518 serde_json::from_slice(&self.raw)
519 }
520
521 /// Get the raw bytes as a string slice (for serialization).
522 ///
523 /// Returns `Err` if the raw bytes are not valid UTF-8, rather than
524 /// silently substituting data.
525 #[inline]
526 pub fn raw_str(&self) -> Result<&str, std::str::Utf8Error> {
527 std::str::from_utf8(&self.raw)
528 }
529}
530
531impl Serialize for StoredEvent {
532 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
533 where
534 S: serde::Serializer,
535 {
536 use serde::ser::SerializeStruct;
537 // Always emit `dedup_id` (as `null` when absent) so the
538 // on-wire shape is stable. Pre-fix the field count flipped
539 // between 4 and 5 based on whether `dedup_id` was populated;
540 // downstream Node / Python / Go consumers using
541 // `deny_unknown_fields` (or strict schema validators)
542 // accepted the 4-field shape but rejected events whose
543 // adapter populated the 5th, making the wire compatibility
544 // *data-dependent*. Stable shape > byte-optimal: an extra
545 // `"dedup_id":null` per legacy event is cheap, the rejection
546 // hazard is not.
547 let field_count = 5;
548 let mut state = serializer.serialize_struct("StoredEvent", field_count)?;
549 state.serialize_field("id", &self.id)?;
550 // Serialize raw bytes as a `RawValue` so the on-wire JSON
551 // is byte-for-byte the same as the input. Pre-fix the
552 // bytes were parsed into a `JsonValue` tree and re-
553 // serialized; the round-trip discarded original
554 // whitespace, normalized number formatting (`1.0` → `1`),
555 // and (without `preserve_order`) re-ordered map keys
556 // alphabetically. Any downstream that hashed or signed
557 // the serialized form and expected byte-equality with the
558 // input silently failed verification — a sneaky failure
559 // mode in audit / signing pipelines that look at the
560 // re-emitted JSON.
561 //
562 // `RawValue::from_string` validates the JSON (so the
563 // pre-existing "invalid raw JSON returns a serde error,
564 // not a silent null" guarantee is preserved), but emits
565 // the original bytes verbatim instead of round-tripping
566 // through a value tree.
567 let raw_str = std::str::from_utf8(&self.raw)
568 .map_err(|e| serde::ser::Error::custom(format!("invalid raw UTF-8: {}", e)))?;
569 let raw_value = serde_json::value::RawValue::from_string(raw_str.to_string())
570 .map_err(|e| serde::ser::Error::custom(format!("invalid raw JSON: {}", e)))?;
571 state.serialize_field("raw", &*raw_value)?;
572 state.serialize_field("insertion_ts", &self.insertion_ts)?;
573 state.serialize_field("shard_id", &self.shard_id)?;
574 // Always emit `dedup_id` to keep the wire shape stable —
575 // `None` serializes as JSON `null`.
576 state.serialize_field("dedup_id", &self.dedup_id)?;
577 state.end()
578 }
579}
580
581#[cfg(test)]
582mod tests {
583 use super::*;
584 use serde_json::json;
585
586 #[test]
587 fn test_event_new() {
588 let value = json!({"key": "value"});
589 let event = Event::new(value.clone());
590 assert_eq!(event.as_value(), &value);
591 }
592
593 #[test]
594 fn test_event_from_str() {
595 let event = Event::from_str(r#"{"key": "value"}"#).unwrap();
596 assert_eq!(event.as_value()["key"], "value");
597 }
598
599 #[test]
600 fn test_event_from_str_invalid() {
601 let result = Event::from_str("not valid json");
602 assert!(result.is_err());
603 }
604
605 #[test]
606 fn test_event_from_slice() {
607 let bytes = br#"{"key": "value"}"#;
608 let event = Event::from_slice(bytes).unwrap();
609 assert_eq!(event.as_value()["key"], "value");
610 }
611
612 #[test]
613 fn test_event_into_inner() {
614 let value = json!({"key": "value"});
615 let event = Event::new(value.clone());
616 assert_eq!(event.into_inner(), value);
617 }
618
619 #[test]
620 fn test_event_into_raw() {
621 let event = Event::new(json!({"key": "value"}));
622 let raw = event.into_raw();
623 assert!(!raw.is_empty());
624 assert!(raw.hash() != 0);
625 }
626
627 #[test]
628 fn test_event_from_json_value() {
629 let value = json!({"key": "value"});
630 let event: Event = value.clone().into();
631 assert_eq!(event.as_value(), &value);
632 }
633
634 #[test]
635 fn test_event_into_json_value() {
636 let value = json!({"key": "value"});
637 let event = Event::new(value.clone());
638 let result: JsonValue = event.into();
639 assert_eq!(result, value);
640 }
641
642 #[test]
643 fn test_raw_event_from_bytes() {
644 let bytes = br#"{"key": "value"}"#;
645 let raw = RawEvent::from_bytes(bytes.as_slice());
646 assert_eq!(raw.as_bytes(), bytes);
647 assert!(!raw.is_empty());
648 assert_eq!(raw.len(), bytes.len());
649 }
650
651 #[test]
652 fn test_raw_event_from_str() {
653 let s = r#"{"key": "value"}"#;
654 let raw = RawEvent::from_str(s);
655 assert_eq!(raw.as_bytes(), s.as_bytes());
656 }
657
658 #[test]
659 fn test_raw_event_from_value() {
660 let value = json!({"key": "value"});
661 let raw = RawEvent::from_value(value);
662 let parsed = raw.parse().unwrap();
663 assert_eq!(parsed["key"], "value");
664 }
665
666 #[test]
667 fn test_raw_event_from_bytes_validated() {
668 let valid = br#"{"key": "value"}"#;
669 let result = RawEvent::from_bytes_validated(valid.as_slice());
670 assert!(result.is_ok());
671
672 let invalid = b"not valid json";
673 let result = RawEvent::from_bytes_validated(invalid.as_slice());
674 assert!(result.is_err());
675 }
676
677 #[test]
678 fn test_raw_event_hash_consistency() {
679 let raw1 = RawEvent::from_str(r#"{"key": "value"}"#);
680 let raw2 = RawEvent::from_str(r#"{"key": "value"}"#);
681 assert_eq!(raw1.hash(), raw2.hash());
682
683 let raw3 = RawEvent::from_str(r#"{"key": "other"}"#);
684 assert_ne!(raw1.hash(), raw3.hash());
685 }
686
687 #[test]
688 fn test_raw_event_bytes_clone() {
689 let raw = RawEvent::from_str(r#"{"key": "value"}"#);
690 let bytes1 = raw.bytes();
691 let bytes2 = raw.bytes();
692 assert_eq!(bytes1, bytes2);
693 }
694
695 #[test]
696 fn test_raw_event_debug() {
697 let raw = RawEvent::from_str(r#"{"key": "value"}"#);
698 let debug = format!("{:?}", raw);
699 assert!(debug.contains("RawEvent"));
700 assert!(debug.contains("len"));
701 assert!(debug.contains("hash"));
702 }
703
704 #[test]
705 fn test_raw_event_from_event() {
706 let event = Event::new(json!({"key": "value"}));
707 let raw: RawEvent = event.into();
708 assert!(!raw.is_empty());
709 }
710
711 #[test]
712 fn test_raw_event_from_json_value() {
713 let value = json!({"key": "value"});
714 let raw: RawEvent = value.into();
715 assert!(!raw.is_empty());
716 }
717
718 #[test]
719 fn test_internal_event_new() {
720 let raw = Bytes::from(r#"{"key": "value"}"#);
721 let event = InternalEvent::new(raw.clone(), 12345, 0);
722 assert_eq!(event.raw, raw);
723 assert_eq!(event.insertion_ts, 12345);
724 assert_eq!(event.shard_id, 0);
725 }
726
727 #[test]
728 fn test_internal_event_from_value() {
729 let event = InternalEvent::from_value(json!({"key": "value"}), 12345, 0);
730 assert_eq!(event.insertion_ts, 12345);
731 assert_eq!(event.shard_id, 0);
732 let parsed = event.parse().unwrap();
733 assert_eq!(parsed["key"], "value");
734 }
735
736 #[test]
737 fn test_internal_event_as_bytes() {
738 let raw = Bytes::from(r#"{"key": "value"}"#);
739 let event = InternalEvent::new(raw.clone(), 12345, 0);
740 assert_eq!(event.as_bytes(), raw.as_ref());
741 }
742
743 #[test]
744 fn test_batch_new() {
745 let events = vec![
746 InternalEvent::from_value(json!({"i": 0}), 1, 0),
747 InternalEvent::from_value(json!({"i": 1}), 2, 0),
748 ];
749 let batch = Batch::new(0, events, 100);
750 assert_eq!(batch.shard_id, 0);
751 assert_eq!(batch.len(), 2);
752 assert_eq!(batch.sequence_start, 100);
753 assert!(!batch.is_empty());
754 }
755
756 #[test]
757 fn test_batch_empty() {
758 let batch = Batch::new(0, vec![], 0);
759 assert!(batch.is_empty());
760 assert_eq!(batch.len(), 0);
761 }
762
763 #[test]
764 fn test_stored_event_new() {
765 let raw = Bytes::from(r#"{"key":"value"}"#);
766 let event = StoredEvent::new("stream-123".to_string(), raw, 12345, 0);
767 assert_eq!(event.id, "stream-123");
768 let parsed = event.parse().unwrap();
769 assert_eq!(parsed["key"], "value");
770 assert_eq!(event.insertion_ts, 12345);
771 assert_eq!(event.shard_id, 0);
772 }
773
774 // Regression: raw_str() used to silently return "{}" for invalid UTF-8
775 // instead of reporting an error (BUGS_3 #4).
776 #[test]
777 fn test_stored_event_raw_str_valid_utf8() {
778 let raw = Bytes::from(r#"{"key":"value"}"#);
779 let event = StoredEvent::new("id".to_string(), raw, 0, 0);
780 assert_eq!(event.raw_str().unwrap(), r#"{"key":"value"}"#);
781 }
782
783 #[test]
784 fn test_stored_event_raw_str_invalid_utf8_returns_err() {
785 let raw = Bytes::from(vec![0xff, 0xfe, 0xfd]);
786 let event = StoredEvent::new("id".to_string(), raw, 0, 0);
787 assert!(event.raw_str().is_err());
788 }
789
790 // Regression: Serialize impl used to silently replace invalid raw bytes
791 // with null. Now it returns a serialization error (BUGS_4 #3).
792 #[test]
793 fn test_stored_event_serialize_valid() {
794 let raw = Bytes::from(r#"{"key":"value"}"#);
795 let event = StoredEvent::new("id".to_string(), raw, 123, 0);
796 let json = serde_json::to_string(&event).unwrap();
797 assert!(json.contains("\"key\""));
798 assert!(json.contains("\"value\""));
799 }
800
801 #[test]
802 fn test_stored_event_serialize_invalid_raw_returns_error() {
803 let raw = Bytes::from(b"not valid json".as_slice());
804 let event = StoredEvent::new("id".to_string(), raw, 0, 0);
805 let result = serde_json::to_string(&event);
806 assert!(
807 result.is_err(),
808 "serializing invalid raw bytes should error, not silently return null"
809 );
810 }
811
812 /// Regression: `StoredEvent::Serialize` must preserve the
813 /// raw bytes byte-for-byte instead of round-tripping through
814 /// `serde_json::Value`. Pre-fix the round-trip discarded
815 /// original whitespace, normalized number formatting
816 /// (`1.0` → `1`), and re-ordered map keys alphabetically.
817 /// Any downstream that hashed or signed the serialized form
818 /// and expected byte-equality with the input silently failed
819 /// verification.
820 #[test]
821 fn stored_event_serialize_preserves_raw_byte_for_byte() {
822 // Pin three cases where the JsonValue round-trip
823 // demonstrably mutates the bytes:
824 // 1. Whitespace (round-trip strips internal whitespace).
825 // 2. Number formatting (`1.0` → `1`).
826 // 3. Key ordering (BTreeMap default re-orders alphabetically).
827 let cases: &[&[u8]] = &[
828 // Whitespace: extra spaces inside the object literal.
829 br#"{ "key" : "value" }"#,
830 // Number formatting: 1.0 (would become 1 via Value).
831 br#"{"x":1.0,"y":2.5}"#,
832 // Key ordering: "z" before "a" (BTreeMap would re-order).
833 br#"{"z":1,"a":2}"#,
834 ];
835
836 for raw_bytes in cases {
837 let raw = Bytes::copy_from_slice(raw_bytes);
838 let event = StoredEvent::new("id".into(), raw.clone(), 0, 0);
839 let json = serde_json::to_string(&event).unwrap();
840
841 // The serialized output must contain the input bytes
842 // verbatim (as-is, not re-formatted by serde_json).
843 let expected_raw = std::str::from_utf8(raw_bytes).unwrap();
844 assert!(
845 json.contains(expected_raw),
846 "regression: StoredEvent serialization must contain the raw \
847 input verbatim (no whitespace stripping, no number \
848 normalization, no key re-ordering).\n\
849 input: {expected_raw}\n\
850 output: {json}"
851 );
852 }
853 }
854
855 /// Regression: `dedup_id` is always emitted (as `null` when
856 /// absent) so the on-wire shape is stable. Pre-fix the field
857 /// was omitted on `None`, making the shape data-dependent:
858 /// downstream Node / Python / Go consumers using
859 /// `deny_unknown_fields` accepted the 4-field shape but
860 /// rejected events whose adapter happened to populate the 5th.
861 #[test]
862 fn stored_event_serialize_always_emits_dedup_id() {
863 // None → expect `"dedup_id":null` in output.
864 let none_event = StoredEvent::new("id-none".into(), Bytes::from(r#"{"x":1}"#), 0, 0);
865 let none_json = serde_json::to_string(&none_event).unwrap();
866 assert!(
867 none_json.contains("\"dedup_id\":null"),
868 "absent dedup_id must serialize as null, not be omitted; got {none_json}",
869 );
870
871 // Some → expect `"dedup_id":"value"`.
872 let some_event = StoredEvent::new("id-some".into(), Bytes::from(r#"{"x":1}"#), 0, 0)
873 .with_dedup_id(Some("abc".into()));
874 let some_json = serde_json::to_string(&some_event).unwrap();
875 assert!(
876 some_json.contains("\"dedup_id\":\"abc\""),
877 "populated dedup_id must serialize as the string value; got {some_json}",
878 );
879 }
880
881 /// Pin that `Batch::with_nonce` writes the passed value into the
882 /// `process_nonce` field. The bus relies on this to stamp the
883 /// loaded persistent nonce on every emitted batch;
884 /// a future refactor that ignored the parameter would silently
885 /// regress JetStream cross-restart dedup.
886 #[test]
887 fn batch_with_nonce_round_trips_the_passed_value() {
888 let events: Vec<InternalEvent> = (0..3)
889 .map(|i| InternalEvent::from_value(serde_json::json!({"i": i}), i, 0))
890 .collect();
891 let nonce: u64 = 0xDEAD_BEEF_CAFE_F00D;
892 let batch = Batch::with_nonce(7, events, 42, nonce);
893 assert_eq!(batch.shard_id, 7);
894 assert_eq!(batch.sequence_start, 42);
895 assert_eq!(
896 batch.process_nonce, nonce,
897 "Batch::with_nonce must write the passed nonce verbatim",
898 );
899 }
900
901 /// Regression: every `Batch` constructed in this process via
902 /// `Batch::new` (the per-process-fallback constructor) must
903 /// carry the same `process_nonce`. Adapters that persist
904 /// `(shard_id, sequence_start)` for dedup compose it with this
905 /// nonce so two processes that both happen to start sequencing
906 /// at zero (the default after `BatchWorker::new`) don't collide
907 /// on `(shard, 0, 0…)` in the backend's dedup window.
908 ///
909 /// We pin two contracts:
910 /// 1. The nonce is non-zero (a process started at exactly
911 /// `UNIX_EPOCH` with pid 0 would defeat the XOR — defend
912 /// against trivially predictable values).
913 /// 2. Multiple `Batch::new` calls in the same process yield
914 /// the same nonce (so retries within a process land on the
915 /// same dedup key).
916 #[test]
917 fn batch_process_nonce_is_stable_within_process() {
918 let nonce_a = batch_process_nonce();
919 let nonce_b = batch_process_nonce();
920 assert_eq!(
921 nonce_a, nonce_b,
922 "within a single process the nonce must be stable"
923 );
924
925 // And it shows up on every Batch.
926 let b1 = Batch::new(0, vec![], 0);
927 let b2 = Batch::new(1, vec![], 100);
928 assert_eq!(b1.process_nonce, nonce_a);
929 assert_eq!(b2.process_nonce, nonce_a);
930
931 // Best-effort: not-zero. UNIX_EPOCH+pid=0 would leave the
932 // XOR at zero; vanishingly unlikely on any real host but a
933 // cheap sanity check.
934 assert_ne!(nonce_a, 0, "process nonce should be non-zero");
935 }
936
937 /// Cubic-ai P2: `Batch::with_nonce` must enforce the non-zero
938 /// invariant `batch_process_nonce` already upholds. A caller
939 /// that hands in `0` (e.g., uninitialized field, default-init
940 /// of a `u64`, or a misconfigured fallback path) MUST NOT have
941 /// `0` propagate into `process_nonce` — JetStream and other
942 /// consumers treat `0` as a sentinel for "no nonce / legacy
943 /// path", which silently disables cross-restart dedup.
944 ///
945 /// The post-fix behavior coerces `0 → 1`, mirroring the
946 /// `PersistentProducerNonce::create_new` and
947 /// `batch_process_nonce` policy.
948 #[test]
949 fn with_nonce_coerces_zero_to_one_to_preserve_dedup_sentinel() {
950 // Zero in → one out (the canonical replacement, not
951 // `batch_process_nonce` — we don't want the function to
952 // silently route around an explicit caller error).
953 let b = Batch::with_nonce(0, vec![], 0, 0);
954 assert_eq!(
955 b.process_nonce, 1,
956 "with_nonce(producer_nonce=0) must coerce to 1 — \
957 letting 0 through would silently disable JetStream \
958 cross-restart dedup (consumers treat 0 as sentinel)",
959 );
960
961 // Non-zero passes through verbatim.
962 let b = Batch::with_nonce(0, vec![], 0, 0xDEAD_BEEF);
963 assert_eq!(
964 b.process_nonce, 0xDEAD_BEEF,
965 "non-zero producer_nonce must pass through unchanged",
966 );
967 }
968}