oxirs-stream 0.2.2

Real-time streaming support with Kafka/NATS/MQTT/OPC-UA I/O, RDF Patch, and SPARQL Update delta
Documentation
//! NATS Message Types
//!
//! This module contains message format definitions for the NATS backend.

use crate::{EventMetadata, StreamEvent};
use serde::{Deserialize, Serialize};

/// NATS event message format
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NatsEventMessage {
    pub event_id: String,
    pub event_type: String,
    pub timestamp: u64,
    pub data: serde_json::Value,
    pub metadata: Option<EventMetadata>,
}

impl From<StreamEvent> for NatsEventMessage {
    fn from(event: StreamEvent) -> Self {
        let event_type = match &event {
            StreamEvent::TripleAdded { .. } => "triple_added",
            StreamEvent::TripleRemoved { .. } => "triple_removed",
            StreamEvent::GraphCleared { .. } => "graph_cleared",
            StreamEvent::GraphCreated { .. } => "graph_created",
            StreamEvent::GraphDeleted { .. } => "graph_deleted",
            StreamEvent::SparqlUpdate { .. } => "sparql_update",
            StreamEvent::TransactionBegin { .. } => "transaction_begin",
            StreamEvent::TransactionCommit { .. } => "transaction_commit",
            StreamEvent::TransactionAbort { .. } => "transaction_abort",
            StreamEvent::SchemaChanged { .. } => "schema_changed",
            StreamEvent::Heartbeat { .. } => "heartbeat",
            _ => "unknown",
        };

        let data = serde_json::to_value(&event).unwrap_or_default();
        let timestamp = std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)
            .expect("SystemTime should be after UNIX_EPOCH")
            .as_secs();

        Self {
            event_id: uuid::Uuid::new_v4().to_string(),
            event_type: event_type.to_string(),
            timestamp,
            data,
            metadata: Some(event.metadata().clone()),
        }
    }
}

impl NatsEventMessage {
    /// Create a new NATS event message
    pub fn new(event_type: String, data: serde_json::Value) -> Self {
        let timestamp = std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)
            .expect("SystemTime should be after UNIX_EPOCH")
            .as_secs();

        Self {
            event_id: uuid::Uuid::new_v4().to_string(),
            event_type,
            timestamp,
            data,
            metadata: None,
        }
    }

    /// Set metadata for the message
    pub fn with_metadata(mut self, metadata: EventMetadata) -> Self {
        self.metadata = Some(metadata);
        self
    }

    /// Get the message subject based on event type
    pub fn get_subject(&self, prefix: &str) -> String {
        format!("{}.{}", prefix, self.event_type)
    }

    /// Convert to bytes for sending
    pub fn to_bytes(&self) -> Result<Vec<u8>, serde_json::Error> {
        serde_json::to_vec(self)
    }

    /// Create from bytes
    pub fn from_bytes(bytes: &[u8]) -> Result<Self, serde_json::Error> {
        serde_json::from_slice(bytes)
    }

    /// Validate message format
    pub fn validate(&self) -> Result<(), String> {
        if self.event_id.is_empty() {
            return Err("Event ID cannot be empty".to_string());
        }
        if self.event_type.is_empty() {
            return Err("Event type cannot be empty".to_string());
        }
        if self.timestamp == 0 {
            return Err("Timestamp cannot be zero".to_string());
        }
        Ok(())
    }

    /// Get message size in bytes
    pub fn size(&self) -> usize {
        self.to_bytes().map(|b| b.len()).unwrap_or(0)
    }
}

/// Conversion from NatsEventMessage to StreamEvent
impl TryFrom<NatsEventMessage> for StreamEvent {
    type Error = serde_json::Error;

    fn try_from(msg: NatsEventMessage) -> Result<Self, Self::Error> {
        serde_json::from_value(msg.data)
    }
}