use crate::{EventMetadata, StreamEvent};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NatsEventMessage {
pub event_id: String,
pub event_type: String,
pub timestamp: u64,
pub data: serde_json::Value,
pub metadata: Option<EventMetadata>,
}
impl From<StreamEvent> for NatsEventMessage {
fn from(event: StreamEvent) -> Self {
let event_type = match &event {
StreamEvent::TripleAdded { .. } => "triple_added",
StreamEvent::TripleRemoved { .. } => "triple_removed",
StreamEvent::GraphCleared { .. } => "graph_cleared",
StreamEvent::GraphCreated { .. } => "graph_created",
StreamEvent::GraphDeleted { .. } => "graph_deleted",
StreamEvent::SparqlUpdate { .. } => "sparql_update",
StreamEvent::TransactionBegin { .. } => "transaction_begin",
StreamEvent::TransactionCommit { .. } => "transaction_commit",
StreamEvent::TransactionAbort { .. } => "transaction_abort",
StreamEvent::SchemaChanged { .. } => "schema_changed",
StreamEvent::Heartbeat { .. } => "heartbeat",
_ => "unknown",
};
let data = serde_json::to_value(&event).unwrap_or_default();
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("SystemTime should be after UNIX_EPOCH")
.as_secs();
Self {
event_id: uuid::Uuid::new_v4().to_string(),
event_type: event_type.to_string(),
timestamp,
data,
metadata: Some(event.metadata().clone()),
}
}
}
impl NatsEventMessage {
pub fn new(event_type: String, data: serde_json::Value) -> Self {
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("SystemTime should be after UNIX_EPOCH")
.as_secs();
Self {
event_id: uuid::Uuid::new_v4().to_string(),
event_type,
timestamp,
data,
metadata: None,
}
}
pub fn with_metadata(mut self, metadata: EventMetadata) -> Self {
self.metadata = Some(metadata);
self
}
pub fn get_subject(&self, prefix: &str) -> String {
format!("{}.{}", prefix, self.event_type)
}
pub fn to_bytes(&self) -> Result<Vec<u8>, serde_json::Error> {
serde_json::to_vec(self)
}
pub fn from_bytes(bytes: &[u8]) -> Result<Self, serde_json::Error> {
serde_json::from_slice(bytes)
}
pub fn validate(&self) -> Result<(), String> {
if self.event_id.is_empty() {
return Err("Event ID cannot be empty".to_string());
}
if self.event_type.is_empty() {
return Err("Event type cannot be empty".to_string());
}
if self.timestamp == 0 {
return Err("Timestamp cannot be zero".to_string());
}
Ok(())
}
pub fn size(&self) -> usize {
self.to_bytes().map(|b| b.len()).unwrap_or(0)
}
}
impl TryFrom<NatsEventMessage> for StreamEvent {
type Error = serde_json::Error;
fn try_from(msg: NatsEventMessage) -> Result<Self, Self::Error> {
serde_json::from_value(msg.data)
}
}