drasi_core/middleware/
mod.rs1use std::{collections::HashMap, sync::Arc};
16
17use crate::{
18 interface::{
19 ElementIndex, MiddlewareError, MiddlewareSetupError, SourceMiddleware,
20 SourceMiddlewareFactory,
21 },
22 models::{SourceChange, SourceMiddlewareConfig},
23};
24
25pub struct MiddlewareTypeRegistry {
26 source_middleware_types: HashMap<String, Arc<dyn SourceMiddlewareFactory>>,
27}
28
29impl Default for MiddlewareTypeRegistry {
30 fn default() -> Self {
31 Self::new()
32 }
33}
34
35impl MiddlewareTypeRegistry {
36 pub fn new() -> Self {
37 MiddlewareTypeRegistry {
38 source_middleware_types: HashMap::new(),
39 }
40 }
41
42 pub fn register(&mut self, factory: Arc<dyn SourceMiddlewareFactory>) {
43 self.source_middleware_types.insert(factory.name(), factory);
44 }
45
46 pub fn get(&self, name: &str) -> Option<Arc<dyn SourceMiddlewareFactory>> {
47 self.source_middleware_types.get(name).cloned()
48 }
49}
50
51pub struct MiddlewareContainer {
52 source_instances: HashMap<Arc<str>, Arc<dyn SourceMiddleware>>,
53}
54
55impl MiddlewareContainer {
56 pub fn new(
57 registry: &MiddlewareTypeRegistry,
58 source_configs: Vec<Arc<SourceMiddlewareConfig>>,
59 ) -> Result<Self, MiddlewareSetupError> {
60 let mut source_instances = HashMap::new();
61 for config in source_configs {
62 let factory =
63 registry
64 .get(&config.kind)
65 .ok_or(MiddlewareSetupError::InvalidConfiguration(format!(
66 "Unknown middleware kind: {}",
67 config.kind
68 )))?;
69 let instance = factory.create(&config)?;
70 source_instances.insert(config.name.clone(), instance);
71 }
72
73 Ok(MiddlewareContainer { source_instances })
74 }
75
76 pub fn get(&self, name: &str) -> Option<Arc<dyn SourceMiddleware>> {
77 self.source_instances.get(name).cloned()
78 }
79}
80
81pub struct SourceMiddlewarePipeline {
82 pipeline: Vec<Arc<dyn SourceMiddleware>>,
83}
84
85impl SourceMiddlewarePipeline {
86 pub fn new(
87 container: &MiddlewareContainer,
88 pipeline_keys: Vec<Arc<str>>,
89 ) -> Result<Self, MiddlewareSetupError> {
90 let pipeline = pipeline_keys
91 .iter()
92 .map(|name| {
93 container
94 .get(name)
95 .ok_or(MiddlewareSetupError::InvalidConfiguration(format!(
96 "Unknown middleware: {name}"
97 )))
98 })
99 .collect::<Result<Vec<Arc<dyn SourceMiddleware>>, MiddlewareSetupError>>()?;
100
101 Ok(SourceMiddlewarePipeline { pipeline })
102 }
103
104 pub async fn process(
105 &self,
106 source_change: SourceChange,
107 element_index: Arc<dyn ElementIndex>,
108 ) -> Result<Vec<SourceChange>, MiddlewareError> {
109 let mut source_changes = vec![source_change];
110
111 for middleware in &self.pipeline {
112 let mut new_source_changes = Vec::new();
113 for source_change in source_changes {
114 new_source_changes.append(
115 &mut middleware
116 .process(source_change, element_index.as_ref())
117 .await?,
118 );
119 }
120
121 source_changes = new_source_changes;
122 }
123
124 Ok(source_changes)
125 }
126}
127
128pub struct SourceMiddlewarePipelineCollection {
129 items: HashMap<Arc<str>, Arc<SourceMiddlewarePipeline>>,
130}
131
132impl Default for SourceMiddlewarePipelineCollection {
133 fn default() -> Self {
134 Self::new()
135 }
136}
137
138impl SourceMiddlewarePipelineCollection {
139 pub fn new() -> Self {
140 SourceMiddlewarePipelineCollection {
141 items: HashMap::new(),
142 }
143 }
144
145 pub fn insert(&mut self, source_id: Arc<str>, pipeline: SourceMiddlewarePipeline) {
146 self.items.insert(source_id, Arc::new(pipeline));
147 }
148
149 pub fn get(&self, source_id: Arc<str>) -> Option<Arc<SourceMiddlewarePipeline>> {
150 self.items.get(&source_id).cloned()
151 }
152}