use crate::engine::error::{DataflowError, ErrorInfo};
use chrono::{DateTime, Utc};
use datavalue::OwnedDataValue;
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use std::sync::Arc;
use uuid::Uuid;
#[derive(Debug, Clone)]
pub struct Message {
pub(crate) id: String,
pub(crate) payload: Arc<OwnedDataValue>,
pub context: OwnedDataValue,
pub(crate) audit_trail: Vec<AuditTrail>,
pub(crate) errors: Vec<ErrorInfo>,
pub(crate) capture_changes: bool,
}
impl Serialize for Message {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeStruct;
let mut state = serializer.serialize_struct("Message", 5)?;
state.serialize_field("id", &self.id)?;
state.serialize_field("payload", &self.payload)?;
state.serialize_field("context", &self.context)?;
state.serialize_field("audit_trail", &self.audit_trail)?;
state.serialize_field("errors", &self.errors)?;
state.end()
}
}
impl<'de> Deserialize<'de> for Message {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
#[derive(Deserialize)]
struct MessageData {
id: String,
payload: Arc<OwnedDataValue>,
context: OwnedDataValue,
audit_trail: Vec<AuditTrail>,
errors: Vec<ErrorInfo>,
}
let data = MessageData::deserialize(deserializer)?;
Ok(Message {
id: data.id,
payload: data.payload,
context: data.context,
audit_trail: data.audit_trail,
errors: data.errors,
capture_changes: true,
})
}
}
impl Message {
pub fn builder() -> MessageBuilder {
MessageBuilder::new()
}
pub fn new(payload: Arc<OwnedDataValue>) -> Self {
Self {
id: Uuid::now_v7().to_string(),
payload,
context: empty_context(),
audit_trail: vec![],
errors: vec![],
capture_changes: true,
}
}
pub fn from_value(payload: &JsonValue) -> Self {
Self::new(Arc::new(OwnedDataValue::from(payload)))
}
pub fn from_json_str(payload: &str) -> crate::engine::error::Result<Self> {
let value: JsonValue = serde_json::from_str(payload).map_err(DataflowError::from_serde)?;
Ok(Self::from_value(&value))
}
pub fn add_error(&mut self, error: ErrorInfo) {
self.errors.push(error);
}
pub fn has_errors(&self) -> bool {
!self.errors.is_empty()
}
#[inline]
pub fn id(&self) -> &str {
&self.id
}
#[inline]
pub fn payload(&self) -> &OwnedDataValue {
&self.payload
}
#[inline]
pub fn payload_arc(&self) -> &Arc<OwnedDataValue> {
&self.payload
}
#[inline]
pub fn audit_trail(&self) -> &[AuditTrail] {
&self.audit_trail
}
#[inline]
pub fn errors(&self) -> &[ErrorInfo] {
&self.errors
}
#[inline]
pub fn capture_changes(&self) -> bool {
self.capture_changes
}
pub fn data(&self) -> &OwnedDataValue {
&self.context["data"]
}
pub fn metadata(&self) -> &OwnedDataValue {
&self.context["metadata"]
}
pub fn temp_data(&self) -> &OwnedDataValue {
&self.context["temp_data"]
}
}
#[must_use = "MessageBuilder must be `.build()` to produce a Message"]
#[derive(Default)]
pub struct MessageBuilder {
id: Option<String>,
payload: Option<Arc<OwnedDataValue>>,
capture_changes: Option<bool>,
}
impl MessageBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn id(mut self, id: impl Into<String>) -> Self {
self.id = Some(id.into());
self
}
pub fn payload(mut self, payload: Arc<OwnedDataValue>) -> Self {
self.payload = Some(payload);
self
}
pub fn payload_json(mut self, payload: &JsonValue) -> Self {
self.payload = Some(Arc::new(OwnedDataValue::from(payload)));
self
}
pub fn capture_changes(mut self, on: bool) -> Self {
self.capture_changes = Some(on);
self
}
pub fn build(self) -> Message {
Message {
id: self.id.unwrap_or_else(|| Uuid::now_v7().to_string()),
payload: self
.payload
.unwrap_or_else(|| Arc::new(OwnedDataValue::Null)),
context: empty_context(),
audit_trail: vec![],
errors: vec![],
capture_changes: self.capture_changes.unwrap_or(true),
}
}
}
fn empty_context() -> OwnedDataValue {
OwnedDataValue::Object(vec![
("data".to_string(), OwnedDataValue::Object(Vec::new())),
("metadata".to_string(), OwnedDataValue::Object(Vec::new())),
("temp_data".to_string(), OwnedDataValue::Object(Vec::new())),
])
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct AuditTrail {
pub workflow_id: Arc<str>,
pub task_id: Arc<str>,
pub timestamp: DateTime<Utc>,
pub changes: Vec<Change>,
pub status: usize,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Change {
pub path: Arc<str>,
pub old_value: OwnedDataValue,
pub new_value: OwnedDataValue,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn from_json_str_parses_valid_payload() {
let msg =
Message::from_json_str(r#"{"order": {"total": 42}}"#).expect("valid JSON should parse");
let payload_json = serde_json::to_value(msg.payload()).unwrap();
assert_eq!(payload_json, serde_json::json!({"order": {"total": 42}}));
}
#[test]
fn from_json_str_rejects_malformed_payload() {
let err = Message::from_json_str("{ not json").expect_err("malformed input should fail");
assert!(matches!(err, DataflowError::Deserialization(_)));
}
}