mockforge-registry-server 0.3.129

Plugin registry server for MockForge
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
//! Database connection and models

use anyhow::Result;
use sqlx::{postgres::PgPoolOptions, PgPool};

#[derive(Clone, Debug)]
pub struct Database {
    pool: PgPool,
}

impl Database {
    pub async fn connect(database_url: &str) -> Result<Self> {
        // DATABASE_MAX_CONNECTIONS: Maximum number of database connections in the pool
        // Default: 20
        let max_connections: u32 = std::env::var("DATABASE_MAX_CONNECTIONS")
            .ok()
            .and_then(|s| s.parse().ok())
            .unwrap_or(20);

        let pool = PgPoolOptions::new()
            .max_connections(max_connections)
            .connect(database_url)
            .await?;

        Ok(Self { pool })
    }

    pub async fn migrate(&self) -> Result<()> {
        // Acquire a PostgreSQL advisory lock to prevent concurrent migration runs
        // across multiple replicas. Lock ID 8675309 is an arbitrary but stable identifier.
        const MIGRATION_LOCK_ID: i64 = 8675309;

        tracing::info!("Acquiring advisory lock for database migrations...");
        sqlx::query("SELECT pg_advisory_lock($1)")
            .bind(MIGRATION_LOCK_ID)
            .execute(&self.pool)
            .await?;
        tracing::info!("Advisory lock acquired, running migrations...");

        // Run migrations. Any error aborts boot — `sqlx::migrate!().run()` bails
        // on the *first* inconsistency without applying subsequent pending
        // migrations, so historically-permissive "warn and continue" handling
        // silently skipped everything past the gap and left the DB multiple
        // versions behind without surfacing a single error to operators. We'd
        // rather refuse to boot and have someone repair the `_sqlx_migrations`
        // table than crash-loop a worker that depends on a table that never
        // got created.
        let result =
            sqlx::migrate!("./migrations")
                .run(&self.pool)
                .await
                .map_err(|e| -> anyhow::Error {
                    if e.to_string().contains("previously applied but is missing") {
                        tracing::error!(
                            "sqlx refused to migrate: the DB's `_sqlx_migrations` table has an \
                         applied row whose matching file is missing from the repo. This \
                         blocks ALL subsequent migrations from running. To fix, either \
                         restore the missing file or remove the orphaned tracking row \
                         manually (psql: `DELETE FROM _sqlx_migrations WHERE version = …`). \
                         Full error: {:?}",
                            e
                        );
                    }
                    e.into()
                });

        // Release the advisory lock regardless of migration outcome
        if let Err(unlock_err) = sqlx::query("SELECT pg_advisory_unlock($1)")
            .bind(MIGRATION_LOCK_ID)
            .execute(&self.pool)
            .await
        {
            tracing::error!("Failed to release migration advisory lock: {}", unlock_err);
        } else {
            tracing::info!("Migration advisory lock released");
        }

        result
    }

    pub fn pool(&self) -> &PgPool {
        &self.pool
    }

    /// Get total number of plugins
    pub async fn get_total_plugins(&self) -> Result<i64> {
        let count: (i64,) =
            sqlx::query_as("SELECT COUNT(*) FROM plugins").fetch_one(&self.pool).await?;
        Ok(count.0)
    }

    /// Get total downloads across all plugins
    pub async fn get_total_downloads(&self) -> Result<i64> {
        // downloads_total is NUMERIC in database, so we need to cast it
        let total: (Option<i64>,) =
            sqlx::query_as("SELECT COALESCE(SUM(downloads_total)::BIGINT, 0) FROM plugins")
                .fetch_one(&self.pool)
                .await?;
        Ok(total.0.unwrap_or(0))
    }

    /// Get total number of users
    pub async fn get_total_users(&self) -> Result<i64> {
        let count: (i64,) =
            sqlx::query_as("SELECT COUNT(*) FROM users").fetch_one(&self.pool).await?;
        Ok(count.0)
    }

    // ==================== Token Revocation Functions ====================

    /// Store a refresh token JTI for tracking (called on token creation)
    pub async fn store_refresh_token_jti(
        &self,
        jti: &str,
        user_id: uuid::Uuid,
        expires_at: chrono::DateTime<chrono::Utc>,
    ) -> Result<()> {
        sqlx::query(
            r#"
            INSERT INTO token_revocations (jti, user_id, expires_at)
            VALUES ($1, $2, $3)
            ON CONFLICT (jti) DO NOTHING
            "#,
        )
        .bind(jti)
        .bind(user_id)
        .bind(expires_at)
        .execute(&self.pool)
        .await?;

        Ok(())
    }

