use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use crate::stream_name::{Category, StreamName};
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(default)]
pub struct Metadata {
#[serde(skip_serializing_if = "Option::is_none")]
pub stream_name: Option<StreamName>,
#[serde(skip_serializing_if = "Option::is_none")]
pub position: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub global_position: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub causation_message_stream_name: Option<StreamName>,
#[serde(skip_serializing_if = "Option::is_none")]
pub causation_message_position: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub causation_message_global_position: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub correlation_stream_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub reply_stream_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub schema_version: Option<String>,
#[serde(skip_serializing_if = "HashMap::is_empty")]
pub properties: HashMap<String, Value>,
#[serde(skip_serializing_if = "HashMap::is_empty")]
pub local_properties: HashMap<String, Value>,
}
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)]
pub struct MetadataRef<'a> {
#[serde(skip_serializing_if = "Option::is_none")]
pub stream_name: Option<&'a StreamName>,
#[serde(skip_serializing_if = "Option::is_none")]
pub position: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub global_position: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub causation_message_stream_name: Option<&'a StreamName>,
#[serde(skip_serializing_if = "Option::is_none")]
pub causation_message_position: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub causation_message_global_position: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub correlation_stream_name: Option<&'a str>,
#[serde(skip_serializing_if = "Option::is_none")]
pub reply_stream_name: Option<&'a str>,
#[serde(skip_serializing_if = "Option::is_none")]
pub schema_version: Option<&'a str>,
#[serde(skip_serializing_if = "HashMap::is_empty")]
pub properties: HashMap<&'a str, &'a Value>,
#[serde(skip_serializing_if = "HashMap::is_empty")]
pub local_properties: HashMap<&'a str, &'a Value>,
}
impl Metadata {
pub fn identifier(&self) -> Option<String> {
Option::zip(self.stream_name.as_ref(), self.position)
.map(|(stream_name, position)| format!("{stream_name}/{position}"))
}
pub fn causation_message_identifier(&self) -> Option<String> {
Option::zip(
self.causation_message_stream_name.as_ref(),
self.causation_message_position,
)
.map(|(stream_name, position)| format!("{stream_name}/{position}"))
}
pub fn follow(&mut self, preceding_metadata: Metadata) {
self.causation_message_stream_name = preceding_metadata.stream_name;
self.causation_message_position = preceding_metadata.position;
self.causation_message_global_position = preceding_metadata.global_position;
self.correlation_stream_name = preceding_metadata.correlation_stream_name;
self.reply_stream_name = preceding_metadata.reply_stream_name;
self.properties.extend(preceding_metadata.properties);
}
pub fn follows(&self, preceding_metadata: &Metadata) -> bool {
if self.causation_message_stream_name.is_none() && preceding_metadata.stream_name.is_none()
{
return false;
}
if self.causation_message_stream_name != preceding_metadata.stream_name {
return false;
}
if self.causation_message_position.is_none() && preceding_metadata.position.is_none() {
return false;
}
if self.causation_message_position != preceding_metadata.position {
return false;
}
if self.causation_message_global_position.is_none()
&& preceding_metadata.global_position.is_none()
{
return false;
}
if self.causation_message_global_position != preceding_metadata.global_position {
return false;
}
if preceding_metadata.correlation_stream_name.is_some()
&& self.correlation_stream_name != preceding_metadata.correlation_stream_name
{
return false;
}
if preceding_metadata.reply_stream_name.is_some()
&& self.reply_stream_name != preceding_metadata.reply_stream_name
{
return false;
}
true
}
pub fn clear_reply_stream_name(&mut self) {
self.reply_stream_name = None;
}
pub fn is_reply(&self) -> bool {
self.reply_stream_name.is_some()
}
pub fn is_correlated(&self, stream_name: &str) -> bool {
let Some(correlation_stream_name) = &self.correlation_stream_name else {
return false;
};
let stream_name = Category::normalize(stream_name);
if StreamName::is_category(&stream_name) {
StreamName::category(correlation_stream_name) == stream_name
} else {
correlation_stream_name == &stream_name
}
}
}
impl TryFrom<Option<Value>> for Metadata {
type Error = serde_json::Error;
fn try_from(value: Option<Value>) -> Result<Self, Self::Error> {
match value {
Some(value) => serde_json::from_value(value),
None => Ok(Metadata::default()),
}
}
}