mockforge-registry-core 0.3.128

Shared domain models, storage abstractions, and OSS-safe handlers for MockForge's registry backends (SaaS Postgres + OSS SQLite admin UI).
Documentation
//! Feature usage tracking model
//!
//! Tracks when specific features are used by organizations for analytics

use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::FromRow;
use uuid::Uuid;

/// Feature types that can be tracked
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, sqlx::Type)]
#[sqlx(type_name = "feature_type", rename_all = "snake_case")]
pub enum FeatureType {
    HostedMockDeploy,
    HostedMockRequest,
    PluginPublish,
    PluginInstall,
    TemplatePublish,
    TemplateInstall,
    ScenarioPublish,
    ScenarioInstall,
    ApiTokenCreate,
    ApiTokenUse,
    BillingCheckout,
    BillingUpgrade,
    BillingDowngrade,
    OrgCreate,
    OrgInvite,
    MarketplaceSearch,
    MarketplaceDownload,
    FederationCreate,
    FederationUpdate,
    FederationDelete,
    FederationScenarioActivate,
    FederationScenarioDeactivate,
    WorkspaceCreate,
    WorkspaceUpdate,
    WorkspaceDelete,
    ServiceCreate,
    ServiceUpdate,
    ServiceDelete,
    FixtureCreate,
    FixtureUpdate,
    FixtureDelete,
    // Cloud Plugins (Phase 1) — see migration
    // 20250101000074_cloud_plugin_attachments.sql.
    PluginAttach,
    PluginDetach,
    /// Wall-time accumulator populated by the OTLP pipeline from the
    /// `mockforge-plugin-loader` invocation metrics bus.
    PluginInvokeMs,
}

/// Feature usage event
#[derive(Debug, Clone, FromRow, Serialize, Deserialize)]
pub struct FeatureUsage {
    pub id: Uuid,
    pub org_id: Uuid,
    pub user_id: Option<Uuid>,
    pub feature: FeatureType,
    pub metadata: Option<serde_json::Value>, // Additional context (e.g., plugin name, deployment ID)
    pub created_at: DateTime<Utc>,
}

#[cfg(feature = "postgres")]
impl FeatureUsage {
    /// Record a feature usage event
    pub async fn record(
        pool: &sqlx::PgPool,
        org_id: Uuid,
        user_id: Option<Uuid>,
        feature: FeatureType,
        metadata: Option<serde_json::Value>,
    ) -> sqlx::Result<()> {
        sqlx::query(
            r#"
            INSERT INTO feature_usage (org_id, user_id, feature, metadata)
            VALUES ($1, $2, $3, $4)
            "#,
        )
        .bind(org_id)
        .bind(user_id)
        .bind(feature)
        .bind(metadata)
        .execute(pool)
        .await?;
        Ok(())
    }

    /// Count feature usage for an org in a time period
    pub async fn count_by_org(
        pool: &sqlx::PgPool,
        org_id: Uuid,
        feature: FeatureType,
        days: i64,
    ) -> sqlx::Result<i64> {
        let since = Utc::now() - chrono::Duration::days(days);
        let count: (i64,) = sqlx::query_as(
            r#"
            SELECT COUNT(*) FROM feature_usage
            WHERE org_id = $1 AND feature = $2 AND created_at > $3
            "#,
        )
        .bind(org_id)
        .bind(feature)
        .bind(since)
        .fetch_one(pool)
        .await?;
        Ok(count.0)
    }

    /// Get feature usage stats across all orgs
    pub async fn get_global_stats(
        pool: &sqlx::PgPool,
        feature: FeatureType,
        days: i64,
    ) -> sqlx::Result<(i64, i64)> {
        let since = Utc::now() - chrono::Duration::days(days);
        let stats: (i64, i64) = sqlx::query_as(
            r#"
            SELECT
                COUNT(*) as total,
                COUNT(DISTINCT org_id) as unique_orgs
            FROM feature_usage
            WHERE feature = $1 AND created_at > $2
            "#,
        )
        .bind(feature)
        .bind(since)
        .fetch_one(pool)
        .await?;
        Ok(stats)
    }

