actrpc-orchestrator 0.1.0

Configuration and orchestration utilities for ActRPC.
Documentation
use crate::{
    action::{ActionRegistry, build_builtin_action_registry},
    error::{ActionError, InterceptorError, MethodCallError, OrchestratorError},
    interceptor::InterceptorCatalogEntry,
    method::{MethodName, ProviderName},
    runtime::{CallExecutionFactory, CallRuntime, PhaseRuntime},
};
use actrpc_core::{
    action::{RequestedActionRecord, ResolvedActionRecord},
    interception::{InterceptionPhase, InterceptionRequest},
    json_rpc::{
        JsonRpcErrorResponse, JsonRpcMessage, JsonRpcResponse, JsonRpcSingleMessage, JsonRpcVersion,
    },
    participant::{Participant, ParticipantType},
};
use std::sync::Arc;

pub struct CallExecution {
    factory: Arc<CallExecutionFactory>,
    call: Arc<CallRuntime>,
    provider: ProviderName,
    method: MethodName,
}

impl CallExecution {
    pub fn new(
        factory: Arc<CallExecutionFactory>,
        call: Arc<CallRuntime>,
        provider: ProviderName,
        method: MethodName,
    ) -> Self {
        Self {
            factory,
            call,
            provider,
            method,
        }
    }

    pub async fn run(&self) -> Result<JsonRpcMessage, OrchestratorError> {
        let resources = self.factory.resources();

        let outbound = PhaseRuntime::new(
            InterceptionPhase::Outbound,
            self.call.clone(),
            resources.interceptor_catalog.outbound_pipeline_snapshot(),
        );

        let outbound_actions =
            build_builtin_action_registry(self.factory.clone(), resources, &outbound)?;

        self.run_interceptor_phase(&outbound, &outbound_actions)
            .await?;

        if self.call.rejection.is_rejected() {
            return self.rejection_response();
        }

        let outbound_message = self.snapshot_message()?;

        let downstream_response = resources
            .method_catalog
            .send_message(&self.provider, &self.method, outbound_message)
            .await
            .map_err(map_method_call_error)?;

        if !self
            .call
            .in_flight_message
            .replace_message(downstream_response)
        {
            return Err(OrchestratorError::Internal {
                message: "failed to replace in-flight message after downstream call".to_owned(),
            });
        }

        let inbound = PhaseRuntime::new(
            InterceptionPhase::Inbound,
            self.call.clone(),
            resources.interceptor_catalog.inbound_pipeline_snapshot(),
        );

        let inbound_actions =
            build_builtin_action_registry(self.factory.clone(), resources, &inbound)?;

        self.run_interceptor_phase(&inbound, &inbound_actions)
            .await?;

        if self.call.rejection.is_rejected() {
            return self.rejection_response();
        }

        self.snapshot_message()
    }

    async fn run_interceptor_phase(
        &self,
        phase: &PhaseRuntime,
        action_registry: &ActionRegistry,
    ) -> Result<(), OrchestratorError> {
        let resources = self.factory.resources();

        for interceptor_name in phase.pipeline.snapshot() {
            if !phase.pipeline.contains(&interceptor_name) {
                continue;
            }

            let entry = resources
                .interceptor_catalog
                .get_entry(&interceptor_name)
                .map_err(|source| OrchestratorError::Internal {
                    message: source.to_string(),
                })?;

            let mut resolved_action_history: Vec<Vec<ResolvedActionRecord>> = Vec::new();

            loop {
                let request = self.build_interception_request(&resolved_action_history)?;

                let response = entry
                    .interceptor
                    .intercept(&request)
                    .await
                    .map_err(|source| {
                        OrchestratorError::Interceptor(InterceptorError::InvocationFailed {
                            name: entry.name.clone(),
                            source,
                        })
                    })?;

                let should_reinvoke = response.should_reinvoke();

                self.validate_policy(phase.phase, &entry, &response.actions)?;

                let mut round_actions = Vec::new();

                for requested_action in response.actions {
                    let action_kind = requested_action.kind.clone();

                    let handler = action_registry.get(&action_kind).ok_or_else(|| {
                        OrchestratorError::Action(ActionError::HandlerNotFound {
                            action: action_kind.clone(),
                        })
                    })?;

                    let resolved =
                        handler
                            .handle(&request, requested_action)
                            .await
                            .map_err(|source| {
                                OrchestratorError::Action(ActionError::HandlerFailed {
                                    interceptor: entry.name.clone(),
                                    action: action_kind,
                                    source,
                                })
                            })?;

                    round_actions.push(resolved);

                    if self.call.rejection.is_rejected() {
                        return Ok(());
                    }
                }

                if !round_actions.is_empty() {
                    resolved_action_history.push(round_actions);
                }

                if !should_reinvoke {
                    break;
                }
            }
        }

        Ok(())
    }

    fn build_interception_request(
        &self,
        resolved_action_history: &[Vec<ResolvedActionRecord>],
    ) -> Result<InterceptionRequest, OrchestratorError> {
        Ok(InterceptionRequest {
            origin: Participant {
                kind: ParticipantType::Orchestrator,
                id: "orchestrator".to_owned(),
            },
            message: self.snapshot_message()?,
            resolved_action_history: resolved_action_history.to_vec(),
        })
    }

    fn validate_policy(
        &self,
        phase: InterceptionPhase,
        entry: &InterceptorCatalogEntry,
        actions: &[RequestedActionRecord],
    ) -> Result<(), OrchestratorError> {
        let conflicts = entry.policy.conflicting_actions(phase, actions);

        let Some(conflict) = conflicts.first() else {
            return Ok(());
        };

        Err(OrchestratorError::Action(ActionError::ForbiddenByPolicy {
            interceptor: entry.name.clone(),
            action: conflict.kind.clone(),
            phase,
        }))
    }

    fn snapshot_message(&self) -> Result<JsonRpcMessage, OrchestratorError> {
        self.call
            .in_flight_message
            .snapshot()
            .ok_or_else(|| OrchestratorError::Internal {
                message: "no in-flight message is currently set".to_owned(),
            })
    }

    fn rejection_response(&self) -> Result<JsonRpcMessage, OrchestratorError> {
        let error = self
            .call
            .rejection
            .snapshot()
            .ok_or_else(|| OrchestratorError::Internal {
                message: "call rejection was set without an error".to_owned(),
            })?;

        let id = match self.snapshot_message()? {
            JsonRpcMessage::Single(JsonRpcSingleMessage::Request(request)) => request.id,

            JsonRpcMessage::Single(JsonRpcSingleMessage::Response(JsonRpcResponse::Success(
                success,
            ))) => success.id,

            JsonRpcMessage::Single(JsonRpcSingleMessage::Response(JsonRpcResponse::Error(
                error_response,
            ))) => error_response.id,

            JsonRpcMessage::Single(JsonRpcSingleMessage::Notification(_)) => {
                return Err(OrchestratorError::Internal {
                    message: "reject_call cannot produce a JSON-RPC response for a notification"
                        .to_owned(),
                });
            }

            JsonRpcMessage::Batch(_) => {
                return Err(OrchestratorError::Internal {
                    message: "reject_call does not support batched JSON-RPC messages yet"
                        .to_owned(),
                });
            }
        };

        Ok(JsonRpcMessage::Single(JsonRpcSingleMessage::Response(
            JsonRpcResponse::Error(JsonRpcErrorResponse {
                jsonrpc: JsonRpcVersion::V2_0,
                id,
                error,
            }),
        )))
    }
}

fn map_method_call_error(error: MethodCallError) -> OrchestratorError {
    OrchestratorError::MethodCall(error)
}