systemprompt-content 0.1.21

Content module for systemprompt.io with content management, analytics, and event tracking
Documentation
use crate::error::ContentError;
use crate::models::{
    CampaignPerformance, ContentJourneyNode, LinkClick, LinkPerformance, RecordClickParams,
};
use sqlx::PgPool;
use std::sync::Arc;
use systemprompt_database::DbPool;
use systemprompt_identifiers::{
    CampaignId, ContentId, ContextId, LinkClickId, LinkId, SessionId, TaskId, UserId,
};

#[derive(Debug)]
pub struct LinkAnalyticsRepository {
    pool: Arc<PgPool>,
    write_pool: Arc<PgPool>,
}

impl LinkAnalyticsRepository {
    pub fn new(db: &DbPool) -> Result<Self, ContentError> {
        let pool = db
            .pool_arc()
            .map_err(|e| ContentError::InvalidRequest(format!("Database pool error: {e}")))?;
        let write_pool = db
            .write_pool_arc()
            .map_err(|e| ContentError::InvalidRequest(format!("Database write pool error: {e}")))?;
        Ok(Self { pool, write_pool })
    }

    pub async fn get_link_performance(
        &self,
        link_id: &LinkId,
    ) -> Result<Option<LinkPerformance>, sqlx::Error> {
        sqlx::query_as!(
            LinkPerformance,
            r#"
            SELECT
                l.id as "link_id: LinkId",
                COALESCE(l.click_count, 0)::bigint as "click_count!",
                COALESCE(l.unique_click_count, 0)::bigint as "unique_click_count!",
                COALESCE(l.conversion_count, 0)::bigint as "conversion_count!",
                CASE
                    WHEN COALESCE(l.click_count, 0) > 0 THEN
                        COALESCE(l.conversion_count, 0)::float / l.click_count
                    ELSE 0.0
                END as conversion_rate
            FROM campaign_links l
            WHERE l.id = $1
            "#,
            link_id.as_str()
        )
        .fetch_optional(&*self.pool)
        .await
    }

    pub async fn check_session_clicked_link(
        &self,
        link_id: &LinkId,
        session_id: &SessionId,
    ) -> Result<bool, sqlx::Error> {
        let result = sqlx::query!(
            r#"SELECT COALESCE(COUNT(*), 0)::bigint as "count!" FROM link_clicks WHERE link_id = $1 AND session_id = $2"#,
            link_id.as_str(),
            session_id.as_str()
        )
        .fetch_one(&*self.pool)
        .await?;

        Ok(result.count > 0)
    }

    pub async fn increment_link_clicks(
        &self,
        link_id: &LinkId,
        is_first_click: bool,
    ) -> Result<(), sqlx::Error> {
        if is_first_click {
            sqlx::query!(
                "UPDATE campaign_links SET click_count = click_count + 1, unique_click_count = \
                 unique_click_count + 1 WHERE id = $1",
                link_id.as_str()
            )
            .execute(&*self.write_pool)
            .await?;
        } else {
            sqlx::query!(
                "UPDATE campaign_links SET click_count = click_count + 1 WHERE id = $1",
                link_id.as_str()
            )
            .execute(&*self.write_pool)
            .await?;
        }
        Ok(())
    }

    pub async fn get_clicks_by_link(
        &self,
        link_id: &LinkId,
        limit: i64,
        offset: i64,
    ) -> Result<Vec<LinkClick>, sqlx::Error> {
        sqlx::query_as!(
            LinkClick,
            r#"
            SELECT id as "id: LinkClickId", link_id as "link_id: LinkId",
                   session_id as "session_id: SessionId", user_id as "user_id: UserId",
                   context_id as "context_id: ContextId", task_id as "task_id: TaskId",
                   referrer_page, referrer_url, clicked_at, user_agent, ip_address,
                   device_type, country, is_first_click, is_conversion, conversion_at,
                   time_on_page_seconds, scroll_depth_percent
            FROM link_clicks
            WHERE link_id = $1
            ORDER BY clicked_at DESC
            LIMIT $2 OFFSET $3
            "#,
            link_id.as_str(),
            limit,
            offset
        )
        .fetch_all(&*self.pool)
        .await
    }

    pub async fn get_content_journey_map(
        &self,
        limit: i64,
        offset: i64,
    ) -> Result<Vec<ContentJourneyNode>, sqlx::Error> {
        let rows = sqlx::query!(
            r#"
            SELECT source_content_id, target_url, COALESCE(click_count, 0) as "click_count!"
            FROM campaign_links
            WHERE source_content_id IS NOT NULL AND click_count > 0
            ORDER BY click_count DESC
            LIMIT $1 OFFSET $2
            "#,
            limit,
            offset
        )
        .fetch_all(&*self.pool)
        .await?;

        Ok(rows
            .into_iter()
            .filter_map(|r| {
                Some(ContentJourneyNode {
                    source_content_id: ContentId::new(r.source_content_id?),
                    target_url: r.target_url,
                    click_count: r.click_count,
                })
            })
            .collect())
    }

    pub async fn get_campaign_performance(
        &self,
        campaign_id: &CampaignId,
    ) -> Result<Option<CampaignPerformance>, sqlx::Error> {
        sqlx::query_as!(
            CampaignPerformance,
            r#"
            SELECT
                campaign_id as "campaign_id!: CampaignId",
                COALESCE(SUM(click_count), 0)::bigint as "total_clicks!",
                COUNT(*)::bigint as "link_count!",
                COUNT(DISTINCT source_content_id) as unique_visitors,
                COALESCE(SUM(conversion_count), 0)::bigint as conversion_count
            FROM campaign_links
            WHERE campaign_id = $1
            GROUP BY campaign_id
            "#,
            campaign_id.as_str()
        )
        .fetch_optional(&*self.pool)
        .await
    }

    pub async fn record_click(&self, params: &RecordClickParams) -> Result<(), sqlx::Error> {
        sqlx::query!(
            r#"
            INSERT INTO link_clicks (
                id, link_id, session_id, user_id, context_id, task_id,
                referrer_page, referrer_url, clicked_at, user_agent, ip_address,
                device_type, country, is_first_click, is_conversion
            )
            VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
            "#,
            params.click_id.as_str(),
            params.link_id.as_str(),
            params.session_id.as_str(),
            params.user_id.as_ref().map(UserId::as_str),
            params.context_id.as_ref().map(ContextId::as_str),
            params.task_id.as_ref().map(TaskId::as_str),
            params.referrer_page,
            params.referrer_url,
            params.clicked_at,
            params.user_agent,
            params.ip_address,
            params.device_type,
            params.country,
            params.is_first_click,
            params.is_conversion
        )
        .execute(&*self.write_pool)
        .await?;
        Ok(())
    }
}