pub mod adapters;
pub mod base;
pub mod circuit_breaker;
pub mod context_namespace;
pub mod logging;
pub mod manager;
pub mod otel_tracing;
pub mod retry;
pub mod yaml_config;
pub use adapters::{AfterAdapter, AfterMiddleware, BeforeAdapter, BeforeMiddleware};
pub use base::{Middleware, OnErrorOutcome, RetrySignal};
pub use circuit_breaker::{
CircuitBreakerBuilder, CircuitBreakerConfig, CircuitBreakerMiddleware, CircuitBreakerState,
DEFAULT_MIN_SAMPLES as CB_DEFAULT_MIN_SAMPLES,
DEFAULT_OPEN_THRESHOLD as CB_DEFAULT_OPEN_THRESHOLD,
DEFAULT_RECOVERY_WINDOW_MS as CB_DEFAULT_RECOVERY_WINDOW_MS,
DEFAULT_WINDOW_SIZE as CB_DEFAULT_WINDOW_SIZE,
};
pub use context_namespace::{
enforce_context_key, namespace_keys, validate_context_key, ContextWriter, NamespaceCheck,
APCORE_KEY_PREFIX, EXT_KEY_PREFIX,
};
pub use logging::LoggingMiddleware;
pub use manager::MiddlewareManager;
pub use otel_tracing::{
TracingBuilder as OtelTracingBuilder, TracingConfig as OtelTracingConfig,
TracingMiddleware as OtelTracingMiddleware, TRACING_ATTRIBUTES_KEY, TRACING_SPAN_NAME_KEY,
TRACING_SPAN_STATUS_KEY,
};
pub use retry::{RetryConfig, RetryMiddleware};
pub use yaml_config::{
CircuitBreakerMiddlewareConfig, CustomMiddlewareConfig, CustomMiddlewareFactory,
LoggingMiddlewareConfig, MiddlewareChainConfig, MiddlewareConfig, MiddlewareFactory,
TracingMiddlewareConfig,
};
use std::collections::{HashMap, HashSet};
use parking_lot::Mutex;
use async_trait::async_trait;
use crate::context::Context;
use crate::errors::ModuleError;
use crate::events::emitter::{ApCoreEvent, EventEmitter};
use crate::observability::metrics::{estimate_p99_from_histogram, MetricsCollector};
#[derive(Debug)]
pub struct PlatformNotifyMiddleware {
emitter: EventEmitter,
metrics_collector: Option<MetricsCollector>,
error_rate_threshold: f64,
latency_p99_threshold_ms: f64,
alerted: Mutex<HashMap<String, HashSet<String>>>,
}
impl PlatformNotifyMiddleware {
#[must_use]
pub fn new(
emitter: EventEmitter,
metrics_collector: Option<MetricsCollector>,
error_rate_threshold: f64,
latency_p99_threshold_ms: f64,
) -> Self {
Self {
emitter,
metrics_collector,
error_rate_threshold,
latency_p99_threshold_ms,
alerted: Mutex::new(HashMap::new()),
}
}
#[must_use]
pub fn with_defaults(
emitter: EventEmitter,
metrics_collector: Option<MetricsCollector>,
) -> Self {
Self::new(emitter, metrics_collector, 0.1, 5000.0)
}
fn compute_error_rate(&self, module_id: &str) -> f64 {
if self.metrics_collector.is_none() {
return 0.0;
}
let collector = self.metrics_collector.as_ref().unwrap();
let snap = collector.snapshot();
let Some(counters) = snap.get("counters") else {
return 0.0;
};
let total_key = format!("apcore_module_calls_total|module={module_id},status=success");
let error_key = format!("apcore_module_calls_total|module={module_id},status=error");
let success = counters
.get(&total_key)
.and_then(serde_json::Value::as_f64)
.unwrap_or(0.0);
let errors = counters
.get(&error_key)
.and_then(serde_json::Value::as_f64)
.unwrap_or(0.0);
let total = success + errors;
if total == 0.0 {
return 0.0;
}
errors / total
}
fn check_error_rate_threshold(&self, module_id: &str) -> Vec<ApCoreEvent> {
let error_rate = self.compute_error_rate(module_id);
let mut alerted = self.alerted.lock();
let module_alerts = alerted.entry(module_id.to_string()).or_default();
if error_rate >= self.error_rate_threshold && !module_alerts.contains("error_rate") {
let canonical = ApCoreEvent::with_module(
"apcore.health.error_threshold_exceeded",
serde_json::json!({
"error_rate": error_rate,
"threshold": self.error_rate_threshold,
}),
module_id,
"error",
);
let legacy = ApCoreEvent::with_module(
"error_threshold_exceeded",
serde_json::json!({
"error_rate": error_rate,
"threshold": self.error_rate_threshold,
"deprecated": true,
"canonical_event": "apcore.health.error_threshold_exceeded",
}),
module_id,
"error",
);
module_alerts.insert("error_rate".to_string());
vec![canonical, legacy]
} else {
vec![]
}
}
fn check_latency_threshold(&self, module_id: &str) -> Vec<ApCoreEvent> {
let Some(collector) = self.metrics_collector.as_ref() else {
return vec![];
};
let snap = collector.snapshot();
let Some(histograms) = snap.get("histograms").and_then(|v| v.as_object()) else {
return vec![];
};
let hist_key = format!("apcore_module_duration_seconds|module_id={module_id}");
let Some(data) = histograms.get(&hist_key) else {
return vec![];
};
let count = data
.get("count")
.and_then(serde_json::Value::as_u64)
.unwrap_or(0);
if count == 0 {
return vec![];
}
let Some(buckets) = data.get("buckets").and_then(|v| v.as_array()) else {
return vec![];
};
let p99_ms = estimate_p99_from_histogram(buckets, count);
let mut alerted = self.alerted.lock();
let module_alerts = alerted.entry(module_id.to_string()).or_default();
if p99_ms >= self.latency_p99_threshold_ms && !module_alerts.contains("latency") {
let canonical = ApCoreEvent::with_module(
"apcore.health.latency_threshold_exceeded",
serde_json::json!({
"p99_latency_ms": p99_ms,
"threshold": self.latency_p99_threshold_ms,
}),
module_id,
"warn",
);
let legacy = ApCoreEvent::with_module(
"latency_threshold_exceeded",
serde_json::json!({
"p99_latency_ms": p99_ms,
"threshold": self.latency_p99_threshold_ms,
"deprecated": true,
"canonical_event": "apcore.health.latency_threshold_exceeded",
}),
module_id,
"warn",
);
module_alerts.insert("latency".to_string());
return vec![canonical, legacy];
}
vec![]
}
fn check_error_recovery(&self, module_id: &str) -> Option<ApCoreEvent> {
let error_rate = self.compute_error_rate(module_id);
let mut alerted = self.alerted.lock();
let has_alert = alerted
.get(module_id)
.is_some_and(|s| s.contains("error_rate"));
if !has_alert {
return None;
}
if error_rate < self.error_rate_threshold * 0.5 {
let event = ApCoreEvent::with_module(
"apcore.health.recovered",
serde_json::json!({
"status": "recovered",
"error_rate": error_rate,
}),
module_id,
"info",
);
if let Some(module_alerts) = alerted.get_mut(module_id) {
module_alerts.remove("error_rate");
}
Some(event)
} else {
None
}
}
}
#[async_trait]
impl Middleware for PlatformNotifyMiddleware {
fn name(&self) -> &'static str {
"platform_notify"
}
async fn before(
&self,
_module_id: &str,
_inputs: serde_json::Value,
_ctx: &Context<serde_json::Value>,
) -> Result<Option<serde_json::Value>, ModuleError> {
Ok(None)
}
async fn after(
&self,
module_id: &str,
_inputs: serde_json::Value,
_output: serde_json::Value,
_ctx: &Context<serde_json::Value>,
) -> Result<Option<serde_json::Value>, ModuleError> {
for event in self.check_latency_threshold(module_id) {
self.emitter.emit(&event).await;
}
if let Some(event) = self.check_error_recovery(module_id) {
self.emitter.emit(&event).await;
}
Ok(None)
}
async fn on_error(
&self,
module_id: &str,
_inputs: serde_json::Value,
_error: &ModuleError,
_ctx: &Context<serde_json::Value>,
) -> Result<Option<serde_json::Value>, ModuleError> {
for event in self.check_error_rate_threshold(module_id) {
self.emitter.emit(&event).await;
}
Ok(None)
}
}