Skip to main content

net/
event.rs

1//! Event types for the Net event bus.
2//!
3//! Events are opaque JSON values - the event bus performs no schema validation
4//! or interpretation of event content.
5//!
6//! # Performance Optimization
7//!
8//! Since Net is schema-agnostic, we offer multiple event representations:
9//!
10//! - `Event`: Standard wrapper around `serde_json::Value` (convenient but slower)
11//! - `RawEvent`: Pre-serialized bytes with cached hash (fastest for high-throughput)
12//!
13//! For maximum performance, use `RawEvent::from_bytes()` when you already have
14//! JSON bytes (e.g., from a network buffer or file).
15
16use bytes::Bytes;
17use serde::{Deserialize, Serialize};
18use serde_json::Value as JsonValue;
19
20/// An opaque event - any valid JSON value.
21///
22/// The event bus does not validate, interpret, or enforce any schema.
23/// Events are treated as opaque binary blobs internally.
24#[derive(Debug, Clone, Serialize, Deserialize)]
25#[repr(transparent)]
26pub struct Event(pub JsonValue);
27
28impl Event {
29    /// Create a new event from a JSON value.
30    #[inline]
31    pub fn new(value: JsonValue) -> Self {
32        Self(value)
33    }
34
35    /// Create an event from a JSON string.
36    #[inline]
37    #[allow(clippy::should_implement_trait)]
38    pub fn from_str(s: &str) -> Result<Self, serde_json::Error> {
39        serde_json::from_str(s).map(Self)
40    }
41
42    /// Create an event from raw bytes.
43    #[inline]
44    pub fn from_slice(bytes: &[u8]) -> Result<Self, serde_json::Error> {
45        serde_json::from_slice(bytes).map(Self)
46    }
47
48    /// Get the inner JSON value.
49    #[inline]
50    pub fn into_inner(self) -> JsonValue {
51        self.0
52    }
53
54    /// Get a reference to the inner JSON value.
55    #[inline]
56    pub fn as_value(&self) -> &JsonValue {
57        &self.0
58    }
59
60    /// Convert to a raw event (serializes once, caches the result).
61    #[inline]
62    pub fn into_raw(self) -> RawEvent {
63        RawEvent::from_value(self.0)
64    }
65}
66
67impl From<JsonValue> for Event {
68    #[inline]
69    fn from(value: JsonValue) -> Self {
70        Self(value)
71    }
72}
73
74impl From<Event> for JsonValue {
75    #[inline]
76    fn from(event: Event) -> Self {
77        event.0
78    }
79}
80
81/// A pre-serialized event with cached hash.
82///
83/// This is the high-performance event type for schema-agnostic ingestion.
84/// By storing the raw bytes and pre-computed hash, we avoid:
85/// - Re-serialization on every operation
86/// - Repeated hashing for shard selection
87///
88/// # Example
89///
90/// ```rust,ignore
91/// // From network buffer or file (zero-copy if Bytes is used)
92/// let raw = RawEvent::from_bytes(network_buffer);
93///
94/// // From existing JSON value (serializes once)
95/// let raw = RawEvent::from_value(json!({"key": "value"}));
96///
97/// // Ingestion uses cached hash - no re-serialization
98/// bus.ingest_raw(raw)?;
99/// ```
100#[derive(Clone)]
101pub struct RawEvent {
102    /// Pre-serialized JSON bytes.
103    bytes: Bytes,
104    /// Pre-computed hash for shard selection.
105    hash: u64,
106}
107
108impl RawEvent {
109    /// Create a raw event from bytes.
110    ///
111    /// The bytes must be valid JSON. No validation is performed for performance.
112    /// Use `from_bytes_validated` if you need validation.
113    #[inline]
114    pub fn from_bytes(bytes: impl Into<Bytes>) -> Self {
115        let bytes = bytes.into();
116        let hash = xxhash_rust::xxh3::xxh3_64(&bytes);
117        Self { bytes, hash }
118    }
119
120    /// Create a raw event from bytes with a pre-computed hash.
121    ///
122    /// Use this when you've already computed the xxhash (e.g., for reused events).
123    /// The caller is responsible for ensuring the hash matches the bytes.
124    #[inline]
125    pub fn from_bytes_with_hash(bytes: impl Into<Bytes>, hash: u64) -> Self {
126        Self {
127            bytes: bytes.into(),
128            hash,
129        }
130    }
131
132    /// Create a raw event from bytes with JSON validation.
133    #[inline]
134    pub fn from_bytes_validated(bytes: impl Into<Bytes>) -> Result<Self, serde_json::Error> {
135        let bytes = bytes.into();
136        // Validate it's valid JSON by attempting to parse
137        let _: JsonValue = serde_json::from_slice(&bytes)?;
138        let hash = xxhash_rust::xxh3::xxh3_64(&bytes);
139        Ok(Self { bytes, hash })
140    }
141
142    /// Create a raw event from a JSON value.
143    ///
144    /// This serializes the value once and caches the result.
145    ///
146    /// `serde_json::to_vec(&JsonValue)` is infallible by
147    /// construction — the value tree is a known-good JSON
148    /// structure with no fallible serializer in the path —
149    /// modulo OOM, which the global allocator handles via
150    /// abort. The `unwrap_or_default()` fallback keeps the
151    /// non-panic contract for a hypothetical future serde-json
152    /// change that introduced a fallible path on `Value`. Pre-
153    /// fix `expect("Value serialization is infallible")` panicked
154    /// at the call site if the assumption ever broke; the panic
155    /// would unwind across `from_value`'s callers (bus ingest,
156    /// FFI ingest paths) where the contract is non-panicking.
157    #[inline]
158    pub fn from_value(value: JsonValue) -> Self {
159        let bytes = Bytes::from(serde_json::to_vec(&value).unwrap_or_default());
160        let hash = xxhash_rust::xxh3::xxh3_64(&bytes);
161        Self { bytes, hash }
162    }
163
164    /// Creates a RawEvent from a string. No validation is performed for
165    /// performance — see `from_bytes_validated` for a validating alternative.
166    #[inline]
167    #[allow(clippy::should_implement_trait)]
168    pub fn from_str(s: &str) -> Self {
169        Self::from_bytes(Bytes::copy_from_slice(s.as_bytes()))
170    }
171
172    /// Get the raw bytes.
173    #[inline]
174    pub fn as_bytes(&self) -> &[u8] {
175        &self.bytes
176    }
177
178    /// Get the bytes (clone is cheap - reference counted).
179    #[inline]
180    pub fn bytes(&self) -> Bytes {
181        self.bytes.clone()
182    }
183
184    /// Get the pre-computed hash.
185    #[inline]
186    pub fn hash(&self) -> u64 {
187        self.hash
188    }
189
190    /// Parse the bytes into a JSON value (for when you need to inspect).
191    #[inline]
192    pub fn parse(&self) -> Result<JsonValue, serde_json::Error> {
193        serde_json::from_slice(&self.bytes)
194    }
195
196    /// Get the byte length.
197    #[inline]
198    pub fn len(&self) -> usize {
199        self.bytes.len()
200    }
201
202    /// Check if empty.
203    #[inline]
204    pub fn is_empty(&self) -> bool {
205        self.bytes.is_empty()
206    }
207}
208
209impl std::fmt::Debug for RawEvent {
210    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
211        f.debug_struct("RawEvent")
212            .field("len", &self.bytes.len())
213            .field("hash", &self.hash)
214            .finish()
215    }
216}
217
218impl From<Event> for RawEvent {
219    #[inline]
220    fn from(event: Event) -> Self {
221        event.into_raw()
222    }
223}
224
225impl From<JsonValue> for RawEvent {
226    #[inline]
227    fn from(value: JsonValue) -> Self {
228        RawEvent::from_value(value)
229    }
230}
231
232/// Internal event representation with metadata assigned at ingestion.
233///
234/// This is the canonical form of an event within the event bus.
235/// The `insertion_ts` provides deterministic ordering within a shard.
236///
237/// Uses `Bytes` for zero-copy, reference-counted storage.
238#[derive(Debug, Clone)]
239pub struct InternalEvent {
240    /// Pre-serialized JSON payload (reference-counted, zero-copy clone).
241    pub raw: Bytes,
242
243    /// Monotonically increasing insertion timestamp (nanoseconds).
244    /// Strictly ordered within a shard, not globally.
245    pub insertion_ts: u64,
246
247    /// Shard this event was assigned to.
248    pub shard_id: u16,
249}
250
251impl InternalEvent {
252    /// Create a new internal event from raw bytes.
253    #[inline]
254    pub fn new(raw: Bytes, insertion_ts: u64, shard_id: u16) -> Self {
255        Self {
256            raw,
257            insertion_ts,
258            shard_id,
259        }
260    }
261
262    /// Create from a JSON value (serializes once).
263    ///
264    /// See `RawEvent::from_value` for the rationale on
265    /// `unwrap_or_default()` instead of `expect()`.
266    #[inline]
267    pub fn from_value(value: JsonValue, insertion_ts: u64, shard_id: u16) -> Self {
268        let raw = Bytes::from(serde_json::to_vec(&value).unwrap_or_default());
269        Self {
270            raw,
271            insertion_ts,
272            shard_id,
273        }
274    }
275
276    /// Parse the raw bytes into a JSON value.
277    #[inline]
278    pub fn parse(&self) -> Result<JsonValue, serde_json::Error> {
279        serde_json::from_slice(&self.raw)
280    }
281
282    /// Get the raw bytes as a slice.
283    #[inline]
284    pub fn as_bytes(&self) -> &[u8] {
285        &self.raw
286    }
287}
288
289/// A batch of events for adapter dispatch.
290///
291/// Batches are formed by shard workers and contain strictly ordered
292/// events from a single shard.
293#[derive(Debug, Clone)]
294pub struct Batch {
295    /// The shard this batch belongs to.
296    pub shard_id: u16,
297
298    /// Events in insertion order.
299    pub events: Vec<InternalEvent>,
300
301    /// Sequence number of the first event in this batch.
302    /// Used for idempotent retry handling.
303    pub sequence_start: u64,
304
305    /// Per-process nonce sampled once at process start. Adapters
306    /// that persist `(shard_id, sequence_start)` for dedup
307    /// (JetStream `Nats-Msg-Id`, Redis stream MAXLEN keys, etc.)
308    /// must include this in the dedup key — otherwise a producer
309    /// that restarts within the backend's dedup window collides
310    /// with its prior incarnation on `(shard, 0, 0)` and the new
311    /// batches are silently discarded as duplicates.
312    ///
313    /// `BatchWorker::next_sequence` is process-local and resets to
314    /// zero on restart; the nonce is the global discriminator that
315    /// makes the composite key globally unique across restarts
316    /// even though the per-process counter is not durable.
317    pub process_nonce: u64,
318}
319
320/// Per-process nonce used by [`Batch::process_nonce`]. Sampled once
321/// (lazy) from a mix of entropy sources so two processes launched on
322/// the same machine within a single nanosecond tick are still
323/// distinguishable.
324///
325/// We don't use `getrandom` here because it's a feature-gated
326/// optional dep — `event.rs` is in the always-compiled core. Instead
327/// we run xxh3 over multiple sources whose joint state is effectively
328/// never identical across two adjacent process starts: wall-clock
329/// nanos, monotonic-clock nanos (resilient to wall-clock skew),
330/// pid, the address of a stack-local (gives an ASLR component),
331/// and the current thread id. Plain XOR of two sources (the
332/// previous implementation) collapses to zero whenever the
333/// components happen to share bit patterns; xxh3 mixes them so any
334/// single non-degenerate source dominates the output.
335pub fn batch_process_nonce() -> u64 {
336    use std::sync::OnceLock;
337    static NONCE: OnceLock<u64> = OnceLock::new();
338    *NONCE.get_or_init(|| {
339        use std::hash::{Hash, Hasher};
340        use std::time::Instant;
341
342        let wall_nanos = std::time::SystemTime::now()
343            .duration_since(std::time::UNIX_EPOCH)
344            .map(|d| d.as_nanos() as u64)
345            .unwrap_or(0);
346        // `Instant::now()` is monotonic — distinct entropy from
347        // `SystemTime` (the OS may slew wall-clock backwards but
348        // never the monotonic source). `Instant` doesn't expose a
349        // public u64 accessor, so we mix the bytes of its `Debug`
350        // repr.
351        let mono_marker = format!("{:?}", Instant::now());
352        let pid = std::process::id() as u64;
353        // Address of a stack local — adds an ASLR-derived
354        // component that differs across process starts even when
355        // the time / pid components happen to collide.
356        let stack_marker: usize = &pid as *const u64 as usize;
357        let mut tid_hasher = std::collections::hash_map::DefaultHasher::new();
358        std::thread::current().id().hash(&mut tid_hasher);
359        let tid = tid_hasher.finish();
360
361        let mut buf = [0u8; 64];
362        buf[..8].copy_from_slice(&wall_nanos.to_le_bytes());
363        buf[8..16].copy_from_slice(&pid.to_le_bytes());
364        buf[16..24].copy_from_slice(&(stack_marker as u64).to_le_bytes());
365        buf[24..32].copy_from_slice(&tid.to_le_bytes());
366        // Pack as much of the monotonic marker bytes as fits.
367        let mono_bytes = mono_marker.as_bytes();
368        let n = mono_bytes.len().min(32);
369        buf[32..32 + n].copy_from_slice(&mono_bytes[..n]);
370
371        let nonce = xxhash_rust::xxh3::xxh3_64(&buf);
372        // Refuse `0` — some consumers treat 0 as a sentinel.
373        // Probability of xxh3 returning exactly 0 is 2^-64; we
374        // map it to 1.
375        if nonce == 0 {
376            1
377        } else {
378            nonce
379        }
380    })
381}
382
383impl Batch {
384    /// Create a new batch using the per-process nonce
385    /// ([`batch_process_nonce`]). Convenience for tests and for
386    /// callers that don't thread a custom producer nonce through.
387    /// Production paths constructed via the bus go through
388    /// [`Self::with_nonce`] with the bus's loaded
389    /// `producer_nonce_path` value so retries dedup across
390    /// process restart.
391    #[inline]
392    pub fn new(shard_id: u16, events: Vec<InternalEvent>, sequence_start: u64) -> Self {
393        Self::with_nonce(shard_id, events, sequence_start, batch_process_nonce())
394    }
395
396    /// Create a new batch with an explicit producer nonce. Used by
397    /// the bus's `BatchWorker` and `remove_shard_internal`'s
398    /// stranded-flush so adapters keying dedup on
399    /// `(producer_nonce, shard, sequence_start, i)` see the same
400    /// nonce across process restart when the bus is configured
401    /// with a `producer_nonce_path`.
402    ///
403    /// A `producer_nonce == 0` is coerced to `1` to preserve the
404    /// non-zero invariant that `batch_process_nonce` and
405    /// `dedup_state::PersistentProducerNonce::create_new` already
406    /// uphold (each generates non-zero u64s and re-rolls on the
407    /// astronomical 1-in-2^64 zero draw).
408    ///
409    /// The zero coercion is **defense-in-depth against future
410    /// codecs**: a downstream caller that constructs a
411    /// `Batch::with_nonce(..., 0)` directly (e.g. tests, hand-built
412    /// fixtures) would otherwise emit `dedup_id` keys starting
413    /// `0:` — collision-prone with any future codec that reserves
414    /// `0` as "no nonce, use the legacy path." Today's
415    /// `adapter/jetstream.rs::on_batch` just formats
416    /// `process_nonce` as `{:x}` with no special-casing, so the
417    /// hazard is latent rather than active. Coercing to 1 keeps
418    /// the invariant that every shipped batch has a non-zero
419    /// producer nonce regardless of caller hygiene.
420    #[inline]
421    pub fn with_nonce(
422        shard_id: u16,
423        events: Vec<InternalEvent>,
424        sequence_start: u64,
425        producer_nonce: u64,
426    ) -> Self {
427        Self {
428            shard_id,
429            events,
430            sequence_start,
431            process_nonce: if producer_nonce == 0 {
432                1
433            } else {
434                producer_nonce
435            },
436        }
437    }
438
439    /// Returns the number of events in this batch.
440    #[inline]
441    pub fn len(&self) -> usize {
442        self.events.len()
443    }
444
445    /// Returns true if this batch is empty.
446    #[inline]
447    pub fn is_empty(&self) -> bool {
448        self.events.is_empty()
449    }
450}
451
452/// An event retrieved from storage with its backend-specific ID.
453#[derive(Debug, Clone)]
454pub struct StoredEvent {
455    /// Backend-specific identifier.
456    pub id: String,
457
458    /// Raw JSON payload bytes (deferred parsing for performance).
459    pub raw: Bytes,
460
461    /// Insertion timestamp from ingestion.
462    pub insertion_ts: u64,
463
464    /// Shard this event belongs to.
465    pub shard_id: u16,
466}
467
468impl StoredEvent {
469    /// Create a new stored event from raw bytes.
470    #[inline]
471    pub fn new(id: String, raw: Bytes, insertion_ts: u64, shard_id: u16) -> Self {
472        Self {
473            id,
474            raw,
475            insertion_ts,
476            shard_id,
477        }
478    }
479
480    /// Create a new stored event from a JSON value (serializes once).
481    ///
482    /// See `RawEvent::from_value` for the rationale on
483    /// `unwrap_or_default()` instead of `expect()`.
484    #[inline]
485    pub fn from_value(id: String, value: JsonValue, insertion_ts: u64, shard_id: u16) -> Self {
486        let raw = Bytes::from(serde_json::to_vec(&value).unwrap_or_default());
487        Self {
488            id,
489            raw,
490            insertion_ts,
491            shard_id,
492        }
493    }
494
495    /// Parse the raw bytes into a JSON value on demand.
496    #[inline]
497    pub fn parse(&self) -> Result<JsonValue, serde_json::Error> {
498        serde_json::from_slice(&self.raw)
499    }
500
501    /// Get the raw bytes as a string slice (for serialization).
502    ///
503    /// Returns `Err` if the raw bytes are not valid UTF-8, rather than
504    /// silently substituting data.
505    #[inline]
506    pub fn raw_str(&self) -> Result<&str, std::str::Utf8Error> {
507        std::str::from_utf8(&self.raw)
508    }
509}
510
511impl Serialize for StoredEvent {
512    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
513    where
514        S: serde::Serializer,
515    {
516        use serde::ser::SerializeStruct;
517        let mut state = serializer.serialize_struct("StoredEvent", 4)?;
518        state.serialize_field("id", &self.id)?;
519        // Serialize raw bytes as a `RawValue` so the on-wire JSON
520        // is byte-for-byte the same as the input. Pre-fix the
521        // bytes were parsed into a `JsonValue` tree and re-
522        // serialized; the round-trip discarded original
523        // whitespace, normalized number formatting (`1.0` → `1`),
524        // and (without `preserve_order`) re-ordered map keys
525        // alphabetically. Any downstream that hashed or signed
526        // the serialized form and expected byte-equality with the
527        // input silently failed verification — a sneaky failure
528        // mode in audit / signing pipelines that look at the
529        // re-emitted JSON.
530        //
531        // `RawValue::from_string` validates the JSON (so the
532        // pre-existing "invalid raw JSON returns a serde error,
533        // not a silent null" guarantee is preserved), but emits
534        // the original bytes verbatim instead of round-tripping
535        // through a value tree.
536        let raw_str = std::str::from_utf8(&self.raw)
537            .map_err(|e| serde::ser::Error::custom(format!("invalid raw UTF-8: {}", e)))?;
538        let raw_value = serde_json::value::RawValue::from_string(raw_str.to_string())
539            .map_err(|e| serde::ser::Error::custom(format!("invalid raw JSON: {}", e)))?;
540        state.serialize_field("raw", &*raw_value)?;
541        state.serialize_field("insertion_ts", &self.insertion_ts)?;
542        state.serialize_field("shard_id", &self.shard_id)?;
543        state.end()
544    }
545}
546
547#[cfg(test)]
548mod tests {
549    use super::*;
550    use serde_json::json;
551
552    #[test]
553    fn test_event_new() {
554        let value = json!({"key": "value"});
555        let event = Event::new(value.clone());
556        assert_eq!(event.as_value(), &value);
557    }
558
559    #[test]
560    fn test_event_from_str() {
561        let event = Event::from_str(r#"{"key": "value"}"#).unwrap();
562        assert_eq!(event.as_value()["key"], "value");
563    }
564
565    #[test]
566    fn test_event_from_str_invalid() {
567        let result = Event::from_str("not valid json");
568        assert!(result.is_err());
569    }
570
571    #[test]
572    fn test_event_from_slice() {
573        let bytes = br#"{"key": "value"}"#;
574        let event = Event::from_slice(bytes).unwrap();
575        assert_eq!(event.as_value()["key"], "value");
576    }
577
578    #[test]
579    fn test_event_into_inner() {
580        let value = json!({"key": "value"});
581        let event = Event::new(value.clone());
582        assert_eq!(event.into_inner(), value);
583    }
584
585    #[test]
586    fn test_event_into_raw() {
587        let event = Event::new(json!({"key": "value"}));
588        let raw = event.into_raw();
589        assert!(!raw.is_empty());
590        assert!(raw.hash() != 0);
591    }
592
593    #[test]
594    fn test_event_from_json_value() {
595        let value = json!({"key": "value"});
596        let event: Event = value.clone().into();
597        assert_eq!(event.as_value(), &value);
598    }
599
600    #[test]
601    fn test_event_into_json_value() {
602        let value = json!({"key": "value"});
603        let event = Event::new(value.clone());
604        let result: JsonValue = event.into();
605        assert_eq!(result, value);
606    }
607
608    #[test]
609    fn test_raw_event_from_bytes() {
610        let bytes = br#"{"key": "value"}"#;
611        let raw = RawEvent::from_bytes(bytes.as_slice());
612        assert_eq!(raw.as_bytes(), bytes);
613        assert!(!raw.is_empty());
614        assert_eq!(raw.len(), bytes.len());
615    }
616
617    #[test]
618    fn test_raw_event_from_str() {
619        let s = r#"{"key": "value"}"#;
620        let raw = RawEvent::from_str(s);
621        assert_eq!(raw.as_bytes(), s.as_bytes());
622    }
623
624    #[test]
625    fn test_raw_event_from_value() {
626        let value = json!({"key": "value"});
627        let raw = RawEvent::from_value(value);
628        let parsed = raw.parse().unwrap();
629        assert_eq!(parsed["key"], "value");
630    }
631
632    #[test]
633    fn test_raw_event_from_bytes_validated() {
634        let valid = br#"{"key": "value"}"#;
635        let result = RawEvent::from_bytes_validated(valid.as_slice());
636        assert!(result.is_ok());
637
638        let invalid = b"not valid json";
639        let result = RawEvent::from_bytes_validated(invalid.as_slice());
640        assert!(result.is_err());
641    }
642
643    #[test]
644    fn test_raw_event_hash_consistency() {
645        let raw1 = RawEvent::from_str(r#"{"key": "value"}"#);
646        let raw2 = RawEvent::from_str(r#"{"key": "value"}"#);
647        assert_eq!(raw1.hash(), raw2.hash());
648
649        let raw3 = RawEvent::from_str(r#"{"key": "other"}"#);
650        assert_ne!(raw1.hash(), raw3.hash());
651    }
652
653    #[test]
654    fn test_raw_event_bytes_clone() {
655        let raw = RawEvent::from_str(r#"{"key": "value"}"#);
656        let bytes1 = raw.bytes();
657        let bytes2 = raw.bytes();
658        assert_eq!(bytes1, bytes2);
659    }
660
661    #[test]
662    fn test_raw_event_debug() {
663        let raw = RawEvent::from_str(r#"{"key": "value"}"#);
664        let debug = format!("{:?}", raw);
665        assert!(debug.contains("RawEvent"));
666        assert!(debug.contains("len"));
667        assert!(debug.contains("hash"));
668    }
669
670    #[test]
671    fn test_raw_event_from_event() {
672        let event = Event::new(json!({"key": "value"}));
673        let raw: RawEvent = event.into();
674        assert!(!raw.is_empty());
675    }
676
677    #[test]
678    fn test_raw_event_from_json_value() {
679        let value = json!({"key": "value"});
680        let raw: RawEvent = value.into();
681        assert!(!raw.is_empty());
682    }
683
684    #[test]
685    fn test_internal_event_new() {
686        let raw = Bytes::from(r#"{"key": "value"}"#);
687        let event = InternalEvent::new(raw.clone(), 12345, 0);
688        assert_eq!(event.raw, raw);
689        assert_eq!(event.insertion_ts, 12345);
690        assert_eq!(event.shard_id, 0);
691    }
692
693    #[test]
694    fn test_internal_event_from_value() {
695        let event = InternalEvent::from_value(json!({"key": "value"}), 12345, 0);
696        assert_eq!(event.insertion_ts, 12345);
697        assert_eq!(event.shard_id, 0);
698        let parsed = event.parse().unwrap();
699        assert_eq!(parsed["key"], "value");
700    }
701
702    #[test]
703    fn test_internal_event_as_bytes() {
704        let raw = Bytes::from(r#"{"key": "value"}"#);
705        let event = InternalEvent::new(raw.clone(), 12345, 0);
706        assert_eq!(event.as_bytes(), raw.as_ref());
707    }
708
709    #[test]
710    fn test_batch_new() {
711        let events = vec![
712            InternalEvent::from_value(json!({"i": 0}), 1, 0),
713            InternalEvent::from_value(json!({"i": 1}), 2, 0),
714        ];
715        let batch = Batch::new(0, events, 100);
716        assert_eq!(batch.shard_id, 0);
717        assert_eq!(batch.len(), 2);
718        assert_eq!(batch.sequence_start, 100);
719        assert!(!batch.is_empty());
720    }
721
722    #[test]
723    fn test_batch_empty() {
724        let batch = Batch::new(0, vec![], 0);
725        assert!(batch.is_empty());
726        assert_eq!(batch.len(), 0);
727    }
728
729    #[test]
730    fn test_stored_event_new() {
731        let raw = Bytes::from(r#"{"key":"value"}"#);
732        let event = StoredEvent::new("stream-123".to_string(), raw, 12345, 0);
733        assert_eq!(event.id, "stream-123");
734        let parsed = event.parse().unwrap();
735        assert_eq!(parsed["key"], "value");
736        assert_eq!(event.insertion_ts, 12345);
737        assert_eq!(event.shard_id, 0);
738    }
739
740    // Regression: raw_str() used to silently return "{}" for invalid UTF-8
741    // instead of reporting an error (BUGS_3 #4).
742    #[test]
743    fn test_stored_event_raw_str_valid_utf8() {
744        let raw = Bytes::from(r#"{"key":"value"}"#);
745        let event = StoredEvent::new("id".to_string(), raw, 0, 0);
746        assert_eq!(event.raw_str().unwrap(), r#"{"key":"value"}"#);
747    }
748
749    #[test]
750    fn test_stored_event_raw_str_invalid_utf8_returns_err() {
751        let raw = Bytes::from(vec![0xff, 0xfe, 0xfd]);
752        let event = StoredEvent::new("id".to_string(), raw, 0, 0);
753        assert!(event.raw_str().is_err());
754    }
755
756    // Regression: Serialize impl used to silently replace invalid raw bytes
757    // with null. Now it returns a serialization error (BUGS_4 #3).
758    #[test]
759    fn test_stored_event_serialize_valid() {
760        let raw = Bytes::from(r#"{"key":"value"}"#);
761        let event = StoredEvent::new("id".to_string(), raw, 123, 0);
762        let json = serde_json::to_string(&event).unwrap();
763        assert!(json.contains("\"key\""));
764        assert!(json.contains("\"value\""));
765    }
766
767    #[test]
768    fn test_stored_event_serialize_invalid_raw_returns_error() {
769        let raw = Bytes::from(b"not valid json".as_slice());
770        let event = StoredEvent::new("id".to_string(), raw, 0, 0);
771        let result = serde_json::to_string(&event);
772        assert!(
773            result.is_err(),
774            "serializing invalid raw bytes should error, not silently return null"
775        );
776    }
777
778    /// Regression: `StoredEvent::Serialize` must preserve the
779    /// raw bytes byte-for-byte instead of round-tripping through
780    /// `serde_json::Value`. Pre-fix the round-trip discarded
781    /// original whitespace, normalized number formatting
782    /// (`1.0` → `1`), and re-ordered map keys alphabetically.
783    /// Any downstream that hashed or signed the serialized form
784    /// and expected byte-equality with the input silently failed
785    /// verification.
786    #[test]
787    fn stored_event_serialize_preserves_raw_byte_for_byte() {
788        // Pin three cases where the JsonValue round-trip
789        // demonstrably mutates the bytes:
790        // 1. Whitespace (round-trip strips internal whitespace).
791        // 2. Number formatting (`1.0` → `1`).
792        // 3. Key ordering (BTreeMap default re-orders alphabetically).
793        let cases: &[&[u8]] = &[
794            // Whitespace: extra spaces inside the object literal.
795            br#"{ "key" : "value" }"#,
796            // Number formatting: 1.0 (would become 1 via Value).
797            br#"{"x":1.0,"y":2.5}"#,
798            // Key ordering: "z" before "a" (BTreeMap would re-order).
799            br#"{"z":1,"a":2}"#,
800        ];
801
802        for raw_bytes in cases {
803            let raw = Bytes::copy_from_slice(raw_bytes);
804            let event = StoredEvent::new("id".into(), raw.clone(), 0, 0);
805            let json = serde_json::to_string(&event).unwrap();
806
807            // The serialized output must contain the input bytes
808            // verbatim (as-is, not re-formatted by serde_json).
809            let expected_raw = std::str::from_utf8(raw_bytes).unwrap();
810            assert!(
811                json.contains(expected_raw),
812                "regression: StoredEvent serialization must contain the raw \
813                 input verbatim (no whitespace stripping, no number \
814                 normalization, no key re-ordering).\n\
815                 input:  {expected_raw}\n\
816                 output: {json}"
817            );
818        }
819    }
820
821    /// Pin that `Batch::with_nonce` writes the passed value into the
822    /// `process_nonce` field. The bus relies on this to stamp the
823    /// loaded persistent nonce on every emitted batch;
824    /// a future refactor that ignored the parameter would silently
825    /// regress JetStream cross-restart dedup.
826    #[test]
827    fn batch_with_nonce_round_trips_the_passed_value() {
828        let events: Vec<InternalEvent> = (0..3)
829            .map(|i| InternalEvent::from_value(serde_json::json!({"i": i}), i, 0))
830            .collect();
831        let nonce: u64 = 0xDEAD_BEEF_CAFE_F00D;
832        let batch = Batch::with_nonce(7, events, 42, nonce);
833        assert_eq!(batch.shard_id, 7);
834        assert_eq!(batch.sequence_start, 42);
835        assert_eq!(
836            batch.process_nonce, nonce,
837            "Batch::with_nonce must write the passed nonce verbatim",
838        );
839    }
840
841    /// Regression: every `Batch` constructed in this process via
842    /// `Batch::new` (the per-process-fallback constructor) must
843    /// carry the same `process_nonce`. Adapters that persist
844    /// `(shard_id, sequence_start)` for dedup compose it with this
845    /// nonce so two processes that both happen to start sequencing
846    /// at zero (the default after `BatchWorker::new`) don't collide
847    /// on `(shard, 0, 0…)` in the backend's dedup window.
848    ///
849    /// We pin two contracts:
850    /// 1. The nonce is non-zero (a process started at exactly
851    ///    `UNIX_EPOCH` with pid 0 would defeat the XOR — defend
852    ///    against trivially predictable values).
853    /// 2. Multiple `Batch::new` calls in the same process yield
854    ///    the same nonce (so retries within a process land on the
855    ///    same dedup key).
856    #[test]
857    fn batch_process_nonce_is_stable_within_process() {
858        let nonce_a = batch_process_nonce();
859        let nonce_b = batch_process_nonce();
860        assert_eq!(
861            nonce_a, nonce_b,
862            "within a single process the nonce must be stable"
863        );
864
865        // And it shows up on every Batch.
866        let b1 = Batch::new(0, vec![], 0);
867        let b2 = Batch::new(1, vec![], 100);
868        assert_eq!(b1.process_nonce, nonce_a);
869        assert_eq!(b2.process_nonce, nonce_a);
870
871        // Best-effort: not-zero. UNIX_EPOCH+pid=0 would leave the
872        // XOR at zero; vanishingly unlikely on any real host but a
873        // cheap sanity check.
874        assert_ne!(nonce_a, 0, "process nonce should be non-zero");
875    }
876
877    /// Cubic-ai P2: `Batch::with_nonce` must enforce the non-zero
878    /// invariant `batch_process_nonce` already upholds. A caller
879    /// that hands in `0` (e.g., uninitialized field, default-init
880    /// of a `u64`, or a misconfigured fallback path) MUST NOT have
881    /// `0` propagate into `process_nonce` — JetStream and other
882    /// consumers treat `0` as a sentinel for "no nonce / legacy
883    /// path", which silently disables cross-restart dedup.
884    ///
885    /// The post-fix behavior coerces `0 → 1`, mirroring the
886    /// `PersistentProducerNonce::create_new` and
887    /// `batch_process_nonce` policy.
888    #[test]
889    fn with_nonce_coerces_zero_to_one_to_preserve_dedup_sentinel() {
890        // Zero in → one out (the canonical replacement, not
891        // `batch_process_nonce` — we don't want the function to
892        // silently route around an explicit caller error).
893        let b = Batch::with_nonce(0, vec![], 0, 0);
894        assert_eq!(
895            b.process_nonce, 1,
896            "with_nonce(producer_nonce=0) must coerce to 1 — \
897             letting 0 through would silently disable JetStream \
898             cross-restart dedup (consumers treat 0 as sentinel)",
899        );
900
901        // Non-zero passes through verbatim.
902        let b = Batch::with_nonce(0, vec![], 0, 0xDEAD_BEEF);
903        assert_eq!(
904            b.process_nonce, 0xDEAD_BEEF,
905            "non-zero producer_nonce must pass through unchanged",
906        );
907    }
908}