Skip to main content

schema_registry_observability/
logging.rs

1//! Structured logging with correlation IDs and contextual fields
2//!
3//! This module provides:
4//! - JSON-formatted structured logging
5//! - Correlation ID propagation
6//! - Contextual fields (request_id, schema_id, user_id)
7//! - Log level configuration per module
8//! - Log sampling for high-volume paths
9
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use tracing::Level;
13
14/// Log sampling configuration
15#[derive(Debug, Clone)]
16pub struct LogSamplingConfig {
17    /// Paths to sample (key: path pattern, value: sample rate 0.0-1.0)
18    pub sampled_paths: HashMap<String, f64>,
19    /// Default sample rate for non-configured paths
20    pub default_sample_rate: f64,
21}
22
23impl Default for LogSamplingConfig {
24    fn default() -> Self {
25        let mut sampled_paths = HashMap::new();
26        // Sample health checks at 1%
27        sampled_paths.insert("/health".to_string(), 0.01);
28        sampled_paths.insert("/ready".to_string(), 0.01);
29        sampled_paths.insert("/metrics".to_string(), 0.01);
30
31        Self {
32            sampled_paths,
33            default_sample_rate: 1.0, // Log everything by default
34        }
35    }
36}
37
38/// Contextual logging fields
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct LogContext {
41    /// Correlation ID for request tracking
42    #[serde(skip_serializing_if = "Option::is_none")]
43    pub correlation_id: Option<String>,
44
45    /// Request ID (unique per request)
46    #[serde(skip_serializing_if = "Option::is_none")]
47    pub request_id: Option<String>,
48
49    /// Schema ID being operated on
50    #[serde(skip_serializing_if = "Option::is_none")]
51    pub schema_id: Option<String>,
52
53    /// User ID making the request
54    #[serde(skip_serializing_if = "Option::is_none")]
55    pub user_id: Option<String>,
56
57    /// Tenant ID for multi-tenancy
58    #[serde(skip_serializing_if = "Option::is_none")]
59    pub tenant_id: Option<String>,
60
61    /// API version
62    #[serde(skip_serializing_if = "Option::is_none")]
63    pub api_version: Option<String>,
64
65    /// Client IP address
66    #[serde(skip_serializing_if = "Option::is_none")]
67    pub client_ip: Option<String>,
68
69    /// User agent
70    #[serde(skip_serializing_if = "Option::is_none")]
71    pub user_agent: Option<String>,
72
73    /// Additional custom fields
74    #[serde(skip_serializing_if = "HashMap::is_empty")]
75    pub custom_fields: HashMap<String, String>,
76}
77
78impl Default for LogContext {
79    fn default() -> Self {
80        Self {
81            correlation_id: None,
82            request_id: None,
83            schema_id: None,
84            user_id: None,
85            tenant_id: None,
86            api_version: None,
87            client_ip: None,
88            user_agent: None,
89            custom_fields: HashMap::new(),
90        }
91    }
92}
93
94impl LogContext {
95    /// Creates a new log context from HTTP headers
96    pub fn from_headers(headers: &axum::http::HeaderMap) -> Self {
97        let mut ctx = Self::default();
98
99        // Extract correlation ID
100        ctx.correlation_id = headers
101            .get("x-correlation-id")
102            .and_then(|v| v.to_str().ok())
103            .map(|s| s.to_string());
104
105        // Extract request ID
106        ctx.request_id = headers
107            .get("x-request-id")
108            .and_then(|v| v.to_str().ok())
109            .map(|s| s.to_string());
110
111        // Extract user ID (from auth header or custom header)
112        ctx.user_id = headers
113            .get("x-user-id")
114            .and_then(|v| v.to_str().ok())
115            .map(|s| s.to_string());
116
117        // Extract tenant ID
118        ctx.tenant_id = headers
119            .get("x-tenant-id")
120            .and_then(|v| v.to_str().ok())
121            .map(|s| s.to_string());
122
123        // Extract API version
124        ctx.api_version = headers
125            .get("x-api-version")
126            .and_then(|v| v.to_str().ok())
127            .map(|s| s.to_string());
128
129        // Extract client IP
130        ctx.client_ip = headers
131            .get("x-forwarded-for")
132            .and_then(|v| v.to_str().ok())
133            .map(|s| s.split(',').next().unwrap_or(s).trim().to_string())
134            .or_else(|| {
135                headers
136                    .get("x-real-ip")
137                    .and_then(|v| v.to_str().ok())
138                    .map(|s| s.to_string())
139            });
140
141        // Extract user agent
142        ctx.user_agent = headers
143            .get("user-agent")
144            .and_then(|v| v.to_str().ok())
145            .map(|s| s.to_string());
146
147        ctx
148    }
149
150    /// Adds a custom field to the context
151    pub fn with_field(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
152        self.custom_fields.insert(key.into(), value.into());
153        self
154    }
155
156    /// Sets correlation ID
157    pub fn with_correlation_id(mut self, id: impl Into<String>) -> Self {
158        self.correlation_id = Some(id.into());
159        self
160    }
161
162    /// Sets request ID
163    pub fn with_request_id(mut self, id: impl Into<String>) -> Self {
164        self.request_id = Some(id.into());
165        self
166    }
167
168    /// Sets schema ID
169    pub fn with_schema_id(mut self, id: impl Into<String>) -> Self {
170        self.schema_id = Some(id.into());
171        self
172    }
173
174    /// Sets user ID
175    pub fn with_user_id(mut self, id: impl Into<String>) -> Self {
176        self.user_id = Some(id.into());
177        self
178    }
179}
180
181/// Module-specific log level configuration
182#[derive(Debug, Clone)]
183pub struct ModuleLogLevels {
184    levels: HashMap<String, Level>,
185    default_level: Level,
186}
187
188impl Default for ModuleLogLevels {
189    fn default() -> Self {
190        let mut levels = HashMap::new();
191
192        // Set specific log levels for different modules
193        levels.insert("schema_registry_api".to_string(), Level::INFO);
194        levels.insert("schema_registry_storage".to_string(), Level::INFO);
195        levels.insert("schema_registry_validation".to_string(), Level::DEBUG);
196        levels.insert("schema_registry_compatibility".to_string(), Level::DEBUG);
197        levels.insert("sqlx".to_string(), Level::WARN); // Reduce DB query noise
198        levels.insert("tower_http".to_string(), Level::INFO);
199        levels.insert("hyper".to_string(), Level::WARN);
200
201        Self {
202            levels,
203            default_level: Level::INFO,
204        }
205    }
206}
207
208impl ModuleLogLevels {
209    /// Gets the log level for a specific module
210    pub fn get_level(&self, module: &str) -> Level {
211        // Try exact match first
212        if let Some(&level) = self.levels.get(module) {
213            return level;
214        }
215
216        // Try prefix match
217        for (prefix, &level) in &self.levels {
218            if module.starts_with(prefix) {
219                return level;
220            }
221        }
222
223        self.default_level
224    }
225
226    /// Sets log level for a module
227    pub fn set_level(&mut self, module: impl Into<String>, level: Level) {
228        self.levels.insert(module.into(), level);
229    }
230}
231
232/// Structured log entry
233#[derive(Debug, Serialize)]
234pub struct StructuredLogEntry {
235    #[serde(with = "timestamp_format")]
236    pub timestamp: chrono::DateTime<chrono::Utc>,
237    pub level: String,
238    pub message: String,
239    pub target: String,
240    #[serde(skip_serializing_if = "Option::is_none")]
241    pub file: Option<String>,
242    #[serde(skip_serializing_if = "Option::is_none")]
243    pub line: Option<u32>,
244    #[serde(skip_serializing_if = "Option::is_none")]
245    pub correlation_id: Option<String>,
246    #[serde(skip_serializing_if = "Option::is_none")]
247    pub request_id: Option<String>,
248    #[serde(skip_serializing_if = "Option::is_none")]
249    pub schema_id: Option<String>,
250    #[serde(skip_serializing_if = "Option::is_none")]
251    pub user_id: Option<String>,
252    #[serde(flatten)]
253    pub fields: HashMap<String, serde_json::Value>,
254}
255
256mod timestamp_format {
257    use chrono::{DateTime, Utc};
258    use serde::{Deserialize, Deserializer, Serializer};
259
260    pub fn serialize<S>(date: &DateTime<Utc>, serializer: S) -> Result<S::Ok, S::Error>
261    where
262        S: Serializer,
263    {
264        serializer.serialize_str(&date.to_rfc3339())
265    }
266
267    pub fn deserialize<'de, D>(deserializer: D) -> Result<DateTime<Utc>, D::Error>
268    where
269        D: Deserializer<'de>,
270    {
271        let s = String::deserialize(deserializer)?;
272        DateTime::parse_from_rfc3339(&s)
273            .map(|dt| dt.with_timezone(&Utc))
274            .map_err(serde::de::Error::custom)
275    }
276}
277
278/// Helper macros for structured logging
279#[macro_export]
280macro_rules! log_info {
281    ($correlation_id:expr, $msg:expr $(, $key:expr => $value:expr)*) => {
282        tracing::info!(
283            correlation_id = $correlation_id,
284            $($key = ?$value,)*
285            $msg
286        );
287    };
288}
289
290#[macro_export]
291macro_rules! log_warn {
292    ($correlation_id:expr, $msg:expr $(, $key:expr => $value:expr)*) => {
293        tracing::warn!(
294            correlation_id = $correlation_id,
295            $($key = ?$value,)*
296            $msg
297        );
298    };
299}
300
301#[macro_export]
302macro_rules! log_error {
303    ($correlation_id:expr, $msg:expr $(, $key:expr => $value:expr)*) => {
304        tracing::error!(
305            correlation_id = $correlation_id,
306            $($key = ?$value,)*
307            $msg
308        );
309    };
310}
311
312#[cfg(test)]
313mod tests {
314    use super::*;
315    use axum::http::HeaderMap;
316
317    #[test]
318    fn test_log_context_from_headers() {
319        let mut headers = HeaderMap::new();
320        headers.insert("x-correlation-id", "test-corr-id".parse().unwrap());
321        headers.insert("x-request-id", "test-req-id".parse().unwrap());
322        headers.insert("x-user-id", "user-123".parse().unwrap());
323
324        let ctx = LogContext::from_headers(&headers);
325        assert_eq!(ctx.correlation_id.unwrap(), "test-corr-id");
326        assert_eq!(ctx.request_id.unwrap(), "test-req-id");
327        assert_eq!(ctx.user_id.unwrap(), "user-123");
328    }
329
330    #[test]
331    fn test_log_context_builder() {
332        let ctx = LogContext::default()
333            .with_correlation_id("corr-123")
334            .with_schema_id("schema-456")
335            .with_field("custom", "value");
336
337        assert_eq!(ctx.correlation_id.unwrap(), "corr-123");
338        assert_eq!(ctx.schema_id.unwrap(), "schema-456");
339        assert_eq!(ctx.custom_fields.get("custom").unwrap(), "value");
340    }
341
342    #[test]
343    fn test_module_log_levels() {
344        let levels = ModuleLogLevels::default();
345        assert_eq!(levels.get_level("schema_registry_api"), Level::INFO);
346        assert_eq!(levels.get_level("sqlx"), Level::WARN);
347        assert_eq!(levels.get_level("unknown_module"), Level::INFO);
348    }
349}