Skip to main content

net/
event.rs

1//! Event types for the Net event bus.
2//!
3//! Events are opaque JSON values - the event bus performs no schema validation
4//! or interpretation of event content.
5//!
6//! # Performance Optimization
7//!
8//! Since Net is schema-agnostic, we offer multiple event representations:
9//!
10//! - `Event`: Standard wrapper around `serde_json::Value` (convenient but slower)
11//! - `RawEvent`: Pre-serialized bytes with cached hash (fastest for high-throughput)
12//!
13//! For maximum performance, use `RawEvent::from_bytes()` when you already have
14//! JSON bytes (e.g., from a network buffer or file).
15
16use bytes::Bytes;
17use serde::{Deserialize, Serialize};
18use serde_json::Value as JsonValue;
19
20/// An opaque event - any valid JSON value.
21///
22/// The event bus does not validate, interpret, or enforce any schema.
23/// Events are treated as opaque binary blobs internally.
24#[derive(Debug, Clone, Serialize, Deserialize)]
25#[repr(transparent)]
26pub struct Event(pub JsonValue);
27
28impl Event {
29    /// Create a new event from a JSON value.
30    #[inline]
31    pub fn new(value: JsonValue) -> Self {
32        Self(value)
33    }
34
35    /// Create an event from a JSON string.
36    #[inline]
37    #[allow(clippy::should_implement_trait)]
38    pub fn from_str(s: &str) -> Result<Self, serde_json::Error> {
39        serde_json::from_str(s).map(Self)
40    }
41
42    /// Create an event from raw bytes.
43    #[inline]
44    pub fn from_slice(bytes: &[u8]) -> Result<Self, serde_json::Error> {
45        serde_json::from_slice(bytes).map(Self)
46    }
47
48    /// Get the inner JSON value.
49    #[inline]
50    pub fn into_inner(self) -> JsonValue {
51        self.0
52    }
53
54    /// Get a reference to the inner JSON value.
55    #[inline]
56    pub fn as_value(&self) -> &JsonValue {
57        &self.0
58    }
59
60    /// Convert to a raw event (serializes once, caches the result).
61    #[inline]
62    pub fn into_raw(self) -> RawEvent {
63        RawEvent::from_value(self.0)
64    }
65}
66
67impl From<JsonValue> for Event {
68    #[inline]
69    fn from(value: JsonValue) -> Self {
70        Self(value)
71    }
72}
73
74impl From<Event> for JsonValue {
75    #[inline]
76    fn from(event: Event) -> Self {
77        event.0
78    }
79}
80
81/// A pre-serialized event with cached hash.
82///
83/// This is the high-performance event type for schema-agnostic ingestion.
84/// By storing the raw bytes and pre-computed hash, we avoid:
85/// - Re-serialization on every operation
86/// - Repeated hashing for shard selection
87///
88/// # Example
89///
90/// ```rust,ignore
91/// // From network buffer or file (zero-copy if Bytes is used)
92/// let raw = RawEvent::from_bytes(network_buffer);
93///
94/// // From existing JSON value (serializes once)
95/// let raw = RawEvent::from_value(json!({"key": "value"}));
96///
97/// // Ingestion uses cached hash - no re-serialization
98/// bus.ingest_raw(raw)?;
99/// ```
100#[derive(Clone)]
101pub struct RawEvent {
102    /// Pre-serialized JSON bytes.
103    bytes: Bytes,
104    /// Pre-computed hash for shard selection.
105    hash: u64,
106}
107
108impl RawEvent {
109    /// Create a raw event from bytes.
110    ///
111    /// The bytes must be valid JSON. No validation is performed for performance.
112    /// Use `from_bytes_validated` if you need validation.
113    #[inline]
114    pub fn from_bytes(bytes: impl Into<Bytes>) -> Self {
115        let bytes = bytes.into();
116        let hash = xxhash_rust::xxh3::xxh3_64(&bytes);
117        Self { bytes, hash }
118    }
119
120    /// Create a raw event from bytes with a pre-computed hash.
121    ///
122    /// Use this when you've already computed the xxhash (e.g., for reused events).
123    /// The caller is responsible for ensuring the hash matches the bytes.
124    #[inline]
125    pub fn from_bytes_with_hash(bytes: impl Into<Bytes>, hash: u64) -> Self {
126        Self {
127            bytes: bytes.into(),
128            hash,
129        }
130    }
131
132    /// Create a raw event from bytes with JSON validation.
133    #[inline]
134    pub fn from_bytes_validated(bytes: impl Into<Bytes>) -> Result<Self, serde_json::Error> {
135        let bytes = bytes.into();
136        // Validate it's valid JSON by attempting to parse
137        let _: JsonValue = serde_json::from_slice(&bytes)?;
138        let hash = xxhash_rust::xxh3::xxh3_64(&bytes);
139        Ok(Self { bytes, hash })
140    }
141
142    /// Create a raw event from a JSON value.
143    ///
144    /// This serializes the value once and caches the result.
145    ///
146    /// `serde_json::to_vec(&JsonValue)` is infallible by
147    /// construction — the value tree is a known-good JSON
148    /// structure with no fallible serializer in the path —
149    /// modulo OOM, which the global allocator handles via
150    /// abort. The `unwrap_or_default()` fallback keeps the
151    /// non-panic contract for a hypothetical future serde-json
152    /// change that introduced a fallible path on `Value`. Pre-
153    /// fix `expect("Value serialization is infallible")` panicked
154    /// at the call site if the assumption ever broke; the panic
155    /// would unwind across `from_value`'s callers (bus ingest,
156    /// FFI ingest paths) where the contract is non-panicking.
157    #[inline]
158    pub fn from_value(value: JsonValue) -> Self {
159        let bytes = Bytes::from(serde_json::to_vec(&value).unwrap_or_default());
160        let hash = xxhash_rust::xxh3::xxh3_64(&bytes);
161        Self { bytes, hash }
162    }
163
164    /// Creates a RawEvent from a string. No validation is performed for
165    /// performance — see `from_bytes_validated` for a validating alternative.
166    #[inline]
167    #[allow(clippy::should_implement_trait)]
168    pub fn from_str(s: &str) -> Self {
169        Self::from_bytes(Bytes::copy_from_slice(s.as_bytes()))
170    }
171
172    /// Get the raw bytes.
173    #[inline]
174    pub fn as_bytes(&self) -> &[u8] {
175        &self.bytes
176    }
177
178    /// Get the bytes (clone is cheap - reference counted).
179    #[inline]
180    pub fn bytes(&self) -> Bytes {
181        self.bytes.clone()
182    }
183
184    /// Get the pre-computed hash.
185    #[inline]
186    pub fn hash(&self) -> u64 {
187        self.hash
188    }
189
190    /// Parse the bytes into a JSON value (for when you need to inspect).
191    #[inline]
192    pub fn parse(&self) -> Result<JsonValue, serde_json::Error> {
193        serde_json::from_slice(&self.bytes)
194    }
195
196    /// Get the byte length.
197    #[inline]
198    pub fn len(&self) -> usize {
199        self.bytes.len()
200    }
201
202    /// Check if empty.
203    #[inline]
204    pub fn is_empty(&self) -> bool {
205        self.bytes.is_empty()
206    }
207}
208
209impl std::fmt::Debug for RawEvent {
210    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
211        f.debug_struct("RawEvent")
212            .field("len", &self.bytes.len())
213            .field("hash", &self.hash)
214            .finish()
215    }
216}
217
218impl From<Event> for RawEvent {
219    #[inline]
220    fn from(event: Event) -> Self {
221        event.into_raw()
222    }
223}
224
225impl From<JsonValue> for RawEvent {
226    #[inline]
227    fn from(value: JsonValue) -> Self {
228        RawEvent::from_value(value)
229    }
230}
231
232/// Internal event representation with metadata assigned at ingestion.
233///
234/// This is the canonical form of an event within the event bus.
235/// The `insertion_ts` provides deterministic ordering within a shard.
236///
237/// Uses `Bytes` for zero-copy, reference-counted storage.
238#[derive(Debug, Clone)]
239pub struct InternalEvent {
240    /// Pre-serialized JSON payload (reference-counted, zero-copy clone).
241    pub raw: Bytes,
242
243    /// Monotonically increasing insertion timestamp (nanoseconds).
244    /// Strictly ordered within a shard, not globally.
245    pub insertion_ts: u64,
246
247    /// Shard this event was assigned to.
248    pub shard_id: u16,
249}
250
251impl InternalEvent {
252    /// Create a new internal event from raw bytes.
253    #[inline]
254    pub fn new(raw: Bytes, insertion_ts: u64, shard_id: u16) -> Self {
255        Self {
256            raw,
257            insertion_ts,
258            shard_id,
259        }
260    }
261
262    /// Create from a JSON value (serializes once).
263    ///
264    /// See `RawEvent::from_value` for the rationale on
265    /// `unwrap_or_default()` instead of `expect()`.
266    #[inline]
267    pub fn from_value(value: JsonValue, insertion_ts: u64, shard_id: u16) -> Self {
268        let raw = Bytes::from(serde_json::to_vec(&value).unwrap_or_default());
269        Self {
270            raw,
271            insertion_ts,
272            shard_id,
273        }
274    }
275
276    /// Parse the raw bytes into a JSON value.
277    #[inline]
278    pub fn parse(&self) -> Result<JsonValue, serde_json::Error> {
279        serde_json::from_slice(&self.raw)
280    }
281
282    /// Get the raw bytes as a slice.
283    #[inline]
284    pub fn as_bytes(&self) -> &[u8] {
285        &self.raw
286    }
287}
288
289/// A batch of events for adapter dispatch.
290///
291/// Batches are formed by shard workers and contain strictly ordered
292/// events from a single shard.
293#[derive(Debug, Clone)]
294pub struct Batch {
295    /// The shard this batch belongs to.
296    pub shard_id: u16,
297
298    /// Events in insertion order.
299    pub events: Vec<InternalEvent>,
300
301    /// Sequence number of the first event in this batch.
302    /// Used for idempotent retry handling.
303    pub sequence_start: u64,
304
305    /// Per-process nonce sampled once at process start. Adapters
306    /// that persist `(shard_id, sequence_start)` for dedup
307    /// (JetStream `Nats-Msg-Id`, Redis stream MAXLEN keys, etc.)
308    /// must include this in the dedup key — otherwise a producer
309    /// that restarts within the backend's dedup window collides
310    /// with its prior incarnation on `(shard, 0, 0)` and the new
311    /// batches are silently discarded as duplicates.
312    ///
313    /// `BatchWorker::next_sequence` is process-local and resets to
314    /// zero on restart; the nonce is the global discriminator that
315    /// makes the composite key globally unique across restarts
316    /// even though the per-process counter is not durable.
317    pub process_nonce: u64,
318}
319
320/// Per-process nonce used by [`Batch::process_nonce`]. Sampled once
321/// (lazy) from a mix of entropy sources so two processes launched on
322/// the same machine within a single nanosecond tick are still
323/// distinguishable.
324///
325/// We don't use `getrandom` here because it's a feature-gated
326/// optional dep — `event.rs` is in the always-compiled core. Instead
327/// we run xxh3 over multiple sources whose joint state is effectively
328/// never identical across two adjacent process starts: wall-clock
329/// nanos, monotonic-clock nanos (resilient to wall-clock skew),
330/// pid, the address of a stack-local (gives an ASLR component),
331/// and the current thread id. Plain XOR of two sources (the
332/// previous implementation) collapses to zero whenever the
333/// components happen to share bit patterns; xxh3 mixes them so any
334/// single non-degenerate source dominates the output.
335pub fn batch_process_nonce() -> u64 {
336    use std::sync::OnceLock;
337    static NONCE: OnceLock<u64> = OnceLock::new();
338    *NONCE.get_or_init(|| {
339        use std::hash::{Hash, Hasher};
340        use std::time::Instant;
341
342        let wall_nanos = std::time::SystemTime::now()
343            .duration_since(std::time::UNIX_EPOCH)
344            .map(|d| d.as_nanos() as u64)
345            .unwrap_or(0);
346        // `Instant::now()` is monotonic — distinct entropy from
347        // `SystemTime` (the OS may slew wall-clock backwards but
348        // never the monotonic source). `Instant` doesn't expose a
349        // public u64 accessor, so we mix the bytes of its `Debug`
350        // repr.
351        let mono_marker = format!("{:?}", Instant::now());
352        let pid = std::process::id() as u64;
353        // Address of a stack local — adds an ASLR-derived
354        // component that differs across process starts even when
355        // the time / pid components happen to collide.
356        let stack_marker: usize = &pid as *const u64 as usize;
357        let mut tid_hasher = std::collections::hash_map::DefaultHasher::new();
358        std::thread::current().id().hash(&mut tid_hasher);
359        let tid = tid_hasher.finish();
360
361        let mut buf = [0u8; 64];
362        buf[..8].copy_from_slice(&wall_nanos.to_le_bytes());
363        buf[8..16].copy_from_slice(&pid.to_le_bytes());
364        buf[16..24].copy_from_slice(&(stack_marker as u64).to_le_bytes());
365        buf[24..32].copy_from_slice(&tid.to_le_bytes());
366        // Pack as much of the monotonic marker bytes as fits.
367        let mono_bytes = mono_marker.as_bytes();
368        let n = mono_bytes.len().min(32);
369        buf[32..32 + n].copy_from_slice(&mono_bytes[..n]);
370
371        let nonce = xxhash_rust::xxh3::xxh3_64(&buf);
372        // Refuse `0` — some consumers treat 0 as a sentinel.
373        // Probability of xxh3 returning exactly 0 is 2^-64; we
374        // map it to 1.
375        if nonce == 0 {
376            1
377        } else {
378            nonce
379        }
380    })
381}
382
383impl Batch {
384    /// Create a new batch using the per-process nonce
385    /// ([`batch_process_nonce`]). Convenience for tests and for
386    /// callers that don't thread a custom producer nonce through.
387    /// Production paths constructed via the bus go through
388    /// [`Self::with_nonce`] with the bus's loaded
389    /// `producer_nonce_path` value so retries dedup across
390    /// process restart.
391    #[inline]
392    pub fn new(shard_id: u16, events: Vec<InternalEvent>, sequence_start: u64) -> Self {
393        Self::with_nonce(shard_id, events, sequence_start, batch_process_nonce())
394    }
395
396    /// Create a new batch with an explicit producer nonce. Used by
397    /// the bus's `BatchWorker` and `remove_shard_internal`'s
398    /// stranded-flush so adapters keying dedup on
399    /// `(producer_nonce, shard, sequence_start, i)` see the same
400    /// nonce across process restart when the bus is configured
401    /// with a `producer_nonce_path`.
402    ///
403    /// A `producer_nonce == 0` is coerced to `1` to preserve the
404    /// non-zero invariant that `batch_process_nonce` and
405    /// `dedup_state::PersistentProducerNonce::create_new` already
406    /// uphold (each generates non-zero u64s and re-rolls on the
407    /// astronomical 1-in-2^64 zero draw).
408    ///
409    /// The zero coercion is **defense-in-depth against future
410    /// codecs**: a downstream caller that constructs a
411    /// `Batch::with_nonce(..., 0)` directly (e.g. tests, hand-built
412    /// fixtures) would otherwise emit `dedup_id` keys starting
413    /// `0:` — collision-prone with any future codec that reserves
414    /// `0` as "no nonce, use the legacy path." Today's
415    /// `adapter/jetstream.rs::on_batch` just formats
416    /// `process_nonce` as `{:x}` with no special-casing, so the
417    /// hazard is latent rather than active. Coercing to 1 keeps
418    /// the invariant that every shipped batch has a non-zero
419    /// producer nonce regardless of caller hygiene.
420    #[inline]
421    pub fn with_nonce(
422        shard_id: u16,
423        events: Vec<InternalEvent>,
424        sequence_start: u64,
425        producer_nonce: u64,
426    ) -> Self {
427        Self {
428            shard_id,
429            events,
430            sequence_start,
431            process_nonce: if producer_nonce == 0 {
432                1
433            } else {
434                producer_nonce
435            },
436        }
437    }
438
439    /// Returns the number of events in this batch.
440    #[inline]
441    pub fn len(&self) -> usize {
442        self.events.len()
443    }
444
445    /// Returns true if this batch is empty.
446    #[inline]
447    pub fn is_empty(&self) -> bool {
448        self.events.is_empty()
449    }
450}
451
452/// An event retrieved from storage with its backend-specific ID.
453#[derive(Debug, Clone)]
454pub struct StoredEvent {
455    /// Backend-specific identifier.
456    pub id: String,
457
458    /// Raw JSON payload bytes (deferred parsing for performance).
459    pub raw: Bytes,
460
461    /// Insertion timestamp from ingestion.
462    pub insertion_ts: u64,
463
464    /// Shard this event belongs to.
465    pub shard_id: u16,
466
467    /// Application-level idempotency key as written by the producer.
468    /// Adapters carry an opaque dedup token on the wire (Redis Streams
469    /// uses a `dedup_id` field; JetStream uses `Nats-Msg-Id`). The
470    /// trait-level consumer ([`crate::adapter::Adapter::poll_shard`])
471    /// surfaces it here so callers can drive their own dedup table
472    /// without re-reading the raw broker payload. `None` when the
473    /// adapter or the wire entry doesn't carry one.
474    pub dedup_id: Option<String>,
475}
476
477impl StoredEvent {
478    /// Create a new stored event from raw bytes.
479    #[inline]
480    pub fn new(id: String, raw: Bytes, insertion_ts: u64, shard_id: u16) -> Self {
481        Self {
482            id,
483            raw,
484            insertion_ts,
485            shard_id,
486            dedup_id: None,
487        }
488    }
489
490    /// Create a new stored event from a JSON value (serializes once).
491    ///
492    /// See `RawEvent::from_value` for the rationale on
493    /// `unwrap_or_default()` instead of `expect()`.
494    #[inline]
495    pub fn from_value(id: String, value: JsonValue, insertion_ts: u64, shard_id: u16) -> Self {
496        let raw = Bytes::from(serde_json::to_vec(&value).unwrap_or_default());
497        Self {
498            id,
499            raw,
500            insertion_ts,
501            shard_id,
502            dedup_id: None,
503        }
504    }
505
506    /// Attach an application-level dedup identifier (the producer's
507    /// `dedup_id` / `Nats-Msg-Id`). Returns `self` for chaining.
508    #[inline]
509    #[must_use]
510    pub fn with_dedup_id(mut self, dedup_id: Option<String>) -> Self {
511        self.dedup_id = dedup_id;
512        self
513    }
514
515    /// Parse the raw bytes into a JSON value on demand.
516    #[inline]
517    pub fn parse(&self) -> Result<JsonValue, serde_json::Error> {
518        serde_json::from_slice(&self.raw)
519    }
520
521    /// Get the raw bytes as a string slice (for serialization).
522    ///
523    /// Returns `Err` if the raw bytes are not valid UTF-8, rather than
524    /// silently substituting data.
525    #[inline]
526    pub fn raw_str(&self) -> Result<&str, std::str::Utf8Error> {
527        std::str::from_utf8(&self.raw)
528    }
529}
530
531impl Serialize for StoredEvent {
532    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
533    where
534        S: serde::Serializer,
535    {
536        use serde::ser::SerializeStruct;
537        // Always emit `dedup_id` (as `null` when absent) so the
538        // on-wire shape is stable. Pre-fix the field count flipped
539        // between 4 and 5 based on whether `dedup_id` was populated;
540        // downstream Node / Python / Go consumers using
541        // `deny_unknown_fields` (or strict schema validators)
542        // accepted the 4-field shape but rejected events whose
543        // adapter populated the 5th, making the wire compatibility
544        // *data-dependent*. Stable shape > byte-optimal: an extra
545        // `"dedup_id":null` per legacy event is cheap, the rejection
546        // hazard is not.
547        let field_count = 5;
548        let mut state = serializer.serialize_struct("StoredEvent", field_count)?;
549        state.serialize_field("id", &self.id)?;
550        // Serialize raw bytes as a `RawValue` so the on-wire JSON
551        // is byte-for-byte the same as the input. Pre-fix the
552        // bytes were parsed into a `JsonValue` tree and re-
553        // serialized; the round-trip discarded original
554        // whitespace, normalized number formatting (`1.0` → `1`),
555        // and (without `preserve_order`) re-ordered map keys
556        // alphabetically. Any downstream that hashed or signed
557        // the serialized form and expected byte-equality with the
558        // input silently failed verification — a sneaky failure
559        // mode in audit / signing pipelines that look at the
560        // re-emitted JSON.
561        //
562        // `RawValue::from_string` validates the JSON (so the
563        // pre-existing "invalid raw JSON returns a serde error,
564        // not a silent null" guarantee is preserved), but emits
565        // the original bytes verbatim instead of round-tripping
566        // through a value tree.
567        let raw_str = std::str::from_utf8(&self.raw)
568            .map_err(|e| serde::ser::Error::custom(format!("invalid raw UTF-8: {}", e)))?;
569        let raw_value = serde_json::value::RawValue::from_string(raw_str.to_string())
570            .map_err(|e| serde::ser::Error::custom(format!("invalid raw JSON: {}", e)))?;
571        state.serialize_field("raw", &*raw_value)?;
572        state.serialize_field("insertion_ts", &self.insertion_ts)?;
573        state.serialize_field("shard_id", &self.shard_id)?;
574        // Always emit `dedup_id` to keep the wire shape stable —
575        // `None` serializes as JSON `null`.
576        state.serialize_field("dedup_id", &self.dedup_id)?;
577        state.end()
578    }
579}
580
581#[cfg(test)]
582mod tests {
583    use super::*;
584    use serde_json::json;
585
586    #[test]
587    fn test_event_new() {
588        let value = json!({"key": "value"});
589        let event = Event::new(value.clone());
590        assert_eq!(event.as_value(), &value);
591    }
592
593    #[test]
594    fn test_event_from_str() {
595        let event = Event::from_str(r#"{"key": "value"}"#).unwrap();
596        assert_eq!(event.as_value()["key"], "value");
597    }
598
599    #[test]
600    fn test_event_from_str_invalid() {
601        let result = Event::from_str("not valid json");
602        assert!(result.is_err());
603    }
604
605    #[test]
606    fn test_event_from_slice() {
607        let bytes = br#"{"key": "value"}"#;
608        let event = Event::from_slice(bytes).unwrap();
609        assert_eq!(event.as_value()["key"], "value");
610    }
611
612    #[test]
613    fn test_event_into_inner() {
614        let value = json!({"key": "value"});
615        let event = Event::new(value.clone());
616        assert_eq!(event.into_inner(), value);
617    }
618
619    #[test]
620    fn test_event_into_raw() {
621        let event = Event::new(json!({"key": "value"}));
622        let raw = event.into_raw();
623        assert!(!raw.is_empty());
624        assert!(raw.hash() != 0);
625    }
626
627    #[test]
628    fn test_event_from_json_value() {
629        let value = json!({"key": "value"});
630        let event: Event = value.clone().into();
631        assert_eq!(event.as_value(), &value);
632    }
633
634    #[test]
635    fn test_event_into_json_value() {
636        let value = json!({"key": "value"});
637        let event = Event::new(value.clone());
638        let result: JsonValue = event.into();
639        assert_eq!(result, value);
640    }
641
642    #[test]
643    fn test_raw_event_from_bytes() {
644        let bytes = br#"{"key": "value"}"#;
645        let raw = RawEvent::from_bytes(bytes.as_slice());
646        assert_eq!(raw.as_bytes(), bytes);
647        assert!(!raw.is_empty());
648        assert_eq!(raw.len(), bytes.len());
649    }
650
651    #[test]
652    fn test_raw_event_from_str() {
653        let s = r#"{"key": "value"}"#;
654        let raw = RawEvent::from_str(s);
655        assert_eq!(raw.as_bytes(), s.as_bytes());
656    }
657
658    #[test]
659    fn test_raw_event_from_value() {
660        let value = json!({"key": "value"});
661        let raw = RawEvent::from_value(value);
662        let parsed = raw.parse().unwrap();
663        assert_eq!(parsed["key"], "value");
664    }
665
666    #[test]
667    fn test_raw_event_from_bytes_validated() {
668        let valid = br#"{"key": "value"}"#;
669        let result = RawEvent::from_bytes_validated(valid.as_slice());
670        assert!(result.is_ok());
671
672        let invalid = b"not valid json";
673        let result = RawEvent::from_bytes_validated(invalid.as_slice());
674        assert!(result.is_err());
675    }
676
677    #[test]
678    fn test_raw_event_hash_consistency() {
679        let raw1 = RawEvent::from_str(r#"{"key": "value"}"#);
680        let raw2 = RawEvent::from_str(r#"{"key": "value"}"#);
681        assert_eq!(raw1.hash(), raw2.hash());
682
683        let raw3 = RawEvent::from_str(r#"{"key": "other"}"#);
684        assert_ne!(raw1.hash(), raw3.hash());
685    }
686
687    #[test]
688    fn test_raw_event_bytes_clone() {
689        let raw = RawEvent::from_str(r#"{"key": "value"}"#);
690        let bytes1 = raw.bytes();
691        let bytes2 = raw.bytes();
692        assert_eq!(bytes1, bytes2);
693    }
694
695    #[test]
696    fn test_raw_event_debug() {
697        let raw = RawEvent::from_str(r#"{"key": "value"}"#);
698        let debug = format!("{:?}", raw);
699        assert!(debug.contains("RawEvent"));
700        assert!(debug.contains("len"));
701        assert!(debug.contains("hash"));
702    }
703
704    #[test]
705    fn test_raw_event_from_event() {
706        let event = Event::new(json!({"key": "value"}));
707        let raw: RawEvent = event.into();
708        assert!(!raw.is_empty());
709    }
710
711    #[test]
712    fn test_raw_event_from_json_value() {
713        let value = json!({"key": "value"});
714        let raw: RawEvent = value.into();
715        assert!(!raw.is_empty());
716    }
717
718    #[test]
719    fn test_internal_event_new() {
720        let raw = Bytes::from(r#"{"key": "value"}"#);
721        let event = InternalEvent::new(raw.clone(), 12345, 0);
722        assert_eq!(event.raw, raw);
723        assert_eq!(event.insertion_ts, 12345);
724        assert_eq!(event.shard_id, 0);
725    }
726
727    #[test]
728    fn test_internal_event_from_value() {
729        let event = InternalEvent::from_value(json!({"key": "value"}), 12345, 0);
730        assert_eq!(event.insertion_ts, 12345);
731        assert_eq!(event.shard_id, 0);
732        let parsed = event.parse().unwrap();
733        assert_eq!(parsed["key"], "value");
734    }
735
736    #[test]
737    fn test_internal_event_as_bytes() {
738        let raw = Bytes::from(r#"{"key": "value"}"#);
739        let event = InternalEvent::new(raw.clone(), 12345, 0);
740        assert_eq!(event.as_bytes(), raw.as_ref());
741    }
742
743    #[test]
744    fn test_batch_new() {
745        let events = vec![
746            InternalEvent::from_value(json!({"i": 0}), 1, 0),
747            InternalEvent::from_value(json!({"i": 1}), 2, 0),
748        ];
749        let batch = Batch::new(0, events, 100);
750        assert_eq!(batch.shard_id, 0);
751        assert_eq!(batch.len(), 2);
752        assert_eq!(batch.sequence_start, 100);
753        assert!(!batch.is_empty());
754    }
755
756    #[test]
757    fn test_batch_empty() {
758        let batch = Batch::new(0, vec![], 0);
759        assert!(batch.is_empty());
760        assert_eq!(batch.len(), 0);
761    }
762
763    #[test]
764    fn test_stored_event_new() {
765        let raw = Bytes::from(r#"{"key":"value"}"#);
766        let event = StoredEvent::new("stream-123".to_string(), raw, 12345, 0);
767        assert_eq!(event.id, "stream-123");
768        let parsed = event.parse().unwrap();
769        assert_eq!(parsed["key"], "value");
770        assert_eq!(event.insertion_ts, 12345);
771        assert_eq!(event.shard_id, 0);
772    }
773
774    // Regression: raw_str() used to silently return "{}" for invalid UTF-8
775    // instead of reporting an error (BUGS_3 #4).
776    #[test]
777    fn test_stored_event_raw_str_valid_utf8() {
778        let raw = Bytes::from(r#"{"key":"value"}"#);
779        let event = StoredEvent::new("id".to_string(), raw, 0, 0);
780        assert_eq!(event.raw_str().unwrap(), r#"{"key":"value"}"#);
781    }
782
783    #[test]
784    fn test_stored_event_raw_str_invalid_utf8_returns_err() {
785        let raw = Bytes::from(vec![0xff, 0xfe, 0xfd]);
786        let event = StoredEvent::new("id".to_string(), raw, 0, 0);
787        assert!(event.raw_str().is_err());
788    }
789
790    // Regression: Serialize impl used to silently replace invalid raw bytes
791    // with null. Now it returns a serialization error (BUGS_4 #3).
792    #[test]
793    fn test_stored_event_serialize_valid() {
794        let raw = Bytes::from(r#"{"key":"value"}"#);
795        let event = StoredEvent::new("id".to_string(), raw, 123, 0);
796        let json = serde_json::to_string(&event).unwrap();
797        assert!(json.contains("\"key\""));
798        assert!(json.contains("\"value\""));
799    }
800
801    #[test]
802    fn test_stored_event_serialize_invalid_raw_returns_error() {
803        let raw = Bytes::from(b"not valid json".as_slice());
804        let event = StoredEvent::new("id".to_string(), raw, 0, 0);
805        let result = serde_json::to_string(&event);
806        assert!(
807            result.is_err(),
808            "serializing invalid raw bytes should error, not silently return null"
809        );
810    }
811
812    /// Regression: `StoredEvent::Serialize` must preserve the
813    /// raw bytes byte-for-byte instead of round-tripping through
814    /// `serde_json::Value`. Pre-fix the round-trip discarded
815    /// original whitespace, normalized number formatting
816    /// (`1.0` → `1`), and re-ordered map keys alphabetically.
817    /// Any downstream that hashed or signed the serialized form
818    /// and expected byte-equality with the input silently failed
819    /// verification.
820    #[test]
821    fn stored_event_serialize_preserves_raw_byte_for_byte() {
822        // Pin three cases where the JsonValue round-trip
823        // demonstrably mutates the bytes:
824        // 1. Whitespace (round-trip strips internal whitespace).
825        // 2. Number formatting (`1.0` → `1`).
826        // 3. Key ordering (BTreeMap default re-orders alphabetically).
827        let cases: &[&[u8]] = &[
828            // Whitespace: extra spaces inside the object literal.
829            br#"{ "key" : "value" }"#,
830            // Number formatting: 1.0 (would become 1 via Value).
831            br#"{"x":1.0,"y":2.5}"#,
832            // Key ordering: "z" before "a" (BTreeMap would re-order).
833            br#"{"z":1,"a":2}"#,
834        ];
835
836        for raw_bytes in cases {
837            let raw = Bytes::copy_from_slice(raw_bytes);
838            let event = StoredEvent::new("id".into(), raw.clone(), 0, 0);
839            let json = serde_json::to_string(&event).unwrap();
840
841            // The serialized output must contain the input bytes
842            // verbatim (as-is, not re-formatted by serde_json).
843            let expected_raw = std::str::from_utf8(raw_bytes).unwrap();
844            assert!(
845                json.contains(expected_raw),
846                "regression: StoredEvent serialization must contain the raw \
847                 input verbatim (no whitespace stripping, no number \
848                 normalization, no key re-ordering).\n\
849                 input:  {expected_raw}\n\
850                 output: {json}"
851            );
852        }
853    }
854
855    /// Regression: `dedup_id` is always emitted (as `null` when
856    /// absent) so the on-wire shape is stable. Pre-fix the field
857    /// was omitted on `None`, making the shape data-dependent:
858    /// downstream Node / Python / Go consumers using
859    /// `deny_unknown_fields` accepted the 4-field shape but
860    /// rejected events whose adapter happened to populate the 5th.
861    #[test]
862    fn stored_event_serialize_always_emits_dedup_id() {
863        // None → expect `"dedup_id":null` in output.
864        let none_event = StoredEvent::new("id-none".into(), Bytes::from(r#"{"x":1}"#), 0, 0);
865        let none_json = serde_json::to_string(&none_event).unwrap();
866        assert!(
867            none_json.contains("\"dedup_id\":null"),
868            "absent dedup_id must serialize as null, not be omitted; got {none_json}",
869        );
870
871        // Some → expect `"dedup_id":"value"`.
872        let some_event = StoredEvent::new("id-some".into(), Bytes::from(r#"{"x":1}"#), 0, 0)
873            .with_dedup_id(Some("abc".into()));
874        let some_json = serde_json::to_string(&some_event).unwrap();
875        assert!(
876            some_json.contains("\"dedup_id\":\"abc\""),
877            "populated dedup_id must serialize as the string value; got {some_json}",
878        );
879    }
880
881    /// Pin that `Batch::with_nonce` writes the passed value into the
882    /// `process_nonce` field. The bus relies on this to stamp the
883    /// loaded persistent nonce on every emitted batch;
884    /// a future refactor that ignored the parameter would silently
885    /// regress JetStream cross-restart dedup.
886    #[test]
887    fn batch_with_nonce_round_trips_the_passed_value() {
888        let events: Vec<InternalEvent> = (0..3)
889            .map(|i| InternalEvent::from_value(serde_json::json!({"i": i}), i, 0))
890            .collect();
891        let nonce: u64 = 0xDEAD_BEEF_CAFE_F00D;
892        let batch = Batch::with_nonce(7, events, 42, nonce);
893        assert_eq!(batch.shard_id, 7);
894        assert_eq!(batch.sequence_start, 42);
895        assert_eq!(
896            batch.process_nonce, nonce,
897            "Batch::with_nonce must write the passed nonce verbatim",
898        );
899    }
900
901    /// Regression: every `Batch` constructed in this process via
902    /// `Batch::new` (the per-process-fallback constructor) must
903    /// carry the same `process_nonce`. Adapters that persist
904    /// `(shard_id, sequence_start)` for dedup compose it with this
905    /// nonce so two processes that both happen to start sequencing
906    /// at zero (the default after `BatchWorker::new`) don't collide
907    /// on `(shard, 0, 0…)` in the backend's dedup window.
908    ///
909    /// We pin two contracts:
910    /// 1. The nonce is non-zero (a process started at exactly
911    ///    `UNIX_EPOCH` with pid 0 would defeat the XOR — defend
912    ///    against trivially predictable values).
913    /// 2. Multiple `Batch::new` calls in the same process yield
914    ///    the same nonce (so retries within a process land on the
915    ///    same dedup key).
916    #[test]
917    fn batch_process_nonce_is_stable_within_process() {
918        let nonce_a = batch_process_nonce();
919        let nonce_b = batch_process_nonce();
920        assert_eq!(
921            nonce_a, nonce_b,
922            "within a single process the nonce must be stable"
923        );
924
925        // And it shows up on every Batch.
926        let b1 = Batch::new(0, vec![], 0);
927        let b2 = Batch::new(1, vec![], 100);
928        assert_eq!(b1.process_nonce, nonce_a);
929        assert_eq!(b2.process_nonce, nonce_a);
930
931        // Best-effort: not-zero. UNIX_EPOCH+pid=0 would leave the
932        // XOR at zero; vanishingly unlikely on any real host but a
933        // cheap sanity check.
934        assert_ne!(nonce_a, 0, "process nonce should be non-zero");
935    }
936
937    /// Cubic-ai P2: `Batch::with_nonce` must enforce the non-zero
938    /// invariant `batch_process_nonce` already upholds. A caller
939    /// that hands in `0` (e.g., uninitialized field, default-init
940    /// of a `u64`, or a misconfigured fallback path) MUST NOT have
941    /// `0` propagate into `process_nonce` — JetStream and other
942    /// consumers treat `0` as a sentinel for "no nonce / legacy
943    /// path", which silently disables cross-restart dedup.
944    ///
945    /// The post-fix behavior coerces `0 → 1`, mirroring the
946    /// `PersistentProducerNonce::create_new` and
947    /// `batch_process_nonce` policy.
948    #[test]
949    fn with_nonce_coerces_zero_to_one_to_preserve_dedup_sentinel() {
950        // Zero in → one out (the canonical replacement, not
951        // `batch_process_nonce` — we don't want the function to
952        // silently route around an explicit caller error).
953        let b = Batch::with_nonce(0, vec![], 0, 0);
954        assert_eq!(
955            b.process_nonce, 1,
956            "with_nonce(producer_nonce=0) must coerce to 1 — \
957             letting 0 through would silently disable JetStream \
958             cross-restart dedup (consumers treat 0 as sentinel)",
959        );
960
961        // Non-zero passes through verbatim.
962        let b = Batch::with_nonce(0, vec![], 0, 0xDEAD_BEEF);
963        assert_eq!(
964            b.process_nonce, 0xDEAD_BEEF,
965            "non-zero producer_nonce must pass through unchanged",
966        );
967    }
968}