    /// Get feature adoption timeline (daily counts)
    pub async fn get_adoption_timeline(
        pool: &sqlx::PgPool,
        feature: FeatureType,
        days: i64,
    ) -> sqlx::Result<Vec<(chrono::NaiveDate, i64)>> {
        let since = Utc::now() - chrono::Duration::days(days);
        let timeline = sqlx::query_as::<_, (chrono::NaiveDate, i64)>(
            r#"
            SELECT
                DATE(created_at) as date,
                COUNT(*) as count
            FROM feature_usage
            WHERE feature = $1 AND created_at > $2
            GROUP BY DATE(created_at)
            ORDER BY date ASC
            "#,
        )
        .bind(feature)
        .bind(since)
        .fetch_all(pool)
        .await?;
        Ok(timeline)
    }

    /// Clean up old feature usage events (older than N days)
    pub async fn cleanup_old(pool: &sqlx::PgPool, days: i64) -> sqlx::Result<u64> {
        let cutoff = Utc::now() - chrono::Duration::days(days);
        let result = sqlx::query("DELETE FROM feature_usage WHERE created_at < $1")
            .bind(cutoff)
            .execute(pool)
            .await?;
        Ok(result.rows_affected())
    }

    /// Aggregate `plugin_invoke_ms` rows for a deployment within a time
    /// window, grouped by attachment.
    ///
    /// Each `plugin_invoke_ms` row is a bucket emitted by the OTLP
    /// aggregator (see migration `20250101000074`); its `metadata` JSONB
    /// is expected to carry:
    ///
    /// ```json
    /// {
    ///   "deployment_id":   "uuid",
    ///   "attachment_id":   "uuid",
    ///   "plugin_id":       "uuid",
    ///   "plugin_name":     "string",
    ///   "plugin_version":  "string",
    ///   "invoke_ms":       12345,
    ///   "memory_peak_mb":  42        // optional
    /// }
    /// ```
    ///
    /// SUMs `invoke_ms`, MAXes `memory_peak_mb` (peak across buckets),
    /// and ORDERs by total invoke_ms descending so the heaviest plugin
    /// surfaces first in the UI. Returns an empty Vec when the OTLP
    /// pipeline hasn't populated any rows yet.
    pub async fn aggregate_plugin_invoke_ms_by_deployment(
        pool: &sqlx::PgPool,
        org_id: Uuid,
        deployment_id: Uuid,
        since: DateTime<Utc>,
    ) -> sqlx::Result<Vec<PluginInvokeAggregateRow>> {
        sqlx::query_as::<_, PluginInvokeAggregateRow>(
            r#"
            SELECT
                metadata->>'attachment_id'                                AS attachment_id,
                MAX(metadata->>'plugin_name')                             AS plugin_name,
                MAX(metadata->>'plugin_version')                          AS plugin_version,
                COALESCE(SUM((metadata->>'invoke_ms')::bigint), 0)::bigint AS invoke_ms,
                MAX(NULLIF(metadata->>'memory_peak_mb', '')::bigint)      AS memory_peak_mb
            FROM feature_usage
            WHERE feature = 'plugin_invoke_ms'
              AND org_id = $1
              AND metadata->>'deployment_id' = $2::text
              AND created_at >= $3
              AND metadata ? 'attachment_id'
            GROUP BY metadata->>'attachment_id'
            ORDER BY invoke_ms DESC
            "#,
        )
        .bind(org_id)
        .bind(deployment_id)
        .bind(since)
        .fetch_all(pool)
        .await
    }
}

/// One row of the per-attachment plugin-invoke aggregate. Strings are
/// `Option<String>` because the metadata fields are text-extracted from
/// JSONB (`->>`), which yields NULL when the key is absent rather than
/// erroring — we'd rather degrade gracefully than reject the whole
/// query if a single row was misshapen.
#[cfg(feature = "postgres")]
#[derive(Debug, Clone, FromRow)]
pub struct PluginInvokeAggregateRow {
    pub attachment_id: Option<String>,
    pub plugin_name: Option<String>,
    pub plugin_version: Option<String>,
    pub invoke_ms: i64,
    pub memory_peak_mb: Option<i64>,
}