use super::types::{TopicRdfMapping, TransformOperation, ValueTransformation};
use crate::error::{StreamError, StreamResult};
use crate::event::{EventMetadata, StreamEvent};
use chrono::Utc;
use std::collections::HashMap;
pub struct TopicMapper {
pattern_cache: std::sync::Mutex<HashMap<String, regex::Regex>>,
}
impl TopicMapper {
pub fn new() -> Self {
Self {
pattern_cache: std::sync::Mutex::new(HashMap::new()),
}
}
pub fn matches(&self, topic: &str, pattern: &str) -> bool {
let topic_parts: Vec<&str> = topic.split('/').collect();
let pattern_parts: Vec<&str> = pattern.split('/').collect();
self.matches_parts(&topic_parts, &pattern_parts)
}
fn matches_parts(&self, topic: &[&str], pattern: &[&str]) -> bool {
let mut ti = 0;
let mut pi = 0;
while pi < pattern.len() && ti < topic.len() {
match pattern[pi] {
"#" => {
return pi == pattern.len() - 1; }
"+" => {
ti += 1;
pi += 1;
}
part if part == topic[ti] => {
ti += 1;
pi += 1;
}
_ => {
return false;
}
}
}
ti == topic.len() && pi == pattern.len()
}
pub fn to_stream_events(
&self,
topic: &str,
parsed: &HashMap<String, serde_json::Value>,
mapping: &TopicRdfMapping,
) -> StreamResult<Vec<StreamEvent>> {
let mut events = Vec::new();
let topic_segments: Vec<&str> = topic.split('/').collect();
let subject = self.apply_pattern(&mapping.subject_pattern, &topic_segments, parsed)?;
let graph = mapping
.graph_pattern
.as_ref()
.map(|pattern| self.apply_pattern(pattern, &topic_segments, parsed))
.transpose()?;
if let Some(ref type_uri) = mapping.type_uri {
let metadata = EventMetadata {
event_id: uuid::Uuid::new_v4().to_string(),
timestamp: Utc::now(),
source: format!("mqtt:{}", topic),
user: None,
context: graph.clone(),
caused_by: None,
version: "1.0".to_string(),
properties: HashMap::new(),
checksum: None,
};
events.push(StreamEvent::TripleAdded {
subject: subject.clone(),
predicate: "http://www.w3.org/1999/02/22-rdf-syntax-ns#type".to_string(),
object: type_uri.clone(),
graph: graph.clone(),
metadata,
});
}
for (field, predicate) in &mapping.predicate_map {
if let Some(value) = parsed.get(field) {
let transformed_value =
self.apply_transformations(value.clone(), field, &mapping.transformations)?;
let object = self.json_to_rdf_literal(&transformed_value)?;
let metadata = EventMetadata {
event_id: uuid::Uuid::new_v4().to_string(),
timestamp: Utc::now(),
source: format!("mqtt:{}", topic),
user: None,
context: graph.clone(),
caused_by: None,
version: "1.0".to_string(),
properties: HashMap::new(),
checksum: None,
};
events.push(StreamEvent::TripleAdded {
subject: subject.clone(),
predicate: predicate.clone(),
object,
graph: graph.clone(),
metadata,
});
}
}
Ok(events)
}
fn apply_pattern(
&self,
pattern: &str,
topic_segments: &[&str],
payload: &HashMap<String, serde_json::Value>,
) -> StreamResult<String> {
let mut result = pattern.to_string();
for (i, segment) in topic_segments.iter().enumerate() {
result = result.replace(&format!("{{topic.{}}}", i), segment);
}
for (field, value) in payload {
if let Some(s) = value.as_str() {
result = result.replace(&format!("{{payload.{}}}", field), s);
} else if let Some(n) = value.as_i64() {
result = result.replace(&format!("{{payload.{}}}", field), &n.to_string());
} else if let Some(f) = value.as_f64() {
result = result.replace(&format!("{{payload.{}}}", field), &f.to_string());
}
}
Ok(result)
}
fn apply_transformations(
&self,
mut value: serde_json::Value,
field: &str,
transformations: &[ValueTransformation],
) -> StreamResult<serde_json::Value> {
for transform in transformations {
if transform.field == field {
value = match &transform.operation {
TransformOperation::Scale { factor } => {
if let Some(n) = value.as_f64() {
serde_json::json!(n * factor)
} else {
value
}
}
TransformOperation::Offset { value: offset } => {
if let Some(n) = value.as_f64() {
serde_json::json!(n + offset)
} else {
value
}
}
TransformOperation::LookupTable { table } => {
if let Some(s) = value.as_str() {
table.get(s).map(|v| serde_json::json!(v)).unwrap_or(value)
} else {
value
}
}
_ => value, };
}
}
Ok(value)
}
fn json_to_rdf_literal(&self, value: &serde_json::Value) -> StreamResult<String> {
match value {
serde_json::Value::String(s) => Ok(format!("\"{}\"", s)),
serde_json::Value::Number(n) => {
if n.is_i64() {
Ok(format!(
"\"{}\"^^<http://www.w3.org/2001/XMLSchema#integer>",
n
))
} else if n.is_f64() {
Ok(format!(
"\"{}\"^^<http://www.w3.org/2001/XMLSchema#double>",
n
))
} else {
Ok(format!("\"{}\"", n))
}
}
serde_json::Value::Bool(b) => Ok(format!(
"\"{}\"^^<http://www.w3.org/2001/XMLSchema#boolean>",
b
)),
serde_json::Value::Null => Ok("\"\"".to_string()),
_ => {
let json_str = serde_json::to_string(value).map_err(|e| {
StreamError::Serialization(format!("JSON serialization failed: {}", e))
})?;
Ok(format!(
"\"{}\"^^<http://www.w3.org/1999/02/22-rdf-syntax-ns#JSON>",
json_str.replace('"', "\\\"")
))
}
}
}
}
impl Default for TopicMapper {
fn default() -> Self {
Self::new()
}
}
pub struct TopicMatch {
pub pattern: String,
pub segments: HashMap<String, String>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_topic_matching() {
let mapper = TopicMapper::new();
assert!(mapper.matches("factory/sensor1/temperature", "factory/sensor1/temperature"));
assert!(mapper.matches("factory/sensor1/temperature", "factory/+/temperature"));
assert!(!mapper.matches(
"factory/floor1/sensor1/temperature",
"factory/+/temperature"
));
assert!(mapper.matches("factory/sensor1/temperature", "factory/#"));
assert!(mapper.matches("factory/floor1/sensor1/temperature", "factory/#"));
assert!(!mapper.matches("plant/sensor1/temperature", "factory/#"));
assert!(mapper.matches(
"factory/floor1/sensor1/temperature",
"factory/+/+/temperature"
));
assert!(mapper.matches("factory/floor1/sensor1/temperature", "factory/+/#"));
}
#[test]
fn test_pattern_application() {
let mapper = TopicMapper::new();
let topic_segments = vec!["factory", "sensor1", "temperature"];
let mut payload = HashMap::new();
payload.insert("device_id".to_string(), serde_json::json!("DEV001"));
let result = mapper
.apply_pattern(
"urn:factory:{topic.0}:device:{payload.device_id}",
&topic_segments,
&payload,
)
.unwrap();
assert_eq!(result, "urn:factory:factory:device:DEV001");
}
#[test]
fn test_json_to_rdf_literal() {
let mapper = TopicMapper::new();
assert_eq!(
mapper
.json_to_rdf_literal(&serde_json::json!("hello"))
.unwrap(),
"\"hello\""
);
assert!(mapper
.json_to_rdf_literal(&serde_json::json!(42))
.unwrap()
.contains("integer"));
assert!(mapper
.json_to_rdf_literal(&serde_json::json!(1.5))
.unwrap()
.contains("double"));
assert!(mapper
.json_to_rdf_literal(&serde_json::json!(true))
.unwrap()
.contains("boolean"));
}
#[test]
fn test_value_transformations() {
let mapper = TopicMapper::new();
let transform = ValueTransformation {
field: "temperature".to_string(),
operation: TransformOperation::Scale { factor: 1.8 },
};
let result = mapper
.apply_transformations(serde_json::json!(100.0), "temperature", &[transform])
.unwrap();
assert_eq!(result.as_f64().unwrap(), 180.0);
let transform = ValueTransformation {
field: "temperature".to_string(),
operation: TransformOperation::Offset { value: 32.0 },
};
let result = mapper
.apply_transformations(serde_json::json!(0.0), "temperature", &[transform])
.unwrap();
assert_eq!(result.as_f64().unwrap(), 32.0);
}
}