net/event.rs
1//! Event types for the Net event bus.
2//!
3//! Events are opaque JSON values - the event bus performs no schema validation
4//! or interpretation of event content.
5//!
6//! # Performance Optimization
7//!
8//! Since Net is schema-agnostic, we offer multiple event representations:
9//!
10//! - `Event`: Standard wrapper around `serde_json::Value` (convenient but slower)
11//! - `RawEvent`: Pre-serialized bytes with cached hash (fastest for high-throughput)
12//!
13//! For maximum performance, use `RawEvent::from_bytes()` when you already have
14//! JSON bytes (e.g., from a network buffer or file).
15
16use bytes::Bytes;
17use serde::{Deserialize, Serialize};
18use serde_json::Value as JsonValue;
19
20/// An opaque event - any valid JSON value.
21///
22/// The event bus does not validate, interpret, or enforce any schema.
23/// Events are treated as opaque binary blobs internally.
24#[derive(Debug, Clone, Serialize, Deserialize)]
25#[repr(transparent)]
26pub struct Event(pub JsonValue);
27
28impl Event {
29 /// Create a new event from a JSON value.
30 #[inline]
31 pub fn new(value: JsonValue) -> Self {
32 Self(value)
33 }
34
35 /// Create an event from a JSON string.
36 #[inline]
37 #[allow(clippy::should_implement_trait)]
38 pub fn from_str(s: &str) -> Result<Self, serde_json::Error> {
39 serde_json::from_str(s).map(Self)
40 }
41
42 /// Create an event from raw bytes.
43 #[inline]
44 pub fn from_slice(bytes: &[u8]) -> Result<Self, serde_json::Error> {
45 serde_json::from_slice(bytes).map(Self)
46 }
47
48 /// Get the inner JSON value.
49 #[inline]
50 pub fn into_inner(self) -> JsonValue {
51 self.0
52 }
53
54 /// Get a reference to the inner JSON value.
55 #[inline]
56 pub fn as_value(&self) -> &JsonValue {
57 &self.0
58 }
59
60 /// Convert to a raw event (serializes once, caches the result).
61 #[inline]
62 pub fn into_raw(self) -> RawEvent {
63 RawEvent::from_value(self.0)
64 }
65}
66
67impl From<JsonValue> for Event {
68 #[inline]
69 fn from(value: JsonValue) -> Self {
70 Self(value)
71 }
72}
73
74impl From<Event> for JsonValue {
75 #[inline]
76 fn from(event: Event) -> Self {
77 event.0
78 }
79}
80
81/// A pre-serialized event with cached hash.
82///
83/// This is the high-performance event type for schema-agnostic ingestion.
84/// By storing the raw bytes and pre-computed hash, we avoid:
85/// - Re-serialization on every operation
86/// - Repeated hashing for shard selection
87///
88/// # Example
89///
90/// ```rust,ignore
91/// // From network buffer or file (zero-copy if Bytes is used)
92/// let raw = RawEvent::from_bytes(network_buffer);
93///
94/// // From existing JSON value (serializes once)
95/// let raw = RawEvent::from_value(json!({"key": "value"}));
96///
97/// // Ingestion uses cached hash - no re-serialization
98/// bus.ingest_raw(raw)?;
99/// ```
100#[derive(Clone)]
101pub struct RawEvent {
102 /// Pre-serialized JSON bytes.
103 bytes: Bytes,
104 /// Pre-computed hash for shard selection.
105 hash: u64,
106}
107
108impl RawEvent {
109 /// Create a raw event from bytes.
110 ///
111 /// The bytes must be valid JSON. No validation is performed for performance.
112 /// Use `from_bytes_validated` if you need validation.
113 #[inline]
114 pub fn from_bytes(bytes: impl Into<Bytes>) -> Self {
115 let bytes = bytes.into();
116 let hash = xxhash_rust::xxh3::xxh3_64(&bytes);
117 Self { bytes, hash }
118 }
119
120 /// Create a raw event from bytes with a pre-computed hash.
121 ///
122 /// Use this when you've already computed the xxhash (e.g., for reused events).
123 /// The caller is responsible for ensuring the hash matches the bytes.
124 #[inline]
125 pub fn from_bytes_with_hash(bytes: impl Into<Bytes>, hash: u64) -> Self {
126 Self {
127 bytes: bytes.into(),
128 hash,
129 }
130 }
131
132 /// Create a raw event from bytes with JSON validation.
133 #[inline]
134 pub fn from_bytes_validated(bytes: impl Into<Bytes>) -> Result<Self, serde_json::Error> {
135 let bytes = bytes.into();
136 // Validate it's valid JSON by attempting to parse
137 let _: JsonValue = serde_json::from_slice(&bytes)?;
138 let hash = xxhash_rust::xxh3::xxh3_64(&bytes);
139 Ok(Self { bytes, hash })
140 }
141
142 /// Create a raw event from a JSON value.
143 ///
144 /// This serializes the value once and caches the result.
145 ///
146 /// `serde_json::to_vec(&JsonValue)` is infallible by
147 /// construction — the value tree is a known-good JSON
148 /// structure with no fallible serializer in the path —
149 /// modulo OOM, which the global allocator handles via
150 /// abort. The `unwrap_or_default()` fallback keeps the
151 /// non-panic contract for a hypothetical future serde-json
152 /// change that introduced a fallible path on `Value`. Pre-
153 /// fix `expect("Value serialization is infallible")` panicked
154 /// at the call site if the assumption ever broke; the panic
155 /// would unwind across `from_value`'s callers (bus ingest,
156 /// FFI ingest paths) where the contract is non-panicking.
157 #[inline]
158 pub fn from_value(value: JsonValue) -> Self {
159 let bytes = Bytes::from(serde_json::to_vec(&value).unwrap_or_default());
160 let hash = xxhash_rust::xxh3::xxh3_64(&bytes);
161 Self { bytes, hash }
162 }
163
164 /// Creates a RawEvent from a string. No validation is performed for
165 /// performance — see `from_bytes_validated` for a validating alternative.
166 #[inline]
167 #[allow(clippy::should_implement_trait)]
168 pub fn from_str(s: &str) -> Self {
169 Self::from_bytes(Bytes::copy_from_slice(s.as_bytes()))
170 }
171
172 /// Get the raw bytes.
173 #[inline]
174 pub fn as_bytes(&self) -> &[u8] {
175 &self.bytes
176 }
177
178 /// Get the bytes (clone is cheap - reference counted).
179 #[inline]
180 pub fn bytes(&self) -> Bytes {
181 self.bytes.clone()
182 }
183
184 /// Get the pre-computed hash.
185 #[inline]
186 pub fn hash(&self) -> u64 {
187 self.hash
188 }
189
190 /// Parse the bytes into a JSON value (for when you need to inspect).
191 #[inline]
192 pub fn parse(&self) -> Result<JsonValue, serde_json::Error> {
193 serde_json::from_slice(&self.bytes)
194 }
195
196 /// Get the byte length.
197 #[inline]
198 pub fn len(&self) -> usize {
199 self.bytes.len()
200 }
201
202 /// Check if empty.
203 #[inline]
204 pub fn is_empty(&self) -> bool {
205 self.bytes.is_empty()
206 }
207}
208
209impl std::fmt::Debug for RawEvent {
210 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
211 f.debug_struct("RawEvent")
212 .field("len", &self.bytes.len())
213 .field("hash", &self.hash)
214 .finish()
215 }
216}
217
218impl From<Event> for RawEvent {
219 #[inline]
220 fn from(event: Event) -> Self {
221 event.into_raw()
222 }
223}
224
225impl From<JsonValue> for RawEvent {
226 #[inline]
227 fn from(value: JsonValue) -> Self {
228 RawEvent::from_value(value)
229 }
230}
231
232/// Internal event representation with metadata assigned at ingestion.
233///
234/// This is the canonical form of an event within the event bus.
235/// The `insertion_ts` provides deterministic ordering within a shard.
236///
237/// Uses `Bytes` for zero-copy, reference-counted storage.
238#[derive(Debug, Clone)]
239pub struct InternalEvent {
240 /// Pre-serialized JSON payload (reference-counted, zero-copy clone).
241 pub raw: Bytes,
242
243 /// Monotonically increasing insertion timestamp (nanoseconds).
244 /// Strictly ordered within a shard, not globally.
245 pub insertion_ts: u64,
246
247 /// Shard this event was assigned to.
248 pub shard_id: u16,
249}
250
251impl InternalEvent {
252 /// Create a new internal event from raw bytes.
253 #[inline]
254 pub fn new(raw: Bytes, insertion_ts: u64, shard_id: u16) -> Self {
255 Self {
256 raw,
257 insertion_ts,
258 shard_id,
259 }
260 }
261
262 /// Create from a JSON value (serializes once).
263 ///
264 /// See `RawEvent::from_value` for the rationale on
265 /// `unwrap_or_default()` instead of `expect()`.
266 #[inline]
267 pub fn from_value(value: JsonValue, insertion_ts: u64, shard_id: u16) -> Self {
268 let raw = Bytes::from(serde_json::to_vec(&value).unwrap_or_default());
269 Self {
270 raw,
271 insertion_ts,
272 shard_id,
273 }
274 }
275
276 /// Parse the raw bytes into a JSON value.
277 #[inline]
278 pub fn parse(&self) -> Result<JsonValue, serde_json::Error> {
279 serde_json::from_slice(&self.raw)
280 }
281
282 /// Get the raw bytes as a slice.
283 #[inline]
284 pub fn as_bytes(&self) -> &[u8] {
285 &self.raw
286 }
287}
288
289/// A batch of events for adapter dispatch.
290///
291/// Batches are formed by shard workers and contain strictly ordered
292/// events from a single shard.
293#[derive(Debug, Clone)]
294pub struct Batch {
295 /// The shard this batch belongs to.
296 pub shard_id: u16,
297
298 /// Events in insertion order.
299 pub events: Vec<InternalEvent>,
300
301 /// Sequence number of the first event in this batch.
302 /// Used for idempotent retry handling.
303 pub sequence_start: u64,
304
305 /// Per-process nonce sampled once at process start. Adapters
306 /// that persist `(shard_id, sequence_start)` for dedup
307 /// (JetStream `Nats-Msg-Id`, Redis stream MAXLEN keys, etc.)
308 /// must include this in the dedup key — otherwise a producer
309 /// that restarts within the backend's dedup window collides
310 /// with its prior incarnation on `(shard, 0, 0)` and the new
311 /// batches are silently discarded as duplicates.
312 ///
313 /// `BatchWorker::next_sequence` is process-local and resets to
314 /// zero on restart; the nonce is the global discriminator that
315 /// makes the composite key globally unique across restarts
316 /// even though the per-process counter is not durable.
317 pub process_nonce: u64,
318}
319
320/// Per-process nonce used by [`Batch::process_nonce`]. Sampled once
321/// (lazy) from a mix of entropy sources so two processes launched on
322/// the same machine within a single nanosecond tick are still
323/// distinguishable.
324///
325/// We don't use `getrandom` here because it's a feature-gated
326/// optional dep — `event.rs` is in the always-compiled core. Instead
327/// we run xxh3 over multiple sources whose joint state is effectively
328/// never identical across two adjacent process starts: wall-clock
329/// nanos, monotonic-clock nanos (resilient to wall-clock skew),
330/// pid, the address of a stack-local (gives an ASLR component),
331/// and the current thread id. Plain XOR of two sources (the
332/// previous implementation) collapses to zero whenever the
333/// components happen to share bit patterns; xxh3 mixes them so any
334/// single non-degenerate source dominates the output.
335pub fn batch_process_nonce() -> u64 {
336 use std::sync::OnceLock;
337 static NONCE: OnceLock<u64> = OnceLock::new();
338 *NONCE.get_or_init(|| {
339 use std::hash::{Hash, Hasher};
340 use std::time::Instant;
341
342 let wall_nanos = std::time::SystemTime::now()
343 .duration_since(std::time::UNIX_EPOCH)
344 .map(|d| d.as_nanos() as u64)
345 .unwrap_or(0);
346 // `Instant::now()` is monotonic — distinct entropy from
347 // `SystemTime` (the OS may slew wall-clock backwards but
348 // never the monotonic source). `Instant` doesn't expose a
349 // public u64 accessor, so we mix the bytes of its `Debug`
350 // repr.
351 let mono_marker = format!("{:?}", Instant::now());
352 let pid = std::process::id() as u64;
353 // Address of a stack local — adds an ASLR-derived
354 // component that differs across process starts even when
355 // the time / pid components happen to collide.
356 let stack_marker: usize = &pid as *const u64 as usize;
357 let mut tid_hasher = std::collections::hash_map::DefaultHasher::new();
358 std::thread::current().id().hash(&mut tid_hasher);
359 let tid = tid_hasher.finish();
360
361 let mut buf = [0u8; 64];
362 buf[..8].copy_from_slice(&wall_nanos.to_le_bytes());
363 buf[8..16].copy_from_slice(&pid.to_le_bytes());
364 buf[16..24].copy_from_slice(&(stack_marker as u64).to_le_bytes());
365 buf[24..32].copy_from_slice(&tid.to_le_bytes());
366 // Pack as much of the monotonic marker bytes as fits.
367 let mono_bytes = mono_marker.as_bytes();
368 let n = mono_bytes.len().min(32);
369 buf[32..32 + n].copy_from_slice(&mono_bytes[..n]);
370
371 let nonce = xxhash_rust::xxh3::xxh3_64(&buf);
372 // Refuse `0` — some consumers treat 0 as a sentinel.
373 // Probability of xxh3 returning exactly 0 is 2^-64; we
374 // map it to 1.
375 if nonce == 0 {
376 1
377 } else {
378 nonce
379 }
380 })
381}
382
383impl Batch {
384 /// Create a new batch using the per-process nonce
385 /// ([`batch_process_nonce`]). Convenience for tests and for
386 /// callers that don't thread a custom producer nonce through.
387 /// Production paths constructed via the bus go through
388 /// [`Self::with_nonce`] with the bus's loaded
389 /// `producer_nonce_path` value so retries dedup across
390 /// process restart.
391 #[inline]
392 pub fn new(shard_id: u16, events: Vec<InternalEvent>, sequence_start: u64) -> Self {
393 Self::with_nonce(shard_id, events, sequence_start, batch_process_nonce())
394 }
395
396 /// Create a new batch with an explicit producer nonce. Used by
397 /// the bus's `BatchWorker` and `remove_shard_internal`'s
398 /// stranded-flush so adapters keying dedup on
399 /// `(producer_nonce, shard, sequence_start, i)` see the same
400 /// nonce across process restart when the bus is configured
401 /// with a `producer_nonce_path`.
402 ///
403 /// A `producer_nonce == 0` is coerced to `1` to preserve the
404 /// non-zero invariant that `batch_process_nonce` and
405 /// `dedup_state::PersistentProducerNonce::create_new` already
406 /// uphold (each generates non-zero u64s and re-rolls on the
407 /// astronomical 1-in-2^64 zero draw).
408 ///
409 /// The zero coercion is **defense-in-depth against future
410 /// codecs**: a downstream caller that constructs a
411 /// `Batch::with_nonce(..., 0)` directly (e.g. tests, hand-built
412 /// fixtures) would otherwise emit `dedup_id` keys starting
413 /// `0:` — collision-prone with any future codec that reserves
414 /// `0` as "no nonce, use the legacy path." Today's
415 /// `adapter/jetstream.rs::on_batch` just formats
416 /// `process_nonce` as `{:x}` with no special-casing, so the
417 /// hazard is latent rather than active. Coercing to 1 keeps
418 /// the invariant that every shipped batch has a non-zero
419 /// producer nonce regardless of caller hygiene.
420 #[inline]
421 pub fn with_nonce(
422 shard_id: u16,
423 events: Vec<InternalEvent>,
424 sequence_start: u64,
425 producer_nonce: u64,
426 ) -> Self {
427 Self {
428 shard_id,
429 events,
430 sequence_start,
431 process_nonce: if producer_nonce == 0 {
432 1
433 } else {
434 producer_nonce
435 },
436 }
437 }
438
439 /// Returns the number of events in this batch.
440 #[inline]
441 pub fn len(&self) -> usize {
442 self.events.len()
443 }
444
445 /// Returns true if this batch is empty.
446 #[inline]
447 pub fn is_empty(&self) -> bool {
448 self.events.is_empty()
449 }
450}
451
452/// An event retrieved from storage with its backend-specific ID.
453#[derive(Debug, Clone)]
454pub struct StoredEvent {
455 /// Backend-specific identifier.
456 pub id: String,
457
458 /// Raw JSON payload bytes (deferred parsing for performance).
459 pub raw: Bytes,
460
461 /// Insertion timestamp from ingestion.
462 pub insertion_ts: u64,
463
464 /// Shard this event belongs to.
465 pub shard_id: u16,
466
467 /// Application-level idempotency key as written by the producer.
468 /// Adapters carry an opaque dedup token on the wire (Redis Streams
469 /// uses a `dedup_id` field; JetStream uses `Nats-Msg-Id`). The
470 /// trait-level consumer ([`crate::adapter::Adapter::poll_shard`])
471 /// surfaces it here so callers can drive their own dedup table
472 /// without re-reading the raw broker payload. `None` when the
473 /// adapter or the wire entry doesn't carry one.
474 pub dedup_id: Option<String>,
475}
476
477impl StoredEvent {
478 /// Create a new stored event from raw bytes.
479 #[inline]
480 pub fn new(id: String, raw: Bytes, insertion_ts: u64, shard_id: u16) -> Self {
481 Self {
482 id,
483 raw,
484 insertion_ts,
485 shard_id,
486 dedup_id: None,
487 }
488 }
489
490 /// Create a new stored event from a JSON value (serializes once).
491 ///
492 /// See `RawEvent::from_value` for the rationale on
493 /// `unwrap_or_default()` instead of `expect()`.
494 #[inline]
495 pub fn from_value(id: String, value: JsonValue, insertion_ts: u64, shard_id: u16) -> Self {
496 let raw = Bytes::from(serde_json::to_vec(&value).unwrap_or_default());
497 Self {
498 id,
499 raw,
500 insertion_ts,
501 shard_id,
502 dedup_id: None,
503 }
504 }
505
506 /// Attach an application-level dedup identifier (the producer's
507 /// `dedup_id` / `Nats-Msg-Id`). Returns `self` for chaining.
508 #[inline]
509 #[must_use]
510 pub fn with_dedup_id(mut self, dedup_id: Option<String>) -> Self {
511 self.dedup_id = dedup_id;
512 self
513 }
514
515 /// Parse the raw bytes into a JSON value on demand.
516 #[inline]
517 pub fn parse(&self) -> Result<JsonValue, serde_json::Error> {
518 serde_json::from_slice(&self.raw)
519 }
520
521 /// Get the raw bytes as a string slice (for serialization).
522 ///
523 /// Returns `Err` if the raw bytes are not valid UTF-8, rather than
524 /// silently substituting data.
525 #[inline]
526 pub fn raw_str(&self) -> Result<&str, std::str::Utf8Error> {
527 std::str::from_utf8(&self.raw)
528 }
529}
530
531impl Serialize for StoredEvent {
532 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
533 where
534 S: serde::Serializer,
535 {
536 use serde::ser::SerializeStruct;
537 // Always emit `dedup_id` (as `null` when absent) so the
538 // on-wire shape is stable. Pre-fix the field count flipped
539 // between 4 and 5 based on whether `dedup_id` was populated;
540 // downstream Node / Python / Go consumers using
541 // `deny_unknown_fields` (or strict schema validators)
542 // accepted the 4-field shape but rejected events whose
543 // adapter populated the 5th, making the wire compatibility
544 // *data-dependent*. Stable shape > byte-optimal: an extra
545 // `"dedup_id":null` per legacy event is cheap, the rejection
546 // hazard is not.
547 let field_count = 5;
548 let mut state = serializer.serialize_struct("StoredEvent", field_count)?;
549 state.serialize_field("id", &self.id)?;
550 // Serialize raw bytes as a `RawValue` so the on-wire JSON
551 // is byte-for-byte the same as the input. Pre-fix the
552 // bytes were parsed into a `JsonValue` tree and re-
553 // serialized; the round-trip discarded original
554 // whitespace, normalized number formatting (`1.0` → `1`),
555 // and (without `preserve_order`) re-ordered map keys
556 // alphabetically. Any downstream that hashed or signed
557 // the serialized form and expected byte-equality with the
558 // input silently failed verification — a sneaky failure
559 // mode in audit / signing pipelines that look at the
560 // re-emitted JSON.
561 //
562 // `from_str::<&RawValue>` validates the JSON (so the
563 // pre-existing "invalid raw JSON returns a serde error,
564 // not a silent null" guarantee is preserved) AND borrows
565 // the input bytes — no allocation.
566 //
567 // PERF_AUDIT §1.8 — pre-fix this used
568 // `RawValue::from_string(raw_str.to_string())`, which
569 // allocated a fresh `String` copy of the entire payload
570 // per serialized event. The borrowed form is byte-for-byte
571 // identical on the wire but skips the copy.
572 let raw_str = std::str::from_utf8(&self.raw)
573 .map_err(|e| serde::ser::Error::custom(format!("invalid raw UTF-8: {}", e)))?;
574 let raw_value: &serde_json::value::RawValue = serde_json::from_str(raw_str)
575 .map_err(|e| serde::ser::Error::custom(format!("invalid raw JSON: {}", e)))?;
576 state.serialize_field("raw", raw_value)?;
577 state.serialize_field("insertion_ts", &self.insertion_ts)?;
578 state.serialize_field("shard_id", &self.shard_id)?;
579 // Always emit `dedup_id` to keep the wire shape stable —
580 // `None` serializes as JSON `null`.
581 state.serialize_field("dedup_id", &self.dedup_id)?;
582 state.end()
583 }
584}
585
586#[cfg(test)]
587mod tests {
588 use super::*;
589 use serde_json::json;
590
591 #[test]
592 fn test_event_new() {
593 let value = json!({"key": "value"});
594 let event = Event::new(value.clone());
595 assert_eq!(event.as_value(), &value);
596 }
597
598 #[test]
599 fn test_event_from_str() {
600 let event = Event::from_str(r#"{"key": "value"}"#).unwrap();
601 assert_eq!(event.as_value()["key"], "value");
602 }
603
604 #[test]
605 fn test_event_from_str_invalid() {
606 let result = Event::from_str("not valid json");
607 assert!(result.is_err());
608 }
609
610 #[test]
611 fn test_event_from_slice() {
612 let bytes = br#"{"key": "value"}"#;
613 let event = Event::from_slice(bytes).unwrap();
614 assert_eq!(event.as_value()["key"], "value");
615 }
616
617 #[test]
618 fn test_event_into_inner() {
619 let value = json!({"key": "value"});
620 let event = Event::new(value.clone());
621 assert_eq!(event.into_inner(), value);
622 }
623
624 #[test]
625 fn test_event_into_raw() {
626 let event = Event::new(json!({"key": "value"}));
627 let raw = event.into_raw();
628 assert!(!raw.is_empty());
629 assert!(raw.hash() != 0);
630 }
631
632 #[test]
633 fn test_event_from_json_value() {
634 let value = json!({"key": "value"});
635 let event: Event = value.clone().into();
636 assert_eq!(event.as_value(), &value);
637 }
638
639 #[test]
640 fn test_event_into_json_value() {
641 let value = json!({"key": "value"});
642 let event = Event::new(value.clone());
643 let result: JsonValue = event.into();
644 assert_eq!(result, value);
645 }
646
647 #[test]
648 fn test_raw_event_from_bytes() {
649 let bytes = br#"{"key": "value"}"#;
650 let raw = RawEvent::from_bytes(bytes.as_slice());
651 assert_eq!(raw.as_bytes(), bytes);
652 assert!(!raw.is_empty());
653 assert_eq!(raw.len(), bytes.len());
654 }
655
656 #[test]
657 fn test_raw_event_from_str() {
658 let s = r#"{"key": "value"}"#;
659 let raw = RawEvent::from_str(s);
660 assert_eq!(raw.as_bytes(), s.as_bytes());
661 }
662
663 #[test]
664 fn test_raw_event_from_value() {
665 let value = json!({"key": "value"});
666 let raw = RawEvent::from_value(value);
667 let parsed = raw.parse().unwrap();
668 assert_eq!(parsed["key"], "value");
669 }
670
671 #[test]
672 fn test_raw_event_from_bytes_validated() {
673 let valid = br#"{"key": "value"}"#;
674 let result = RawEvent::from_bytes_validated(valid.as_slice());
675 assert!(result.is_ok());
676
677 let invalid = b"not valid json";
678 let result = RawEvent::from_bytes_validated(invalid.as_slice());
679 assert!(result.is_err());
680 }
681
682 #[test]
683 fn test_raw_event_hash_consistency() {
684 let raw1 = RawEvent::from_str(r#"{"key": "value"}"#);
685 let raw2 = RawEvent::from_str(r#"{"key": "value"}"#);
686 assert_eq!(raw1.hash(), raw2.hash());
687
688 let raw3 = RawEvent::from_str(r#"{"key": "other"}"#);
689 assert_ne!(raw1.hash(), raw3.hash());
690 }
691
692 #[test]
693 fn test_raw_event_bytes_clone() {
694 let raw = RawEvent::from_str(r#"{"key": "value"}"#);
695 let bytes1 = raw.bytes();
696 let bytes2 = raw.bytes();
697 assert_eq!(bytes1, bytes2);
698 }
699
700 #[test]
701 fn test_raw_event_debug() {
702 let raw = RawEvent::from_str(r#"{"key": "value"}"#);
703 let debug = format!("{:?}", raw);
704 assert!(debug.contains("RawEvent"));
705 assert!(debug.contains("len"));
706 assert!(debug.contains("hash"));
707 }
708
709 #[test]
710 fn test_raw_event_from_event() {
711 let event = Event::new(json!({"key": "value"}));
712 let raw: RawEvent = event.into();
713 assert!(!raw.is_empty());
714 }
715
716 #[test]
717 fn test_raw_event_from_json_value() {
718 let value = json!({"key": "value"});
719 let raw: RawEvent = value.into();
720 assert!(!raw.is_empty());
721 }
722
723 #[test]
724 fn test_internal_event_new() {
725 let raw = Bytes::from(r#"{"key": "value"}"#);
726 let event = InternalEvent::new(raw.clone(), 12345, 0);
727 assert_eq!(event.raw, raw);
728 assert_eq!(event.insertion_ts, 12345);
729 assert_eq!(event.shard_id, 0);
730 }
731
732 #[test]
733 fn test_internal_event_from_value() {
734 let event = InternalEvent::from_value(json!({"key": "value"}), 12345, 0);
735 assert_eq!(event.insertion_ts, 12345);
736 assert_eq!(event.shard_id, 0);
737 let parsed = event.parse().unwrap();
738 assert_eq!(parsed["key"], "value");
739 }
740
741 #[test]
742 fn test_internal_event_as_bytes() {
743 let raw = Bytes::from(r#"{"key": "value"}"#);
744 let event = InternalEvent::new(raw.clone(), 12345, 0);
745 assert_eq!(event.as_bytes(), raw.as_ref());
746 }
747
748 #[test]
749 fn test_batch_new() {
750 let events = vec![
751 InternalEvent::from_value(json!({"i": 0}), 1, 0),
752 InternalEvent::from_value(json!({"i": 1}), 2, 0),
753 ];
754 let batch = Batch::new(0, events, 100);
755 assert_eq!(batch.shard_id, 0);
756 assert_eq!(batch.len(), 2);
757 assert_eq!(batch.sequence_start, 100);
758 assert!(!batch.is_empty());
759 }
760
761 #[test]
762 fn test_batch_empty() {
763 let batch = Batch::new(0, vec![], 0);
764 assert!(batch.is_empty());
765 assert_eq!(batch.len(), 0);
766 }
767
768 #[test]
769 fn test_stored_event_new() {
770 let raw = Bytes::from(r#"{"key":"value"}"#);
771 let event = StoredEvent::new("stream-123".to_string(), raw, 12345, 0);
772 assert_eq!(event.id, "stream-123");
773 let parsed = event.parse().unwrap();
774 assert_eq!(parsed["key"], "value");
775 assert_eq!(event.insertion_ts, 12345);
776 assert_eq!(event.shard_id, 0);
777 }
778
779 // Regression: raw_str() used to silently return "{}" for invalid UTF-8
780 // instead of reporting an error (BUGS_3 #4).
781 #[test]
782 fn test_stored_event_raw_str_valid_utf8() {
783 let raw = Bytes::from(r#"{"key":"value"}"#);
784 let event = StoredEvent::new("id".to_string(), raw, 0, 0);
785 assert_eq!(event.raw_str().unwrap(), r#"{"key":"value"}"#);
786 }
787
788 #[test]
789 fn test_stored_event_raw_str_invalid_utf8_returns_err() {
790 let raw = Bytes::from(vec![0xff, 0xfe, 0xfd]);
791 let event = StoredEvent::new("id".to_string(), raw, 0, 0);
792 assert!(event.raw_str().is_err());
793 }
794
795 // Regression: Serialize impl used to silently replace invalid raw bytes
796 // with null. Now it returns a serialization error (BUGS_4 #3).
797 #[test]
798 fn test_stored_event_serialize_valid() {
799 let raw = Bytes::from(r#"{"key":"value"}"#);
800 let event = StoredEvent::new("id".to_string(), raw, 123, 0);
801 let json = serde_json::to_string(&event).unwrap();
802 assert!(json.contains("\"key\""));
803 assert!(json.contains("\"value\""));
804 }
805
806 #[test]
807 fn test_stored_event_serialize_invalid_raw_returns_error() {
808 let raw = Bytes::from(b"not valid json".as_slice());
809 let event = StoredEvent::new("id".to_string(), raw, 0, 0);
810 let result = serde_json::to_string(&event);
811 assert!(
812 result.is_err(),
813 "serializing invalid raw bytes should error, not silently return null"
814 );
815 }
816
817 /// Regression: `StoredEvent::Serialize` must preserve the
818 /// raw bytes byte-for-byte instead of round-tripping through
819 /// `serde_json::Value`. Pre-fix the round-trip discarded
820 /// original whitespace, normalized number formatting
821 /// (`1.0` → `1`), and re-ordered map keys alphabetically.
822 /// Any downstream that hashed or signed the serialized form
823 /// and expected byte-equality with the input silently failed
824 /// verification.
825 #[test]
826 fn stored_event_serialize_preserves_raw_byte_for_byte() {
827 // Pin three cases where the JsonValue round-trip
828 // demonstrably mutates the bytes:
829 // 1. Whitespace (round-trip strips internal whitespace).
830 // 2. Number formatting (`1.0` → `1`).
831 // 3. Key ordering (BTreeMap default re-orders alphabetically).
832 let cases: &[&[u8]] = &[
833 // Whitespace: extra spaces inside the object literal.
834 br#"{ "key" : "value" }"#,
835 // Number formatting: 1.0 (would become 1 via Value).
836 br#"{"x":1.0,"y":2.5}"#,
837 // Key ordering: "z" before "a" (BTreeMap would re-order).
838 br#"{"z":1,"a":2}"#,
839 ];
840
841 for raw_bytes in cases {
842 let raw = Bytes::copy_from_slice(raw_bytes);
843 let event = StoredEvent::new("id".into(), raw.clone(), 0, 0);
844 let json = serde_json::to_string(&event).unwrap();
845
846 // The serialized output must contain the input bytes
847 // verbatim (as-is, not re-formatted by serde_json).
848 let expected_raw = std::str::from_utf8(raw_bytes).unwrap();
849 assert!(
850 json.contains(expected_raw),
851 "regression: StoredEvent serialization must contain the raw \
852 input verbatim (no whitespace stripping, no number \
853 normalization, no key re-ordering).\n\
854 input: {expected_raw}\n\
855 output: {json}"
856 );
857 }
858 }
859
860 /// Regression: `dedup_id` is always emitted (as `null` when
861 /// absent) so the on-wire shape is stable. Pre-fix the field
862 /// was omitted on `None`, making the shape data-dependent:
863 /// downstream Node / Python / Go consumers using
864 /// `deny_unknown_fields` accepted the 4-field shape but
865 /// rejected events whose adapter happened to populate the 5th.
866 #[test]
867 fn stored_event_serialize_always_emits_dedup_id() {
868 // None → expect `"dedup_id":null` in output.
869 let none_event = StoredEvent::new("id-none".into(), Bytes::from(r#"{"x":1}"#), 0, 0);
870 let none_json = serde_json::to_string(&none_event).unwrap();
871 assert!(
872 none_json.contains("\"dedup_id\":null"),
873 "absent dedup_id must serialize as null, not be omitted; got {none_json}",
874 );
875
876 // Some → expect `"dedup_id":"value"`.
877 let some_event = StoredEvent::new("id-some".into(), Bytes::from(r#"{"x":1}"#), 0, 0)
878 .with_dedup_id(Some("abc".into()));
879 let some_json = serde_json::to_string(&some_event).unwrap();
880 assert!(
881 some_json.contains("\"dedup_id\":\"abc\""),
882 "populated dedup_id must serialize as the string value; got {some_json}",
883 );
884 }
885
886 /// Pin that `Batch::with_nonce` writes the passed value into the
887 /// `process_nonce` field. The bus relies on this to stamp the
888 /// loaded persistent nonce on every emitted batch;
889 /// a future refactor that ignored the parameter would silently
890 /// regress JetStream cross-restart dedup.
891 #[test]
892 fn batch_with_nonce_round_trips_the_passed_value() {
893 let events: Vec<InternalEvent> = (0..3)
894 .map(|i| InternalEvent::from_value(serde_json::json!({"i": i}), i, 0))
895 .collect();
896 let nonce: u64 = 0xDEAD_BEEF_CAFE_F00D;
897 let batch = Batch::with_nonce(7, events, 42, nonce);
898 assert_eq!(batch.shard_id, 7);
899 assert_eq!(batch.sequence_start, 42);
900 assert_eq!(
901 batch.process_nonce, nonce,
902 "Batch::with_nonce must write the passed nonce verbatim",
903 );
904 }
905
906 /// Regression: every `Batch` constructed in this process via
907 /// `Batch::new` (the per-process-fallback constructor) must
908 /// carry the same `process_nonce`. Adapters that persist
909 /// `(shard_id, sequence_start)` for dedup compose it with this
910 /// nonce so two processes that both happen to start sequencing
911 /// at zero (the default after `BatchWorker::new`) don't collide
912 /// on `(shard, 0, 0…)` in the backend's dedup window.
913 ///
914 /// We pin two contracts:
915 /// 1. The nonce is non-zero (a process started at exactly
916 /// `UNIX_EPOCH` with pid 0 would defeat the XOR — defend
917 /// against trivially predictable values).
918 /// 2. Multiple `Batch::new` calls in the same process yield
919 /// the same nonce (so retries within a process land on the
920 /// same dedup key).
921 #[test]
922 fn batch_process_nonce_is_stable_within_process() {
923 let nonce_a = batch_process_nonce();
924 let nonce_b = batch_process_nonce();
925 assert_eq!(
926 nonce_a, nonce_b,
927 "within a single process the nonce must be stable"
928 );
929
930 // And it shows up on every Batch.
931 let b1 = Batch::new(0, vec![], 0);
932 let b2 = Batch::new(1, vec![], 100);
933 assert_eq!(b1.process_nonce, nonce_a);
934 assert_eq!(b2.process_nonce, nonce_a);
935
936 // Best-effort: not-zero. UNIX_EPOCH+pid=0 would leave the
937 // XOR at zero; vanishingly unlikely on any real host but a
938 // cheap sanity check.
939 assert_ne!(nonce_a, 0, "process nonce should be non-zero");
940 }
941
942 /// Cubic-ai P2: `Batch::with_nonce` must enforce the non-zero
943 /// invariant `batch_process_nonce` already upholds. A caller
944 /// that hands in `0` (e.g., uninitialized field, default-init
945 /// of a `u64`, or a misconfigured fallback path) MUST NOT have
946 /// `0` propagate into `process_nonce` — JetStream and other
947 /// consumers treat `0` as a sentinel for "no nonce / legacy
948 /// path", which silently disables cross-restart dedup.
949 ///
950 /// The post-fix behavior coerces `0 → 1`, mirroring the
951 /// `PersistentProducerNonce::create_new` and
952 /// `batch_process_nonce` policy.
953 #[test]
954 fn with_nonce_coerces_zero_to_one_to_preserve_dedup_sentinel() {
955 // Zero in → one out (the canonical replacement, not
956 // `batch_process_nonce` — we don't want the function to
957 // silently route around an explicit caller error).
958 let b = Batch::with_nonce(0, vec![], 0, 0);
959 assert_eq!(
960 b.process_nonce, 1,
961 "with_nonce(producer_nonce=0) must coerce to 1 — \
962 letting 0 through would silently disable JetStream \
963 cross-restart dedup (consumers treat 0 as sentinel)",
964 );
965
966 // Non-zero passes through verbatim.
967 let b = Batch::with_nonce(0, vec![], 0, 0xDEAD_BEEF);
968 assert_eq!(
969 b.process_nonce, 0xDEAD_BEEF,
970 "non-zero producer_nonce must pass through unchanged",
971 );
972 }
973}