systemprompt_logging/repository/analytics/
mod.rs1use anyhow::Context;
2use chrono::Utc;
3use serde_json::Value;
4use systemprompt_database::DbPool;
5use systemprompt_identifiers::{AgentId, ContextId, SessionId, TaskId, UserId};
6use systemprompt_traits::{Repository as RepositoryTrait, RepositoryError};
7
8#[derive(Debug, Clone)]
9pub struct AnalyticsRepository {
10 db_pool: DbPool,
11}
12
13impl RepositoryTrait for AnalyticsRepository {
14 type Pool = DbPool;
15 type Error = RepositoryError;
16
17 fn pool(&self) -> &Self::Pool {
18 &self.db_pool
19 }
20}
21
22impl AnalyticsRepository {
23 pub const fn new(db_pool: DbPool) -> Self {
24 Self { db_pool }
25 }
26
27 pub async fn log_event(&self, event: &AnalyticsEvent) -> anyhow::Result<i64> {
28 let pool = self
29 .db_pool
30 .pool_arc()
31 .context("Failed to get database pool")?;
32
33 let result = execute_insert(&pool, event).await?;
34 Ok(i64::try_from(result).unwrap_or(i64::MAX))
35 }
36}
37
38async fn execute_insert(
39 pool: &std::sync::Arc<sqlx::PgPool>,
40 event: &AnalyticsEvent,
41) -> anyhow::Result<u64> {
42 let params = EventParams::from(event);
43 run_insert_query(pool, params).await
44}
45
46#[allow(clippy::cognitive_complexity)]
47async fn run_insert_query(
48 pool: &std::sync::Arc<sqlx::PgPool>,
49 p: EventParams<'_>,
50) -> anyhow::Result<u64> {
51 sqlx::query!(
52 r"
53 INSERT INTO analytics_events
54 (user_id, session_id, context_id, event_type, event_category, severity,
55 endpoint, error_code, response_time_ms, agent_id, task_id, message, metadata, timestamp)
56 VALUES
57 ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
58 ",
59 p.user_id,
60 p.session_id,
61 p.context_id,
62 p.event_type,
63 p.event_category,
64 p.severity,
65 p.endpoint,
66 p.error_code,
67 p.response_time_ms,
68 p.agent_id,
69 p.task_id,
70 p.message,
71 p.metadata,
72 p.timestamp
73 )
74 .execute(pool.as_ref())
75 .await
76 .map(|r| r.rows_affected())
77 .context("Failed to log analytics event")
78}
79
80struct EventParams<'a> {
81 user_id: &'a str,
82 session_id: &'a str,
83 context_id: &'a str,
84 event_type: &'a str,
85 event_category: &'a str,
86 severity: &'a str,
87 agent_id: Option<&'a str>,
88 task_id: Option<&'a str>,
89 endpoint: Option<&'a str>,
90 message: Option<&'a str>,
91 error_code: Option<i32>,
92 response_time_ms: Option<i32>,
93 metadata: String,
94 timestamp: chrono::DateTime<Utc>,
95}
96
97impl<'a> From<&'a AnalyticsEvent> for EventParams<'a> {
98 fn from(event: &'a AnalyticsEvent) -> Self {
99 Self {
100 user_id: event.user_id.as_str(),
101 session_id: event.session_id.as_str(),
102 context_id: event.context_id.as_str(),
103 event_type: &event.event_type,
104 event_category: &event.event_category,
105 severity: &event.severity,
106 agent_id: event.agent_id.as_ref().map(AgentId::as_str),
107 task_id: event.task_id.as_ref().map(TaskId::as_str),
108 endpoint: event.endpoint.as_deref(),
109 message: event.message.as_deref(),
110 error_code: event.error_code,
111 response_time_ms: event.response_time_ms,
112 metadata: event.metadata.to_string(),
113 timestamp: Utc::now(),
114 }
115 }
116}
117
118#[derive(Debug, Clone)]
119pub struct AnalyticsEvent {
120 pub user_id: UserId,
121 pub session_id: SessionId,
122 pub context_id: ContextId,
123 pub event_type: String,
124 pub event_category: String,
125 pub severity: String,
126 pub endpoint: Option<String>,
127 pub error_code: Option<i32>,
128 pub response_time_ms: Option<i32>,
129 pub agent_id: Option<AgentId>,
130 pub task_id: Option<TaskId>,
131 pub message: Option<String>,
132 pub metadata: Value,
133}