use std::collections::HashMap;
use std::sync::Arc;
use parking_lot::RwLock;
use crate::computation_graph::stream_backend::{
StreamBackendFactory, StreamBackendFuture, StreamConfig,
};
use crate::computation_graph::triggerless::TriggerlessGraphRegistration;
use crate::task::{Task, TaskNamespace};
use crate::trigger::Trigger;
use crate::workflow::Workflow;
use cloacina_computation_graph::{
ComputationGraphConstructor, ComputationGraphRegistration, ReactorConstructor,
ReactorRegistration,
};
pub(crate) type TriggerlessGraphConstructor =
Box<dyn Fn() -> TriggerlessGraphRegistration + Send + Sync>;
pub(crate) type TaskConstructorFn = Box<dyn Fn() -> Arc<dyn Task> + Send + Sync>;
pub(crate) type WorkflowConstructorFn = Box<dyn Fn() -> Workflow + Send + Sync>;
pub(crate) type TriggerConstructorFn = Box<dyn Fn() -> Arc<dyn Trigger> + Send + Sync>;
#[derive(Clone)]
pub struct Runtime {
inner: Arc<RuntimeInner>,
}
struct RuntimeInner {
tasks: RwLock<HashMap<TaskNamespace, TaskConstructorFn>>,
workflows: RwLock<HashMap<String, WorkflowConstructorFn>>,
triggers: RwLock<HashMap<String, TriggerConstructorFn>>,
computation_graphs: RwLock<HashMap<String, ComputationGraphConstructor>>,
triggerless_graphs: RwLock<HashMap<String, TriggerlessGraphConstructor>>,
reactors: RwLock<HashMap<String, ReactorConstructor>>,
stream_backends: RwLock<HashMap<String, StreamBackendFactory>>,
}
impl Runtime {
pub fn new() -> Self {
let rt = Self::empty();
rt.seed_from_inventory();
rt
}
pub fn empty() -> Self {
Self {
inner: Arc::new(RuntimeInner {
tasks: RwLock::new(HashMap::new()),
workflows: RwLock::new(HashMap::new()),
triggers: RwLock::new(HashMap::new()),
computation_graphs: RwLock::new(HashMap::new()),
triggerless_graphs: RwLock::new(HashMap::new()),
reactors: RwLock::new(HashMap::new()),
stream_backends: RwLock::new(HashMap::new()),
}),
}
}
pub fn seed_from_inventory(&self) {
use crate::inventory_entries::{
ComputationGraphEntry, ReactorEntry, StreamBackendEntry, TaskEntry, TriggerEntry,
TriggerlessGraphEntry, WorkflowEntry,
};
for entry in inventory::iter::<TaskEntry> {
let ns = (entry.namespace)();
let ctor = entry.constructor;
self.register_task(ns, move || ctor());
}
for entry in inventory::iter::<WorkflowEntry> {
self.register_workflow(entry.name.to_string(), entry.constructor);
}
for entry in inventory::iter::<TriggerEntry> {
self.register_trigger(entry.name.to_string(), entry.constructor);
}
for entry in inventory::iter::<ComputationGraphEntry> {
self.register_computation_graph(entry.name.to_string(), entry.constructor);
}
for entry in inventory::iter::<TriggerlessGraphEntry> {
self.register_triggerless_graph(entry.name.to_string(), entry.constructor);
}
for entry in inventory::iter::<ReactorEntry> {
self.register_reactor(entry.name.to_string(), entry.constructor);
}
for entry in inventory::iter::<StreamBackendEntry> {
let factory = entry.factory;
self.register_stream_backend(
entry.type_name.to_string(),
Box::new(move |config| factory(config)),
);
}
}
pub fn register_task<F>(&self, namespace: TaskNamespace, constructor: F)
where
F: Fn() -> Arc<dyn Task> + Send + Sync + 'static,
{
self.inner
.tasks
.write()
.insert(namespace, Box::new(constructor));
}
pub fn unregister_task(&self, namespace: &TaskNamespace) -> bool {
self.inner.tasks.write().remove(namespace).is_some()
}
pub fn get_task(&self, namespace: &TaskNamespace) -> Option<Arc<dyn Task>> {
self.inner.tasks.read().get(namespace).map(|ctor| ctor())
}
#[cfg(test)]
pub(crate) fn has_task(&self, namespace: &TaskNamespace) -> bool {
self.inner.tasks.read().contains_key(namespace)
}
pub fn task_namespaces(&self) -> Vec<TaskNamespace> {
self.inner.tasks.read().keys().cloned().collect()
}
pub fn register_workflow<F>(&self, name: String, constructor: F)
where
F: Fn() -> Workflow + Send + Sync + 'static,
{
self.inner
.workflows
.write()
.insert(name, Box::new(constructor));
}
pub fn unregister_workflow(&self, name: &str) -> bool {
self.inner.workflows.write().remove(name).is_some()
}
pub fn get_workflow(&self, name: &str) -> Option<Workflow> {
self.inner.workflows.read().get(name).map(|ctor| ctor())
}
pub fn workflow_names(&self) -> Vec<String> {
self.inner.workflows.read().keys().cloned().collect()
}
pub fn register_trigger<F>(&self, name: String, constructor: F)
where
F: Fn() -> Arc<dyn Trigger> + Send + Sync + 'static,
{
self.inner
.triggers
.write()
.insert(name, Box::new(constructor));
}
pub fn unregister_trigger(&self, name: &str) -> bool {
self.inner.triggers.write().remove(name).is_some()
}
pub fn get_trigger(&self, name: &str) -> Option<Arc<dyn Trigger>> {
self.inner.triggers.read().get(name).map(|ctor| ctor())
}
pub fn trigger_names(&self) -> Vec<String> {
self.inner.triggers.read().keys().cloned().collect()
}
pub fn register_computation_graph<F>(&self, name: String, constructor: F)
where
F: Fn() -> ComputationGraphRegistration + Send + Sync + 'static,
{
self.inner
.computation_graphs
.write()
.insert(name, Box::new(constructor));
}
pub fn unregister_computation_graph(&self, name: &str) -> bool {
self.inner.computation_graphs.write().remove(name).is_some()
}
pub fn get_computation_graph(&self, name: &str) -> Option<ComputationGraphRegistration> {
self.inner
.computation_graphs
.read()
.get(name)
.map(|ctor| ctor())
}
pub fn computation_graph_names(&self) -> Vec<String> {
self.inner
.computation_graphs
.read()
.keys()
.cloned()
.collect()
}
pub fn register_triggerless_graph<F>(&self, name: String, constructor: F)
where
F: Fn() -> TriggerlessGraphRegistration + Send + Sync + 'static,
{
self.inner
.triggerless_graphs
.write()
.insert(name, Box::new(constructor));
}
pub fn unregister_triggerless_graph(&self, name: &str) -> bool {
self.inner.triggerless_graphs.write().remove(name).is_some()
}
pub fn get_triggerless_graph(&self, name: &str) -> Option<TriggerlessGraphRegistration> {
self.inner
.triggerless_graphs
.read()
.get(name)
.map(|ctor| ctor())
}
pub fn triggerless_graph_names(&self) -> Vec<String> {
self.inner
.triggerless_graphs
.read()
.keys()
.cloned()
.collect()
}
pub fn register_reactor<F>(&self, name: String, constructor: F)
where
F: Fn() -> ReactorRegistration + Send + Sync + 'static,
{
self.inner
.reactors
.write()
.insert(name, Box::new(constructor));
}
pub fn unregister_reactor(&self, name: &str) -> bool {
self.inner.reactors.write().remove(name).is_some()
}
pub fn get_reactor(&self, name: &str) -> Option<ReactorRegistration> {
self.inner.reactors.read().get(name).map(|ctor| ctor())
}
pub fn reactor_names(&self) -> Vec<String> {
self.inner.reactors.read().keys().cloned().collect()
}
pub fn register_stream_backend(&self, type_name: String, factory: StreamBackendFactory) {
self.inner
.stream_backends
.write()
.insert(type_name, factory);
}
pub fn unregister_stream_backend(&self, type_name: &str) -> bool {
self.inner
.stream_backends
.write()
.remove(type_name)
.is_some()
}
#[cfg(test)]
pub(crate) fn has_stream_backend(&self, type_name: &str) -> bool {
self.inner.stream_backends.read().contains_key(type_name)
}
pub fn create_stream_backend(
&self,
type_name: &str,
config: StreamConfig,
) -> Option<StreamBackendFuture> {
let guard = self.inner.stream_backends.read();
let factory = guard.get(type_name)?;
Some(factory(config))
}
#[cfg(test)]
pub(crate) fn stream_backend_names(&self) -> Vec<String> {
self.inner.stream_backends.read().keys().cloned().collect()
}
}
impl Default for Runtime {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Debug for Runtime {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let tasks = self.inner.tasks.read().len();
let workflows = self.inner.workflows.read().len();
let triggers = self.inner.triggers.read().len();
let cgs = self.inner.computation_graphs.read().len();
let sbs = self.inner.stream_backends.read().len();
f.debug_struct("Runtime")
.field("tasks", &tasks)
.field("workflows", &workflows)
.field("triggers", &triggers)
.field("computation_graphs", &cgs)
.field("stream_backends", &sbs)
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::task::TaskNamespace;
#[test]
fn register_and_unregister_workflow() {
let rt = Runtime::empty();
assert!(!rt.unregister_workflow("nope"));
let wf = crate::workflow::Workflow::new("unit-test-wf");
rt.register_workflow("unit-test-wf".to_string(), move || wf.clone());
assert!(rt.get_workflow("unit-test-wf").is_some());
assert_eq!(rt.workflow_names(), vec!["unit-test-wf".to_string()]);
assert!(rt.unregister_workflow("unit-test-wf"));
assert!(rt.get_workflow("unit-test-wf").is_none());
assert!(rt.workflow_names().is_empty());
}
#[test]
fn register_and_unregister_trigger_by_name() {
let rt = Runtime::empty();
assert!(!rt.unregister_trigger("missing"));
assert!(rt.get_trigger("missing").is_none());
assert!(rt.trigger_names().is_empty());
}
#[test]
fn register_and_unregister_task() {
let rt = Runtime::empty();
let ns = TaskNamespace::new("t", "p", "w", "task_a");
assert!(!rt.unregister_task(&ns));
assert!(!rt.has_task(&ns));
}
#[test]
fn stream_backend_roundtrip_names_only() {
let rt = Runtime::empty();
assert!(!rt.has_stream_backend("mock"));
assert!(rt.stream_backend_names().is_empty());
assert!(!rt.unregister_stream_backend("mock"));
}
#[test]
fn runtimes_are_independent() {
let rt1 = Runtime::empty();
let rt2 = Runtime::empty();
let wf = crate::workflow::Workflow::new("iso");
rt1.register_workflow("iso".to_string(), move || wf.clone());
assert!(rt1.get_workflow("iso").is_some());
assert!(rt2.get_workflow("iso").is_none());
}
#[test]
fn debug_format_reports_sizes() {
let rt = Runtime::empty();
let debug = format!("{:?}", rt);
assert!(debug.contains("computation_graphs: 0"));
assert!(debug.contains("stream_backends: 0"));
}
}