systemprompt_logging/repository/analytics/
mod.rs1use anyhow::Context;
2use chrono::Utc;
3use serde_json::Value;
4use sqlx::PgPool;
5use std::sync::Arc;
6use systemprompt_database::DbPool;
7use systemprompt_identifiers::{AgentId, ContextId, SessionId, TaskId, UserId};
8#[derive(Debug, Clone)]
9pub struct AnalyticsRepository {
10 write_pool: Arc<PgPool>,
11}
12
13impl AnalyticsRepository {
14 pub fn new(db: &DbPool) -> anyhow::Result<Self> {
15 let write_pool = db.write_pool_arc()?;
16 Ok(Self { write_pool })
17 }
18
19 pub async fn log_event(&self, event: &AnalyticsEvent) -> anyhow::Result<i64> {
20 let result = execute_insert(&self.write_pool, event).await?;
21 Ok(i64::try_from(result).unwrap_or(i64::MAX))
22 }
23}
24
25async fn execute_insert(pool: &PgPool, event: &AnalyticsEvent) -> anyhow::Result<u64> {
26 let params = EventParams::from(event);
27 run_insert_query(pool, params).await
28}
29
30#[allow(clippy::cognitive_complexity)]
31async fn run_insert_query(pool: &PgPool, p: EventParams<'_>) -> anyhow::Result<u64> {
32 sqlx::query!(
33 r"
34 INSERT INTO analytics_events
35 (user_id, session_id, context_id, event_type, event_category, severity,
36 endpoint, error_code, response_time_ms, agent_id, task_id, message, metadata, timestamp)
37 VALUES
38 ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
39 ",
40 p.user_id,
41 p.session_id,
42 p.context_id,
43 p.event_type,
44 p.event_category,
45 p.severity,
46 p.endpoint,
47 p.error_code,
48 p.response_time_ms,
49 p.agent_id,
50 p.task_id,
51 p.message,
52 p.metadata,
53 p.timestamp
54 )
55 .execute(pool)
56 .await
57 .map(|r| r.rows_affected())
58 .context("Failed to log analytics event")
59}
60
61struct EventParams<'a> {
62 user_id: &'a str,
63 session_id: &'a str,
64 context_id: &'a str,
65 event_type: &'a str,
66 event_category: &'a str,
67 severity: &'a str,
68 agent_id: Option<&'a str>,
69 task_id: Option<&'a str>,
70 endpoint: Option<&'a str>,
71 message: Option<&'a str>,
72 error_code: Option<i32>,
73 response_time_ms: Option<i32>,
74 metadata: String,
75 timestamp: chrono::DateTime<Utc>,
76}
77
78impl<'a> From<&'a AnalyticsEvent> for EventParams<'a> {
79 fn from(event: &'a AnalyticsEvent) -> Self {
80 Self {
81 user_id: event.user_id.as_str(),
82 session_id: event.session_id.as_str(),
83 context_id: event.context_id.as_str(),
84 event_type: &event.event_type,
85 event_category: &event.event_category,
86 severity: &event.severity,
87 agent_id: event.agent_id.as_ref().map(AgentId::as_str),
88 task_id: event.task_id.as_ref().map(TaskId::as_str),
89 endpoint: event.endpoint.as_deref(),
90 message: event.message.as_deref(),
91 error_code: event.error_code,
92 response_time_ms: event.response_time_ms,
93 metadata: event.metadata.to_string(),
94 timestamp: Utc::now(),
95 }
96 }
97}
98
99#[derive(Debug, Clone)]
100pub struct AnalyticsEvent {
101 pub user_id: UserId,
102 pub session_id: SessionId,
103 pub context_id: ContextId,
104 pub event_type: String,
105 pub event_category: String,
106 pub severity: String,
107 pub endpoint: Option<String>,
108 pub error_code: Option<i32>,
109 pub response_time_ms: Option<i32>,
110 pub agent_id: Option<AgentId>,
111 pub task_id: Option<TaskId>,
112 pub message: Option<String>,
113 pub metadata: Value,
114}