drasi_core/middleware/
mod.rs

1// Copyright 2024 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{collections::HashMap, sync::Arc};
16
17use crate::{
18    interface::{MiddlewareError, MiddlewareSetupError, SourceMiddleware, SourceMiddlewareFactory},
19    models::{SourceChange, SourceMiddlewareConfig},
20};
21
22pub struct MiddlewareTypeRegistry {
23    source_middleware_types: HashMap<String, Arc<dyn SourceMiddlewareFactory>>,
24}
25
26impl Default for MiddlewareTypeRegistry {
27    fn default() -> Self {
28        Self::new()
29    }
30}
31
32impl MiddlewareTypeRegistry {
33    pub fn new() -> Self {
34        MiddlewareTypeRegistry {
35            source_middleware_types: HashMap::new(),
36        }
37    }
38
39    pub fn register(&mut self, factory: Arc<dyn SourceMiddlewareFactory>) {
40        self.source_middleware_types.insert(factory.name(), factory);
41    }
42
43    pub fn get(&self, name: &str) -> Option<Arc<dyn SourceMiddlewareFactory>> {
44        self.source_middleware_types.get(name).cloned()
45    }
46}
47
48pub struct MiddlewareContainer {
49    source_instances: HashMap<Arc<str>, Arc<dyn SourceMiddleware>>,
50}
51
52impl MiddlewareContainer {
53    pub fn new(
54        registry: &MiddlewareTypeRegistry,
55        source_configs: Vec<Arc<SourceMiddlewareConfig>>,
56    ) -> Result<Self, MiddlewareSetupError> {
57        let mut source_instances = HashMap::new();
58        for config in source_configs {
59            let factory =
60                registry
61                    .get(&config.kind)
62                    .ok_or(MiddlewareSetupError::InvalidConfiguration(format!(
63                        "Unknown middleware kind: {}",
64                        config.kind
65                    )))?;
66            let instance = factory.create(&config)?;
67            source_instances.insert(config.name.clone(), instance);
68        }
69
70        Ok(MiddlewareContainer { source_instances })
71    }
72
73    pub fn get(&self, name: &str) -> Option<Arc<dyn SourceMiddleware>> {
74        self.source_instances.get(name).cloned()
75    }
76}
77
78pub struct SourceMiddlewarePipeline {
79    pipeline: Vec<Arc<dyn SourceMiddleware>>,
80}
81
82impl SourceMiddlewarePipeline {
83    pub fn new(
84        container: &MiddlewareContainer,
85        pipeline_keys: Vec<Arc<str>>,
86    ) -> Result<Self, MiddlewareSetupError> {
87        let pipeline = pipeline_keys
88            .iter()
89            .map(|name| {
90                container
91                    .get(name)
92                    .ok_or(MiddlewareSetupError::InvalidConfiguration(format!(
93                        "Unknown middleware: {}",
94                        name
95                    )))
96            })
97            .collect::<Result<Vec<Arc<dyn SourceMiddleware>>, MiddlewareSetupError>>()?;
98
99        Ok(SourceMiddlewarePipeline { pipeline })
100    }
101
102    pub async fn process(
103        &self,
104        source_change: SourceChange,
105    ) -> Result<Vec<SourceChange>, MiddlewareError> {
106        let mut source_changes = vec![source_change];
107
108        for middleware in &self.pipeline {
109            let mut new_source_changes = Vec::new();
110            for source_change in source_changes {
111                new_source_changes.append(&mut middleware.process(source_change).await?);
112            }
113
114            source_changes = new_source_changes;
115        }
116
117        Ok(source_changes)
118    }
119}
120
121pub struct SourceMiddlewarePipelineCollection {
122    items: HashMap<Arc<str>, Arc<SourceMiddlewarePipeline>>,
123}
124
125impl Default for SourceMiddlewarePipelineCollection {
126    fn default() -> Self {
127        Self::new()
128    }
129}
130
131impl SourceMiddlewarePipelineCollection {
132    pub fn new() -> Self {
133        SourceMiddlewarePipelineCollection {
134            items: HashMap::new(),
135        }
136    }
137
138    pub fn insert(&mut self, source_id: Arc<str>, pipeline: SourceMiddlewarePipeline) {
139        self.items.insert(source_id, Arc::new(pipeline));
140    }
141
142    pub fn get(&self, source_id: Arc<str>) -> Option<Arc<SourceMiddlewarePipeline>> {
143        self.items.get(&source_id).cloned()
144    }
145}