use anyhow::Result;
use log::{info, warn};
use std::sync::Arc;
use tokio::sync::RwLock;
use drasi_core::models::{ElementPropertyMap, ElementValue};
use ordered_float::OrderedFloat;
use serde_json::Value;
use std::collections::BTreeMap;
use crate::channels::*;
use crate::component_graph::{ComponentGraph, ComponentKind, ComponentUpdateSender};
use crate::config::SourceRuntime;
use crate::context::SourceRuntimeContext;
use crate::identity::IdentityProvider;
use crate::managers::{ComponentLogKey, ComponentLogRegistry};
use crate::sources::Source;
use crate::state_store::StateStoreProvider;
pub fn convert_json_to_element_value(value: &Value) -> ElementValue {
match value {
Value::String(s) => ElementValue::String(Arc::from(s.as_str())),
Value::Number(n) => {
if let Some(i) = n.as_i64() {
ElementValue::Integer(i)
} else if let Some(f) = n.as_f64() {
ElementValue::Float(OrderedFloat(f))
} else {
ElementValue::String(Arc::from(n.to_string()))
}
}
Value::Bool(b) => ElementValue::Bool(*b),
Value::Null => ElementValue::Null,
Value::Array(_) | Value::Object(_) => ElementValue::String(Arc::from(value.to_string())),
}
}
pub fn convert_json_to_element_properties(
json_props: &serde_json::Map<String, Value>,
) -> ElementPropertyMap {
let mut properties = BTreeMap::new();
for (key, value) in json_props {
let element_value = convert_json_to_element_value(value);
properties.insert(Arc::from(key.as_str()), element_value);
}
let mut property_map = ElementPropertyMap::new();
for (key, value) in properties {
property_map.insert(&key, value);
}
property_map
}
pub struct SourceManager {
instance_id: String,
state_store: Arc<RwLock<Option<Arc<dyn StateStoreProvider>>>>,
identity_provider: Arc<RwLock<Option<Arc<dyn IdentityProvider>>>>,
log_registry: Arc<ComponentLogRegistry>,
graph: Arc<RwLock<ComponentGraph>>,
update_tx: ComponentUpdateSender,
}
impl SourceManager {
pub fn new(
instance_id: impl Into<String>,
log_registry: Arc<ComponentLogRegistry>,
graph: Arc<RwLock<ComponentGraph>>,
update_tx: ComponentUpdateSender,
) -> Self {
Self {
instance_id: instance_id.into(),
state_store: Arc::new(RwLock::new(None)),
identity_provider: Arc::new(RwLock::new(None)),
log_registry,
graph,
update_tx,
}
}
pub async fn inject_state_store(&self, state_store: Arc<dyn StateStoreProvider>) {
*self.state_store.write().await = Some(state_store);
}
pub async fn inject_identity_provider(&self, identity_provider: Arc<dyn IdentityProvider>) {
*self.identity_provider.write().await = Some(identity_provider);
}
pub async fn get_source_instance(&self, id: &str) -> Option<Arc<dyn Source>> {
let graph = self.graph.read().await;
graph.get_runtime::<Arc<dyn Source>>(id).cloned()
}
pub async fn provision_source(&self, source: impl Source + 'static) -> Result<()> {
let source: Arc<dyn Source> = Arc::new(source);
let source_id = source.id().to_string();
let mut context = SourceRuntimeContext::new(
&self.instance_id,
&source_id,
self.state_store.read().await.clone(),
self.update_tx.clone(),
None,
);
context.identity_provider = self.identity_provider.read().await.clone();
source.initialize(context).await;
{
let mut graph = self.graph.write().await;
graph.set_runtime(&source_id, Box::new(source))?;
}
info!("Provisioned source: {source_id}");
Ok(())
}
pub async fn start_source(&self, id: String) -> Result<()> {
let source =
crate::managers::lifecycle_helpers::get_runtime::<Arc<dyn Source>>(&self.graph, &id)
.await
.ok_or_else(|| {
anyhow::Error::new(crate::managers::ComponentNotFoundError::new("source", &id))
})?;
crate::managers::lifecycle_helpers::start_component(&self.graph, &id, "source", &source)
.await
}
pub async fn stop_source(&self, id: String) -> Result<()> {
let source =
crate::managers::lifecycle_helpers::get_runtime::<Arc<dyn Source>>(&self.graph, &id)
.await
.ok_or_else(|| {
anyhow::Error::new(crate::managers::ComponentNotFoundError::new("source", &id))
})?;
crate::managers::lifecycle_helpers::stop_component(&self.graph, &id, "source", &source)
.await
}
pub async fn get_source_status(&self, id: String) -> Result<ComponentStatus> {
crate::managers::lifecycle_helpers::get_component_status(&self.graph, &id, "Source").await
}
pub async fn list_sources(&self) -> Vec<(String, ComponentStatus)> {
crate::managers::lifecycle_helpers::list_components(&self.graph, &ComponentKind::Source)
.await
}
pub async fn get_source(&self, id: String) -> Result<SourceRuntime> {
let graph = self.graph.read().await;
let source = graph.get_runtime::<Arc<dyn Source>>(&id).cloned();
if let Some(source) = source {
let status = graph
.get_component(&id)
.map(|n| n.status)
.unwrap_or(ComponentStatus::Stopped);
let error_message = match &status {
ComponentStatus::Error => graph.get_last_error(&id),
_ => None,
};
drop(graph);
let runtime = SourceRuntime {
id: source.id().to_string(),
source_type: source.type_name().to_string(),
status,
error_message,
properties: source.properties(),
};
Ok(runtime)
} else {
Err(crate::managers::ComponentNotFoundError::new("source", &id).into())
}
}
pub async fn teardown_source(&self, id: String, cleanup: bool) -> Result<()> {
crate::managers::lifecycle_helpers::teardown_component::<Arc<dyn Source>, _, _>(
&self.graph,
&id,
"source",
ComponentType::Source,
&self.instance_id,
&self.log_registry,
cleanup,
|| async {},
)
.await
}
pub async fn update_source(&self, id: String, new_source: impl Source + 'static) -> Result<()> {
let old_source = {
let graph = self.graph.read().await;
graph.get_runtime::<Arc<dyn Source>>(&id).cloned()
};
if let Some(old_source) = old_source {
if new_source.id() != id {
return Err(anyhow::anyhow!(
"New source ID '{}' does not match existing source ID '{}'",
new_source.id(),
id
));
}
let graph = &self.graph;
let instance_id = &self.instance_id;
let state_store = &self.state_store;
let update_tx = &self.update_tx;
crate::managers::lifecycle_helpers::reconfigure_component::<Arc<dyn Source>, _, _, _>(
graph,
&id,
"source",
&old_source,
|| async {},
|| async {
let new_source: Arc<dyn Source> = Arc::new(new_source);
let context = SourceRuntimeContext::new(
instance_id,
&id,
state_store.read().await.clone(),
update_tx.clone(),
None,
);
new_source.initialize(context).await;
let mut g = graph.write().await;
if !g.has_runtime(&id) {
return Err(anyhow::anyhow!(
"Source '{id}' was concurrently deleted during reconfiguration"
));
}
g.set_runtime(&id, Box::new(new_source))?;
Ok(())
},
|| self.start_source(id.clone()),
)
.await
} else {
Err(crate::managers::ComponentNotFoundError::new("source", &id).into())
}
}
pub async fn start_all(&self) -> Result<()> {
crate::managers::lifecycle_helpers::start_all_components::<Arc<dyn Source>, _, _>(
&self.graph,
&ComponentKind::Source,
"source",
|s| s.auto_start(),
|id, source| async move {
{
let mut graph = self.graph.write().await;
graph.validate_and_transition(
&id,
ComponentStatus::Starting,
Some("Starting source".to_string()),
)?;
}
if let Err(e) = source.start().await {
let mut graph = self.graph.write().await;
let _ = graph.validate_and_transition(
&id,
ComponentStatus::Error,
Some(format!("Start failed: {e}")),
);
return Err(e);
}
Ok(())
},
)
.await
}
pub async fn stop_all(&self) -> Result<()> {
crate::managers::lifecycle_helpers::stop_all_components(
&self.graph,
&ComponentKind::Source,
"Source",
|id| self.stop_source(id),
)
.await
}
pub async fn record_event(&self, event: ComponentEvent) {
let mut graph = self.graph.write().await;
graph.record_event(event);
}
pub async fn get_source_events(&self, id: &str) -> Vec<ComponentEvent> {
self.graph.read().await.get_events(id)
}
pub async fn get_all_events(&self) -> Vec<ComponentEvent> {
let graph = self.graph.read().await;
graph
.get_all_events()
.into_iter()
.filter(|e| e.component_type == ComponentType::Source)
.collect()
}
pub async fn subscribe_logs(
&self,
id: &str,
) -> Option<(
Vec<crate::managers::LogMessage>,
tokio::sync::broadcast::Receiver<crate::managers::LogMessage>,
)> {
{
let graph = self.graph.read().await;
if !graph.has_runtime(id) {
return None;
}
}
let log_key = ComponentLogKey::new(&self.instance_id, ComponentType::Source, id);
Some(self.log_registry.subscribe_by_key(&log_key).await)
}
pub async fn subscribe_events(
&self,
id: &str,
) -> Option<(
Vec<ComponentEvent>,
tokio::sync::broadcast::Receiver<ComponentEvent>,
)> {
let graph = self.graph.read().await;
if !graph.has_runtime(id) {
return None;
}
graph.subscribe_events(id)
}
}