schema_registry_observability/
logging.rs1use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use tracing::Level;
13
14#[derive(Debug, Clone)]
16pub struct LogSamplingConfig {
17 pub sampled_paths: HashMap<String, f64>,
19 pub default_sample_rate: f64,
21}
22
23impl Default for LogSamplingConfig {
24 fn default() -> Self {
25 let mut sampled_paths = HashMap::new();
26 sampled_paths.insert("/health".to_string(), 0.01);
28 sampled_paths.insert("/ready".to_string(), 0.01);
29 sampled_paths.insert("/metrics".to_string(), 0.01);
30
31 Self {
32 sampled_paths,
33 default_sample_rate: 1.0, }
35 }
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct LogContext {
41 #[serde(skip_serializing_if = "Option::is_none")]
43 pub correlation_id: Option<String>,
44
45 #[serde(skip_serializing_if = "Option::is_none")]
47 pub request_id: Option<String>,
48
49 #[serde(skip_serializing_if = "Option::is_none")]
51 pub schema_id: Option<String>,
52
53 #[serde(skip_serializing_if = "Option::is_none")]
55 pub user_id: Option<String>,
56
57 #[serde(skip_serializing_if = "Option::is_none")]
59 pub tenant_id: Option<String>,
60
61 #[serde(skip_serializing_if = "Option::is_none")]
63 pub api_version: Option<String>,
64
65 #[serde(skip_serializing_if = "Option::is_none")]
67 pub client_ip: Option<String>,
68
69 #[serde(skip_serializing_if = "Option::is_none")]
71 pub user_agent: Option<String>,
72
73 #[serde(skip_serializing_if = "HashMap::is_empty")]
75 pub custom_fields: HashMap<String, String>,
76}
77
78impl Default for LogContext {
79 fn default() -> Self {
80 Self {
81 correlation_id: None,
82 request_id: None,
83 schema_id: None,
84 user_id: None,
85 tenant_id: None,
86 api_version: None,
87 client_ip: None,
88 user_agent: None,
89 custom_fields: HashMap::new(),
90 }
91 }
92}
93
94impl LogContext {
95 pub fn from_headers(headers: &axum::http::HeaderMap) -> Self {
97 let mut ctx = Self::default();
98
99 ctx.correlation_id = headers
101 .get("x-correlation-id")
102 .and_then(|v| v.to_str().ok())
103 .map(|s| s.to_string());
104
105 ctx.request_id = headers
107 .get("x-request-id")
108 .and_then(|v| v.to_str().ok())
109 .map(|s| s.to_string());
110
111 ctx.user_id = headers
113 .get("x-user-id")
114 .and_then(|v| v.to_str().ok())
115 .map(|s| s.to_string());
116
117 ctx.tenant_id = headers
119 .get("x-tenant-id")
120 .and_then(|v| v.to_str().ok())
121 .map(|s| s.to_string());
122
123 ctx.api_version = headers
125 .get("x-api-version")
126 .and_then(|v| v.to_str().ok())
127 .map(|s| s.to_string());
128
129 ctx.client_ip = headers
131 .get("x-forwarded-for")
132 .and_then(|v| v.to_str().ok())
133 .map(|s| s.split(',').next().unwrap_or(s).trim().to_string())
134 .or_else(|| {
135 headers
136 .get("x-real-ip")
137 .and_then(|v| v.to_str().ok())
138 .map(|s| s.to_string())
139 });
140
141 ctx.user_agent = headers
143 .get("user-agent")
144 .and_then(|v| v.to_str().ok())
145 .map(|s| s.to_string());
146
147 ctx
148 }
149
150 pub fn with_field(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
152 self.custom_fields.insert(key.into(), value.into());
153 self
154 }
155
156 pub fn with_correlation_id(mut self, id: impl Into<String>) -> Self {
158 self.correlation_id = Some(id.into());
159 self
160 }
161
162 pub fn with_request_id(mut self, id: impl Into<String>) -> Self {
164 self.request_id = Some(id.into());
165 self
166 }
167
168 pub fn with_schema_id(mut self, id: impl Into<String>) -> Self {
170 self.schema_id = Some(id.into());
171 self
172 }
173
174 pub fn with_user_id(mut self, id: impl Into<String>) -> Self {
176 self.user_id = Some(id.into());
177 self
178 }
179}
180
181#[derive(Debug, Clone)]
183pub struct ModuleLogLevels {
184 levels: HashMap<String, Level>,
185 default_level: Level,
186}
187
188impl Default for ModuleLogLevels {
189 fn default() -> Self {
190 let mut levels = HashMap::new();
191
192 levels.insert("schema_registry_api".to_string(), Level::INFO);
194 levels.insert("schema_registry_storage".to_string(), Level::INFO);
195 levels.insert("schema_registry_validation".to_string(), Level::DEBUG);
196 levels.insert("schema_registry_compatibility".to_string(), Level::DEBUG);
197 levels.insert("sqlx".to_string(), Level::WARN); levels.insert("tower_http".to_string(), Level::INFO);
199 levels.insert("hyper".to_string(), Level::WARN);
200
201 Self {
202 levels,
203 default_level: Level::INFO,
204 }
205 }
206}
207
208impl ModuleLogLevels {
209 pub fn get_level(&self, module: &str) -> Level {
211 if let Some(&level) = self.levels.get(module) {
213 return level;
214 }
215
216 for (prefix, &level) in &self.levels {
218 if module.starts_with(prefix) {
219 return level;
220 }
221 }
222
223 self.default_level
224 }
225
226 pub fn set_level(&mut self, module: impl Into<String>, level: Level) {
228 self.levels.insert(module.into(), level);
229 }
230}
231
232#[derive(Debug, Serialize)]
234pub struct StructuredLogEntry {
235 #[serde(with = "timestamp_format")]
236 pub timestamp: chrono::DateTime<chrono::Utc>,
237 pub level: String,
238 pub message: String,
239 pub target: String,
240 #[serde(skip_serializing_if = "Option::is_none")]
241 pub file: Option<String>,
242 #[serde(skip_serializing_if = "Option::is_none")]
243 pub line: Option<u32>,
244 #[serde(skip_serializing_if = "Option::is_none")]
245 pub correlation_id: Option<String>,
246 #[serde(skip_serializing_if = "Option::is_none")]
247 pub request_id: Option<String>,
248 #[serde(skip_serializing_if = "Option::is_none")]
249 pub schema_id: Option<String>,
250 #[serde(skip_serializing_if = "Option::is_none")]
251 pub user_id: Option<String>,
252 #[serde(flatten)]
253 pub fields: HashMap<String, serde_json::Value>,
254}
255
256mod timestamp_format {
257 use chrono::{DateTime, Utc};
258 use serde::{Deserialize, Deserializer, Serializer};
259
260 pub fn serialize<S>(date: &DateTime<Utc>, serializer: S) -> Result<S::Ok, S::Error>
261 where
262 S: Serializer,
263 {
264 serializer.serialize_str(&date.to_rfc3339())
265 }
266
267 pub fn deserialize<'de, D>(deserializer: D) -> Result<DateTime<Utc>, D::Error>
268 where
269 D: Deserializer<'de>,
270 {
271 let s = String::deserialize(deserializer)?;
272 DateTime::parse_from_rfc3339(&s)
273 .map(|dt| dt.with_timezone(&Utc))
274 .map_err(serde::de::Error::custom)
275 }
276}
277
278#[macro_export]
280macro_rules! log_info {
281 ($correlation_id:expr, $msg:expr $(, $key:expr => $value:expr)*) => {
282 tracing::info!(
283 correlation_id = $correlation_id,
284 $($key = ?$value,)*
285 $msg
286 );
287 };
288}
289
290#[macro_export]
291macro_rules! log_warn {
292 ($correlation_id:expr, $msg:expr $(, $key:expr => $value:expr)*) => {
293 tracing::warn!(
294 correlation_id = $correlation_id,
295 $($key = ?$value,)*
296 $msg
297 );
298 };
299}
300
301#[macro_export]
302macro_rules! log_error {
303 ($correlation_id:expr, $msg:expr $(, $key:expr => $value:expr)*) => {
304 tracing::error!(
305 correlation_id = $correlation_id,
306 $($key = ?$value,)*
307 $msg
308 );
309 };
310}
311
312#[cfg(test)]
313mod tests {
314 use super::*;
315 use axum::http::HeaderMap;
316
317 #[test]
318 fn test_log_context_from_headers() {
319 let mut headers = HeaderMap::new();
320 headers.insert("x-correlation-id", "test-corr-id".parse().unwrap());
321 headers.insert("x-request-id", "test-req-id".parse().unwrap());
322 headers.insert("x-user-id", "user-123".parse().unwrap());
323
324 let ctx = LogContext::from_headers(&headers);
325 assert_eq!(ctx.correlation_id.unwrap(), "test-corr-id");
326 assert_eq!(ctx.request_id.unwrap(), "test-req-id");
327 assert_eq!(ctx.user_id.unwrap(), "user-123");
328 }
329
330 #[test]
331 fn test_log_context_builder() {
332 let ctx = LogContext::default()
333 .with_correlation_id("corr-123")
334 .with_schema_id("schema-456")
335 .with_field("custom", "value");
336
337 assert_eq!(ctx.correlation_id.unwrap(), "corr-123");
338 assert_eq!(ctx.schema_id.unwrap(), "schema-456");
339 assert_eq!(ctx.custom_fields.get("custom").unwrap(), "value");
340 }
341
342 #[test]
343 fn test_module_log_levels() {
344 let levels = ModuleLogLevels::default();
345 assert_eq!(levels.get_level("schema_registry_api"), Level::INFO);
346 assert_eq!(levels.get_level("sqlx"), Level::WARN);
347 assert_eq!(levels.get_level("unknown_module"), Level::INFO);
348 }
349}