Skip to main content

systemprompt_logging/repository/analytics/
mod.rs

1use anyhow::Context;
2use chrono::Utc;
3use serde_json::Value;
4use sqlx::PgPool;
5use std::sync::Arc;
6use systemprompt_database::DbPool;
7use systemprompt_identifiers::{AgentId, ContextId, SessionId, TaskId, UserId};
8#[derive(Debug, Clone)]
9pub struct AnalyticsRepository {
10    write_pool: Arc<PgPool>,
11}
12
13impl AnalyticsRepository {
14    pub fn new(db: &DbPool) -> anyhow::Result<Self> {
15        let write_pool = db.write_pool_arc()?;
16        Ok(Self { write_pool })
17    }
18
19    pub async fn log_event(&self, event: &AnalyticsEvent) -> anyhow::Result<i64> {
20        let result = execute_insert(&self.write_pool, event).await?;
21        Ok(i64::try_from(result).unwrap_or(i64::MAX))
22    }
23}
24
25async fn execute_insert(pool: &PgPool, event: &AnalyticsEvent) -> anyhow::Result<u64> {
26    let params = EventParams::from(event);
27    run_insert_query(pool, params).await
28}
29
30async fn run_insert_query(pool: &PgPool, p: EventParams<'_>) -> anyhow::Result<u64> {
31    sqlx::query!(
32        r"
33        INSERT INTO analytics_events
34        (user_id, session_id, context_id, event_type, event_category, severity,
35         endpoint, error_code, response_time_ms, agent_id, task_id, message, metadata, timestamp)
36        VALUES
37        ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
38        ",
39        p.user_id,
40        p.session_id,
41        p.context_id,
42        p.event_type,
43        p.event_category,
44        p.severity,
45        p.endpoint,
46        p.error_code,
47        p.response_time_ms,
48        p.agent_id,
49        p.task_id,
50        p.message,
51        p.metadata,
52        p.timestamp
53    )
54    .execute(pool)
55    .await
56    .map(|r| r.rows_affected())
57    .context("Failed to log analytics event")
58}
59
60struct EventParams<'a> {
61    user_id: &'a str,
62    session_id: &'a str,
63    context_id: &'a str,
64    event_type: &'a str,
65    event_category: &'a str,
66    severity: &'a str,
67    agent_id: Option<&'a str>,
68    task_id: Option<&'a str>,
69    endpoint: Option<&'a str>,
70    message: Option<&'a str>,
71    error_code: Option<i32>,
72    response_time_ms: Option<i32>,
73    metadata: String,
74    timestamp: chrono::DateTime<Utc>,
75}
76
77impl<'a> From<&'a AnalyticsEvent> for EventParams<'a> {
78    fn from(event: &'a AnalyticsEvent) -> Self {
79        Self {
80            user_id: event.user_id.as_str(),
81            session_id: event.session_id.as_str(),
82            context_id: event.context_id.as_str(),
83            event_type: &event.event_type,
84            event_category: &event.event_category,
85            severity: &event.severity,
86            agent_id: event.agent_id.as_ref().map(AgentId::as_str),
87            task_id: event.task_id.as_ref().map(TaskId::as_str),
88            endpoint: event.endpoint.as_deref(),
89            message: event.message.as_deref(),
90            error_code: event.error_code,
91            response_time_ms: event.response_time_ms,
92            metadata: event.metadata.to_string(),
93            timestamp: Utc::now(),
94        }
95    }
96}
97
98#[derive(Debug, Clone)]
99pub struct AnalyticsEvent {
100    pub user_id: UserId,
101    pub session_id: SessionId,
102    pub context_id: ContextId,
103    pub event_type: String,
104    pub event_category: String,
105    pub severity: String,
106    pub endpoint: Option<String>,
107    pub error_code: Option<i32>,
108    pub response_time_ms: Option<i32>,
109    pub agent_id: Option<AgentId>,
110    pub task_id: Option<TaskId>,
111    pub message: Option<String>,
112    pub metadata: Value,
113}