net/event.rs
1//! Event types for the Net event bus.
2//!
3//! Events are opaque JSON values - the event bus performs no schema validation
4//! or interpretation of event content.
5//!
6//! # Performance Optimization
7//!
8//! Since Net is schema-agnostic, we offer multiple event representations:
9//!
10//! - `Event`: Standard wrapper around `serde_json::Value` (convenient but slower)
11//! - `RawEvent`: Pre-serialized bytes with cached hash (fastest for high-throughput)
12//!
13//! For maximum performance, use `RawEvent::from_bytes()` when you already have
14//! JSON bytes (e.g., from a network buffer or file).
15
16use bytes::Bytes;
17use serde::{Deserialize, Serialize};
18use serde_json::Value as JsonValue;
19
20/// An opaque event - any valid JSON value.
21///
22/// The event bus does not validate, interpret, or enforce any schema.
23/// Events are treated as opaque binary blobs internally.
24#[derive(Debug, Clone, Serialize, Deserialize)]
25#[repr(transparent)]
26pub struct Event(pub JsonValue);
27
28impl Event {
29 /// Create a new event from a JSON value.
30 #[inline]
31 pub fn new(value: JsonValue) -> Self {
32 Self(value)
33 }
34
35 /// Create an event from a JSON string.
36 #[inline]
37 #[allow(clippy::should_implement_trait)]
38 pub fn from_str(s: &str) -> Result<Self, serde_json::Error> {
39 serde_json::from_str(s).map(Self)
40 }
41
42 /// Create an event from raw bytes.
43 #[inline]
44 pub fn from_slice(bytes: &[u8]) -> Result<Self, serde_json::Error> {
45 serde_json::from_slice(bytes).map(Self)
46 }
47
48 /// Get the inner JSON value.
49 #[inline]
50 pub fn into_inner(self) -> JsonValue {
51 self.0
52 }
53
54 /// Get a reference to the inner JSON value.
55 #[inline]
56 pub fn as_value(&self) -> &JsonValue {
57 &self.0
58 }
59
60 /// Convert to a raw event (serializes once, caches the result).
61 #[inline]
62 pub fn into_raw(self) -> RawEvent {
63 RawEvent::from_value(self.0)
64 }
65}
66
67impl From<JsonValue> for Event {
68 #[inline]
69 fn from(value: JsonValue) -> Self {
70 Self(value)
71 }
72}
73
74impl From<Event> for JsonValue {
75 #[inline]
76 fn from(event: Event) -> Self {
77 event.0
78 }
79}
80
81/// A pre-serialized event with cached hash.
82///
83/// This is the high-performance event type for schema-agnostic ingestion.
84/// By storing the raw bytes and pre-computed hash, we avoid:
85/// - Re-serialization on every operation
86/// - Repeated hashing for shard selection
87///
88/// # Example
89///
90/// ```rust,ignore
91/// // From network buffer or file (zero-copy if Bytes is used)
92/// let raw = RawEvent::from_bytes(network_buffer);
93///
94/// // From existing JSON value (serializes once)
95/// let raw = RawEvent::from_value(json!({"key": "value"}));
96///
97/// // Ingestion uses cached hash - no re-serialization
98/// bus.ingest_raw(raw)?;
99/// ```
100#[derive(Clone)]
101pub struct RawEvent {
102 /// Pre-serialized JSON bytes.
103 bytes: Bytes,
104 /// Pre-computed hash for shard selection.
105 hash: u64,
106}
107
108impl RawEvent {
109 /// Create a raw event from bytes.
110 ///
111 /// The bytes must be valid JSON. No validation is performed for performance.
112 /// Use `from_bytes_validated` if you need validation.
113 #[inline]
114 pub fn from_bytes(bytes: impl Into<Bytes>) -> Self {
115 let bytes = bytes.into();
116 let hash = xxhash_rust::xxh3::xxh3_64(&bytes);
117 Self { bytes, hash }
118 }
119
120 /// Create a raw event from bytes with a pre-computed hash.
121 ///
122 /// Use this when you've already computed the xxhash (e.g., for reused events).
123 /// The caller is responsible for ensuring the hash matches the bytes.
124 #[inline]
125 pub fn from_bytes_with_hash(bytes: impl Into<Bytes>, hash: u64) -> Self {
126 Self {
127 bytes: bytes.into(),
128 hash,
129 }
130 }
131
132 /// Create a raw event from bytes with JSON validation.
133 #[inline]
134 pub fn from_bytes_validated(bytes: impl Into<Bytes>) -> Result<Self, serde_json::Error> {
135 let bytes = bytes.into();
136 // Validate it's valid JSON by attempting to parse
137 let _: JsonValue = serde_json::from_slice(&bytes)?;
138 let hash = xxhash_rust::xxh3::xxh3_64(&bytes);
139 Ok(Self { bytes, hash })
140 }
141
142 /// Create a raw event from a JSON value.
143 ///
144 /// This serializes the value once and caches the result.
145 ///
146 /// `serde_json::to_vec(&JsonValue)` is infallible by
147 /// construction — the value tree is a known-good JSON
148 /// structure with no fallible serializer in the path —
149 /// modulo OOM, which the global allocator handles via
150 /// abort. The `unwrap_or_default()` fallback keeps the
151 /// non-panic contract for a hypothetical future serde-json
152 /// change that introduced a fallible path on `Value`. Pre-
153 /// fix `expect("Value serialization is infallible")` panicked
154 /// at the call site if the assumption ever broke; the panic
155 /// would unwind across `from_value`'s callers (bus ingest,
156 /// FFI ingest paths) where the contract is non-panicking.
157 #[inline]
158 pub fn from_value(value: JsonValue) -> Self {
159 let bytes = Bytes::from(serde_json::to_vec(&value).unwrap_or_default());
160 let hash = xxhash_rust::xxh3::xxh3_64(&bytes);
161 Self { bytes, hash }
162 }
163
164 /// Creates a RawEvent from a string. No validation is performed for
165 /// performance — see `from_bytes_validated` for a validating alternative.
166 #[inline]
167 #[allow(clippy::should_implement_trait)]
168 pub fn from_str(s: &str) -> Self {
169 Self::from_bytes(Bytes::copy_from_slice(s.as_bytes()))
170 }
171
172 /// Get the raw bytes.
173 #[inline]
174 pub fn as_bytes(&self) -> &[u8] {
175 &self.bytes
176 }
177
178 /// Get the bytes (clone is cheap - reference counted).
179 #[inline]
180 pub fn bytes(&self) -> Bytes {
181 self.bytes.clone()
182 }
183
184 /// Get the pre-computed hash.
185 #[inline]
186 pub fn hash(&self) -> u64 {
187 self.hash
188 }
189
190 /// Parse the bytes into a JSON value (for when you need to inspect).
191 #[inline]
192 pub fn parse(&self) -> Result<JsonValue, serde_json::Error> {
193 serde_json::from_slice(&self.bytes)
194 }
195
196 /// Get the byte length.
197 #[inline]
198 pub fn len(&self) -> usize {
199 self.bytes.len()
200 }
201
202 /// Check if empty.
203 #[inline]
204 pub fn is_empty(&self) -> bool {
205 self.bytes.is_empty()
206 }
207}
208
209impl std::fmt::Debug for RawEvent {
210 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
211 f.debug_struct("RawEvent")
212 .field("len", &self.bytes.len())
213 .field("hash", &self.hash)
214 .finish()
215 }
216}
217
218impl From<Event> for RawEvent {
219 #[inline]
220 fn from(event: Event) -> Self {
221 event.into_raw()
222 }
223}
224
225impl From<JsonValue> for RawEvent {
226 #[inline]
227 fn from(value: JsonValue) -> Self {
228 RawEvent::from_value(value)
229 }
230}
231
232/// Internal event representation with metadata assigned at ingestion.
233///
234/// This is the canonical form of an event within the event bus.
235/// The `insertion_ts` provides deterministic ordering within a shard.
236///
237/// Uses `Bytes` for zero-copy, reference-counted storage.
238#[derive(Debug, Clone)]
239pub struct InternalEvent {
240 /// Pre-serialized JSON payload (reference-counted, zero-copy clone).
241 pub raw: Bytes,
242
243 /// Monotonically increasing insertion timestamp (nanoseconds).
244 /// Strictly ordered within a shard, not globally.
245 pub insertion_ts: u64,
246
247 /// Shard this event was assigned to.
248 pub shard_id: u16,
249}
250
251impl InternalEvent {
252 /// Create a new internal event from raw bytes.
253 #[inline]
254 pub fn new(raw: Bytes, insertion_ts: u64, shard_id: u16) -> Self {
255 Self {
256 raw,
257 insertion_ts,
258 shard_id,
259 }
260 }
261
262 /// Create from a JSON value (serializes once).
263 ///
264 /// See `RawEvent::from_value` for the rationale on
265 /// `unwrap_or_default()` instead of `expect()`.
266 #[inline]
267 pub fn from_value(value: JsonValue, insertion_ts: u64, shard_id: u16) -> Self {
268 let raw = Bytes::from(serde_json::to_vec(&value).unwrap_or_default());
269 Self {
270 raw,
271 insertion_ts,
272 shard_id,
273 }
274 }
275
276 /// Parse the raw bytes into a JSON value.
277 #[inline]
278 pub fn parse(&self) -> Result<JsonValue, serde_json::Error> {
279 serde_json::from_slice(&self.raw)
280 }
281
282 /// Get the raw bytes as a slice.
283 #[inline]
284 pub fn as_bytes(&self) -> &[u8] {
285 &self.raw
286 }
287}
288
289/// A batch of events for adapter dispatch.
290///
291/// Batches are formed by shard workers and contain strictly ordered
292/// events from a single shard.
293#[derive(Debug, Clone)]
294pub struct Batch {
295 /// The shard this batch belongs to.
296 pub shard_id: u16,
297
298 /// Events in insertion order.
299 pub events: Vec<InternalEvent>,
300
301 /// Sequence number of the first event in this batch.
302 /// Used for idempotent retry handling.
303 pub sequence_start: u64,
304
305 /// Per-process nonce sampled once at process start. Adapters
306 /// that persist `(shard_id, sequence_start)` for dedup
307 /// (JetStream `Nats-Msg-Id`, Redis stream MAXLEN keys, etc.)
308 /// must include this in the dedup key — otherwise a producer
309 /// that restarts within the backend's dedup window collides
310 /// with its prior incarnation on `(shard, 0, 0)` and the new
311 /// batches are silently discarded as duplicates.
312 ///
313 /// `BatchWorker::next_sequence` is process-local and resets to
314 /// zero on restart; the nonce is the global discriminator that
315 /// makes the composite key globally unique across restarts
316 /// even though the per-process counter is not durable.
317 pub process_nonce: u64,
318}
319
320/// Per-process nonce used by [`Batch::process_nonce`]. Sampled once
321/// (lazy) from a mix of entropy sources so two processes launched on
322/// the same machine within a single nanosecond tick are still
323/// distinguishable.
324///
325/// We don't use `getrandom` here because it's a feature-gated
326/// optional dep — `event.rs` is in the always-compiled core. Instead
327/// we run xxh3 over multiple sources whose joint state is effectively
328/// never identical across two adjacent process starts: wall-clock
329/// nanos, monotonic-clock nanos (resilient to wall-clock skew),
330/// pid, the address of a stack-local (gives an ASLR component),
331/// and the current thread id. Plain XOR of two sources (the
332/// previous implementation) collapses to zero whenever the
333/// components happen to share bit patterns; xxh3 mixes them so any
334/// single non-degenerate source dominates the output.
335pub fn batch_process_nonce() -> u64 {
336 use std::sync::OnceLock;
337 static NONCE: OnceLock<u64> = OnceLock::new();
338 *NONCE.get_or_init(|| {
339 use std::hash::{Hash, Hasher};
340 use std::time::Instant;
341
342 let wall_nanos = std::time::SystemTime::now()
343 .duration_since(std::time::UNIX_EPOCH)
344 .map(|d| d.as_nanos() as u64)
345 .unwrap_or(0);
346 // `Instant::now()` is monotonic — distinct entropy from
347 // `SystemTime` (the OS may slew wall-clock backwards but
348 // never the monotonic source). `Instant` doesn't expose a
349 // public u64 accessor, so we mix the bytes of its `Debug`
350 // repr.
351 let mono_marker = format!("{:?}", Instant::now());
352 let pid = std::process::id() as u64;
353 // Address of a stack local — adds an ASLR-derived
354 // component that differs across process starts even when
355 // the time / pid components happen to collide.
356 let stack_marker: usize = &pid as *const u64 as usize;
357 let mut tid_hasher = std::collections::hash_map::DefaultHasher::new();
358 std::thread::current().id().hash(&mut tid_hasher);
359 let tid = tid_hasher.finish();
360
361 let mut buf = [0u8; 64];
362 buf[..8].copy_from_slice(&wall_nanos.to_le_bytes());
363 buf[8..16].copy_from_slice(&pid.to_le_bytes());
364 buf[16..24].copy_from_slice(&(stack_marker as u64).to_le_bytes());
365 buf[24..32].copy_from_slice(&tid.to_le_bytes());
366 // Pack as much of the monotonic marker bytes as fits.
367 let mono_bytes = mono_marker.as_bytes();
368 let n = mono_bytes.len().min(32);
369 buf[32..32 + n].copy_from_slice(&mono_bytes[..n]);
370
371 let nonce = xxhash_rust::xxh3::xxh3_64(&buf);
372 // Refuse `0` — some consumers treat 0 as a sentinel.
373 // Probability of xxh3 returning exactly 0 is 2^-64; we
374 // map it to 1.
375 if nonce == 0 {
376 1
377 } else {
378 nonce
379 }
380 })
381}
382
383impl Batch {
384 /// Create a new batch using the per-process nonce
385 /// ([`batch_process_nonce`]). Convenience for tests and for
386 /// callers that don't thread a custom producer nonce through.
387 /// Production paths constructed via the bus go through
388 /// [`Self::with_nonce`] with the bus's loaded
389 /// `producer_nonce_path` value so retries dedup across
390 /// process restart.
391 #[inline]
392 pub fn new(shard_id: u16, events: Vec<InternalEvent>, sequence_start: u64) -> Self {
393 Self::with_nonce(shard_id, events, sequence_start, batch_process_nonce())
394 }
395
396 /// Create a new batch with an explicit producer nonce. Used by
397 /// the bus's `BatchWorker` and `remove_shard_internal`'s
398 /// stranded-flush so adapters keying dedup on
399 /// `(producer_nonce, shard, sequence_start, i)` see the same
400 /// nonce across process restart when the bus is configured
401 /// with a `producer_nonce_path`.
402 ///
403 /// A `producer_nonce == 0` is coerced to `1` to preserve the
404 /// non-zero invariant that `batch_process_nonce` and
405 /// `dedup_state::PersistentProducerNonce::create_new` already
406 /// uphold (each generates non-zero u64s and re-rolls on the
407 /// astronomical 1-in-2^64 zero draw).
408 ///
409 /// The zero coercion is **defense-in-depth against future
410 /// codecs**: a downstream caller that constructs a
411 /// `Batch::with_nonce(..., 0)` directly (e.g. tests, hand-built
412 /// fixtures) would otherwise emit `dedup_id` keys starting
413 /// `0:` — collision-prone with any future codec that reserves
414 /// `0` as "no nonce, use the legacy path." Today's
415 /// `adapter/jetstream.rs::on_batch` just formats
416 /// `process_nonce` as `{:x}` with no special-casing, so the
417 /// hazard is latent rather than active. Coercing to 1 keeps
418 /// the invariant that every shipped batch has a non-zero
419 /// producer nonce regardless of caller hygiene.
420 #[inline]
421 pub fn with_nonce(
422 shard_id: u16,
423 events: Vec<InternalEvent>,
424 sequence_start: u64,
425 producer_nonce: u64,
426 ) -> Self {
427 Self {
428 shard_id,
429 events,
430 sequence_start,
431 process_nonce: if producer_nonce == 0 {
432 1
433 } else {
434 producer_nonce
435 },
436 }
437 }
438
439 /// Returns the number of events in this batch.
440 #[inline]
441 pub fn len(&self) -> usize {
442 self.events.len()
443 }
444
445 /// Returns true if this batch is empty.
446 #[inline]
447 pub fn is_empty(&self) -> bool {
448 self.events.is_empty()
449 }
450}
451
452/// An event retrieved from storage with its backend-specific ID.
453#[derive(Debug, Clone)]
454pub struct StoredEvent {
455 /// Backend-specific identifier.
456 pub id: String,
457
458 /// Raw JSON payload bytes (deferred parsing for performance).
459 pub raw: Bytes,
460
461 /// Insertion timestamp from ingestion.
462 pub insertion_ts: u64,
463
464 /// Shard this event belongs to.
465 pub shard_id: u16,
466}
467
468impl StoredEvent {
469 /// Create a new stored event from raw bytes.
470 #[inline]
471 pub fn new(id: String, raw: Bytes, insertion_ts: u64, shard_id: u16) -> Self {
472 Self {
473 id,
474 raw,
475 insertion_ts,
476 shard_id,
477 }
478 }
479
480 /// Create a new stored event from a JSON value (serializes once).
481 ///
482 /// See `RawEvent::from_value` for the rationale on
483 /// `unwrap_or_default()` instead of `expect()`.
484 #[inline]
485 pub fn from_value(id: String, value: JsonValue, insertion_ts: u64, shard_id: u16) -> Self {
486 let raw = Bytes::from(serde_json::to_vec(&value).unwrap_or_default());
487 Self {
488 id,
489 raw,
490 insertion_ts,
491 shard_id,
492 }
493 }
494
495 /// Parse the raw bytes into a JSON value on demand.
496 #[inline]
497 pub fn parse(&self) -> Result<JsonValue, serde_json::Error> {
498 serde_json::from_slice(&self.raw)
499 }
500
501 /// Get the raw bytes as a string slice (for serialization).
502 ///
503 /// Returns `Err` if the raw bytes are not valid UTF-8, rather than
504 /// silently substituting data.
505 #[inline]
506 pub fn raw_str(&self) -> Result<&str, std::str::Utf8Error> {
507 std::str::from_utf8(&self.raw)
508 }
509}
510
511impl Serialize for StoredEvent {
512 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
513 where
514 S: serde::Serializer,
515 {
516 use serde::ser::SerializeStruct;
517 let mut state = serializer.serialize_struct("StoredEvent", 4)?;
518 state.serialize_field("id", &self.id)?;
519 // Serialize raw bytes as a `RawValue` so the on-wire JSON
520 // is byte-for-byte the same as the input. Pre-fix the
521 // bytes were parsed into a `JsonValue` tree and re-
522 // serialized; the round-trip discarded original
523 // whitespace, normalized number formatting (`1.0` → `1`),
524 // and (without `preserve_order`) re-ordered map keys
525 // alphabetically. Any downstream that hashed or signed
526 // the serialized form and expected byte-equality with the
527 // input silently failed verification — a sneaky failure
528 // mode in audit / signing pipelines that look at the
529 // re-emitted JSON.
530 //
531 // `RawValue::from_string` validates the JSON (so the
532 // pre-existing "invalid raw JSON returns a serde error,
533 // not a silent null" guarantee is preserved), but emits
534 // the original bytes verbatim instead of round-tripping
535 // through a value tree.
536 let raw_str = std::str::from_utf8(&self.raw)
537 .map_err(|e| serde::ser::Error::custom(format!("invalid raw UTF-8: {}", e)))?;
538 let raw_value = serde_json::value::RawValue::from_string(raw_str.to_string())
539 .map_err(|e| serde::ser::Error::custom(format!("invalid raw JSON: {}", e)))?;
540 state.serialize_field("raw", &*raw_value)?;
541 state.serialize_field("insertion_ts", &self.insertion_ts)?;
542 state.serialize_field("shard_id", &self.shard_id)?;
543 state.end()
544 }
545}
546
547#[cfg(test)]
548mod tests {
549 use super::*;
550 use serde_json::json;
551
552 #[test]
553 fn test_event_new() {
554 let value = json!({"key": "value"});
555 let event = Event::new(value.clone());
556 assert_eq!(event.as_value(), &value);
557 }
558
559 #[test]
560 fn test_event_from_str() {
561 let event = Event::from_str(r#"{"key": "value"}"#).unwrap();
562 assert_eq!(event.as_value()["key"], "value");
563 }
564
565 #[test]
566 fn test_event_from_str_invalid() {
567 let result = Event::from_str("not valid json");
568 assert!(result.is_err());
569 }
570
571 #[test]
572 fn test_event_from_slice() {
573 let bytes = br#"{"key": "value"}"#;
574 let event = Event::from_slice(bytes).unwrap();
575 assert_eq!(event.as_value()["key"], "value");
576 }
577
578 #[test]
579 fn test_event_into_inner() {
580 let value = json!({"key": "value"});
581 let event = Event::new(value.clone());
582 assert_eq!(event.into_inner(), value);
583 }
584
585 #[test]
586 fn test_event_into_raw() {
587 let event = Event::new(json!({"key": "value"}));
588 let raw = event.into_raw();
589 assert!(!raw.is_empty());
590 assert!(raw.hash() != 0);
591 }
592
593 #[test]
594 fn test_event_from_json_value() {
595 let value = json!({"key": "value"});
596 let event: Event = value.clone().into();
597 assert_eq!(event.as_value(), &value);
598 }
599
600 #[test]
601 fn test_event_into_json_value() {
602 let value = json!({"key": "value"});
603 let event = Event::new(value.clone());
604 let result: JsonValue = event.into();
605 assert_eq!(result, value);
606 }
607
608 #[test]
609 fn test_raw_event_from_bytes() {
610 let bytes = br#"{"key": "value"}"#;
611 let raw = RawEvent::from_bytes(bytes.as_slice());
612 assert_eq!(raw.as_bytes(), bytes);
613 assert!(!raw.is_empty());
614 assert_eq!(raw.len(), bytes.len());
615 }
616
617 #[test]
618 fn test_raw_event_from_str() {
619 let s = r#"{"key": "value"}"#;
620 let raw = RawEvent::from_str(s);
621 assert_eq!(raw.as_bytes(), s.as_bytes());
622 }
623
624 #[test]
625 fn test_raw_event_from_value() {
626 let value = json!({"key": "value"});
627 let raw = RawEvent::from_value(value);
628 let parsed = raw.parse().unwrap();
629 assert_eq!(parsed["key"], "value");
630 }
631
632 #[test]
633 fn test_raw_event_from_bytes_validated() {
634 let valid = br#"{"key": "value"}"#;
635 let result = RawEvent::from_bytes_validated(valid.as_slice());
636 assert!(result.is_ok());
637
638 let invalid = b"not valid json";
639 let result = RawEvent::from_bytes_validated(invalid.as_slice());
640 assert!(result.is_err());
641 }
642
643 #[test]
644 fn test_raw_event_hash_consistency() {
645 let raw1 = RawEvent::from_str(r#"{"key": "value"}"#);
646 let raw2 = RawEvent::from_str(r#"{"key": "value"}"#);
647 assert_eq!(raw1.hash(), raw2.hash());
648
649 let raw3 = RawEvent::from_str(r#"{"key": "other"}"#);
650 assert_ne!(raw1.hash(), raw3.hash());
651 }
652
653 #[test]
654 fn test_raw_event_bytes_clone() {
655 let raw = RawEvent::from_str(r#"{"key": "value"}"#);
656 let bytes1 = raw.bytes();
657 let bytes2 = raw.bytes();
658 assert_eq!(bytes1, bytes2);
659 }
660
661 #[test]
662 fn test_raw_event_debug() {
663 let raw = RawEvent::from_str(r#"{"key": "value"}"#);
664 let debug = format!("{:?}", raw);
665 assert!(debug.contains("RawEvent"));
666 assert!(debug.contains("len"));
667 assert!(debug.contains("hash"));
668 }
669
670 #[test]
671 fn test_raw_event_from_event() {
672 let event = Event::new(json!({"key": "value"}));
673 let raw: RawEvent = event.into();
674 assert!(!raw.is_empty());
675 }
676
677 #[test]
678 fn test_raw_event_from_json_value() {
679 let value = json!({"key": "value"});
680 let raw: RawEvent = value.into();
681 assert!(!raw.is_empty());
682 }
683
684 #[test]
685 fn test_internal_event_new() {
686 let raw = Bytes::from(r#"{"key": "value"}"#);
687 let event = InternalEvent::new(raw.clone(), 12345, 0);
688 assert_eq!(event.raw, raw);
689 assert_eq!(event.insertion_ts, 12345);
690 assert_eq!(event.shard_id, 0);
691 }
692
693 #[test]
694 fn test_internal_event_from_value() {
695 let event = InternalEvent::from_value(json!({"key": "value"}), 12345, 0);
696 assert_eq!(event.insertion_ts, 12345);
697 assert_eq!(event.shard_id, 0);
698 let parsed = event.parse().unwrap();
699 assert_eq!(parsed["key"], "value");
700 }
701
702 #[test]
703 fn test_internal_event_as_bytes() {
704 let raw = Bytes::from(r#"{"key": "value"}"#);
705 let event = InternalEvent::new(raw.clone(), 12345, 0);
706 assert_eq!(event.as_bytes(), raw.as_ref());
707 }
708
709 #[test]
710 fn test_batch_new() {
711 let events = vec![
712 InternalEvent::from_value(json!({"i": 0}), 1, 0),
713 InternalEvent::from_value(json!({"i": 1}), 2, 0),
714 ];
715 let batch = Batch::new(0, events, 100);
716 assert_eq!(batch.shard_id, 0);
717 assert_eq!(batch.len(), 2);
718 assert_eq!(batch.sequence_start, 100);
719 assert!(!batch.is_empty());
720 }
721
722 #[test]
723 fn test_batch_empty() {
724 let batch = Batch::new(0, vec![], 0);
725 assert!(batch.is_empty());
726 assert_eq!(batch.len(), 0);
727 }
728
729 #[test]
730 fn test_stored_event_new() {
731 let raw = Bytes::from(r#"{"key":"value"}"#);
732 let event = StoredEvent::new("stream-123".to_string(), raw, 12345, 0);
733 assert_eq!(event.id, "stream-123");
734 let parsed = event.parse().unwrap();
735 assert_eq!(parsed["key"], "value");
736 assert_eq!(event.insertion_ts, 12345);
737 assert_eq!(event.shard_id, 0);
738 }
739
740 // Regression: raw_str() used to silently return "{}" for invalid UTF-8
741 // instead of reporting an error (BUGS_3 #4).
742 #[test]
743 fn test_stored_event_raw_str_valid_utf8() {
744 let raw = Bytes::from(r#"{"key":"value"}"#);
745 let event = StoredEvent::new("id".to_string(), raw, 0, 0);
746 assert_eq!(event.raw_str().unwrap(), r#"{"key":"value"}"#);
747 }
748
749 #[test]
750 fn test_stored_event_raw_str_invalid_utf8_returns_err() {
751 let raw = Bytes::from(vec![0xff, 0xfe, 0xfd]);
752 let event = StoredEvent::new("id".to_string(), raw, 0, 0);
753 assert!(event.raw_str().is_err());
754 }
755
756 // Regression: Serialize impl used to silently replace invalid raw bytes
757 // with null. Now it returns a serialization error (BUGS_4 #3).
758 #[test]
759 fn test_stored_event_serialize_valid() {
760 let raw = Bytes::from(r#"{"key":"value"}"#);
761 let event = StoredEvent::new("id".to_string(), raw, 123, 0);
762 let json = serde_json::to_string(&event).unwrap();
763 assert!(json.contains("\"key\""));
764 assert!(json.contains("\"value\""));
765 }
766
767 #[test]
768 fn test_stored_event_serialize_invalid_raw_returns_error() {
769 let raw = Bytes::from(b"not valid json".as_slice());
770 let event = StoredEvent::new("id".to_string(), raw, 0, 0);
771 let result = serde_json::to_string(&event);
772 assert!(
773 result.is_err(),
774 "serializing invalid raw bytes should error, not silently return null"
775 );
776 }
777
778 /// Regression: `StoredEvent::Serialize` must preserve the
779 /// raw bytes byte-for-byte instead of round-tripping through
780 /// `serde_json::Value`. Pre-fix the round-trip discarded
781 /// original whitespace, normalized number formatting
782 /// (`1.0` → `1`), and re-ordered map keys alphabetically.
783 /// Any downstream that hashed or signed the serialized form
784 /// and expected byte-equality with the input silently failed
785 /// verification.
786 #[test]
787 fn stored_event_serialize_preserves_raw_byte_for_byte() {
788 // Pin three cases where the JsonValue round-trip
789 // demonstrably mutates the bytes:
790 // 1. Whitespace (round-trip strips internal whitespace).
791 // 2. Number formatting (`1.0` → `1`).
792 // 3. Key ordering (BTreeMap default re-orders alphabetically).
793 let cases: &[&[u8]] = &[
794 // Whitespace: extra spaces inside the object literal.
795 br#"{ "key" : "value" }"#,
796 // Number formatting: 1.0 (would become 1 via Value).
797 br#"{"x":1.0,"y":2.5}"#,
798 // Key ordering: "z" before "a" (BTreeMap would re-order).
799 br#"{"z":1,"a":2}"#,
800 ];
801
802 for raw_bytes in cases {
803 let raw = Bytes::copy_from_slice(raw_bytes);
804 let event = StoredEvent::new("id".into(), raw.clone(), 0, 0);
805 let json = serde_json::to_string(&event).unwrap();
806
807 // The serialized output must contain the input bytes
808 // verbatim (as-is, not re-formatted by serde_json).
809 let expected_raw = std::str::from_utf8(raw_bytes).unwrap();
810 assert!(
811 json.contains(expected_raw),
812 "regression: StoredEvent serialization must contain the raw \
813 input verbatim (no whitespace stripping, no number \
814 normalization, no key re-ordering).\n\
815 input: {expected_raw}\n\
816 output: {json}"
817 );
818 }
819 }
820
821 /// Pin that `Batch::with_nonce` writes the passed value into the
822 /// `process_nonce` field. The bus relies on this to stamp the
823 /// loaded persistent nonce on every emitted batch;
824 /// a future refactor that ignored the parameter would silently
825 /// regress JetStream cross-restart dedup.
826 #[test]
827 fn batch_with_nonce_round_trips_the_passed_value() {
828 let events: Vec<InternalEvent> = (0..3)
829 .map(|i| InternalEvent::from_value(serde_json::json!({"i": i}), i, 0))
830 .collect();
831 let nonce: u64 = 0xDEAD_BEEF_CAFE_F00D;
832 let batch = Batch::with_nonce(7, events, 42, nonce);
833 assert_eq!(batch.shard_id, 7);
834 assert_eq!(batch.sequence_start, 42);
835 assert_eq!(
836 batch.process_nonce, nonce,
837 "Batch::with_nonce must write the passed nonce verbatim",
838 );
839 }
840
841 /// Regression: every `Batch` constructed in this process via
842 /// `Batch::new` (the per-process-fallback constructor) must
843 /// carry the same `process_nonce`. Adapters that persist
844 /// `(shard_id, sequence_start)` for dedup compose it with this
845 /// nonce so two processes that both happen to start sequencing
846 /// at zero (the default after `BatchWorker::new`) don't collide
847 /// on `(shard, 0, 0…)` in the backend's dedup window.
848 ///
849 /// We pin two contracts:
850 /// 1. The nonce is non-zero (a process started at exactly
851 /// `UNIX_EPOCH` with pid 0 would defeat the XOR — defend
852 /// against trivially predictable values).
853 /// 2. Multiple `Batch::new` calls in the same process yield
854 /// the same nonce (so retries within a process land on the
855 /// same dedup key).
856 #[test]
857 fn batch_process_nonce_is_stable_within_process() {
858 let nonce_a = batch_process_nonce();
859 let nonce_b = batch_process_nonce();
860 assert_eq!(
861 nonce_a, nonce_b,
862 "within a single process the nonce must be stable"
863 );
864
865 // And it shows up on every Batch.
866 let b1 = Batch::new(0, vec![], 0);
867 let b2 = Batch::new(1, vec![], 100);
868 assert_eq!(b1.process_nonce, nonce_a);
869 assert_eq!(b2.process_nonce, nonce_a);
870
871 // Best-effort: not-zero. UNIX_EPOCH+pid=0 would leave the
872 // XOR at zero; vanishingly unlikely on any real host but a
873 // cheap sanity check.
874 assert_ne!(nonce_a, 0, "process nonce should be non-zero");
875 }
876
877 /// Cubic-ai P2: `Batch::with_nonce` must enforce the non-zero
878 /// invariant `batch_process_nonce` already upholds. A caller
879 /// that hands in `0` (e.g., uninitialized field, default-init
880 /// of a `u64`, or a misconfigured fallback path) MUST NOT have
881 /// `0` propagate into `process_nonce` — JetStream and other
882 /// consumers treat `0` as a sentinel for "no nonce / legacy
883 /// path", which silently disables cross-restart dedup.
884 ///
885 /// The post-fix behavior coerces `0 → 1`, mirroring the
886 /// `PersistentProducerNonce::create_new` and
887 /// `batch_process_nonce` policy.
888 #[test]
889 fn with_nonce_coerces_zero_to_one_to_preserve_dedup_sentinel() {
890 // Zero in → one out (the canonical replacement, not
891 // `batch_process_nonce` — we don't want the function to
892 // silently route around an explicit caller error).
893 let b = Batch::with_nonce(0, vec![], 0, 0);
894 assert_eq!(
895 b.process_nonce, 1,
896 "with_nonce(producer_nonce=0) must coerce to 1 — \
897 letting 0 through would silently disable JetStream \
898 cross-restart dedup (consumers treat 0 as sentinel)",
899 );
900
901 // Non-zero passes through verbatim.
902 let b = Batch::with_nonce(0, vec![], 0, 0xDEAD_BEEF);
903 assert_eq!(
904 b.process_nonce, 0xDEAD_BEEF,
905 "non-zero producer_nonce must pass through unchanged",
906 );
907 }
908}