Skip to main content

nostr_bbs_relay_worker/
lib.rs

1//! nostr-bbs Relay Worker (Rust)
2//!
3//! Cloudflare Workers-based private Nostr relay with:
4//! - WebSocket NIP-01 protocol via Durable Objects
5//! - D1-backed event storage + whitelist
6//! - NIP-98 authenticated admin endpoints
7//! - Whitelist/cohort management API
8//! - NIP-11 relay information document
9//! - NIP-16/33 replaceable events
10//!
11//! ## Architecture
12//!
13//! - `lib.rs` -- HTTP router, CORS, entry point
14//! - `relay_do.rs` -- Durable Object: WebSocket relay, NIP-01 message handling
15//! - `nip11.rs` -- NIP-11 relay information document
16//! - `whitelist.rs` -- Whitelist management HTTP handlers
17//! - `auth.rs` -- NIP-98 admin verification wrapper
18
19// Worker entry points are invoked via wasm-bindgen and appear unused in native builds.
20#![allow(dead_code)]
21
22mod audit;
23mod auth;
24mod cron;
25mod moderation;
26mod nip11;
27mod profiles;
28mod relay_do;
29mod trust;
30mod whitelist;
31
32/// Re-export so the `worker` crate runtime can discover the Durable Object.
33pub use relay_do::NostrRelayDO;
34
35// Test-only public API for integration tests (Sprint v9 Stream-E2).
36// Activated by `--features test-exports` for tests in `tests/`.
37#[cfg(feature = "test-exports")]
38pub mod test_exports {
39    pub use crate::relay_do::test_exports::*;
40    pub use crate::trust::{compute_trust_level, TrustLevel, TrustThresholds};
41}
42
43use worker::*;
44
45// ---------------------------------------------------------------------------
46// CORS
47// ---------------------------------------------------------------------------
48
49/// Build allowed origins list from `ALLOWED_ORIGINS` env var (comma-separated)
50/// or fall back to the production domain.
51fn allowed_origins(env: &Env) -> Vec<String> {
52    env.var("ALLOWED_ORIGINS")
53        .map(|v| v.to_string())
54        .unwrap_or_else(|_| "https://example.com".to_string())
55        .split(',')
56        .map(|s| s.trim().to_string())
57        .collect()
58}
59
60/// Determine the allowed CORS origin for a request.
61///
62/// If the request's `Origin` header matches one of the allowed origins, that
63/// origin is returned. Otherwise falls back to the first allowed origin.
64fn cors_origin(req: &Request, env: &Env) -> String {
65    let origins = allowed_origins(env);
66    let origin = req
67        .headers()
68        .get("Origin")
69        .ok()
70        .flatten()
71        .unwrap_or_default();
72    if origins.iter().any(|o| o == &origin) {
73        origin
74    } else {
75        origins
76            .into_iter()
77            .next()
78            .unwrap_or_else(|| "https://example.com".to_string())
79    }
80}
81
82/// Build CORS response headers.
83fn cors_headers(req: &Request, env: &Env) -> Headers {
84    let headers = Headers::new();
85    headers
86        .set("Access-Control-Allow-Origin", &cors_origin(req, env))
87        .ok();
88    headers
89        .set("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
90        .ok();
91    headers
92        .set(
93            "Access-Control-Allow-Headers",
94            "Content-Type, Authorization, Accept",
95        )
96        .ok();
97    headers.set("Access-Control-Max-Age", "86400").ok();
98    headers.set("Vary", "Origin").ok();
99    headers
100}
101
102/// Return the default allowed origin from the env or the production domain.
103fn default_origin(env: &Env) -> String {
104    allowed_origins(env)
105        .into_iter()
106        .next()
107        .unwrap_or_else(|| "https://example.com".to_string())
108}
109
110/// CORS utilities for submodules that lack direct access to the request.
111pub(crate) mod cors {
112    use worker::*;
113
114    /// Create a JSON response with CORS headers attached.
115    ///
116    /// Used by whitelist handlers that receive `&Env` but not the original
117    /// `&Request`. The origin is resolved from the env-based allowed origins.
118    pub fn json_response(env: &Env, body: &serde_json::Value, status: u16) -> Result<Response> {
119        let json_str = serde_json::to_string(body).map_err(|e| Error::RustError(e.to_string()))?;
120        let headers = Headers::new();
121        headers.set("Content-Type", "application/json").ok();
122
123        let origin = super::default_origin(env);
124        headers.set("Access-Control-Allow-Origin", &origin).ok();
125        headers
126            .set("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
127            .ok();
128        headers
129            .set(
130                "Access-Control-Allow-Headers",
131                "Content-Type, Authorization, Accept",
132            )
133            .ok();
134        headers.set("Access-Control-Max-Age", "86400").ok();
135        headers.set("Vary", "Origin").ok();
136
137        Ok(Response::ok(json_str)?
138            .with_status(status)
139            .with_headers(headers))
140    }
141}
142
143/// Create a JSON response with CORS from the request's Origin header.
144fn json_response(
145    req: &Request,
146    env: &Env,
147    body: &serde_json::Value,
148    status: u16,
149) -> Result<Response> {
150    let json_str = serde_json::to_string(body).map_err(|e| Error::RustError(e.to_string()))?;
151    let headers = cors_headers(req, env);
152    headers.set("Content-Type", "application/json").ok();
153    Ok(Response::ok(json_str)?
154        .with_status(status)
155        .with_headers(headers))
156}
157
158// ---------------------------------------------------------------------------
159// Entry point
160// ---------------------------------------------------------------------------
161
162#[event(fetch)]
163async fn fetch(req: Request, env: Env, _ctx: Context) -> Result<Response> {
164    // Idempotent schema migrations (trust columns, new tables, etc.)
165    ensure_schema(&env).await;
166    nostr_bbs_rate_limit::ensure_replay_schema(&env, "REPLAY_DB").await;
167
168    // CORS preflight
169    if req.method() == Method::Options {
170        return Ok(Response::empty()?
171            .with_status(204)
172            .with_headers(cors_headers(&req, &env)));
173    }
174
175    // WebSocket upgrade -> Durable Object
176    if req.headers().get("Upgrade")?.as_deref() == Some("websocket") {
177        let stub = env.durable_object("RELAY")?.get_by_name("main")?;
178        return stub.fetch_with_request(req).await;
179    }
180
181    let url = req.url()?;
182    let path = url.path();
183
184    // NIP-11 relay info document
185    if path == "/" && accepts_nostr_json(&req) {
186        let info = nip11::relay_info(&env);
187        let json_str = serde_json::to_string(&info).map_err(|e| Error::RustError(e.to_string()))?;
188        let headers = Headers::new();
189        headers.set("Content-Type", "application/nostr+json").ok();
190        headers
191            .set("Access-Control-Allow-Origin", &cors_origin(&req, &env))
192            .ok();
193        headers.set("Vary", "Origin").ok();
194        return Ok(Response::ok(json_str)?.with_headers(headers));
195    }
196
197    // Route to handlers with error wrapping
198    let result = route(req, &env, path).await;
199    match result {
200        Ok(resp) => Ok(resp),
201        Err(e) => {
202            console_error!("Relay worker error: {e}");
203            let msg = e.to_string();
204            let fallback_origin = default_origin(&env);
205            if msg.contains("JSON") || msg.contains("json") || msg.contains("Syntax") {
206                let headers = Headers::new();
207                headers.set("Content-Type", "application/json").ok();
208                headers
209                    .set("Access-Control-Allow-Origin", &fallback_origin)
210                    .ok();
211                headers.set("Vary", "Origin").ok();
212                Ok(Response::ok(r#"{"error":"Invalid JSON"}"#)?
213                    .with_status(400)
214                    .with_headers(headers))
215            } else {
216                let headers = Headers::new();
217                headers.set("Content-Type", "application/json").ok();
218                headers
219                    .set("Access-Control-Allow-Origin", &fallback_origin)
220                    .ok();
221                headers.set("Vary", "Origin").ok();
222                Ok(Response::ok(r#"{"error":"Internal error"}"#)?
223                    .with_status(500)
224                    .with_headers(headers))
225            }
226        }
227    }
228}
229
230/// Route incoming requests to the appropriate handler.
231async fn route(req: Request, env: &Env, path: &str) -> Result<Response> {
232    let method = req.method();
233
234    // Health check
235    if path == "/health" || path == "/" {
236        return json_response(
237            &req,
238            env,
239            &serde_json::json!({
240                "status": "healthy",
241                "version": "3.0.0",
242                "runtime": "workers-rs",
243                "nips": [1, 9, 11, 16, 17, 29, 33, 40, 42, 45, 50, 59, 65, 90, 98],
244            }),
245            200,
246        );
247    }
248
249    // Setup status check (public -- returns whether initial admin setup is needed)
250    if path == "/api/setup-status" && method == Method::Get {
251        return whitelist::handle_setup_status(&req, env).await;
252    }
253
254    // Whitelist check (public)
255    if path == "/api/check-whitelist" && method == Method::Get {
256        return whitelist::handle_check_whitelist(&req, env).await;
257    }
258
259    // Whitelist list (public)
260    if path == "/api/whitelist/list" && method == Method::Get {
261        return whitelist::handle_whitelist_list(&req, env).await;
262    }
263
264    // Whitelist add (NIP-98 admin only)
265    if path == "/api/whitelist/add" && method == Method::Post {
266        return whitelist::handle_whitelist_add(req, env).await;
267    }
268
269    // Whitelist update cohorts (NIP-98 admin only)
270    if path == "/api/whitelist/update-cohorts" && method == Method::Post {
271        return whitelist::handle_whitelist_update_cohorts(req, env).await;
272    }
273
274    // Set admin status (NIP-98 admin only)
275    if path == "/api/whitelist/set-admin" && method == Method::Post {
276        return whitelist::handle_set_admin(req, env).await;
277    }
278
279    // Reset database (NIP-98 admin only)
280    if path == "/api/admin/reset-db" && method == Method::Post {
281        return whitelist::handle_reset_db(req, env).await;
282    }
283
284    // --- Moderation endpoints (NIP-98 admin only) ---
285
286    // List reports
287    if path == "/api/reports" && method == Method::Get {
288        return moderation::handle_list_reports(&req, env).await;
289    }
290
291    // Resolve a report
292    if path == "/api/reports/resolve" && method == Method::Post {
293        return moderation::handle_resolve_report(req, env).await;
294    }
295
296    // --- Audit log endpoint (NIP-98 admin only) ---
297
298    if path == "/api/admin/audit-log" && method == Method::Get {
299        return audit::handle_audit_log_list(&req, env).await;
300    }
301
302    // --- Sprint v10: profiles (public, no auth) ---
303
304    if path == "/api/profiles/batch" && method == Method::Post {
305        let mut req = req;
306        let body_bytes = req.bytes().await.unwrap_or_default();
307        return profiles::handle_batch(&req, &body_bytes, env).await;
308    }
309
310    if path == "/api/profiles/search" && method == Method::Get {
311        return profiles::handle_search(&req, env).await;
312    }
313
314    // --- Sprint v11: profiles backfill (NIP-98 admin only, one-shot) ---
315    //
316    // Manually-triggered replay of historic kind-0 events into the `profiles`
317    // projection. Idempotent — the upsert's `last_kind0_at` guard means
318    // re-running is always safe.
319    if path == "/api/admin/profiles/backfill" && method == Method::Post {
320        return handle_profiles_backfill(req, env).await;
321    }
322
323    json_response(&req, env, &serde_json::json!({ "error": "Not found" }), 404)
324}
325
326// ---------------------------------------------------------------------------
327// Sprint v11: profiles backfill admin endpoint
328// ---------------------------------------------------------------------------
329
330/// `POST /api/admin/profiles/backfill` — NIP-98 admin only.
331///
332/// Replays every stored kind-0 event through the `profiles` projection upsert
333/// (Sprint v10). Returns `{ scanned, backfilled, skipped, truncated }`.
334async fn handle_profiles_backfill(mut req: Request, env: &Env) -> Result<Response> {
335    let url = req.url()?;
336    let request_url = format!("{}{}", url.origin().ascii_serialization(), url.path());
337    let auth_header = req.headers().get("Authorization").ok().flatten();
338    let body_bytes = req.bytes().await.unwrap_or_default();
339    // Treat empty body as no body for NIP-98 payload-hash semantics; non-empty
340    // is hashed and verified.
341    let body_for_auth: Option<&[u8]> = if body_bytes.is_empty() {
342        None
343    } else {
344        Some(&body_bytes)
345    };
346
347    let _admin_pubkey = match auth::require_nip98_admin(
348        auth_header.as_deref(),
349        &request_url,
350        "POST",
351        body_for_auth,
352        env,
353    )
354    .await
355    {
356        Ok(pk) => pk,
357        Err((body, status)) => return json_response(&req, env, &body, status),
358    };
359
360    match cron::backfill_profiles(env).await {
361        Ok(result) => {
362            let body = serde_json::to_value(result).unwrap_or_else(|_| serde_json::json!({}));
363            json_response(&req, env, &body, 200)
364        }
365        Err(e) => {
366            console_error!("backfill_profiles failed: {e}");
367            json_response(
368                &req,
369                env,
370                &serde_json::json!({ "error": "backfill failed", "detail": e }),
371                500,
372            )
373        }
374    }
375}
376
377/// Idempotent schema migrations.
378///
379/// All statements use `IF NOT EXISTS` for tables or silently ignore errors
380/// for `ALTER TABLE ADD COLUMN` (D1/SQLite raises an error if the column
381/// already exists, which we swallow).
382async fn ensure_schema(env: &Env) {
383    let db = match env.d1("DB") {
384        Ok(db) => db,
385        Err(_) => return,
386    };
387
388    // --- Whitelist columns (idempotent: errors ignored if column exists) ---
389    let alter_stmts = [
390        "ALTER TABLE whitelist ADD COLUMN is_admin INTEGER DEFAULT 0",
391        "ALTER TABLE whitelist ADD COLUMN trust_level INTEGER NOT NULL DEFAULT 0",
392        "ALTER TABLE whitelist ADD COLUMN days_active INTEGER NOT NULL DEFAULT 0",
393        "ALTER TABLE whitelist ADD COLUMN posts_read INTEGER NOT NULL DEFAULT 0",
394        "ALTER TABLE whitelist ADD COLUMN posts_created INTEGER NOT NULL DEFAULT 0",
395        "ALTER TABLE whitelist ADD COLUMN mod_actions_against INTEGER NOT NULL DEFAULT 0",
396        "ALTER TABLE whitelist ADD COLUMN last_active_at INTEGER",
397        "ALTER TABLE whitelist ADD COLUMN trust_level_updated_at INTEGER",
398        "ALTER TABLE whitelist ADD COLUMN suspended_until INTEGER",
399        "ALTER TABLE whitelist ADD COLUMN silenced INTEGER NOT NULL DEFAULT 0",
400        "ALTER TABLE whitelist ADD COLUMN user_notes TEXT",
401    ];
402    for stmt in alter_stmts {
403        let _ = db.prepare(stmt).run().await;
404    }
405
406    // --- New tables (idempotent via IF NOT EXISTS) ---
407    let create_stmts = [
408        "CREATE TABLE IF NOT EXISTS channel_zones (\
409            channel_id TEXT PRIMARY KEY, \
410            zone TEXT NOT NULL DEFAULT 'home', \
411            archived INTEGER NOT NULL DEFAULT 0\
412        )",
413        "CREATE TABLE IF NOT EXISTS admin_log (\
414            id INTEGER PRIMARY KEY AUTOINCREMENT, \
415            actor_pubkey TEXT NOT NULL, \
416            action TEXT NOT NULL, \
417            target_pubkey TEXT, \
418            target_id TEXT, \
419            previous_value TEXT, \
420            new_value TEXT, \
421            reason TEXT, \
422            created_at INTEGER NOT NULL\
423        )",
424        "CREATE TABLE IF NOT EXISTS settings (\
425            key TEXT PRIMARY KEY, \
426            value TEXT NOT NULL, \
427            type TEXT NOT NULL DEFAULT 'string', \
428            category TEXT NOT NULL DEFAULT 'general'\
429        )",
430        "CREATE TABLE IF NOT EXISTS reports (\
431            id INTEGER PRIMARY KEY AUTOINCREMENT, \
432            report_event_id TEXT NOT NULL UNIQUE, \
433            reporter_pubkey TEXT NOT NULL, \
434            reporter_trust_level INTEGER NOT NULL DEFAULT 0, \
435            reported_event_id TEXT NOT NULL, \
436            reported_pubkey TEXT NOT NULL, \
437            reason TEXT NOT NULL, \
438            reason_text TEXT, \
439            status TEXT NOT NULL DEFAULT 'pending', \
440            resolved_by TEXT, \
441            resolution TEXT, \
442            created_at INTEGER NOT NULL, \
443            resolved_at INTEGER\
444        )",
445        "CREATE TABLE IF NOT EXISTS hidden_events (\
446            event_id TEXT PRIMARY KEY, \
447            hidden_by TEXT NOT NULL, \
448            reason TEXT, \
449            created_at INTEGER NOT NULL\
450        )",
451        // WI-2: mirror of auth-worker's moderation_actions. Populated when
452        // kind-30910/30911 Nostr events signed by an admin are saved here.
453        // Consumed by the relay's ingress gate to block muted/banned authors.
454        "CREATE TABLE IF NOT EXISTS moderation_actions (\
455            id TEXT PRIMARY KEY, \
456            action TEXT NOT NULL, \
457            target_pubkey TEXT NOT NULL, \
458            performed_by TEXT NOT NULL, \
459            reason TEXT, \
460            expires_at INTEGER, \
461            event_id TEXT NOT NULL, \
462            created_at INTEGER NOT NULL\
463        )",
464        // Sprint v10: projection of the most-recent kind-0 per pubkey, with
465        // the JSON content fields parsed into typed columns. Maintained by
466        // the kind-0 ingest hook in `relay_do::storage::save_event`.
467        "CREATE TABLE IF NOT EXISTS profiles (\
468            pubkey TEXT PRIMARY KEY NOT NULL, \
469            name TEXT, \
470            display_name TEXT, \
471            picture TEXT, \
472            banner TEXT, \
473            about TEXT, \
474            nip05 TEXT, \
475            lud16 TEXT, \
476            last_kind0_at INTEGER NOT NULL, \
477            raw_event TEXT NOT NULL\
478        )",
479        // Agent Control Surface Protocol: registry of agents allowed to publish
480        // governance events (kinds 31400-31405).
481        "CREATE TABLE IF NOT EXISTS agent_registry (\
482            pubkey TEXT PRIMARY KEY NOT NULL, \
483            name TEXT NOT NULL, \
484            description TEXT NOT NULL DEFAULT '', \
485            registered_by TEXT NOT NULL, \
486            registered_at INTEGER NOT NULL, \
487            rate_limit_per_min INTEGER NOT NULL DEFAULT 60, \
488            active INTEGER NOT NULL DEFAULT 1\
489        )",
490        // Broker case aggregate — human-in-the-loop governance decisions.
491        "CREATE TABLE IF NOT EXISTS broker_cases (\
492            id TEXT PRIMARY KEY NOT NULL, \
493            category TEXT NOT NULL, \
494            subject_kind TEXT NOT NULL, \
495            subject_id TEXT NOT NULL, \
496            title TEXT NOT NULL, \
497            summary TEXT NOT NULL DEFAULT '', \
498            state TEXT NOT NULL DEFAULT 'open', \
499            priority INTEGER NOT NULL DEFAULT 50, \
500            from_share_state TEXT, \
501            to_share_state TEXT, \
502            created_by TEXT NOT NULL, \
503            assigned_to TEXT, \
504            nostr_event_id TEXT, \
505            created_at INTEGER NOT NULL, \
506            updated_at INTEGER NOT NULL\
507        )",
508        // Individual decisions on broker cases (append-only audit trail).
509        "CREATE TABLE IF NOT EXISTS broker_decisions (\
510            decision_id TEXT PRIMARY KEY NOT NULL, \
511            case_id TEXT NOT NULL REFERENCES broker_cases(id), \
512            outcome TEXT NOT NULL, \
513            outcome_detail TEXT, \
514            broker_pubkey TEXT NOT NULL, \
515            reasoning TEXT NOT NULL DEFAULT '', \
516            prior_decision_id TEXT, \
517            decided_at INTEGER NOT NULL\
518        )",
519        // Role assignments for broker governance (which pubkeys can claim cases).
520        "CREATE TABLE IF NOT EXISTS broker_roles (\
521            pubkey TEXT NOT NULL, \
522            role TEXT NOT NULL, \
523            granted_by TEXT NOT NULL, \
524            granted_at INTEGER NOT NULL, \
525            PRIMARY KEY (pubkey, role)\
526        )",
527    ];
528    for stmt in create_stmts {
529        let _ = db.prepare(stmt).run().await;
530    }
531
532    // --- Indexes (idempotent via IF NOT EXISTS) ---
533    let index_stmts = [
534        "CREATE INDEX IF NOT EXISTS idx_reports_status ON reports(status)",
535        "CREATE INDEX IF NOT EXISTS idx_reports_reported_event ON reports(reported_event_id)",
536        "CREATE INDEX IF NOT EXISTS idx_reports_reported_pubkey ON reports(reported_pubkey)",
537        "CREATE INDEX IF NOT EXISTS idx_admin_log_action ON admin_log(action)",
538        "CREATE INDEX IF NOT EXISTS idx_admin_log_actor ON admin_log(actor_pubkey)",
539        "CREATE INDEX IF NOT EXISTS idx_admin_log_target ON admin_log(target_pubkey)",
540        "CREATE INDEX IF NOT EXISTS idx_admin_log_created ON admin_log(created_at)",
541        "CREATE INDEX IF NOT EXISTS idx_mod_actions_target ON moderation_actions(target_pubkey)",
542        "CREATE INDEX IF NOT EXISTS idx_mod_actions_active ON moderation_actions(action, expires_at)",
543        // NIP-59 (Sealed DMs, kind-1059): index on kind for efficient recipient delivery.
544        // p-tag recipient filtering is applied via the tags LIKE pattern at query time;
545        // this index narrows the scan to kind-1059 rows first.
546        "CREATE INDEX IF NOT EXISTS idx_events_kind ON events(kind)",
547        // Sprint v10: profiles indexes for batch lookup and prefix typeahead.
548        "CREATE INDEX IF NOT EXISTS idx_profiles_name ON profiles(name)",
549        "CREATE INDEX IF NOT EXISTS idx_profiles_display_name ON profiles(display_name)",
550        "CREATE INDEX IF NOT EXISTS idx_profiles_last_kind0 ON profiles(last_kind0_at DESC)",
551        // Agent Control Surface Protocol indexes.
552        "CREATE INDEX IF NOT EXISTS idx_agent_registry_active ON agent_registry(active)",
553        "CREATE INDEX IF NOT EXISTS idx_broker_cases_state ON broker_cases(state)",
554        "CREATE INDEX IF NOT EXISTS idx_broker_cases_category ON broker_cases(category)",
555        "CREATE INDEX IF NOT EXISTS idx_broker_cases_assigned ON broker_cases(assigned_to)",
556        "CREATE INDEX IF NOT EXISTS idx_broker_decisions_case ON broker_decisions(case_id)",
557        "CREATE INDEX IF NOT EXISTS idx_broker_roles_pubkey ON broker_roles(pubkey)",
558    ];
559    for stmt in index_stmts {
560        let _ = db.prepare(stmt).run().await;
561    }
562}
563
564/// Check whether the request's Accept header includes `application/nostr+json`.
565fn accepts_nostr_json(req: &Request) -> bool {
566    req.headers()
567        .get("Accept")
568        .ok()
569        .flatten()
570        .map(|v| v.contains("application/nostr+json"))
571        .unwrap_or(false)
572}
573
574// ---------------------------------------------------------------------------
575// Cron keep-warm
576// ---------------------------------------------------------------------------
577
578/// Cron handler: touch D1 to keep the connection pool warm and prevent cold starts.
579#[event(scheduled)]
580async fn scheduled(_event: ScheduledEvent, env: Env, _ctx: ScheduleContext) {
581    let db = match env.d1("DB") {
582        Ok(db) => db,
583        Err(_) => return,
584    };
585    let _ = db
586        .prepare("SELECT 1")
587        .first::<serde_json::Value>(None)
588        .await;
589}