drasi_core/middleware/
mod.rs1use std::{collections::HashMap, sync::Arc};
16
17use crate::{
18 interface::{MiddlewareError, MiddlewareSetupError, SourceMiddleware, SourceMiddlewareFactory},
19 models::{SourceChange, SourceMiddlewareConfig},
20};
21
22pub struct MiddlewareTypeRegistry {
23 source_middleware_types: HashMap<String, Arc<dyn SourceMiddlewareFactory>>,
24}
25
26impl Default for MiddlewareTypeRegistry {
27 fn default() -> Self {
28 Self::new()
29 }
30}
31
32impl MiddlewareTypeRegistry {
33 pub fn new() -> Self {
34 MiddlewareTypeRegistry {
35 source_middleware_types: HashMap::new(),
36 }
37 }
38
39 pub fn register(&mut self, factory: Arc<dyn SourceMiddlewareFactory>) {
40 self.source_middleware_types.insert(factory.name(), factory);
41 }
42
43 pub fn get(&self, name: &str) -> Option<Arc<dyn SourceMiddlewareFactory>> {
44 self.source_middleware_types.get(name).cloned()
45 }
46}
47
48pub struct MiddlewareContainer {
49 source_instances: HashMap<Arc<str>, Arc<dyn SourceMiddleware>>,
50}
51
52impl MiddlewareContainer {
53 pub fn new(
54 registry: &MiddlewareTypeRegistry,
55 source_configs: Vec<Arc<SourceMiddlewareConfig>>,
56 ) -> Result<Self, MiddlewareSetupError> {
57 let mut source_instances = HashMap::new();
58 for config in source_configs {
59 let factory =
60 registry
61 .get(&config.kind)
62 .ok_or(MiddlewareSetupError::InvalidConfiguration(format!(
63 "Unknown middleware kind: {}",
64 config.kind
65 )))?;
66 let instance = factory.create(&config)?;
67 source_instances.insert(config.name.clone(), instance);
68 }
69
70 Ok(MiddlewareContainer { source_instances })
71 }
72
73 pub fn get(&self, name: &str) -> Option<Arc<dyn SourceMiddleware>> {
74 self.source_instances.get(name).cloned()
75 }
76}
77
78pub struct SourceMiddlewarePipeline {
79 pipeline: Vec<Arc<dyn SourceMiddleware>>,
80}
81
82impl SourceMiddlewarePipeline {
83 pub fn new(
84 container: &MiddlewareContainer,
85 pipeline_keys: Vec<Arc<str>>,
86 ) -> Result<Self, MiddlewareSetupError> {
87 let pipeline = pipeline_keys
88 .iter()
89 .map(|name| {
90 container
91 .get(name)
92 .ok_or(MiddlewareSetupError::InvalidConfiguration(format!(
93 "Unknown middleware: {}",
94 name
95 )))
96 })
97 .collect::<Result<Vec<Arc<dyn SourceMiddleware>>, MiddlewareSetupError>>()?;
98
99 Ok(SourceMiddlewarePipeline { pipeline })
100 }
101
102 pub async fn process(
103 &self,
104 source_change: SourceChange,
105 ) -> Result<Vec<SourceChange>, MiddlewareError> {
106 let mut source_changes = vec![source_change];
107
108 for middleware in &self.pipeline {
109 let mut new_source_changes = Vec::new();
110 for source_change in source_changes {
111 new_source_changes.append(&mut middleware.process(source_change).await?);
112 }
113
114 source_changes = new_source_changes;
115 }
116
117 Ok(source_changes)
118 }
119}
120
121pub struct SourceMiddlewarePipelineCollection {
122 items: HashMap<Arc<str>, Arc<SourceMiddlewarePipeline>>,
123}
124
125impl Default for SourceMiddlewarePipelineCollection {
126 fn default() -> Self {
127 Self::new()
128 }
129}
130
131impl SourceMiddlewarePipelineCollection {
132 pub fn new() -> Self {
133 SourceMiddlewarePipelineCollection {
134 items: HashMap::new(),
135 }
136 }
137
138 pub fn insert(&mut self, source_id: Arc<str>, pipeline: SourceMiddlewarePipeline) {
139 self.items.insert(source_id, Arc::new(pipeline));
140 }
141
142 pub fn get(&self, source_id: Arc<str>) -> Option<Arc<SourceMiddlewarePipeline>> {
143 self.items.get(&source_id).cloned()
144 }
145}