use super::config::{DataType, MockSourceConfig};
use anyhow::Result;
use async_trait::async_trait;
use drasi_core::models::{
Element, ElementMetadata, ElementPropertyMap, ElementReference, SourceChange,
};
use log::{debug, info};
use serde_json::Value;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tokio::sync::RwLock;
use drasi_lib::channels::*;
use drasi_lib::managers::{log_component_start, log_component_stop};
use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
use drasi_lib::Source;
use tracing::Instrument;
pub struct MockSource {
base: SourceBase,
config: MockSourceConfig,
seen_sensors: Arc<RwLock<HashSet<u32>>>,
}
impl MockSource {
pub fn new(id: impl Into<String>, config: MockSourceConfig) -> Result<Self> {
config.validate()?;
let id = id.into();
let params = SourceBaseParams::new(id);
Ok(Self {
base: SourceBase::new(params)?,
config,
seen_sensors: Arc::new(RwLock::new(HashSet::new())),
})
}
pub fn with_dispatch(
id: impl Into<String>,
config: MockSourceConfig,
dispatch_mode: Option<DispatchMode>,
dispatch_buffer_capacity: Option<usize>,
) -> Result<Self> {
config.validate()?;
let id = id.into();
let mut params = SourceBaseParams::new(id);
if let Some(mode) = dispatch_mode {
params = params.with_dispatch_mode(mode);
}
if let Some(capacity) = dispatch_buffer_capacity {
params = params.with_dispatch_buffer_capacity(capacity);
}
Ok(Self {
base: SourceBase::new(params)?,
config,
seen_sensors: Arc::new(RwLock::new(HashSet::new())),
})
}
}
#[async_trait]
impl Source for MockSource {
fn id(&self) -> &str {
&self.base.id
}
fn type_name(&self) -> &str {
"mock"
}
fn properties(&self) -> HashMap<String, serde_json::Value> {
use crate::descriptor::{DataTypeDto, MockSourceConfigDto};
use drasi_plugin_sdk::ConfigValue;
let data_type_dto = match &self.config.data_type {
DataType::Counter => DataTypeDto::Counter,
DataType::SensorReading { sensor_count } => DataTypeDto::SensorReading {
sensor_count: *sensor_count,
},
DataType::Generic => DataTypeDto::Generic,
};
let dto = MockSourceConfigDto {
data_type: data_type_dto,
interval_ms: ConfigValue::Static(self.config.interval_ms),
};
match serde_json::to_value(&dto) {
Ok(serde_json::Value::Object(map)) => map.into_iter().collect(),
_ => HashMap::new(),
}
}
fn auto_start(&self) -> bool {
self.base.get_auto_start()
}
async fn start(&self) -> Result<()> {
log_component_start("Mock Source", &self.base.id);
self.base
.set_status(
ComponentStatus::Starting,
Some("Starting mock source".to_string()),
)
.await;
let base_dispatchers = self.base.dispatchers.clone();
let source_id = self.base.id.clone();
let data_type = self.config.data_type.clone();
let interval_ms = self.config.interval_ms;
let seen_sensors = Arc::clone(&self.seen_sensors);
let seen_sensors = Arc::clone(&self.seen_sensors);
let instance_id = self
.base
.context()
.await
.map(|c| c.instance_id)
.unwrap_or_default();
let status_handle = self.base.status_handle();
let source_name = self.base.id.clone();
let source_id_for_span = source_id.clone();
let span = tracing::info_span!(
"mock_source_task",
instance_id = %instance_id,
component_id = %source_id_for_span,
component_type = "source"
);
let task = tokio::spawn(
async move {
status_handle
.set_status(
ComponentStatus::Running,
Some("Mock source started successfully".to_string()),
)
.await;
let mut interval =
tokio::time::interval(tokio::time::Duration::from_millis(interval_ms));
let mut seq = 0u64;
loop {
interval.tick().await;
if !matches!(status_handle.get_status().await, ComponentStatus::Running) {
break;
}
seq += 1;
let source_change = match data_type {
DataType::Counter => {
let element_id = format!("counter_{seq}");
let reference = ElementReference::new(&source_name, &element_id);
let mut property_map = ElementPropertyMap::new();
property_map.insert(
"value",
crate::conversion::json_to_element_value_or_default(
&Value::Number(seq.into()),
drasi_core::models::ElementValue::Null,
),
);
property_map.insert(
"timestamp",
crate::conversion::json_to_element_value_or_default(
&Value::String(chrono::Utc::now().to_rfc3339()),
drasi_core::models::ElementValue::Null,
),
);
let metadata = ElementMetadata {
reference,
labels: Arc::from(vec![Arc::from("Counter")]),
effective_from: crate::time::get_system_time_millis()
.unwrap_or_else(|e| {
log::warn!("Failed to get timestamp for mock counter: {e}");
chrono::Utc::now().timestamp_millis() as u64
}),
};
let element = Element::Node {
metadata,
properties: property_map,
};
SourceChange::Insert { element }
}
DataType::SensorReading { sensor_count } => {
let sensor_id = rand::random::<u32>() % sensor_count;
let element_id = format!("sensor_{sensor_id}");
let reference = ElementReference::new(&source_name, &element_id);
let mut property_map = ElementPropertyMap::new();
property_map.insert(
"sensor_id",
crate::conversion::json_to_element_value_or_default(
&Value::String(format!("sensor_{sensor_id}")),
drasi_core::models::ElementValue::Null,
),
);
property_map.insert(
"temperature",
crate::conversion::json_to_element_value_or_default(
&Value::Number(
serde_json::Number::from_f64(
20.0 + rand::random::<f64>() * 10.0,
)
.unwrap_or(serde_json::Number::from(25)),
),
drasi_core::models::ElementValue::Null,
),
);
property_map.insert(
"humidity",
crate::conversion::json_to_element_value_or_default(
&Value::Number(
serde_json::Number::from_f64(
40.0 + rand::random::<f64>() * 20.0,
)
.unwrap_or(serde_json::Number::from(50)),
),
drasi_core::models::ElementValue::Null,
),
);
property_map.insert(
"timestamp",
crate::conversion::json_to_element_value_or_default(
&Value::String(chrono::Utc::now().to_rfc3339()),
drasi_core::models::ElementValue::Null,
),
);
let metadata = ElementMetadata {
reference,
labels: Arc::from(vec![Arc::from("SensorReading")]),
effective_from: crate::time::get_system_time_millis()
.unwrap_or_else(|e| {
log::warn!("Failed to get timestamp for mock sensor: {e}");
chrono::Utc::now().timestamp_millis() as u64
}),
};
let element = Element::Node {
metadata,
properties: property_map,
};
let is_new = {
let mut seen = seen_sensors.write().await;
seen.insert(sensor_id)
};
if is_new {
SourceChange::Insert { element }
} else {
SourceChange::Update { element }
}
}
DataType::Generic => {
let element_id = format!("generic_{seq}");
let reference = ElementReference::new(&source_name, &element_id);
let mut property_map = ElementPropertyMap::new();
property_map.insert(
"value",
crate::conversion::json_to_element_value_or_default(
&Value::Number(rand::random::<i32>().into()),
drasi_core::models::ElementValue::Null,
),
);
property_map.insert(
"message",
crate::conversion::json_to_element_value_or_default(
&Value::String("Generic mock data".to_string()),
drasi_core::models::ElementValue::Null,
),
);
property_map.insert(
"timestamp",
crate::conversion::json_to_element_value_or_default(
&Value::String(chrono::Utc::now().to_rfc3339()),
drasi_core::models::ElementValue::Null,
),
);
let metadata = ElementMetadata {
reference,
labels: Arc::from(vec![Arc::from("Generic")]),
effective_from: crate::time::get_system_time_millis()
.unwrap_or_else(|e| {
log::warn!("Failed to get timestamp for mock generic: {e}");
chrono::Utc::now().timestamp_millis() as u64
}),
};
let element = Element::Node {
metadata,
properties: property_map,
};
SourceChange::Insert { element }
}
};
let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
let wrapper = SourceEventWrapper::with_profiling(
source_id.clone(),
SourceEvent::Change(source_change),
chrono::Utc::now(),
profiling,
);
if let Err(e) = SourceBase::dispatch_from_task(
base_dispatchers.clone(),
wrapper,
&source_id,
)
.await
{
debug!("Failed to dispatch change: {e}");
}
}
info!("Mock source task completed");
}
.instrument(span),
);
*self.base.task_handle.write().await = Some(task);
Ok(())
}
async fn stop(&self) -> Result<()> {
log_component_stop("Mock Source", &self.base.id);
self.base
.set_status(
ComponentStatus::Stopping,
Some("Stopping mock source".to_string()),
)
.await;
if let Some(handle) = self.base.task_handle.write().await.take() {
handle.abort();
let _ = handle.await;
}
self.base
.set_status(
ComponentStatus::Stopped,
Some("Mock source stopped successfully".to_string()),
)
.await;
Ok(())
}
async fn status(&self) -> ComponentStatus {
self.base.get_status().await
}
async fn subscribe(
&self,
settings: drasi_lib::config::SourceSubscriptionSettings,
) -> Result<SubscriptionResponse> {
self.base.subscribe_with_bootstrap(&settings, "Mock").await
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
self.base.initialize(context).await;
}
async fn set_bootstrap_provider(
&self,
provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
) {
self.base.set_bootstrap_provider(provider).await;
}
}
impl MockSource {
pub async fn inject_event(&self, change: SourceChange) -> Result<()> {
self.base.dispatch_source_change(change).await
}
pub fn test_subscribe(
&self,
) -> Box<dyn drasi_lib::channels::ChangeReceiver<drasi_lib::channels::SourceEventWrapper>> {
self.base.test_subscribe()
}
}
pub struct MockSourceBuilder {
id: String,
data_type: DataType,
interval_ms: u64,
dispatch_mode: Option<DispatchMode>,
dispatch_buffer_capacity: Option<usize>,
bootstrap_provider: Option<Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>>,
auto_start: bool,
}
impl MockSourceBuilder {
pub fn new(id: impl Into<String>) -> Self {
Self {
id: id.into(),
data_type: DataType::Generic,
interval_ms: 5000,
dispatch_mode: None,
dispatch_buffer_capacity: None,
bootstrap_provider: None,
auto_start: true,
}
}
pub fn with_data_type(mut self, data_type: DataType) -> Self {
self.data_type = data_type;
self
}
pub fn with_interval_ms(mut self, interval_ms: u64) -> Self {
self.interval_ms = interval_ms;
self
}
pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
self.dispatch_mode = Some(mode);
self
}
pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
self.dispatch_buffer_capacity = Some(capacity);
self
}
pub fn with_bootstrap_provider(
mut self,
provider: impl drasi_lib::bootstrap::BootstrapProvider + 'static,
) -> Self {
self.bootstrap_provider = Some(Box::new(provider));
self
}
pub fn with_auto_start(mut self, auto_start: bool) -> Self {
self.auto_start = auto_start;
self
}
pub fn build(self) -> Result<MockSource> {
let config = MockSourceConfig {
data_type: self.data_type,
interval_ms: self.interval_ms,
};
config.validate()?;
let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
if let Some(mode) = self.dispatch_mode {
params = params.with_dispatch_mode(mode);
}
if let Some(capacity) = self.dispatch_buffer_capacity {
params = params.with_dispatch_buffer_capacity(capacity);
}
if let Some(provider) = self.bootstrap_provider {
params = params.with_bootstrap_provider(provider);
}
Ok(MockSource {
base: SourceBase::new(params)?,
config,
seen_sensors: Arc::new(RwLock::new(HashSet::new())),
})
}
}
impl MockSource {
pub fn builder(id: impl Into<String>) -> MockSourceBuilder {
MockSourceBuilder::new(id)
}
}