Skip to main content

systemprompt_logging/repository/analytics/
mod.rs

1use chrono::Utc;
2use serde_json::Value;
3use sqlx::PgPool;
4use std::sync::Arc;
5use systemprompt_database::DbPool;
6use systemprompt_identifiers::{AgentId, ContextId, SessionId, TaskId, UserId};
7
8use crate::models::LoggingError;
9
10#[derive(Debug, Clone)]
11pub struct AnalyticsRepository {
12    write_pool: Arc<PgPool>,
13}
14
15impl AnalyticsRepository {
16    pub fn new(db: &DbPool) -> Result<Self, LoggingError> {
17        let write_pool = db.write_pool_arc()?;
18        Ok(Self { write_pool })
19    }
20
21    pub async fn log_event(&self, event: &AnalyticsEvent) -> Result<i64, LoggingError> {
22        let result = execute_insert(&self.write_pool, event).await?;
23        Ok(i64::try_from(result).unwrap_or(i64::MAX))
24    }
25}
26
27async fn execute_insert(pool: &PgPool, event: &AnalyticsEvent) -> Result<u64, LoggingError> {
28    let params = EventParams::from(event);
29    run_insert_query(pool, params).await
30}
31
32async fn run_insert_query(pool: &PgPool, p: EventParams<'_>) -> Result<u64, LoggingError> {
33    let result = sqlx::query!(
34        r"
35        INSERT INTO analytics_events
36        (user_id, session_id, context_id, event_type, event_category, severity,
37         endpoint, error_code, response_time_ms, agent_id, task_id, message, metadata, timestamp)
38        VALUES
39        ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
40        ",
41        p.user_id,
42        p.session_id,
43        p.context_id,
44        p.event_type,
45        p.event_category,
46        p.severity,
47        p.endpoint,
48        p.error_code,
49        p.response_time_ms,
50        p.agent_id,
51        p.task_id,
52        p.message,
53        p.metadata,
54        p.timestamp
55    )
56    .execute(pool)
57    .await?;
58    Ok(result.rows_affected())
59}
60
61struct EventParams<'a> {
62    user_id: &'a str,
63    session_id: &'a str,
64    context_id: &'a str,
65    event_type: &'a str,
66    event_category: &'a str,
67    severity: &'a str,
68    agent_id: Option<&'a str>,
69    task_id: Option<&'a str>,
70    endpoint: Option<&'a str>,
71    message: Option<&'a str>,
72    error_code: Option<i32>,
73    response_time_ms: Option<i32>,
74    metadata: String,
75    timestamp: chrono::DateTime<Utc>,
76}
77
78impl<'a> From<&'a AnalyticsEvent> for EventParams<'a> {
79    fn from(event: &'a AnalyticsEvent) -> Self {
80        Self {
81            user_id: event.user_id.as_str(),
82            session_id: event.session_id.as_str(),
83            context_id: event.context_id.as_str(),
84            event_type: &event.event_type,
85            event_category: &event.event_category,
86            severity: &event.severity,
87            agent_id: event.agent_id.as_ref().map(AgentId::as_str),
88            task_id: event.task_id.as_ref().map(TaskId::as_str),
89            endpoint: event.endpoint.as_deref(),
90            message: event.message.as_deref(),
91            error_code: event.error_code,
92            response_time_ms: event.response_time_ms,
93            metadata: event.metadata.to_string(),
94            timestamp: Utc::now(),
95        }
96    }
97}
98
99#[derive(Debug, Clone)]
100pub struct AnalyticsEvent {
101    pub user_id: UserId,
102    pub session_id: SessionId,
103    pub context_id: ContextId,
104    pub event_type: String,
105    pub event_category: String,
106    pub severity: String,
107    pub endpoint: Option<String>,
108    pub error_code: Option<i32>,
109    pub response_time_ms: Option<i32>,
110    pub agent_id: Option<AgentId>,
111    pub task_id: Option<TaskId>,
112    pub message: Option<String>,
113    pub metadata: Value,
114}