use super::types::{ResultChangeEvent, ResultEvent, UpdatePayload};
use anyhow::{anyhow, Result};
use drasi_lib::channels::{QueryResult, ResultDiff};
use drasi_lib::profiling::ProfilingMetadata;
use serde_json::{json, Map, Value};
fn build_tracking_metadata(profiling: &ProfilingMetadata, sequence: u64) -> Map<String, Value> {
let mut tracking = Map::new();
let mut source = Map::new();
if let Some(source_send_ns) = profiling.source_send_ns {
source.insert("changeDispatcherEnd_ns".to_string(), json!(source_send_ns));
source.insert(
"changeDispatcherStart_ns".to_string(),
json!(source_send_ns),
);
source.insert("changeRouterEnd_ns".to_string(), json!(source_send_ns));
}
if let Some(source_receive_ns) = profiling.source_receive_ns {
source.insert("changeRouterStart_ns".to_string(), json!(source_receive_ns));
}
if let Some(reactivator_end_ns) = profiling.reactivator_end_ns {
source.insert("reactivatorEnd_ns".to_string(), json!(reactivator_end_ns));
} else if let Some(source_ns) = profiling.source_ns {
source.insert("reactivatorEnd_ns".to_string(), json!(source_ns));
}
if let Some(reactivator_start_ns) = profiling.reactivator_start_ns {
source.insert(
"reactivatorStart_ns".to_string(),
json!(reactivator_start_ns),
);
} else if let Some(source_ns) = profiling.source_ns {
source.insert("reactivatorStart_ns".to_string(), json!(source_ns));
}
if let Some(source_ns) = profiling.source_ns {
source.insert("source_ns".to_string(), json!(source_ns));
}
source.insert("seq".to_string(), json!(sequence));
let mut query = Map::new();
if let Some(source_send_ns) = profiling.source_send_ns {
query.insert("enqueue_ns".to_string(), json!(source_send_ns));
}
if let Some(query_receive_ns) = profiling.query_receive_ns {
query.insert("dequeue_ns".to_string(), json!(query_receive_ns));
}
if let Some(query_core_call_ns) = profiling.query_core_call_ns {
query.insert("queryStart_ns".to_string(), json!(query_core_call_ns));
}
if let Some(query_core_return_ns) = profiling.query_core_return_ns {
query.insert("queryEnd_ns".to_string(), json!(query_core_return_ns));
}
if !source.is_empty() {
tracking.insert("source".to_string(), Value::Object(source));
}
if !query.is_empty() {
tracking.insert("query".to_string(), Value::Object(query));
}
let mut result = Map::new();
result.insert("tracking".to_string(), Value::Object(tracking));
result
}
pub fn transform_query_result(
query_result: QueryResult,
sequence: i64,
sequence_u64: u64,
) -> Result<ResultEvent> {
let mut added_results = Vec::new();
let mut updated_results = Vec::new();
let mut deleted_results = Vec::new();
let profiling = query_result.profiling.as_ref();
for result_item in query_result.results {
match result_item {
ResultDiff::Add { data } => {
let data = data
.as_object()
.ok_or_else(|| anyhow!("'data' field must be an object"))?
.clone();
added_results.push(data);
}
ResultDiff::Update {
before,
after,
grouping_keys,
..
} => {
let before = before
.as_object()
.ok_or_else(|| anyhow!("'before' field must be an object"))?
.clone();
let after = after
.as_object()
.ok_or_else(|| anyhow!("'after' field must be an object"))?
.clone();
updated_results.push(UpdatePayload {
before: Some(before),
after: Some(after),
grouping_keys,
});
}
ResultDiff::Delete { data } => {
let data = data
.as_object()
.ok_or_else(|| anyhow!("'data' field must be an object"))?
.clone();
deleted_results.push(data);
}
ResultDiff::Aggregation { .. } | ResultDiff::Noop => {
log::warn!("Unknown result type: aggregation/noop, skipping");
continue;
}
}
}
let source_time_ms = query_result.timestamp.timestamp_millis() as u64;
let mut filtered_metadata: Map<String, Value> = query_result
.metadata
.into_iter()
.filter(|(key, _)| {
!matches!(
key.as_str(),
"query" | "processed_by" | "source_id" | "result_count"
)
})
.collect();
if let Some(prof) = profiling {
let tracking_metadata = build_tracking_metadata(prof, sequence_u64);
for (key, value) in tracking_metadata {
filtered_metadata.insert(key, value);
}
}
let metadata = if filtered_metadata.is_empty() {
None
} else {
Some(filtered_metadata)
};
let change_event = ResultChangeEvent {
query_id: query_result.query_id,
sequence: sequence as u64,
source_time_ms,
added_results,
updated_results,
deleted_results,
metadata,
};
Ok(ResultEvent::Change(change_event))
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
use std::collections::HashMap;
#[test]
fn test_transform_add_results() {
let timestamp = chrono::DateTime::from_timestamp_millis(1609459200000).unwrap();
let query_result = QueryResult {
query_id: "test-query".to_string(),
timestamp,
results: vec![
ResultDiff::Add {
data: json!({"id": "1", "value": "test1"}),
},
ResultDiff::Add {
data: json!({"id": "2", "value": "test2"}),
},
],
metadata: HashMap::new(),
profiling: None,
};
let result = transform_query_result(query_result, 1, 1).unwrap();
match result {
ResultEvent::Change(change) => {
assert_eq!(change.query_id, "test-query");
assert_eq!(change.sequence, 1);
assert_eq!(change.source_time_ms, 1609459200000);
assert_eq!(change.added_results.len(), 2);
assert_eq!(change.updated_results.len(), 0);
assert_eq!(change.deleted_results.len(), 0);
assert_eq!(change.added_results[0]["id"], "1");
assert_eq!(change.added_results[1]["id"], "2");
}
_ => panic!("Expected Change event"),
}
}
#[test]
fn test_transform_update_results() {
let timestamp = chrono::DateTime::from_timestamp_millis(1609459200000).unwrap();
let query_result = QueryResult {
query_id: "test-query".to_string(),
timestamp,
results: vec![ResultDiff::Update {
data: json!({"id": "1", "value": 20}),
before: json!({"id": "1", "value": 10}),
after: json!({"id": "1", "value": 20}),
grouping_keys: None,
}],
metadata: HashMap::new(),
profiling: None,
};
let result = transform_query_result(query_result, 2, 2).unwrap();
match result {
ResultEvent::Change(change) => {
assert_eq!(change.updated_results.len(), 1);
assert_eq!(
change.updated_results[0].before.as_ref().unwrap()["value"],
10
);
assert_eq!(
change.updated_results[0].after.as_ref().unwrap()["value"],
20
);
assert_eq!(change.updated_results[0].grouping_keys, None);
}
_ => panic!("Expected Change event"),
}
}
#[test]
fn test_transform_update_with_grouping_keys() {
let query_result = QueryResult {
query_id: "test-query".to_string(),
timestamp: chrono::DateTime::from_timestamp_millis(1609459200000).unwrap(),
results: vec![ResultDiff::Update {
data: json!({"id": "1", "value": 20}),
before: json!({"id": "1", "value": 10}),
after: json!({"id": "1", "value": 20}),
grouping_keys: Some(vec!["key1".to_string(), "key2".to_string()]),
}],
metadata: HashMap::new(),
profiling: None,
};
let result = transform_query_result(query_result, 2, 2).unwrap();
match result {
ResultEvent::Change(change) => {
assert_eq!(change.updated_results.len(), 1);
assert_eq!(
change.updated_results[0].grouping_keys,
Some(vec!["key1".to_string(), "key2".to_string()])
);
}
_ => panic!("Expected Change event"),
}
}
#[test]
fn test_transform_delete_results() {
let query_result = QueryResult {
query_id: "test-query".to_string(),
timestamp: chrono::DateTime::from_timestamp_millis(1609459200000).unwrap(),
results: vec![ResultDiff::Delete {
data: json!({"id": "1"}),
}],
metadata: HashMap::new(),
profiling: None,
};
let result = transform_query_result(query_result, 3, 3).unwrap();
match result {
ResultEvent::Change(change) => {
assert_eq!(change.deleted_results.len(), 1);
assert_eq!(change.deleted_results[0]["id"], "1");
}
_ => panic!("Expected Change event"),
}
}
#[test]
fn test_transform_mixed_results() {
let query_result = QueryResult {
query_id: "test-query".to_string(),
timestamp: chrono::DateTime::from_timestamp_millis(1609459200000).unwrap(),
results: vec![
ResultDiff::Add {
data: json!({"id": "1"}),
},
ResultDiff::Update {
data: json!({"id": "2", "value": 20}),
before: json!({"id": "2", "value": 10}),
after: json!({"id": "2", "value": 20}),
grouping_keys: None,
},
ResultDiff::Delete {
data: json!({"id": "3"}),
},
],
metadata: HashMap::new(),
profiling: None,
};
let result = transform_query_result(query_result, 4, 4).unwrap();
match result {
ResultEvent::Change(change) => {
assert_eq!(change.added_results.len(), 1);
assert_eq!(change.updated_results.len(), 1);
assert_eq!(change.deleted_results.len(), 1);
}
_ => panic!("Expected Change event"),
}
}
#[test]
fn test_transform_with_metadata() {
let mut metadata = HashMap::new();
metadata.insert("tracking".to_string(), json!({"timing": 100}));
metadata.insert("query".to_string(), json!({"execution_time": 50})); metadata.insert("source_id".to_string(), json!("test-source"));
let query_result = QueryResult {
query_id: "test-query".to_string(),
timestamp: chrono::DateTime::from_timestamp_millis(1609459200000).unwrap(),
results: vec![ResultDiff::Add {
data: json!({"id": "1"}),
}],
metadata,
profiling: None,
};
let result = transform_query_result(query_result, 5, 5).unwrap();
match result {
ResultEvent::Change(change) => {
assert!(change.metadata.is_some());
let meta = change.metadata.unwrap();
assert!(meta.contains_key("tracking"));
assert!(!meta.contains_key("query")); assert!(!meta.contains_key("source_id")); }
_ => panic!("Expected Change event"),
}
}
#[test]
fn test_transform_invalid_add_data_field() {
let query_result = QueryResult {
query_id: "test-query".to_string(),
timestamp: chrono::DateTime::from_timestamp_millis(1609459200000).unwrap(),
results: vec![ResultDiff::Add {
data: json!(["not-an-object"]),
}],
metadata: HashMap::new(),
profiling: None,
};
let result = transform_query_result(query_result, 1, 1);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("'data' field must be an object"));
}
#[test]
fn test_transform_invalid_update_before_field() {
let query_result = QueryResult {
query_id: "test-query".to_string(),
timestamp: chrono::DateTime::from_timestamp_millis(1609459200000).unwrap(),
results: vec![ResultDiff::Update {
data: json!({"id": "1", "value": 20}),
before: json!(true),
after: json!({"id": "1", "value": 20}),
grouping_keys: None,
}],
metadata: HashMap::new(),
profiling: None,
};
let result = transform_query_result(query_result, 1, 1);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("'before' field must be an object"));
}
#[test]
fn test_transform_invalid_update_after_field() {
let query_result = QueryResult {
query_id: "test-query".to_string(),
timestamp: chrono::DateTime::from_timestamp_millis(1609459200000).unwrap(),
results: vec![ResultDiff::Update {
data: json!({"id": "1", "value": 20}),
before: json!({"id": "1", "value": 10}),
after: json!(42),
grouping_keys: None,
}],
metadata: HashMap::new(),
profiling: None,
};
let result = transform_query_result(query_result, 1, 1);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("'after' field must be an object"));
}
#[test]
fn test_transform_unknown_type_skipped() {
let query_result = QueryResult {
query_id: "test-query".to_string(),
timestamp: chrono::DateTime::from_timestamp_millis(1609459200000).unwrap(),
results: vec![
ResultDiff::Add {
data: json!({"id": "1"}),
},
ResultDiff::Noop,
ResultDiff::Delete {
data: json!({"id": "3"}),
},
],
metadata: HashMap::new(),
profiling: None,
};
let result = transform_query_result(query_result, 1, 1).unwrap();
match result {
ResultEvent::Change(change) => {
assert_eq!(change.added_results.len(), 1);
assert_eq!(change.deleted_results.len(), 1);
}
_ => panic!("Expected Change event"),
}
}
#[test]
fn test_transform_empty_metadata_filtered() {
let query_result = QueryResult {
query_id: "test-query".to_string(),
timestamp: chrono::DateTime::from_timestamp_millis(1609459200000).unwrap(),
results: vec![ResultDiff::Add {
data: json!({"id": "1"}),
}],
metadata: HashMap::new(), profiling: None,
};
let result = transform_query_result(query_result, 1, 1).unwrap();
match result {
ResultEvent::Change(change) => {
assert!(change.metadata.is_none());
}
_ => panic!("Expected Change event"),
}
}
#[test]
fn test_build_tracking_metadata_all_fields() {
use drasi_lib::profiling::ProfilingMetadata;
let profiling = ProfilingMetadata {
source_ns: Some(1000),
reactivator_start_ns: Some(500),
reactivator_end_ns: Some(750),
source_receive_ns: Some(2000),
source_send_ns: Some(3000),
query_receive_ns: Some(4000),
query_core_call_ns: Some(5000),
query_core_return_ns: Some(6000),
query_send_ns: Some(7000),
reaction_receive_ns: Some(8000),
reaction_complete_ns: Some(9000),
};
let result = build_tracking_metadata(&profiling, 42);
assert!(result.contains_key("tracking"));
let tracking = result.get("tracking").unwrap().as_object().unwrap();
assert!(tracking.contains_key("source"));
let source = tracking.get("source").unwrap().as_object().unwrap();
assert_eq!(
source.get("changeDispatcherEnd_ns").unwrap().as_u64(),
Some(3000)
);
assert_eq!(
source.get("changeDispatcherStart_ns").unwrap().as_u64(),
Some(3000)
);
assert_eq!(
source.get("changeRouterEnd_ns").unwrap().as_u64(),
Some(3000)
);
assert_eq!(
source.get("changeRouterStart_ns").unwrap().as_u64(),
Some(2000)
);
assert_eq!(source.get("reactivatorEnd_ns").unwrap().as_u64(), Some(750));
assert_eq!(
source.get("reactivatorStart_ns").unwrap().as_u64(),
Some(500)
);
assert_eq!(source.get("source_ns").unwrap().as_u64(), Some(1000));
assert_eq!(source.get("seq").unwrap().as_u64(), Some(42));
assert!(tracking.contains_key("query"));
let query = tracking.get("query").unwrap().as_object().unwrap();
assert_eq!(query.get("enqueue_ns").unwrap().as_u64(), Some(3000));
assert_eq!(query.get("dequeue_ns").unwrap().as_u64(), Some(4000));
assert_eq!(query.get("queryStart_ns").unwrap().as_u64(), Some(5000));
assert_eq!(query.get("queryEnd_ns").unwrap().as_u64(), Some(6000));
}
#[test]
fn test_build_tracking_metadata_partial_fields() {
use drasi_lib::profiling::ProfilingMetadata;
let profiling = ProfilingMetadata {
source_ns: Some(1000),
source_send_ns: Some(3000),
query_receive_ns: Some(4000),
..Default::default()
};
let result = build_tracking_metadata(&profiling, 10);
let tracking = result.get("tracking").unwrap().as_object().unwrap();
let source = tracking.get("source").unwrap().as_object().unwrap();
assert_eq!(
source.get("changeDispatcherEnd_ns").unwrap().as_u64(),
Some(3000)
);
assert_eq!(source.get("source_ns").unwrap().as_u64(), Some(1000));
assert_eq!(source.get("seq").unwrap().as_u64(), Some(10));
assert!(!source.contains_key("changeRouterStart_ns"));
let query = tracking.get("query").unwrap().as_object().unwrap();
assert_eq!(query.get("enqueue_ns").unwrap().as_u64(), Some(3000));
assert_eq!(query.get("dequeue_ns").unwrap().as_u64(), Some(4000));
assert!(!query.contains_key("queryStart_ns")); }
#[test]
fn test_build_tracking_metadata_minimal_fields() {
use drasi_lib::profiling::ProfilingMetadata;
let profiling = ProfilingMetadata::default();
let result = build_tracking_metadata(&profiling, 99);
let tracking = result.get("tracking").unwrap().as_object().unwrap();
let source = tracking.get("source").unwrap().as_object().unwrap();
assert_eq!(source.get("seq").unwrap().as_u64(), Some(99));
assert_eq!(source.len(), 1);
}
#[test]
fn test_transform_with_profiling_metadata() {
use drasi_lib::profiling::ProfilingMetadata;
let profiling = ProfilingMetadata {
source_ns: Some(1744055144490466971),
source_receive_ns: Some(1744055159124143047),
source_send_ns: Some(1744055173551481387),
query_receive_ns: Some(1744055178510629042),
query_core_call_ns: Some(1744055178510650750),
query_core_return_ns: Some(1744055178510848750),
..Default::default()
};
let query_result = QueryResult {
query_id: "test-query".to_string(),
timestamp: chrono::DateTime::from_timestamp_millis(1609459200000).unwrap(),
results: vec![ResultDiff::Add {
data: json!({"id": "1"}),
}],
metadata: HashMap::new(),
profiling: Some(profiling),
};
let result = transform_query_result(query_result, 24851, 24851).unwrap();
match result {
ResultEvent::Change(change) => {
assert!(change.metadata.is_some());
let metadata = change.metadata.unwrap();
assert!(metadata.contains_key("tracking"));
let tracking = metadata.get("tracking").unwrap().as_object().unwrap();
assert!(tracking.contains_key("source"));
let source = tracking.get("source").unwrap().as_object().unwrap();
assert_eq!(source.get("seq").unwrap().as_u64(), Some(24851));
assert_eq!(
source.get("source_ns").unwrap().as_u64(),
Some(1744055144490466971)
);
assert!(tracking.contains_key("query"));
let query = tracking.get("query").unwrap().as_object().unwrap();
assert_eq!(
query.get("enqueue_ns").unwrap().as_u64(),
Some(1744055173551481387)
);
assert_eq!(
query.get("dequeue_ns").unwrap().as_u64(),
Some(1744055178510629042)
);
}
_ => panic!("Expected Change event"),
}
}
#[test]
fn test_transform_profiling_merged_with_existing_metadata() {
use drasi_lib::profiling::ProfilingMetadata;
let mut metadata = HashMap::new();
metadata.insert("custom_field".to_string(), json!({"value": 123}));
let profiling = ProfilingMetadata {
source_ns: Some(1000),
source_send_ns: Some(2000),
..Default::default()
};
let query_result = QueryResult {
query_id: "test-query".to_string(),
timestamp: chrono::DateTime::from_timestamp_millis(1609459200000).unwrap(),
results: vec![ResultDiff::Add {
data: json!({"id": "1"}),
}],
metadata,
profiling: Some(profiling),
};
let result = transform_query_result(query_result, 1, 1).unwrap();
match result {
ResultEvent::Change(change) => {
let meta = change.metadata.unwrap();
assert!(meta.contains_key("custom_field"));
assert!(meta.contains_key("tracking"));
}
_ => panic!("Expected Change event"),
}
}
}