    /// Check if a refresh token JTI has been revoked
    pub async fn is_token_revoked(&self, jti: &str) -> Result<bool> {
        let result: Option<(Option<chrono::DateTime<chrono::Utc>>,)> = sqlx::query_as(
            r#"
            SELECT revoked_at FROM token_revocations WHERE jti = $1
            "#,
        )
        .bind(jti)
        .fetch_optional(&self.pool)
        .await?;

        match result {
            // Token found and has a revoked_at timestamp = revoked
            Some((Some(_),)) => Ok(true),
            // Token found but no revoked_at timestamp = not revoked (active)
            Some((None,)) => Ok(false),
            // Token not found = treat as revoked (unknown tokens should be rejected)
            None => Ok(true),
        }
    }

    /// Revoke a refresh token JTI (called on logout or token refresh)
    pub async fn revoke_token(&self, jti: &str, reason: &str) -> Result<()> {
        sqlx::query(
            r#"
            UPDATE token_revocations
            SET revoked_at = NOW(), revocation_reason = $2
            WHERE jti = $1 AND revoked_at IS NULL
            "#,
        )
        .bind(jti)
        .bind(reason)
        .execute(&self.pool)
        .await?;

        Ok(())
    }

    /// Revoke all refresh tokens for a user (called on password change, security events)
    pub async fn revoke_all_user_tokens(&self, user_id: uuid::Uuid, reason: &str) -> Result<u64> {
        let result = sqlx::query(
            r#"
            UPDATE token_revocations
            SET revoked_at = NOW(), revocation_reason = $2
            WHERE user_id = $1 AND revoked_at IS NULL
            "#,
        )
        .bind(user_id)
        .bind(reason)
        .execute(&self.pool)
        .await?;

        Ok(result.rows_affected())
    }

