drasi_core/middleware/
mod.rs

1// Copyright 2024 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{collections::HashMap, sync::Arc};
16
17use crate::{
18    interface::{
19        ElementIndex, MiddlewareError, MiddlewareSetupError, SourceMiddleware,
20        SourceMiddlewareFactory,
21    },
22    models::{SourceChange, SourceMiddlewareConfig},
23};
24
25pub struct MiddlewareTypeRegistry {
26    source_middleware_types: HashMap<String, Arc<dyn SourceMiddlewareFactory>>,
27}
28
29impl Default for MiddlewareTypeRegistry {
30    fn default() -> Self {
31        Self::new()
32    }
33}
34
35impl MiddlewareTypeRegistry {
36    pub fn new() -> Self {
37        MiddlewareTypeRegistry {
38            source_middleware_types: HashMap::new(),
39        }
40    }
41
42    pub fn register(&mut self, factory: Arc<dyn SourceMiddlewareFactory>) {
43        self.source_middleware_types.insert(factory.name(), factory);
44    }
45
46    pub fn get(&self, name: &str) -> Option<Arc<dyn SourceMiddlewareFactory>> {
47        self.source_middleware_types.get(name).cloned()
48    }
49}
50
51pub struct MiddlewareContainer {
52    source_instances: HashMap<Arc<str>, Arc<dyn SourceMiddleware>>,
53}
54
55impl MiddlewareContainer {
56    pub fn new(
57        registry: &MiddlewareTypeRegistry,
58        source_configs: Vec<Arc<SourceMiddlewareConfig>>,
59    ) -> Result<Self, MiddlewareSetupError> {
60        let mut source_instances = HashMap::new();
61        for config in source_configs {
62            let factory =
63                registry
64                    .get(&config.kind)
65                    .ok_or(MiddlewareSetupError::InvalidConfiguration(format!(
66                        "Unknown middleware kind: {}",
67                        config.kind
68                    )))?;
69            let instance = factory.create(&config)?;
70            source_instances.insert(config.name.clone(), instance);
71        }
72
73        Ok(MiddlewareContainer { source_instances })
74    }
75
76    pub fn get(&self, name: &str) -> Option<Arc<dyn SourceMiddleware>> {
77        self.source_instances.get(name).cloned()
78    }
79}
80
81pub struct SourceMiddlewarePipeline {
82    pipeline: Vec<Arc<dyn SourceMiddleware>>,
83}
84
85impl SourceMiddlewarePipeline {
86    pub fn new(
87        container: &MiddlewareContainer,
88        pipeline_keys: Vec<Arc<str>>,
89    ) -> Result<Self, MiddlewareSetupError> {
90        let pipeline = pipeline_keys
91            .iter()
92            .map(|name| {
93                container
94                    .get(name)
95                    .ok_or(MiddlewareSetupError::InvalidConfiguration(format!(
96                        "Unknown middleware: {name}"
97                    )))
98            })
99            .collect::<Result<Vec<Arc<dyn SourceMiddleware>>, MiddlewareSetupError>>()?;
100
101        Ok(SourceMiddlewarePipeline { pipeline })
102    }
103
104    pub async fn process(
105        &self,
106        source_change: SourceChange,
107        element_index: Arc<dyn ElementIndex>,
108    ) -> Result<Vec<SourceChange>, MiddlewareError> {
109        let mut source_changes = vec![source_change];
110
111        for middleware in &self.pipeline {
112            let mut new_source_changes = Vec::new();
113            for source_change in source_changes {
114                new_source_changes.append(
115                    &mut middleware
116                        .process(source_change, element_index.as_ref())
117                        .await?,
118                );
119            }
120
121            source_changes = new_source_changes;
122        }
123
124        Ok(source_changes)
125    }
126}
127
128pub struct SourceMiddlewarePipelineCollection {
129    items: HashMap<Arc<str>, Arc<SourceMiddlewarePipeline>>,
130}
131
132impl Default for SourceMiddlewarePipelineCollection {
133    fn default() -> Self {
134        Self::new()
135    }
136}
137
138impl SourceMiddlewarePipelineCollection {
139    pub fn new() -> Self {
140        SourceMiddlewarePipelineCollection {
141            items: HashMap::new(),
142        }
143    }
144
145    pub fn insert(&mut self, source_id: Arc<str>, pipeline: SourceMiddlewarePipeline) {
146        self.items.insert(source_id, Arc::new(pipeline));
147    }
148
149    pub fn get(&self, source_id: Arc<str>) -> Option<Arc<SourceMiddlewarePipeline>> {
150        self.items.get(&source_id).cloned()
151    }
152}