use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use super::schema::QueryConfig;
use crate::channels::ComponentStatus;
use crate::identity::IdentityProvider;
use crate::indexes::IndexBackendPlugin;
use crate::indexes::IndexFactory;
use crate::state_store::{MemoryStateStoreProvider, StateStoreProvider};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SourceRuntime {
pub id: String,
pub source_type: String,
pub status: ComponentStatus,
#[serde(skip_serializing_if = "Option::is_none")]
pub error_message: Option<String>,
pub properties: HashMap<String, serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueryRuntime {
pub id: String,
pub query: String,
pub status: ComponentStatus,
#[serde(skip_serializing_if = "Option::is_none")]
pub error_message: Option<String>,
pub source_subscriptions: Vec<super::schema::SourceSubscriptionConfig>,
#[serde(skip_serializing_if = "Option::is_none")]
pub joins: Option<Vec<super::schema::QueryJoinConfig>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReactionRuntime {
pub id: String,
pub reaction_type: String,
pub status: ComponentStatus,
#[serde(skip_serializing_if = "Option::is_none")]
pub error_message: Option<String>,
pub queries: Vec<String>,
pub properties: HashMap<String, serde_json::Value>,
}
impl From<QueryConfig> for QueryRuntime {
fn from(config: QueryConfig) -> Self {
Self {
id: config.id,
query: config.query,
status: ComponentStatus::Stopped,
error_message: None,
source_subscriptions: config.sources,
joins: config.joins,
}
}
}
#[derive(Clone)]
pub struct RuntimeConfig {
pub id: String,
pub index_factory: Arc<IndexFactory>,
pub state_store_provider: Arc<dyn StateStoreProvider>,
pub identity_provider: Option<Arc<dyn IdentityProvider>>,
pub queries: Vec<QueryConfig>,
pub global_priority_queue_capacity: Option<usize>,
pub global_dispatch_buffer_capacity: Option<usize>,
pub storage_backends: Vec<crate::indexes::StorageBackendConfig>,
}
impl std::fmt::Debug for RuntimeConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RuntimeConfig")
.field("id", &self.id)
.field("index_factory", &self.index_factory)
.field("state_store_provider", &"<dyn StateStoreProvider>")
.field(
"identity_provider",
&self
.identity_provider
.as_ref()
.map(|_| "<dyn IdentityProvider>"),
)
.field("queries", &self.queries)
.field(
"global_priority_queue_capacity",
&self.global_priority_queue_capacity,
)
.field(
"global_dispatch_buffer_capacity",
&self.global_dispatch_buffer_capacity,
)
.field("storage_backends", &self.storage_backends)
.finish()
}
}
impl RuntimeConfig {
pub fn new(
config: super::schema::DrasiLibConfig,
index_provider: Option<Arc<dyn IndexBackendPlugin>>,
state_store_provider: Option<Arc<dyn StateStoreProvider>>,
identity_provider: Option<Arc<dyn IdentityProvider>>,
) -> Self {
let global_priority_queue_capacity = config.priority_queue_capacity;
let global_dispatch_buffer_capacity = config.dispatch_buffer_capacity;
let global_priority_queue = global_priority_queue_capacity.unwrap_or(10000);
let global_dispatch_capacity = global_dispatch_buffer_capacity.unwrap_or(1000);
let storage_backends = config.storage_backends.clone();
let index_factory = Arc::new(IndexFactory::new(config.storage_backends, index_provider));
let state_store_provider: Arc<dyn StateStoreProvider> =
state_store_provider.unwrap_or_else(|| Arc::new(MemoryStateStoreProvider::new()));
let queries = config
.queries
.into_iter()
.map(|mut q| {
if q.priority_queue_capacity.is_none() {
q.priority_queue_capacity = Some(global_priority_queue);
}
if q.dispatch_buffer_capacity.is_none() {
q.dispatch_buffer_capacity = Some(global_dispatch_capacity);
}
q
})
.collect();
Self {
id: config.id,
index_factory,
state_store_provider,
identity_provider,
queries,
global_priority_queue_capacity,
global_dispatch_buffer_capacity,
storage_backends,
}
}
}
impl From<super::schema::DrasiLibConfig> for RuntimeConfig {
fn from(config: super::schema::DrasiLibConfig) -> Self {
Self::new(config, None, None, None)
}
}