Skip to main content

systemprompt_api/services/middleware/analytics/
events.rs

1use serde_json::json;
2use std::sync::Arc;
3
4use systemprompt_logging::{AnalyticsEvent, AnalyticsRepository};
5use systemprompt_models::{RequestContext, RouteClassifier};
6
7#[derive(Debug)]
8pub struct AnalyticsEventParams {
9    pub req_ctx: RequestContext,
10    pub endpoint: String,
11    pub path: String,
12    pub method: String,
13    pub uri: http::Uri,
14    pub status_code: u16,
15    pub response_time_ms: u64,
16    pub user_agent: Option<String>,
17    pub referer: Option<String>,
18}
19
20pub fn spawn_analytics_event_task(
21    analytics_repo: Arc<AnalyticsRepository>,
22    route_classifier: Arc<RouteClassifier>,
23    params: AnalyticsEventParams,
24) {
25    let sanitized_uri = sanitize_uri(&params.uri);
26
27    tokio::spawn(async move {
28        let message = format!(
29            "HTTP {} - {} {}",
30            params.status_code, params.method, sanitized_uri
31        );
32        let metadata = json!({
33            "status_code": params.status_code,
34            "method": params.method,
35            "uri": sanitized_uri,
36            "endpoint": params.endpoint,
37            "trace_id": params.req_ctx.trace_id(),
38            "user_agent": params.user_agent,
39            "referer": params.referer
40        });
41
42        let event_metadata = route_classifier.get_event_metadata(&params.path, &params.method);
43
44        let severity = if params.status_code >= 500 {
45            "error"
46        } else if params.status_code >= 400 {
47            "warning"
48        } else {
49            "info"
50        };
51
52        let event = AnalyticsEvent {
53            user_id: params.req_ctx.auth.user_id.clone(),
54            session_id: params.req_ctx.request.session_id.clone(),
55            context_id: params.req_ctx.execution.context_id.clone(),
56            event_type: event_metadata.event_type.to_string(),
57            event_category: event_metadata.event_category.to_string(),
58            severity: severity.to_string(),
59            endpoint: Some(params.endpoint),
60            error_code: if params.status_code >= 400 {
61                Some(i32::from(params.status_code))
62            } else {
63                None
64            },
65            response_time_ms: Some(params.response_time_ms as i32),
66            agent_id: None,
67            task_id: params.req_ctx.task_id().cloned(),
68            message: Some(message.clone()),
69            metadata: metadata.clone(),
70        };
71
72        if let Err(e) = analytics_repo.log_event(&event).await {
73            tracing::error!(error = %e, "Failed to log analytics event");
74        }
75
76        if params.status_code >= 500 {
77            tracing::error!(module = event_metadata.log_module, message = %message, metadata = ?metadata, "HTTP error");
78        }
79    });
80}
81
82fn sanitize_uri(uri: &http::Uri) -> String {
83    let path = uri.path();
84
85    uri.query().map_or_else(
86        || path.to_string(),
87        |query| {
88            let sanitized_params: Vec<String> = query
89                .split('&')
90                .map(|param| {
91                    param.split_once('=').map_or_else(
92                        || param.to_string(),
93                        |(key, value)| {
94                            let key_lower = key.to_lowercase();
95                            if is_sensitive_key(&key_lower) {
96                                format!("{key}=[REDACTED]")
97                            } else {
98                                format!("{key}={value}")
99                            }
100                        },
101                    )
102                })
103                .collect();
104
105            format!("{path}?{}", sanitized_params.join("&"))
106        },
107    )
108}
109
110fn is_sensitive_key(key: &str) -> bool {
111    matches!(
112        key,
113        "token" | "password" | "api_key" | "apikey" | "secret" | "authorization" | "auth"
114    )
115}