1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
//! Feature usage tracking model
//!
//! Tracks when specific features are used by organizations for analytics
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::FromRow;
use uuid::Uuid;
/// Feature types that can be tracked
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, sqlx::Type)]
#[sqlx(type_name = "feature_type", rename_all = "snake_case")]
pub enum FeatureType {
HostedMockDeploy,
HostedMockRequest,
PluginPublish,
PluginInstall,
TemplatePublish,
TemplateInstall,
ScenarioPublish,
ScenarioInstall,
ApiTokenCreate,
ApiTokenUse,
BillingCheckout,
BillingUpgrade,
BillingDowngrade,
OrgCreate,
OrgInvite,
MarketplaceSearch,
MarketplaceDownload,
FederationCreate,
FederationUpdate,
FederationDelete,
FederationScenarioActivate,
FederationScenarioDeactivate,
WorkspaceCreate,
WorkspaceUpdate,
WorkspaceDelete,
ServiceCreate,
ServiceUpdate,
ServiceDelete,
FixtureCreate,
FixtureUpdate,
FixtureDelete,
// Cloud Plugins (Phase 1) — see migration
// 20250101000074_cloud_plugin_attachments.sql.
PluginAttach,
PluginDetach,
/// Wall-time accumulator populated by the OTLP pipeline from the
/// `mockforge-plugin-loader` invocation metrics bus.
PluginInvokeMs,
}
/// Feature usage event
#[derive(Debug, Clone, FromRow, Serialize, Deserialize)]
pub struct FeatureUsage {
pub id: Uuid,
pub org_id: Uuid,
pub user_id: Option<Uuid>,
pub feature: FeatureType,
pub metadata: Option<serde_json::Value>, // Additional context (e.g., plugin name, deployment ID)
pub created_at: DateTime<Utc>,
}
#[cfg(feature = "postgres")]
impl FeatureUsage {
/// Record a feature usage event
pub async fn record(
pool: &sqlx::PgPool,
org_id: Uuid,
user_id: Option<Uuid>,
feature: FeatureType,
metadata: Option<serde_json::Value>,
) -> sqlx::Result<()> {
sqlx::query(
r#"
INSERT INTO feature_usage (org_id, user_id, feature, metadata)
VALUES ($1, $2, $3, $4)
"#,
)
.bind(org_id)
.bind(user_id)
.bind(feature)
.bind(metadata)
.execute(pool)
.await?;
Ok(())
}
/// Count feature usage for an org in a time period
pub async fn count_by_org(
pool: &sqlx::PgPool,
org_id: Uuid,
feature: FeatureType,
days: i64,
) -> sqlx::Result<i64> {
let since = Utc::now() - chrono::Duration::days(days);
let count: (i64,) = sqlx::query_as(
r#"
SELECT COUNT(*) FROM feature_usage
WHERE org_id = $1 AND feature = $2 AND created_at > $3
"#,
)
.bind(org_id)
.bind(feature)
.bind(since)
.fetch_one(pool)
.await?;
Ok(count.0)
}
/// Get feature usage stats across all orgs
pub async fn get_global_stats(
pool: &sqlx::PgPool,
feature: FeatureType,
days: i64,
) -> sqlx::Result<(i64, i64)> {
let since = Utc::now() - chrono::Duration::days(days);
let stats: (i64, i64) = sqlx::query_as(
r#"
SELECT
COUNT(*) as total,
COUNT(DISTINCT org_id) as unique_orgs
FROM feature_usage
WHERE feature = $1 AND created_at > $2
"#,
)
.bind(feature)
.bind(since)
.fetch_one(pool)
.await?;
Ok(stats)
}
/// Get feature adoption timeline (daily counts)
pub async fn get_adoption_timeline(
pool: &sqlx::PgPool,
feature: FeatureType,
days: i64,
) -> sqlx::Result<Vec<(chrono::NaiveDate, i64)>> {
let since = Utc::now() - chrono::Duration::days(days);
let timeline = sqlx::query_as::<_, (chrono::NaiveDate, i64)>(
r#"
SELECT
DATE(created_at) as date,
COUNT(*) as count
FROM feature_usage
WHERE feature = $1 AND created_at > $2
GROUP BY DATE(created_at)
ORDER BY date ASC
"#,
)
.bind(feature)
.bind(since)
.fetch_all(pool)
.await?;
Ok(timeline)
}
/// Clean up old feature usage events (older than N days)
pub async fn cleanup_old(pool: &sqlx::PgPool, days: i64) -> sqlx::Result<u64> {
let cutoff = Utc::now() - chrono::Duration::days(days);
let result = sqlx::query("DELETE FROM feature_usage WHERE created_at < $1")
.bind(cutoff)
.execute(pool)
.await?;
Ok(result.rows_affected())
}
/// Aggregate `plugin_invoke_ms` rows for a deployment within a time
/// window, grouped by attachment.
///
/// Each `plugin_invoke_ms` row is a bucket emitted by the OTLP
/// aggregator (see migration `20250101000074`); its `metadata` JSONB
/// is expected to carry:
///
/// ```json
/// {
/// "deployment_id": "uuid",
/// "attachment_id": "uuid",
/// "plugin_id": "uuid",
/// "plugin_name": "string",
/// "plugin_version": "string",
/// "invoke_ms": 12345,
/// "memory_peak_mb": 42 // optional
/// }
/// ```
///
/// SUMs `invoke_ms`, MAXes `memory_peak_mb` (peak across buckets),
/// and ORDERs by total invoke_ms descending so the heaviest plugin
/// surfaces first in the UI. Returns an empty Vec when the OTLP
/// pipeline hasn't populated any rows yet.
pub async fn aggregate_plugin_invoke_ms_by_deployment(
pool: &sqlx::PgPool,
org_id: Uuid,
deployment_id: Uuid,
since: DateTime<Utc>,
) -> sqlx::Result<Vec<PluginInvokeAggregateRow>> {
sqlx::query_as::<_, PluginInvokeAggregateRow>(
r#"
SELECT
metadata->>'attachment_id' AS attachment_id,
MAX(metadata->>'plugin_name') AS plugin_name,
MAX(metadata->>'plugin_version') AS plugin_version,
COALESCE(SUM((metadata->>'invoke_ms')::bigint), 0)::bigint AS invoke_ms,
MAX(NULLIF(metadata->>'memory_peak_mb', '')::bigint) AS memory_peak_mb
FROM feature_usage
WHERE feature = 'plugin_invoke_ms'
AND org_id = $1
AND metadata->>'deployment_id' = $2::text
AND created_at >= $3
AND metadata ? 'attachment_id'
GROUP BY metadata->>'attachment_id'
ORDER BY invoke_ms DESC
"#,
)
.bind(org_id)
.bind(deployment_id)
.bind(since)
.fetch_all(pool)
.await
}
}
/// One row of the per-attachment plugin-invoke aggregate. Strings are
/// `Option<String>` because the metadata fields are text-extracted from
/// JSONB (`->>`), which yields NULL when the key is absent rather than
/// erroring — we'd rather degrade gracefully than reject the whole
/// query if a single row was misshapen.
#[cfg(feature = "postgres")]
#[derive(Debug, Clone, FromRow)]
pub struct PluginInvokeAggregateRow {
pub attachment_id: Option<String>,
pub plugin_name: Option<String>,
pub plugin_version: Option<String>,
pub invoke_ms: i64,
pub memory_peak_mb: Option<i64>,
}