use anyhow::Result;
use async_trait::async_trait;
use drasi_lib::channels::{
ComponentStatus, DispatchMode, SourceEvent, SourceEventWrapper, SubscriptionResponse,
};
use drasi_lib::config::SourceSubscriptionSettings;
use drasi_lib::context::SourceRuntimeContext;
use drasi_lib::profiling::ProfilingMetadata;
use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
use drasi_lib::Source;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
use drasi_core::models::{
Element, ElementMetadata, ElementPropertyMap, ElementReference, ElementValue, SourceChange,
};
use ordered_float::OrderedFloat;
#[derive(Clone)]
pub struct MockSourceHandle {
tx: mpsc::Sender<SourceChange>,
source_id: String,
}
impl MockSourceHandle {
pub async fn send(&self, change: SourceChange) -> Result<()> {
self.tx
.send(change)
.await
.map_err(|_| anyhow::anyhow!("Failed to send event: channel closed"))?;
Ok(())
}
pub async fn send_node_insert(
&self,
element_id: impl Into<Arc<str>>,
labels: Vec<impl Into<Arc<str>>>,
properties: ElementPropertyMap,
) -> Result<()> {
let effective_from = chrono::Utc::now().timestamp_millis() as u64;
let element = Element::Node {
metadata: ElementMetadata {
reference: ElementReference {
source_id: Arc::from(self.source_id.as_str()),
element_id: element_id.into(),
},
labels: Arc::from(labels.into_iter().map(|l| l.into()).collect::<Vec<_>>()),
effective_from,
},
properties,
};
self.send(SourceChange::Insert { element }).await
}
#[allow(dead_code)]
pub fn source_id(&self) -> &str {
&self.source_id
}
}
pub struct MockSource {
base: SourceBase,
app_rx: Arc<RwLock<Option<mpsc::Receiver<SourceChange>>>>,
#[allow(dead_code)]
app_tx: mpsc::Sender<SourceChange>,
}
impl MockSource {
pub fn new(id: impl Into<String>) -> Result<(Self, MockSourceHandle)> {
let id = id.into();
let params = SourceBaseParams::new(id.clone());
let (app_tx, app_rx) = mpsc::channel(1000);
let handle = MockSourceHandle {
tx: app_tx.clone(),
source_id: id.clone(),
};
let source = Self {
base: SourceBase::new(params)?,
app_rx: Arc::new(RwLock::new(Some(app_rx))),
app_tx,
};
Ok((source, handle))
}
async fn process_events(&self) -> Result<()> {
let mut rx = self
.app_rx
.write()
.await
.take()
.ok_or_else(|| anyhow::anyhow!("Receiver already taken"))?;
let source_name = self.base.id.clone();
let base_dispatchers = self.base.dispatchers.clone();
let status_handle = self.base.status_handle();
let handle = tokio::spawn(async move {
status_handle
.set_status(ComponentStatus::Running, Some("Mock source running".into()))
.await;
while let Some(change) = rx.recv().await {
let mut profiling = ProfilingMetadata::new();
profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
let wrapper = SourceEventWrapper::with_profiling(
source_name.clone(),
SourceEvent::Change(change),
chrono::Utc::now(),
profiling,
);
let _ =
SourceBase::dispatch_from_task(base_dispatchers.clone(), wrapper, &source_name)
.await;
}
});
*self.base.task_handle.write().await = Some(handle);
Ok(())
}
}
#[async_trait]
impl Source for MockSource {
fn id(&self) -> &str {
&self.base.id
}
fn type_name(&self) -> &str {
"mock"
}
fn properties(&self) -> HashMap<String, serde_json::Value> {
HashMap::new()
}
fn dispatch_mode(&self) -> DispatchMode {
DispatchMode::Channel
}
fn auto_start(&self) -> bool {
true
}
async fn start(&self) -> Result<()> {
self.base
.set_status(
ComponentStatus::Starting,
Some("Starting mock source".into()),
)
.await;
self.process_events().await?;
Ok(())
}
async fn stop(&self) -> Result<()> {
self.base
.set_status(
ComponentStatus::Stopping,
Some("Stopping mock source".into()),
)
.await;
if let Some(handle) = self.base.task_handle.write().await.take() {
handle.abort();
}
self.base
.set_status(ComponentStatus::Stopped, Some("Mock source stopped".into()))
.await;
Ok(())
}
async fn status(&self) -> ComponentStatus {
self.base.get_status().await
}
async fn subscribe(
&self,
settings: SourceSubscriptionSettings,
) -> Result<SubscriptionResponse> {
self.base.subscribe_with_bootstrap(&settings, "Mock").await
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
async fn initialize(&self, context: SourceRuntimeContext) {
self.base.initialize(context).await;
}
async fn set_bootstrap_provider(
&self,
provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
) {
self.base.set_bootstrap_provider(provider).await;
}
}
pub struct PropertyMapBuilder {
properties: ElementPropertyMap,
}
impl PropertyMapBuilder {
pub fn new() -> Self {
Self {
properties: ElementPropertyMap::new(),
}
}
pub fn with_string(mut self, key: &str, value: &str) -> Self {
self.properties
.insert(key, ElementValue::String(Arc::from(value)));
self
}
pub fn with_integer(mut self, key: &str, value: i64) -> Self {
self.properties.insert(key, ElementValue::Integer(value));
self
}
#[allow(dead_code)]
pub fn with_float(mut self, key: &str, value: f64) -> Self {
self.properties
.insert(key, ElementValue::Float(OrderedFloat(value)));
self
}
#[allow(dead_code)]
pub fn with_bool(mut self, key: &str, value: bool) -> Self {
self.properties.insert(key, ElementValue::Bool(value));
self
}
pub fn build(self) -> ElementPropertyMap {
self.properties
}
}
impl Default for PropertyMapBuilder {
fn default() -> Self {
Self::new()
}
}