use bytes::Bytes;
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[repr(transparent)]
pub struct Event(pub JsonValue);
impl Event {
#[inline]
pub fn new(value: JsonValue) -> Self {
Self(value)
}
#[inline]
#[allow(clippy::should_implement_trait)]
pub fn from_str(s: &str) -> Result<Self, serde_json::Error> {
serde_json::from_str(s).map(Self)
}
#[inline]
pub fn from_slice(bytes: &[u8]) -> Result<Self, serde_json::Error> {
serde_json::from_slice(bytes).map(Self)
}
#[inline]
pub fn into_inner(self) -> JsonValue {
self.0
}
#[inline]
pub fn as_value(&self) -> &JsonValue {
&self.0
}
#[inline]
pub fn into_raw(self) -> RawEvent {
RawEvent::from_value(self.0)
}
}
impl From<JsonValue> for Event {
#[inline]
fn from(value: JsonValue) -> Self {
Self(value)
}
}
impl From<Event> for JsonValue {
#[inline]
fn from(event: Event) -> Self {
event.0
}
}
#[derive(Clone)]
pub struct RawEvent {
bytes: Bytes,
hash: u64,
}
impl RawEvent {
#[inline]
pub fn from_bytes(bytes: impl Into<Bytes>) -> Self {
let bytes = bytes.into();
let hash = xxhash_rust::xxh3::xxh3_64(&bytes);
Self { bytes, hash }
}
#[inline]
pub fn from_bytes_with_hash(bytes: impl Into<Bytes>, hash: u64) -> Self {
Self {
bytes: bytes.into(),
hash,
}
}
#[inline]
pub fn from_bytes_validated(bytes: impl Into<Bytes>) -> Result<Self, serde_json::Error> {
let bytes = bytes.into();
let _: JsonValue = serde_json::from_slice(&bytes)?;
let hash = xxhash_rust::xxh3::xxh3_64(&bytes);
Ok(Self { bytes, hash })
}
#[inline]
pub fn from_value(value: JsonValue) -> Self {
let bytes = Bytes::from(serde_json::to_vec(&value).unwrap_or_default());
let hash = xxhash_rust::xxh3::xxh3_64(&bytes);
Self { bytes, hash }
}
#[inline]
#[allow(clippy::should_implement_trait)]
pub fn from_str(s: &str) -> Self {
Self::from_bytes(Bytes::copy_from_slice(s.as_bytes()))
}
#[inline]
pub fn as_bytes(&self) -> &[u8] {
&self.bytes
}
#[inline]
pub fn bytes(&self) -> Bytes {
self.bytes.clone()
}
#[inline]
pub fn hash(&self) -> u64 {
self.hash
}
#[inline]
pub fn parse(&self) -> Result<JsonValue, serde_json::Error> {
serde_json::from_slice(&self.bytes)
}
#[inline]
pub fn len(&self) -> usize {
self.bytes.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.bytes.is_empty()
}
}
impl std::fmt::Debug for RawEvent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RawEvent")
.field("len", &self.bytes.len())
.field("hash", &self.hash)
.finish()
}
}
impl From<Event> for RawEvent {
#[inline]
fn from(event: Event) -> Self {
event.into_raw()
}
}
impl From<JsonValue> for RawEvent {
#[inline]
fn from(value: JsonValue) -> Self {
RawEvent::from_value(value)
}
}
#[derive(Debug, Clone)]
pub struct InternalEvent {
pub raw: Bytes,
pub insertion_ts: u64,
pub shard_id: u16,
}
impl InternalEvent {
#[inline]
pub fn new(raw: Bytes, insertion_ts: u64, shard_id: u16) -> Self {
Self {
raw,
insertion_ts,
shard_id,
}
}
#[inline]
pub fn from_value(value: JsonValue, insertion_ts: u64, shard_id: u16) -> Self {
let raw = Bytes::from(serde_json::to_vec(&value).unwrap_or_default());
Self {
raw,
insertion_ts,
shard_id,
}
}
#[inline]
pub fn parse(&self) -> Result<JsonValue, serde_json::Error> {
serde_json::from_slice(&self.raw)
}
#[inline]
pub fn as_bytes(&self) -> &[u8] {
&self.raw
}
}
#[derive(Debug, Clone)]
pub struct Batch {
pub shard_id: u16,
pub events: Vec<InternalEvent>,
pub sequence_start: u64,
pub process_nonce: u64,
}
pub fn batch_process_nonce() -> u64 {
use std::sync::OnceLock;
static NONCE: OnceLock<u64> = OnceLock::new();
*NONCE.get_or_init(|| {
use std::hash::{Hash, Hasher};
use std::time::Instant;
let wall_nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0);
let mono_marker = format!("{:?}", Instant::now());
let pid = std::process::id() as u64;
let stack_marker: usize = &pid as *const u64 as usize;
let mut tid_hasher = std::collections::hash_map::DefaultHasher::new();
std::thread::current().id().hash(&mut tid_hasher);
let tid = tid_hasher.finish();
let mut buf = [0u8; 64];
buf[..8].copy_from_slice(&wall_nanos.to_le_bytes());
buf[8..16].copy_from_slice(&pid.to_le_bytes());
buf[16..24].copy_from_slice(&(stack_marker as u64).to_le_bytes());
buf[24..32].copy_from_slice(&tid.to_le_bytes());
let mono_bytes = mono_marker.as_bytes();
let n = mono_bytes.len().min(32);
buf[32..32 + n].copy_from_slice(&mono_bytes[..n]);
let nonce = xxhash_rust::xxh3::xxh3_64(&buf);
if nonce == 0 {
1
} else {
nonce
}
})
}
impl Batch {
#[inline]
pub fn new(shard_id: u16, events: Vec<InternalEvent>, sequence_start: u64) -> Self {
Self::with_nonce(shard_id, events, sequence_start, batch_process_nonce())
}
#[inline]
pub fn with_nonce(
shard_id: u16,
events: Vec<InternalEvent>,
sequence_start: u64,
producer_nonce: u64,
) -> Self {
Self {
shard_id,
events,
sequence_start,
process_nonce: if producer_nonce == 0 {
1
} else {
producer_nonce
},
}
}
#[inline]
pub fn len(&self) -> usize {
self.events.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.events.is_empty()
}
}
#[derive(Debug, Clone)]
pub struct StoredEvent {
pub id: String,
pub raw: Bytes,
pub insertion_ts: u64,
pub shard_id: u16,
pub dedup_id: Option<String>,
}
impl StoredEvent {
#[inline]
pub fn new(id: String, raw: Bytes, insertion_ts: u64, shard_id: u16) -> Self {
Self {
id,
raw,
insertion_ts,
shard_id,
dedup_id: None,
}
}
#[inline]
pub fn from_value(id: String, value: JsonValue, insertion_ts: u64, shard_id: u16) -> Self {
let raw = Bytes::from(serde_json::to_vec(&value).unwrap_or_default());
Self {
id,
raw,
insertion_ts,
shard_id,
dedup_id: None,
}
}
#[inline]
#[must_use]
pub fn with_dedup_id(mut self, dedup_id: Option<String>) -> Self {
self.dedup_id = dedup_id;
self
}
#[inline]
pub fn parse(&self) -> Result<JsonValue, serde_json::Error> {
serde_json::from_slice(&self.raw)
}
#[inline]
pub fn raw_str(&self) -> Result<&str, std::str::Utf8Error> {
std::str::from_utf8(&self.raw)
}
}
impl Serialize for StoredEvent {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeStruct;
let field_count = 5;
let mut state = serializer.serialize_struct("StoredEvent", field_count)?;
state.serialize_field("id", &self.id)?;
let raw_str = std::str::from_utf8(&self.raw)
.map_err(|e| serde::ser::Error::custom(format!("invalid raw UTF-8: {}", e)))?;
let raw_value = serde_json::value::RawValue::from_string(raw_str.to_string())
.map_err(|e| serde::ser::Error::custom(format!("invalid raw JSON: {}", e)))?;
state.serialize_field("raw", &*raw_value)?;
state.serialize_field("insertion_ts", &self.insertion_ts)?;
state.serialize_field("shard_id", &self.shard_id)?;
state.serialize_field("dedup_id", &self.dedup_id)?;
state.end()
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_event_new() {
let value = json!({"key": "value"});
let event = Event::new(value.clone());
assert_eq!(event.as_value(), &value);
}
#[test]
fn test_event_from_str() {
let event = Event::from_str(r#"{"key": "value"}"#).unwrap();
assert_eq!(event.as_value()["key"], "value");
}
#[test]
fn test_event_from_str_invalid() {
let result = Event::from_str("not valid json");
assert!(result.is_err());
}
#[test]
fn test_event_from_slice() {
let bytes = br#"{"key": "value"}"#;
let event = Event::from_slice(bytes).unwrap();
assert_eq!(event.as_value()["key"], "value");
}
#[test]
fn test_event_into_inner() {
let value = json!({"key": "value"});
let event = Event::new(value.clone());
assert_eq!(event.into_inner(), value);
}
#[test]
fn test_event_into_raw() {
let event = Event::new(json!({"key": "value"}));
let raw = event.into_raw();
assert!(!raw.is_empty());
assert!(raw.hash() != 0);
}
#[test]
fn test_event_from_json_value() {
let value = json!({"key": "value"});
let event: Event = value.clone().into();
assert_eq!(event.as_value(), &value);
}
#[test]
fn test_event_into_json_value() {
let value = json!({"key": "value"});
let event = Event::new(value.clone());
let result: JsonValue = event.into();
assert_eq!(result, value);
}
#[test]
fn test_raw_event_from_bytes() {
let bytes = br#"{"key": "value"}"#;
let raw = RawEvent::from_bytes(bytes.as_slice());
assert_eq!(raw.as_bytes(), bytes);
assert!(!raw.is_empty());
assert_eq!(raw.len(), bytes.len());
}
#[test]
fn test_raw_event_from_str() {
let s = r#"{"key": "value"}"#;
let raw = RawEvent::from_str(s);
assert_eq!(raw.as_bytes(), s.as_bytes());
}
#[test]
fn test_raw_event_from_value() {
let value = json!({"key": "value"});
let raw = RawEvent::from_value(value);
let parsed = raw.parse().unwrap();
assert_eq!(parsed["key"], "value");
}
#[test]
fn test_raw_event_from_bytes_validated() {
let valid = br#"{"key": "value"}"#;
let result = RawEvent::from_bytes_validated(valid.as_slice());
assert!(result.is_ok());
let invalid = b"not valid json";
let result = RawEvent::from_bytes_validated(invalid.as_slice());
assert!(result.is_err());
}
#[test]
fn test_raw_event_hash_consistency() {
let raw1 = RawEvent::from_str(r#"{"key": "value"}"#);
let raw2 = RawEvent::from_str(r#"{"key": "value"}"#);
assert_eq!(raw1.hash(), raw2.hash());
let raw3 = RawEvent::from_str(r#"{"key": "other"}"#);
assert_ne!(raw1.hash(), raw3.hash());
}
#[test]
fn test_raw_event_bytes_clone() {
let raw = RawEvent::from_str(r#"{"key": "value"}"#);
let bytes1 = raw.bytes();
let bytes2 = raw.bytes();
assert_eq!(bytes1, bytes2);
}
#[test]
fn test_raw_event_debug() {
let raw = RawEvent::from_str(r#"{"key": "value"}"#);
let debug = format!("{:?}", raw);
assert!(debug.contains("RawEvent"));
assert!(debug.contains("len"));
assert!(debug.contains("hash"));
}
#[test]
fn test_raw_event_from_event() {
let event = Event::new(json!({"key": "value"}));
let raw: RawEvent = event.into();
assert!(!raw.is_empty());
}
#[test]
fn test_raw_event_from_json_value() {
let value = json!({"key": "value"});
let raw: RawEvent = value.into();
assert!(!raw.is_empty());
}
#[test]
fn test_internal_event_new() {
let raw = Bytes::from(r#"{"key": "value"}"#);
let event = InternalEvent::new(raw.clone(), 12345, 0);
assert_eq!(event.raw, raw);
assert_eq!(event.insertion_ts, 12345);
assert_eq!(event.shard_id, 0);
}
#[test]
fn test_internal_event_from_value() {
let event = InternalEvent::from_value(json!({"key": "value"}), 12345, 0);
assert_eq!(event.insertion_ts, 12345);
assert_eq!(event.shard_id, 0);
let parsed = event.parse().unwrap();
assert_eq!(parsed["key"], "value");
}
#[test]
fn test_internal_event_as_bytes() {
let raw = Bytes::from(r#"{"key": "value"}"#);
let event = InternalEvent::new(raw.clone(), 12345, 0);
assert_eq!(event.as_bytes(), raw.as_ref());
}
#[test]
fn test_batch_new() {
let events = vec![
InternalEvent::from_value(json!({"i": 0}), 1, 0),
InternalEvent::from_value(json!({"i": 1}), 2, 0),
];
let batch = Batch::new(0, events, 100);
assert_eq!(batch.shard_id, 0);
assert_eq!(batch.len(), 2);
assert_eq!(batch.sequence_start, 100);
assert!(!batch.is_empty());
}
#[test]
fn test_batch_empty() {
let batch = Batch::new(0, vec![], 0);
assert!(batch.is_empty());
assert_eq!(batch.len(), 0);
}
#[test]
fn test_stored_event_new() {
let raw = Bytes::from(r#"{"key":"value"}"#);
let event = StoredEvent::new("stream-123".to_string(), raw, 12345, 0);
assert_eq!(event.id, "stream-123");
let parsed = event.parse().unwrap();
assert_eq!(parsed["key"], "value");
assert_eq!(event.insertion_ts, 12345);
assert_eq!(event.shard_id, 0);
}
#[test]
fn test_stored_event_raw_str_valid_utf8() {
let raw = Bytes::from(r#"{"key":"value"}"#);
let event = StoredEvent::new("id".to_string(), raw, 0, 0);
assert_eq!(event.raw_str().unwrap(), r#"{"key":"value"}"#);
}
#[test]
fn test_stored_event_raw_str_invalid_utf8_returns_err() {
let raw = Bytes::from(vec![0xff, 0xfe, 0xfd]);
let event = StoredEvent::new("id".to_string(), raw, 0, 0);
assert!(event.raw_str().is_err());
}
#[test]
fn test_stored_event_serialize_valid() {
let raw = Bytes::from(r#"{"key":"value"}"#);
let event = StoredEvent::new("id".to_string(), raw, 123, 0);
let json = serde_json::to_string(&event).unwrap();
assert!(json.contains("\"key\""));
assert!(json.contains("\"value\""));
}
#[test]
fn test_stored_event_serialize_invalid_raw_returns_error() {
let raw = Bytes::from(b"not valid json".as_slice());
let event = StoredEvent::new("id".to_string(), raw, 0, 0);
let result = serde_json::to_string(&event);
assert!(
result.is_err(),
"serializing invalid raw bytes should error, not silently return null"
);
}
#[test]
fn stored_event_serialize_preserves_raw_byte_for_byte() {
let cases: &[&[u8]] = &[
br#"{ "key" : "value" }"#,
br#"{"x":1.0,"y":2.5}"#,
br#"{"z":1,"a":2}"#,
];
for raw_bytes in cases {
let raw = Bytes::copy_from_slice(raw_bytes);
let event = StoredEvent::new("id".into(), raw.clone(), 0, 0);
let json = serde_json::to_string(&event).unwrap();
let expected_raw = std::str::from_utf8(raw_bytes).unwrap();
assert!(
json.contains(expected_raw),
"regression: StoredEvent serialization must contain the raw \
input verbatim (no whitespace stripping, no number \
normalization, no key re-ordering).\n\
input: {expected_raw}\n\
output: {json}"
);
}
}
#[test]
fn stored_event_serialize_always_emits_dedup_id() {
let none_event = StoredEvent::new("id-none".into(), Bytes::from(r#"{"x":1}"#), 0, 0);
let none_json = serde_json::to_string(&none_event).unwrap();
assert!(
none_json.contains("\"dedup_id\":null"),
"absent dedup_id must serialize as null, not be omitted; got {none_json}",
);
let some_event = StoredEvent::new("id-some".into(), Bytes::from(r#"{"x":1}"#), 0, 0)
.with_dedup_id(Some("abc".into()));
let some_json = serde_json::to_string(&some_event).unwrap();
assert!(
some_json.contains("\"dedup_id\":\"abc\""),
"populated dedup_id must serialize as the string value; got {some_json}",
);
}
#[test]
fn batch_with_nonce_round_trips_the_passed_value() {
let events: Vec<InternalEvent> = (0..3)
.map(|i| InternalEvent::from_value(serde_json::json!({"i": i}), i, 0))
.collect();
let nonce: u64 = 0xDEAD_BEEF_CAFE_F00D;
let batch = Batch::with_nonce(7, events, 42, nonce);
assert_eq!(batch.shard_id, 7);
assert_eq!(batch.sequence_start, 42);
assert_eq!(
batch.process_nonce, nonce,
"Batch::with_nonce must write the passed nonce verbatim",
);
}
#[test]
fn batch_process_nonce_is_stable_within_process() {
let nonce_a = batch_process_nonce();
let nonce_b = batch_process_nonce();
assert_eq!(
nonce_a, nonce_b,
"within a single process the nonce must be stable"
);
let b1 = Batch::new(0, vec![], 0);
let b2 = Batch::new(1, vec![], 100);
assert_eq!(b1.process_nonce, nonce_a);
assert_eq!(b2.process_nonce, nonce_a);
assert_ne!(nonce_a, 0, "process nonce should be non-zero");
}
#[test]
fn with_nonce_coerces_zero_to_one_to_preserve_dedup_sentinel() {
let b = Batch::with_nonce(0, vec![], 0, 0);
assert_eq!(
b.process_nonce, 1,
"with_nonce(producer_nonce=0) must coerce to 1 — \
letting 0 through would silently disable JetStream \
cross-restart dedup (consumers treat 0 as sentinel)",
);
let b = Batch::with_nonce(0, vec![], 0, 0xDEAD_BEEF);
assert_eq!(
b.process_nonce, 0xDEAD_BEEF,
"non-zero producer_nonce must pass through unchanged",
);
}
}