Skip to main content

systemprompt_api/routes/analytics/
events.rs

1use axum::extract::{Extension, State};
2use axum::http::StatusCode;
3use axum::response::IntoResponse;
4use axum::Json;
5use std::sync::Arc;
6
7use systemprompt_analytics::{
8    AnalyticsEventBatchResponse, AnalyticsEventType, AnalyticsEventsRepository,
9    CreateAnalyticsEventBatchInput, CreateAnalyticsEventInput, CreateEngagementEventInput,
10    EngagementOptionalMetrics, EngagementRepository,
11};
12use systemprompt_content::ContentRepository;
13use systemprompt_identifiers::ContentId;
14use systemprompt_models::api::ApiError;
15use systemprompt_models::execution::context::RequestContext;
16use systemprompt_models::ContentRouting;
17
18#[derive(Clone)]
19pub struct AnalyticsState {
20    pub events: Arc<AnalyticsEventsRepository>,
21    pub content: Arc<ContentRepository>,
22    pub engagement: Arc<EngagementRepository>,
23    pub content_routing: Option<Arc<dyn ContentRouting>>,
24}
25
26impl std::fmt::Debug for AnalyticsState {
27    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28        f.debug_struct("AnalyticsState")
29            .field("content_routing", &self.content_routing.is_some())
30            .finish()
31    }
32}
33
34async fn resolve_content_id(
35    content_repo: &ContentRepository,
36    content_routing: Option<&dyn ContentRouting>,
37    page_url: &str,
38    slug: Option<&str>,
39) -> Option<ContentId> {
40    let resolved_slug;
41    let slug_to_use = if let Some(s) = slug {
42        s
43    } else {
44        resolved_slug = content_routing.and_then(|r| r.resolve_slug(page_url))?;
45        &resolved_slug
46    };
47
48    content_repo
49        .get_by_slug(slug_to_use)
50        .await
51        .map_err(|e| {
52            tracing::warn!(error = %e, slug = %slug_to_use, "Failed to lookup content by slug");
53            e
54        })
55        .ok()
56        .flatten()
57        .map(|c| c.id)
58}
59
60pub async fn record_event(
61    State(state): State<AnalyticsState>,
62    Extension(req_ctx): Extension<RequestContext>,
63    Json(mut input): Json<CreateAnalyticsEventInput>,
64) -> Result<impl IntoResponse, ApiError> {
65    if input.content_id.is_none() {
66        input.content_id =
67            resolve_content_id(
68                &state.content,
69                state.content_routing.as_deref(),
70                &input.page_url,
71                input.slug.as_deref(),
72            )
73            .await;
74    }
75
76    let created = state
77        .events
78        .create_event(
79            req_ctx.session_id().as_str(),
80            req_ctx.user_id().as_str(),
81            &input,
82        )
83        .await
84        .map_err(|e| {
85            tracing::error!(error = %e, "Failed to record analytics event");
86            ApiError::internal_error("Failed to record analytics event")
87        })?;
88
89    if input.event_type == AnalyticsEventType::PageExit {
90        fan_out_engagement(&state, &req_ctx, &input).await;
91    }
92
93    Ok((StatusCode::CREATED, Json(created)))
94}
95
96pub async fn record_events_batch(
97    State(state): State<AnalyticsState>,
98    Extension(req_ctx): Extension<RequestContext>,
99    Json(mut input): Json<CreateAnalyticsEventBatchInput>,
100) -> Result<impl IntoResponse, ApiError> {
101    for event in &mut input.events {
102        if event.content_id.is_none() {
103            event.content_id =
104                resolve_content_id(&state.content, state.content_routing.as_deref(), &event.page_url, event.slug.as_deref()).await;
105        }
106    }
107
108    let created = state
109        .events
110        .create_events_batch(
111            req_ctx.session_id().as_str(),
112            req_ctx.user_id().as_str(),
113            &input.events,
114        )
115        .await
116        .map_err(|e| {
117            tracing::error!(error = %e, "Failed to record analytics events batch");
118            ApiError::internal_error("Failed to record analytics events")
119        })?;
120
121    for event in &input.events {
122        if event.event_type == AnalyticsEventType::PageExit {
123            fan_out_engagement(&state, &req_ctx, event).await;
124        }
125    }
126
127    Ok((
128        StatusCode::CREATED,
129        Json(AnalyticsEventBatchResponse {
130            recorded: created.len(),
131            events: created,
132        }),
133    ))
134}
135
136async fn fan_out_engagement(
137    state: &AnalyticsState,
138    req_ctx: &RequestContext,
139    input: &CreateAnalyticsEventInput,
140) {
141    let Some(ref data) = input.data else { return };
142
143    let get_i32 =
144        |key: &str| -> Option<i32> { data.get(key).and_then(|v| v.as_i64()).map(|v| v as i32) };
145    let get_f32 =
146        |key: &str| -> Option<f32> { data.get(key).and_then(|v| v.as_f64()).map(|v| v as f32) };
147    let get_bool = |key: &str| -> Option<bool> { data.get(key).and_then(|v| v.as_bool()) };
148    let get_string =
149        |key: &str| -> Option<String> { data.get(key).and_then(|v| v.as_str()).map(String::from) };
150
151    let engagement_input = CreateEngagementEventInput {
152        page_url: input.page_url.clone(),
153        event_type: input.event_type.as_str().to_string(),
154        time_on_page_ms: get_i32("time_on_page_ms").unwrap_or(0),
155        max_scroll_depth: get_i32("max_scroll_depth").unwrap_or(0),
156        click_count: get_i32("click_count").unwrap_or(0),
157        optional_metrics: EngagementOptionalMetrics {
158            time_to_first_interaction_ms: get_i32("time_to_first_interaction_ms"),
159            time_to_first_scroll_ms: get_i32("time_to_first_scroll_ms"),
160            scroll_velocity_avg: get_f32("scroll_velocity_avg"),
161            scroll_direction_changes: get_i32("scroll_direction_changes"),
162            mouse_move_distance_px: get_i32("mouse_move_distance_px"),
163            keyboard_events: get_i32("keyboard_events"),
164            copy_events: get_i32("copy_events"),
165            focus_time_ms: get_i32("focus_time_ms"),
166            blur_count: get_i32("blur_count"),
167            tab_switches: get_i32("tab_switches"),
168            visible_time_ms: get_i32("visible_time_ms"),
169            hidden_time_ms: get_i32("hidden_time_ms"),
170            is_rage_click: get_bool("is_rage_click"),
171            is_dead_click: get_bool("is_dead_click"),
172            reading_pattern: get_string("reading_pattern"),
173        },
174    };
175
176    let content_id = resolve_content_id(
177        &state.content,
178        state.content_routing.as_deref(),
179        &input.page_url,
180        input.slug.as_deref(),
181    )
182    .await;
183
184    if let Err(e) = state
185        .engagement
186        .create_engagement(
187            req_ctx.session_id().as_str(),
188            req_ctx.user_id().as_str(),
189            content_id.as_ref(),
190            &engagement_input,
191        )
192        .await
193    {
194        tracing::warn!(error = %e, "Failed to fan out engagement data from page_exit event");
195    }
196}