reliability_toolkit/
audit_stream.rs1use std::env;
26use std::time::Duration;
27
28use serde_json::json;
29
30use crate::circuit_breaker::{CircuitBreaker, CircuitState};
31use crate::error::ToolkitError;
32
33pub const DEFAULT_TIMEOUT_S: f64 = 2.5;
35
36#[must_use]
38pub fn is_enabled() -> bool {
39 base_url().is_some()
40}
41
42#[must_use]
44pub fn base_url() -> Option<String> {
45 let raw = env::var("AUDIT_STREAM_URL").ok()?;
46 let trimmed = raw.trim();
47 if trimmed.is_empty() {
48 return None;
49 }
50 Some(trimmed.trim_end_matches('/').to_string())
51}
52
53#[must_use]
55pub fn timeout() -> Duration {
56 let secs = env::var("AUDIT_STREAM_TIMEOUT_S")
57 .ok()
58 .and_then(|raw| raw.trim().parse::<f64>().ok())
59 .map_or(DEFAULT_TIMEOUT_S, |v| v.max(0.1));
60 Duration::from_secs_f64(secs)
61}
62
63pub async fn emit(client: &reqwest::Client, kind: &str, payload: serde_json::Value) {
65 let Some(url) = base_url() else {
66 return;
67 };
68 let body = json!({
69 "kind": kind,
70 "source": "reliability-toolkit",
71 "payload": payload,
72 });
73 let endpoint = format!("{url}/events");
74 let result = client
75 .post(&endpoint)
76 .json(&body)
77 .timeout(timeout())
78 .send()
79 .await;
80 match result {
81 Ok(resp) if resp.status().is_success() => {}
82 Ok(resp) => {
83 eprintln!(
84 "audit-stream emit failed (kind={kind}): HTTP {}",
85 resp.status()
86 );
87 }
88 Err(err) => {
89 eprintln!("audit-stream emit failed (kind={kind}): {err}");
90 }
91 }
92}
93
94#[derive(Clone)]
100pub struct AuditingBreaker {
101 inner: CircuitBreaker,
102 name: String,
103 client: reqwest::Client,
104}
105
106impl AuditingBreaker {
107 pub fn new(breaker: CircuitBreaker, name: impl Into<String>, client: reqwest::Client) -> Self {
110 Self {
111 inner: breaker,
112 name: name.into(),
113 client,
114 }
115 }
116
117 #[must_use]
120 pub fn inner(&self) -> &CircuitBreaker {
121 &self.inner
122 }
123
124 #[must_use]
126 pub fn name(&self) -> &str {
127 &self.name
128 }
129
130 pub async fn call<F, T, E>(&self, fut: F) -> Result<Result<T, E>, ToolkitError>
133 where
134 F: std::future::Future<Output = Result<T, E>>,
135 {
136 let before = self.inner.state().await;
137 let out = self.inner.call(fut).await;
138 let after = self.inner.state().await;
139 self.emit_transition(before, after, "call").await;
140 out
141 }
142
143 pub async fn trip(&self) {
146 let before = self.inner.state().await;
147 self.inner.trip().await;
148 let after = self.inner.state().await;
149 self.emit_transition(before, after, "trip").await;
150 }
151
152 pub async fn reset(&self) {
155 let before = self.inner.state().await;
156 self.inner.reset().await;
157 let after = self.inner.state().await;
158 self.emit_transition(before, after, "reset").await;
159 }
160
161 async fn emit_transition(&self, before: CircuitState, after: CircuitState, cause: &str) {
162 if before == after {
163 return;
164 }
165 if after == CircuitState::Open {
167 emit(
168 &self.client,
169 "breaker_opened",
170 json!({
171 "name": self.name,
172 "previous_state": state_label(before),
173 "cause": cause,
174 }),
175 )
176 .await;
177 }
178 else if after == CircuitState::Closed && before != CircuitState::Closed {
180 emit(
181 &self.client,
182 "breaker_recovered",
183 json!({
184 "name": self.name,
185 "previous_state": state_label(before),
186 "cause": cause,
187 }),
188 )
189 .await;
190 }
191 }
196}
197
198fn state_label(s: CircuitState) -> &'static str {
199 match s {
200 CircuitState::Closed => "closed",
201 CircuitState::Open => "open",
202 CircuitState::HalfOpen => "half_open",
203 }
204}
205
206#[cfg(test)]
207mod tests {
208 use super::*;
209 use std::sync::Mutex;
210
211 static ENV_GUARD: Mutex<()> = Mutex::new(());
212
213 fn reset_env() {
214 env::remove_var("AUDIT_STREAM_URL");
215 env::remove_var("AUDIT_STREAM_TIMEOUT_S");
216 }
217
218 #[test]
219 fn disabled_when_unset() {
220 let _l = ENV_GUARD
221 .lock()
222 .unwrap_or_else(std::sync::PoisonError::into_inner);
223 reset_env();
224 assert!(!is_enabled());
225 }
226
227 #[test]
228 fn enabled_with_value() {
229 let _l = ENV_GUARD
230 .lock()
231 .unwrap_or_else(std::sync::PoisonError::into_inner);
232 reset_env();
233 env::set_var("AUDIT_STREAM_URL", "http://audit.local:8093/");
234 assert!(is_enabled());
235 assert_eq!(base_url().unwrap(), "http://audit.local:8093");
236 env::remove_var("AUDIT_STREAM_URL");
237 }
238
239 #[test]
240 fn timeout_default() {
241 let _l = ENV_GUARD
242 .lock()
243 .unwrap_or_else(std::sync::PoisonError::into_inner);
244 reset_env();
245 assert_eq!(timeout(), Duration::from_secs_f64(DEFAULT_TIMEOUT_S));
246 }
247
248 #[test]
249 fn timeout_override() {
250 let _l = ENV_GUARD
251 .lock()
252 .unwrap_or_else(std::sync::PoisonError::into_inner);
253 reset_env();
254 env::set_var("AUDIT_STREAM_TIMEOUT_S", "0.75");
255 assert_eq!(timeout(), Duration::from_secs_f64(0.75));
256 env::remove_var("AUDIT_STREAM_TIMEOUT_S");
257 }
258}