#![allow(unexpected_cfgs)]
pub mod config;
pub use config::ApplicationSourceConfig;
mod property_builder;
mod time;
#[cfg(test)]
mod tests;
pub use property_builder::PropertyMapBuilder;
use anyhow::Result;
use async_trait::async_trait;
use log::{debug, info, warn};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
use drasi_lib::channels::{ComponentStatus, ComponentType, *};
use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
use drasi_lib::Source;
use tracing::Instrument;
#[derive(Clone)]
pub struct ApplicationSourceHandle {
tx: mpsc::Sender<SourceChange>,
source_id: String,
}
impl ApplicationSourceHandle {
pub async fn send(&self, change: SourceChange) -> Result<()> {
self.tx
.send(change)
.await
.map_err(|_| anyhow::anyhow!("Failed to send event: channel closed"))?;
Ok(())
}
pub async fn send_node_insert(
&self,
element_id: impl Into<Arc<str>>,
labels: Vec<impl Into<Arc<str>>>,
properties: drasi_core::models::ElementPropertyMap,
) -> Result<()> {
let effective_from = crate::time::get_current_timestamp_millis().unwrap_or_else(|e| {
warn!("Failed to get timestamp for node insert: {e}, using fallback");
chrono::Utc::now().timestamp_millis() as u64
});
let element = Element::Node {
metadata: ElementMetadata {
reference: ElementReference {
source_id: Arc::from(self.source_id.as_str()),
element_id: element_id.into(),
},
labels: Arc::from(labels.into_iter().map(|l| l.into()).collect::<Vec<_>>()),
effective_from,
},
properties,
};
self.send(SourceChange::Insert { element }).await
}
pub async fn send_node_update(
&self,
element_id: impl Into<Arc<str>>,
labels: Vec<impl Into<Arc<str>>>,
properties: drasi_core::models::ElementPropertyMap,
) -> Result<()> {
let effective_from = crate::time::get_current_timestamp_millis().unwrap_or_else(|e| {
warn!("Failed to get timestamp for node update: {e}, using fallback");
chrono::Utc::now().timestamp_millis() as u64
});
let element = Element::Node {
metadata: ElementMetadata {
reference: ElementReference {
source_id: Arc::from(self.source_id.as_str()),
element_id: element_id.into(),
},
labels: Arc::from(labels.into_iter().map(|l| l.into()).collect::<Vec<_>>()),
effective_from,
},
properties,
};
self.send(SourceChange::Update { element }).await
}
pub async fn send_delete(
&self,
element_id: impl Into<Arc<str>>,
labels: Vec<impl Into<Arc<str>>>,
) -> Result<()> {
let effective_from = crate::time::get_current_timestamp_millis().unwrap_or_else(|e| {
warn!("Failed to get timestamp for delete: {e}, using fallback");
chrono::Utc::now().timestamp_millis() as u64
});
let metadata = ElementMetadata {
reference: ElementReference {
source_id: Arc::from(self.source_id.as_str()),
element_id: element_id.into(),
},
labels: Arc::from(labels.into_iter().map(|l| l.into()).collect::<Vec<_>>()),
effective_from,
};
self.send(SourceChange::Delete { metadata }).await
}
pub async fn send_relation_insert(
&self,
element_id: impl Into<Arc<str>>,
labels: Vec<impl Into<Arc<str>>>,
properties: drasi_core::models::ElementPropertyMap,
start_node_id: impl Into<Arc<str>>,
end_node_id: impl Into<Arc<str>>,
) -> Result<()> {
let effective_from = crate::time::get_current_timestamp_millis().unwrap_or_else(|e| {
warn!("Failed to get timestamp for relation insert: {e}, using fallback");
chrono::Utc::now().timestamp_millis() as u64
});
let element = Element::Relation {
metadata: ElementMetadata {
reference: ElementReference {
source_id: Arc::from(self.source_id.as_str()),
element_id: element_id.into(),
},
labels: Arc::from(labels.into_iter().map(|l| l.into()).collect::<Vec<_>>()),
effective_from,
},
properties,
in_node: ElementReference {
source_id: Arc::from(self.source_id.as_str()),
element_id: end_node_id.into(),
},
out_node: ElementReference {
source_id: Arc::from(self.source_id.as_str()),
element_id: start_node_id.into(),
},
};
self.send(SourceChange::Insert { element }).await
}
pub async fn send_batch(&self, changes: Vec<SourceChange>) -> Result<()> {
for change in changes {
self.send(change).await?;
}
Ok(())
}
pub fn source_id(&self) -> &str {
&self.source_id
}
}
pub struct ApplicationSource {
base: SourceBase,
config: ApplicationSourceConfig,
app_rx: Arc<RwLock<Option<mpsc::Receiver<SourceChange>>>>,
app_tx: mpsc::Sender<SourceChange>,
}
impl ApplicationSource {
pub fn new(
id: impl Into<String>,
config: ApplicationSourceConfig,
) -> Result<(Self, ApplicationSourceHandle)> {
let id = id.into();
let params = SourceBaseParams::new(id.clone());
let (app_tx, app_rx) = mpsc::channel(1000);
let handle = ApplicationSourceHandle {
tx: app_tx.clone(),
source_id: id.clone(),
};
let source = Self {
base: SourceBase::new(params)?,
config,
app_rx: Arc::new(RwLock::new(Some(app_rx))),
app_tx,
};
Ok((source, handle))
}
pub fn get_handle(&self) -> ApplicationSourceHandle {
ApplicationSourceHandle {
tx: self.app_tx.clone(),
source_id: self.base.id.clone(),
}
}
async fn process_events(&self) -> Result<()> {
let mut rx = self
.app_rx
.write()
.await
.take()
.ok_or_else(|| anyhow::anyhow!("Receiver already taken"))?;
let source_name = self.base.id.clone();
let base_dispatchers = self.base.dispatchers.clone();
let status_tx = self.base.status_tx();
let status = self.base.status.clone();
let source_id = self.base.id.clone();
let instance_id = self
.base
.context()
.await
.map(|c| c.instance_id)
.unwrap_or_default();
let span = tracing::info_span!(
"application_source_processor",
instance_id = %instance_id,
component_id = %source_id,
component_type = "source"
);
let handle = tokio::spawn(
async move {
info!("ApplicationSource '{source_name}' event processor started");
if let Some(ref tx) = *status_tx.read().await {
let _ = tx
.send(ComponentEvent {
component_id: source_name.clone(),
component_type: ComponentType::Source,
status: ComponentStatus::Running,
timestamp: chrono::Utc::now(),
message: Some("Processing events".to_string()),
})
.await;
}
*status.write().await = ComponentStatus::Running;
while let Some(change) = rx.recv().await {
debug!("ApplicationSource '{source_name}' received event: {change:?}");
let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
let wrapper = SourceEventWrapper::with_profiling(
source_name.clone(),
SourceEvent::Change(change),
chrono::Utc::now(),
profiling,
);
if let Err(e) = SourceBase::dispatch_from_task(
base_dispatchers.clone(),
wrapper,
&source_name,
)
.await
{
debug!("Failed to dispatch change (no subscribers): {e}");
}
}
info!("ApplicationSource '{source_name}' event processor stopped");
}
.instrument(span),
);
*self.base.task_handle.write().await = Some(handle);
Ok(())
}
}
#[async_trait]
impl Source for ApplicationSource {
fn id(&self) -> &str {
&self.base.id
}
fn type_name(&self) -> &str {
"application"
}
fn properties(&self) -> HashMap<String, serde_json::Value> {
self.config.properties.clone()
}
fn auto_start(&self) -> bool {
self.base.get_auto_start()
}
async fn start(&self) -> Result<()> {
info!("Starting ApplicationSource '{}'", self.base.id);
self.base
.set_status_with_event(
ComponentStatus::Starting,
Some("Starting application source".to_string()),
)
.await?;
self.process_events().await?;
Ok(())
}
async fn stop(&self) -> Result<()> {
info!("Stopping ApplicationSource '{}'", self.base.id);
self.base
.set_status_with_event(
ComponentStatus::Stopping,
Some("Stopping application source".to_string()),
)
.await?;
if let Some(handle) = self.base.task_handle.write().await.take() {
handle.abort();
}
self.base
.set_status_with_event(
ComponentStatus::Stopped,
Some("Application source stopped".to_string()),
)
.await?;
Ok(())
}
async fn status(&self) -> ComponentStatus {
self.base.status.read().await.clone()
}
async fn subscribe(
&self,
settings: drasi_lib::config::SourceSubscriptionSettings,
) -> Result<SubscriptionResponse> {
self.base
.subscribe_with_bootstrap(&settings, "Application")
.await
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
self.base.initialize(context).await;
}
async fn set_bootstrap_provider(
&self,
provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
) {
self.base.set_bootstrap_provider(provider).await;
}
}