    /// Clean up expired token revocation records (for maintenance)
    pub async fn cleanup_expired_tokens(&self) -> Result<u64> {
        let result = sqlx::query(
            r#"
            DELETE FROM token_revocations
            WHERE expires_at < NOW() - INTERVAL '1 day'
            "#,
        )
        .execute(&self.pool)
        .await?;

        Ok(result.rows_affected())
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_database_clone() {
        // Verify Database implements Clone
        fn requires_clone<T: Clone>() {}
        requires_clone::<Database>();
    }

    #[tokio::test]
    async fn test_database_connect() {
        // This test would require a real Postgres database
        // We can test that the function exists and has the right signature
        let database_url = "postgresql://test:test@localhost/test_db";

        // Attempt to connect (will fail without a real database, which is expected)
        let result = Database::connect(database_url).await;

        // We expect this to fail since we don't have a database running
        assert!(
            result.is_err(),
            "expected connection to fail without a running database, but got: {result:?}"
        );
    }

    #[test]
    fn test_database_pool_type() {
        // Verify that Database has the expected structure
        // This ensures the API surface is correct
        fn check_pool_method(_db: &Database) -> &PgPool {
            _db.pool()
        }

        // Verify the function has the expected signature (compile-time check)
        let _: fn(&Database) -> &PgPool = check_pool_method;
    }

    // Mock test to verify query structures
    #[test]
    fn test_total_plugins_query_structure() {
        let query = "SELECT COUNT(*) FROM plugins";

        // Verify basic query structure
        assert!(query.contains("SELECT"));
        assert!(query.contains("COUNT(*)"));
        assert!(query.contains("FROM plugins"));
    }

    #[test]
    fn test_total_downloads_query_structure() {
        let query = "SELECT COALESCE(SUM(downloads_total)::BIGINT, 0) FROM plugins";

        // Verify query structure
        assert!(query.contains("SELECT"));
        assert!(query.contains("COALESCE"));
        assert!(query.contains("SUM(downloads_total)"));
        assert!(query.contains("FROM plugins"));
        assert!(query.contains("::BIGINT"));
    }

    #[test]
    fn test_total_users_query_structure() {
        let query = "SELECT COUNT(*) FROM users";

        // Verify basic query structure
        assert!(query.contains("SELECT"));
        assert!(query.contains("COUNT(*)"));
        assert!(query.contains("FROM users"));
    }

    #[test]
    fn test_migration_error_handling() {
        // Verify the migration error message patterns
        let error_msg = "previously applied but is missing";

        assert!(error_msg.contains("previously applied"));
        assert!(error_msg.contains("missing"));
    }

    // Integration-style tests (require database, so we make them conditional)
    // These will be skipped in normal test runs but can be run with a test database

    #[tokio::test]
    #[ignore] // Requires database
    async fn test_database_migration() {
        // This test requires a real Postgres database
        let database_url = std::env::var("TEST_DATABASE_URL")
            .unwrap_or_else(|_| "postgresql://test:test@localhost/test_db".to_string());

        if let Ok(db) = Database::connect(&database_url).await {
            let result = db.migrate().await;
            // Migration should either succeed or fail gracefully
            assert!(result.is_ok() || result.is_err());
        }
    }

    #[tokio::test]
    #[ignore] // Requires database
    async fn test_get_total_plugins() {
        let database_url = std::env::var("TEST_DATABASE_URL")
            .unwrap_or_else(|_| "postgresql://test:test@localhost/test_db".to_string());

        if let Ok(db) = Database::connect(&database_url).await {
            let _ = db.migrate().await;

            let result = db.get_total_plugins().await;
            if let Ok(count) = result {
                assert!(count >= 0);
            }
        }
    }

    #[tokio::test]
    #[ignore] // Requires database
    async fn test_get_total_downloads() {
        let database_url = std::env::var("TEST_DATABASE_URL")
            .unwrap_or_else(|_| "postgresql://test:test@localhost/test_db".to_string());

        if let Ok(db) = Database::connect(&database_url).await {
            let _ = db.migrate().await;

            let result = db.get_total_downloads().await;
            if let Ok(count) = result {
                assert!(count >= 0);
            }
        }
    }

    #[tokio::test]
    #[ignore] // Requires database
    async fn test_get_total_users() {
        let database_url = std::env::var("TEST_DATABASE_URL")
            .unwrap_or_else(|_| "postgresql://test:test@localhost/test_db".to_string());

        if let Ok(db) = Database::connect(&database_url).await {
            let _ = db.migrate().await;

            let result = db.get_total_users().await;
            if let Ok(count) = result {
                assert!(count >= 0);
            }
        }
    }

    #[tokio::test]
    #[ignore] // Requires database
    async fn test_pool_reuse() {
        let database_url = std::env::var("TEST_DATABASE_URL")
            .unwrap_or_else(|_| "postgresql://test:test@localhost/test_db".to_string());

        if let Ok(db) = Database::connect(&database_url).await {
            // Get pool reference multiple times
            let pool1 = db.pool();
            let pool2 = db.pool();

            // Should return the same pool
            assert!(std::ptr::eq(pool1, pool2));
        }
    }

    #[test]
    fn test_database_connection_string_validation() {
        // Test various database URL formats
        let valid_urls = vec![
            "postgresql://user:pass@localhost/db",
            "postgresql://user:pass@localhost:5432/db",
            "postgresql://localhost/db",
            "postgres://user:pass@host:5432/database?sslmode=require",
        ];

        for url in valid_urls {
            assert!(url.starts_with("postgres"));
            assert!(url.contains("://"));
        }
    }

    #[test]
    fn test_max_connections_config() {
        // Verify the default max_connections value is reasonable
        let max_connections = 20; // Default value from DATABASE_MAX_CONNECTIONS env var

        assert!(max_connections > 0);
        assert!(max_connections <= 100); // Reasonable upper bound
    }

    #[tokio::test]
    #[ignore] // Requires database
    async fn test_migration_idempotency() {
        let database_url = std::env::var("TEST_DATABASE_URL")
            .unwrap_or_else(|_| "postgresql://test:test@localhost/test_db".to_string());

        if let Ok(db) = Database::connect(&database_url).await {
            // Run migrations twice
            let result1 = db.migrate().await;
            let result2 = db.migrate().await;

            // Both should succeed (migrations are idempotent)
            // Or both should handle the "already applied" case gracefully
            assert!(result1.is_ok() || result1.is_err());
            assert!(result2.is_ok() || result2.is_err());
        }
    }

    #[test]
    fn test_query_return_types() {
        // Verify that query return types are correct
        // This is a compile-time check that the types match expectations

        fn check_total_plugins_type(_: i64) {}
        fn check_total_downloads_type(_: i64) {}
        fn check_total_users_type(_: i64) {}

        // Verify the functions accept i64 (compile-time type check)
        let _: fn(i64) = check_total_plugins_type;
        let _: fn(i64) = check_total_downloads_type;
        let _: fn(i64) = check_total_users_type;
    }

    #[test]
    fn test_database_error_types() {
        // Verify error types are appropriate
        use anyhow::Result;

        fn returns_result() -> Result<()> {
            Ok(())
        }

        let result = returns_result();
        assert!(result.is_ok());
    }
}