use actrpc_core::{
InterceptorInitialization,
action::{ActionDescriptor, ActionKind},
};
use actrpc_transport::{JsonRpcClient, JsonRpcClientProvider, TransportError};
use crate::{
error::{ActionExecutionError, InterceptorError, OrchestratorError},
interceptor::{
ImmutableInterceptorPipeline, Interceptor, InterceptorConfig, InterceptorPolicy,
JsonRpcBackedInterceptor, WorkingInterceptorPipeline,
initialization::validate_interceptor_registration,
},
};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
#[derive(Clone)]
pub struct InterceptorCatalogEntry {
pub name: String,
pub policy: InterceptorPolicy,
pub interceptor: Arc<dyn Interceptor>,
}
impl core::fmt::Debug for InterceptorCatalogEntry {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("InterceptorCatalogEntry")
.field("name", &self.name)
.field("policy", &self.policy)
.finish_non_exhaustive()
}
}
#[derive(Debug)]
pub struct InterceptorCatalog {
entries: HashMap<String, InterceptorCatalogEntry>,
outbound_pipeline: ImmutableInterceptorPipeline,
inbound_pipeline: ImmutableInterceptorPipeline,
}
impl InterceptorCatalog {
pub fn new(
entries: HashMap<String, InterceptorCatalogEntry>,
outbound_pipeline: ImmutableInterceptorPipeline,
inbound_pipeline: ImmutableInterceptorPipeline,
) -> Self {
Self {
entries,
outbound_pipeline,
inbound_pipeline,
}
}
pub async fn build(
interceptors: Vec<(InterceptorConfig, Arc<dyn Interceptor>)>,
outbound_pipeline: Vec<String>,
inbound_pipeline: Vec<String>,
available_actions: &HashMap<ActionKind, ActionDescriptor>,
) -> Result<Self, OrchestratorError> {
let mut entries = HashMap::new();
let mut initializations = HashMap::new();
for (config, interceptor) in interceptors {
let initialization = interceptor.initialize().await.map_err(|source| {
OrchestratorError::Interceptor(InterceptorError::InitializationFailed {
name: config.name.clone(),
source,
})
})?;
validate_interceptor_registration(
&config.name,
&config.policy,
&initialization,
available_actions,
)?;
if entries.contains_key(&config.name) {
return Err(OrchestratorError::Interceptor(
InterceptorError::DuplicateRegistration { name: config.name },
));
}
initializations.insert(config.name.clone(), initialization);
entries.insert(
config.name.clone(),
InterceptorCatalogEntry {
name: config.name,
policy: config.policy,
interceptor,
},
);
}
validate_pipeline("outbound", &outbound_pipeline, &entries, &initializations)?;
validate_pipeline("inbound", &inbound_pipeline, &entries, &initializations)?;
Ok(Self::new(
entries,
ImmutableInterceptorPipeline::new(outbound_pipeline),
ImmutableInterceptorPipeline::new(inbound_pipeline),
))
}
pub async fn build_from_configs<P>(
configs: Vec<InterceptorConfig>,
outbound_pipeline: Vec<String>,
inbound_pipeline: Vec<String>,
available_actions: &HashMap<ActionKind, ActionDescriptor>,
client_provider: &P,
) -> Result<Self, OrchestratorError>
where
P: JsonRpcClientProvider<
Client = Arc<dyn JsonRpcClient<Error = TransportError>>,
Error = TransportError,
> + Send
+ Sync,
{
let mut interceptors = Vec::with_capacity(configs.len());
for config in configs {
let client = client_provider
.get_client(&config.target)
.await
.map_err(OrchestratorError::Transport)?;
let interceptor: Arc<dyn Interceptor> = Arc::new(JsonRpcBackedInterceptor::new(client));
interceptors.push((config, interceptor));
}
Self::build(
interceptors,
outbound_pipeline,
inbound_pipeline,
available_actions,
)
.await
}
pub fn get_entry(&self, name: &str) -> Result<InterceptorCatalogEntry, ActionExecutionError> {
let Some(entry) = self.entries.get(name) else {
return Err(ActionExecutionError::NotFound {
target: name.to_owned(),
});
};
Ok(entry.clone())
}
pub fn entries(&self) -> Vec<InterceptorCatalogEntry> {
self.entries.values().cloned().collect()
}
pub fn entries_for_names(
&self,
names: &[String],
) -> Result<Vec<InterceptorCatalogEntry>, ActionExecutionError> {
let mut result = Vec::with_capacity(names.len());
for name in names {
let Some(entry) = self.entries.get(name) else {
return Err(ActionExecutionError::NotFound {
target: name.clone(),
});
};
result.push(entry.clone());
}
Ok(result)
}
pub fn outbound_pipeline_snapshot(&self) -> WorkingInterceptorPipeline {
self.outbound_pipeline.snapshot()
}
pub fn inbound_pipeline_snapshot(&self) -> WorkingInterceptorPipeline {
self.inbound_pipeline.snapshot()
}
}
fn validate_pipeline(
phase: &str,
pipeline: &[String],
entries: &HashMap<String, InterceptorCatalogEntry>,
initializations: &HashMap<String, InterceptorInitialization>,
) -> Result<(), OrchestratorError> {
let mut seen = HashSet::new();
for name in pipeline {
if !seen.insert(name.clone()) {
return Err(OrchestratorError::Interceptor(
InterceptorError::DuplicatePipelineEntry {
phase: phase.to_owned(),
name: name.clone(),
},
));
}
let Some(initialization) = initializations.get(name) else {
return Err(OrchestratorError::Interceptor(
InterceptorError::UnknownPipelineInterceptor {
phase: phase.to_owned(),
name: name.clone(),
},
));
};
if !entries.contains_key(name) {
return Err(OrchestratorError::Interceptor(
InterceptorError::UnknownPipelineInterceptor {
phase: phase.to_owned(),
name: name.clone(),
},
));
}
match phase {
"outbound" if !initialization.supports_outbound => {
return Err(OrchestratorError::Interceptor(
InterceptorError::InvalidInitialization {
interceptor: name.clone(),
message: "interceptor is listed in outbound pipeline but does not support outbound"
.to_owned(),
},
));
}
"inbound" if !initialization.supports_inbound => {
return Err(OrchestratorError::Interceptor(
InterceptorError::InvalidInitialization {
interceptor: name.clone(),
message:
"interceptor is listed in inbound pipeline but does not support inbound"
.to_owned(),
},
));
}
_ => {}
}
}
Ok(())
}