Skip to main content

reliability_toolkit/
audit_stream.rs

1//! Optional audit-stream-py producer.
2//!
3//! When the `audit-stream` Cargo feature is enabled **and** the
4//! `AUDIT_STREAM_URL` env var is set, an [`AuditingBreaker`] fires
5//! governance events on every circuit-breaker state transition worth
6//! recording:
7//!
8//! - `breaker_opened`     — closed/half-open → open. The "we just stopped
9//!                          trusting a downstream" signal.
10//! - `breaker_recovered`  — half-open → closed. The "downstream looks
11//!                          healthy again" signal.
12//!
13//! Manual [`AuditingBreaker::trip`] / [`AuditingBreaker::reset`] also
14//! fire when they cause a real state change.
15//!
16//! Same opt-in pattern as the other Rust producers (hash-attestation,
17//! incident-correlation, aeo-graph-explorer). Identical env-var contract:
18//!
19//! - `AUDIT_STREAM_URL`        — base URL, e.g. `http://audit.local:8093`
20//! - `AUDIT_STREAM_TIMEOUT_S`  — per-call timeout, default 2.5s
21//!
22//! Best-effort. Failures are logged to stderr and swallowed — an
23//! audit-stream outage must never block a call gated by the breaker.
24
25use std::env;
26use std::time::Duration;
27
28use serde_json::json;
29
30use crate::circuit_breaker::{CircuitBreaker, CircuitState};
31use crate::error::ToolkitError;
32
33/// Default per-call timeout when `AUDIT_STREAM_TIMEOUT_S` is unset.
34pub const DEFAULT_TIMEOUT_S: f64 = 2.5;
35
36/// True when `AUDIT_STREAM_URL` is set to a non-empty value.
37#[must_use]
38pub fn is_enabled() -> bool {
39    base_url().is_some()
40}
41
42/// Stripped audit-stream base URL, or `None` when disabled.
43#[must_use]
44pub fn base_url() -> Option<String> {
45    let raw = env::var("AUDIT_STREAM_URL").ok()?;
46    let trimmed = raw.trim();
47    if trimmed.is_empty() {
48        return None;
49    }
50    Some(trimmed.trim_end_matches('/').to_string())
51}
52
53/// Configured per-call timeout. Defaults to 2.5 seconds.
54#[must_use]
55pub fn timeout() -> Duration {
56    let secs = env::var("AUDIT_STREAM_TIMEOUT_S")
57        .ok()
58        .and_then(|raw| raw.trim().parse::<f64>().ok())
59        .map_or(DEFAULT_TIMEOUT_S, |v| v.max(0.1));
60    Duration::from_secs_f64(secs)
61}
62
63/// Fire one event. Silent no-op when `AUDIT_STREAM_URL` is unset.
64pub async fn emit(client: &reqwest::Client, kind: &str, payload: serde_json::Value) {
65    let Some(url) = base_url() else {
66        return;
67    };
68    let body = json!({
69        "kind": kind,
70        "source": "reliability-toolkit",
71        "payload": payload,
72    });
73    let endpoint = format!("{url}/events");
74    let result = client
75        .post(&endpoint)
76        .json(&body)
77        .timeout(timeout())
78        .send()
79        .await;
80    match result {
81        Ok(resp) if resp.status().is_success() => {}
82        Ok(resp) => {
83            eprintln!(
84                "audit-stream emit failed (kind={kind}): HTTP {}",
85                resp.status()
86            );
87        }
88        Err(err) => {
89            eprintln!("audit-stream emit failed (kind={kind}): {err}");
90        }
91    }
92}
93
94/// Wraps a [`CircuitBreaker`] with a name + audit-stream client so that
95/// state transitions fan out as governance events.
96///
97/// The wrapper is cheap to clone — both the inner breaker and the
98/// reqwest client are reference-counted.
99#[derive(Clone)]
100pub struct AuditingBreaker {
101    inner: CircuitBreaker,
102    name: String,
103    client: reqwest::Client,
104}
105
106impl AuditingBreaker {
107    /// Wrap a breaker with a name (identifies which breaker in the audit
108    /// log — e.g. `"downstream-billing"`) and an HTTP client.
109    pub fn new(breaker: CircuitBreaker, name: impl Into<String>, client: reqwest::Client) -> Self {
110        Self {
111            inner: breaker,
112            name: name.into(),
113            client,
114        }
115    }
116
117    /// The wrapped breaker. Use this for non-audited calls or to read
118    /// state directly.
119    #[must_use]
120    pub fn inner(&self) -> &CircuitBreaker {
121        &self.inner
122    }
123
124    /// Name this breaker emits under.
125    #[must_use]
126    pub fn name(&self) -> &str {
127        &self.name
128    }
129
130    /// Execute `fut` through the breaker, firing transition events on
131    /// open/recover. Semantics match [`CircuitBreaker::call`].
132    pub async fn call<F, T, E>(&self, fut: F) -> Result<Result<T, E>, ToolkitError>
133    where
134        F: std::future::Future<Output = Result<T, E>>,
135    {
136        let before = self.inner.state().await;
137        let out = self.inner.call(fut).await;
138        let after = self.inner.state().await;
139        self.emit_transition(before, after, "call").await;
140        out
141    }
142
143    /// Manually trip the breaker, emitting `breaker_opened` if the
144    /// transition was real (was-not-already-open → open).
145    pub async fn trip(&self) {
146        let before = self.inner.state().await;
147        self.inner.trip().await;
148        let after = self.inner.state().await;
149        self.emit_transition(before, after, "trip").await;
150    }
151
152    /// Manually reset the breaker, emitting `breaker_recovered` if the
153    /// transition was real (was-not-already-closed → closed).
154    pub async fn reset(&self) {
155        let before = self.inner.state().await;
156        self.inner.reset().await;
157        let after = self.inner.state().await;
158        self.emit_transition(before, after, "reset").await;
159    }
160
161    async fn emit_transition(&self, before: CircuitState, after: CircuitState, cause: &str) {
162        if before == after {
163            return;
164        }
165        // Opened: anything → Open
166        if after == CircuitState::Open {
167            emit(
168                &self.client,
169                "breaker_opened",
170                json!({
171                    "name": self.name,
172                    "previous_state": state_label(before),
173                    "cause": cause,
174                }),
175            )
176            .await;
177        }
178        // Recovered: HalfOpen → Closed, or manual reset Open/HalfOpen → Closed
179        else if after == CircuitState::Closed && before != CircuitState::Closed {
180            emit(
181                &self.client,
182                "breaker_recovered",
183                json!({
184                    "name": self.name,
185                    "previous_state": state_label(before),
186                    "cause": cause,
187                }),
188            )
189            .await;
190        }
191        // Closed → HalfOpen and HalfOpen ↔ HalfOpen are not audit-worthy on
192        // their own — they're internal cool-down transitions. The "real"
193        // governance events are opened (we lost trust) and recovered (we
194        // re-gained it).
195    }
196}
197
198fn state_label(s: CircuitState) -> &'static str {
199    match s {
200        CircuitState::Closed => "closed",
201        CircuitState::Open => "open",
202        CircuitState::HalfOpen => "half_open",
203    }
204}
205
206#[cfg(test)]
207mod tests {
208    use super::*;
209    use std::sync::Mutex;
210
211    static ENV_GUARD: Mutex<()> = Mutex::new(());
212
213    fn reset_env() {
214        env::remove_var("AUDIT_STREAM_URL");
215        env::remove_var("AUDIT_STREAM_TIMEOUT_S");
216    }
217
218    #[test]
219    fn disabled_when_unset() {
220        let _l = ENV_GUARD
221            .lock()
222            .unwrap_or_else(std::sync::PoisonError::into_inner);
223        reset_env();
224        assert!(!is_enabled());
225    }
226
227    #[test]
228    fn enabled_with_value() {
229        let _l = ENV_GUARD
230            .lock()
231            .unwrap_or_else(std::sync::PoisonError::into_inner);
232        reset_env();
233        env::set_var("AUDIT_STREAM_URL", "http://audit.local:8093/");
234        assert!(is_enabled());
235        assert_eq!(base_url().unwrap(), "http://audit.local:8093");
236        env::remove_var("AUDIT_STREAM_URL");
237    }
238
239    #[test]
240    fn timeout_default() {
241        let _l = ENV_GUARD
242            .lock()
243            .unwrap_or_else(std::sync::PoisonError::into_inner);
244        reset_env();
245        assert_eq!(timeout(), Duration::from_secs_f64(DEFAULT_TIMEOUT_S));
246    }
247
248    #[test]
249    fn timeout_override() {
250        let _l = ENV_GUARD
251            .lock()
252            .unwrap_or_else(std::sync::PoisonError::into_inner);
253        reset_env();
254        env::set_var("AUDIT_STREAM_TIMEOUT_S", "0.75");
255        assert_eq!(timeout(), Duration::from_secs_f64(0.75));
256        env::remove_var("AUDIT_STREAM_TIMEOUT_S");
257    }
258}