Skip to main content

aetheris_server/auth/
mod.rs

1#![allow(clippy::missing_errors_doc)]
2#![allow(clippy::result_large_err)]
3#![allow(clippy::cast_sign_loss)]
4
5use aetheris_protocol::auth::v1::{
6    ConnectTokenRequest, ConnectTokenResponse, GoogleLoginNonceRequest, GoogleLoginNonceResponse,
7    LoginMethod, LoginRequest, LoginResponse, LogoutRequest, LogoutResponse, OtpRequest,
8    OtpRequestAck, QuicConnectToken, RefreshRequest, RefreshResponse,
9    auth_service_server::AuthService, login_request::Method,
10};
11use async_trait::async_trait;
12use base64::Engine;
13use blake2::{Blake2b, Digest, digest::consts::U32};
14use chrono::{DateTime, Duration, Utc};
15use dashmap::DashMap;
16use rand::RngExt;
17use rusty_paseto::prelude::{
18    CustomClaim, ExpirationClaim, IssuedAtClaim, Key, Local, PasetoBuilder, PasetoParser,
19    PasetoSymmetricKey, SubjectClaim, TokenIdentifierClaim, V4,
20};
21use std::sync::Arc;
22use subtle::ConstantTimeEq;
23use tonic::{Request, Response, Status};
24use tracing::warn;
25use ulid::Ulid;
26
27pub mod email;
28pub mod google;
29pub mod rate_limit;
30
31use email::EmailSender;
32use google::GoogleOidcClient;
33use rate_limit::{InMemoryRateLimiter, RateLimitType};
34
35pub struct OtpRecord {
36    pub email: String,
37    pub code_hash: Vec<u8>,
38    pub google_nonce: Option<String>,
39    pub expires_at: DateTime<Utc>,
40    pub attempts: u8,
41}
42
43#[derive(Clone)]
44pub struct AuthServiceImpl {
45    otp_store: Arc<DashMap<String, OtpRecord>>,
46    /// Maps Session JTI -> Last Activity Unix Timestamp
47    session_activity: Arc<DashMap<String, i64>>,
48    /// Maps Player ID -> () (existence check for P1)
49    player_registry: Arc<DashMap<String, ()>>,
50    email_sender: Arc<dyn EmailSender>,
51    google_client: Arc<Option<GoogleOidcClient>>,
52    pub(crate) session_key: Arc<PasetoSymmetricKey<V4, Local>>,
53    transport_key: Arc<PasetoSymmetricKey<V4, Local>>,
54    rate_limiter: InMemoryRateLimiter,
55    bypass_enabled: bool,
56}
57
58impl std::fmt::Debug for AuthServiceImpl {
59    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60        f.debug_struct("AuthServiceImpl").finish_non_exhaustive()
61    }
62}
63
64impl AuthServiceImpl {
65    /// Creates a new `AuthServiceImpl` with the provided `email_sender`.
66    ///
67    /// # Panics
68    ///
69    /// - If `AETHERIS_ENV` is set to "production" but `AETHERIS_AUTH_BYPASS` is enabled.
70    /// - If `AETHERIS_ENV` is set to "production" but `SESSION_PASETO_KEY` or `TRANSPORT_PASETO_KEY` are missing.
71    /// - If the provided PASETO keys are not exactly 32 bytes long.
72    pub async fn new(email_sender: Arc<dyn EmailSender>) -> Self {
73        let env = std::env::var("AETHERIS_ENV").unwrap_or_else(|_| "dev".to_string());
74
75        let session_key_str =
76            std::env::var("SESSION_PASETO_KEY").map_err(|_| "SESSION_PASETO_KEY missing");
77        let transport_key_str =
78            std::env::var("TRANSPORT_PASETO_KEY").map_err(|_| "TRANSPORT_PASETO_KEY missing");
79
80        let bypass_enabled = std::env::var("AETHERIS_AUTH_BYPASS").is_ok_and(|v| {
81            let v = v.to_lowercase();
82            v == "1" || v == "true" || v == "yes" || v == "on"
83        });
84
85        if env == "production" {
86            assert!(
87                !bypass_enabled,
88                "AETHERIS_AUTH_BYPASS=1 is forbidden in production"
89            );
90            assert!(
91                !(session_key_str.is_err() || transport_key_str.is_err()),
92                "PASETO keys must be explicitly set in production"
93            );
94        }
95
96        let session_key_val = session_key_str.unwrap_or_else(|_| {
97            assert!(env != "production", "Missing SESSION_PASETO_KEY");
98            "01234567890123456789012345678901".to_string()
99        });
100        let transport_key_val = transport_key_str.unwrap_or_else(|_| {
101            assert!(env != "production", "Missing TRANSPORT_PASETO_KEY");
102            "01234567890123456789012345678901".to_string()
103        });
104
105        assert!(
106            !(session_key_val.len() != 32 || transport_key_val.len() != 32),
107            "PASETO keys must be exactly 32 bytes"
108        );
109
110        let session_key =
111            PasetoSymmetricKey::<V4, Local>::from(Key::<32>::from(session_key_val.as_bytes()));
112        let transport_key =
113            PasetoSymmetricKey::<V4, Local>::from(Key::<32>::from(transport_key_val.as_bytes()));
114
115        let google_client = GoogleOidcClient::new().await.ok();
116
117        if bypass_enabled {
118            tracing::warn!(
119                "Authentication bypass is ENABLED (DEV ONLY) — 000001 code will work for smoke-test@aetheris.dev"
120            );
121        } else {
122            tracing::info!("Authentication bypass is disabled");
123        }
124
125        Self {
126            otp_store: Arc::new(DashMap::new()),
127            session_activity: Arc::new(DashMap::new()),
128            player_registry: Arc::new(DashMap::new()),
129            email_sender,
130            google_client: Arc::new(google_client),
131            session_key: Arc::new(session_key),
132            transport_key: Arc::new(transport_key),
133            rate_limiter: InMemoryRateLimiter::new(),
134            bypass_enabled,
135        }
136    }
137
138    /// Normalizes email according to M1005 spec: trim whitespace and lowercase the entire address.
139    fn normalize_email(email: &str) -> String {
140        email.trim().to_lowercase()
141    }
142
143    fn derive_player_id(method: &str, identifier: &str) -> String {
144        use sha2::{Digest, Sha256};
145        let mut hasher = Sha256::new();
146        hasher.update(format!("aetheris:{method}:{identifier}").as_bytes());
147        let hash = hasher.finalize();
148        let mut buf = [0u8; 16];
149        buf.copy_from_slice(&hash[0..16]);
150        Ulid::from(u128::from_be_bytes(buf)).to_string()
151    }
152
153    #[must_use]
154    pub fn is_authorized(&self, token: &str) -> bool {
155        self.validate_and_get_jti(token, None).is_some()
156    }
157
158    /// Validates a session token and returns the JTI if authorized.
159    #[must_use]
160    pub fn validate_and_get_jti(&self, token: &str, tick: Option<u64>) -> Option<String> {
161        let claims = PasetoParser::<V4, Local>::default()
162            .parse(token, &self.session_key)
163            .ok()?;
164
165        let jti = claims.get("jti").and_then(|v| v.as_str())?;
166
167        if self.is_session_authorized(jti, tick) {
168            Some(jti.to_string())
169        } else {
170            None
171        }
172    }
173
174    /// Validates a session by JTI, checking for revocation and enforcing 1h sliding idle window.
175    ///
176    /// # Performance
177    /// if `tick` is provided, activity updates are coalesced to once per 60 ticks (~1s)
178    /// to reduce write-lock contention on the session map.
179    #[must_use]
180    pub fn is_session_authorized(&self, jti: &str, tick: Option<u64>) -> bool {
181        // Optimistic Read: Check for existence and idle timeout without write lock first.
182        let (needs_update, now_ts) = if let Some(activity) = self.session_activity.get(jti) {
183            let now = Utc::now().timestamp();
184            // Idle timeout: 1 hour (3600 seconds)
185            if now - *activity > 3600 {
186                return false;
187            }
188
189            // Coalescing: Only update if tick is unavailable or it's a multiple of 60 (with jitter).
190            // We use a simple summation of jti bytes to stagger updates across the 60-tick window
191            // and avoid synchronized lock contention from 1000s of clients simultaneously.
192            let jitter = jti
193                .as_bytes()
194                .iter()
195                .fold(0u64, |acc, &x| acc.wrapping_add(u64::from(x)));
196            let needs_update = tick.is_none_or(|t| (t.wrapping_add(jitter)) % 60 == 0);
197            (needs_update, now)
198        } else {
199            // Not in activity map -> revoked or expired
200            return false;
201        };
202
203        if needs_update && let Some(mut activity) = self.session_activity.get_mut(jti) {
204            *activity = now_ts;
205        }
206
207        true
208    }
209
210    pub fn mint_session_token_for_test(&self, player_id: &str) -> Result<(String, u64), Status> {
211        self.mint_session_token(player_id)
212    }
213
214    fn mint_session_token(&self, player_id: &str) -> Result<(String, u64), Status> {
215        let jti = Ulid::new().to_string();
216        let iat = Utc::now();
217        let exp = iat + Duration::hours(24);
218
219        let token = PasetoBuilder::<V4, Local>::default()
220            .set_claim(SubjectClaim::from(player_id))
221            .set_claim(TokenIdentifierClaim::from(jti.as_str()))
222            .set_claim(IssuedAtClaim::try_from(iat.to_rfc3339().as_str()).unwrap())
223            .set_claim(ExpirationClaim::try_from(exp.to_rfc3339().as_str()).unwrap())
224            .build(&self.session_key)
225            .map_err(|e| Status::internal(format!("{e:?}")))?;
226
227        // Initialize session activity
228        self.session_activity.insert(jti, iat.timestamp_millis());
229
230        Ok((token, exp.timestamp_millis() as u64))
231    }
232}
233
234#[async_trait]
235impl AuthService for AuthServiceImpl {
236    async fn request_otp(
237        &self,
238        request: Request<OtpRequest>,
239    ) -> Result<Response<OtpRequestAck>, Status> {
240        // M10146 — Rate Limiting (M1005 §3.4.2)
241        // 1. IP-based limit (30/h)
242        if let Some(addr) = request.remote_addr() {
243            let ip = addr.ip().to_string();
244            self.rate_limiter.check_limit(RateLimitType::Ip, &ip)?;
245        } else {
246            warn!("Request missing remote_addr; IP rate limiting skipped (check proxy config)");
247        }
248
249        let req = request.into_inner();
250        let email = Self::normalize_email(&req.email);
251
252        // 2. Email-based limit (5/h)
253        self.rate_limiter
254            .check_limit(RateLimitType::Email, &email)?;
255
256        let mut rng = rand::rng();
257        let code = format!("{:06}", rng.random_range(0..1_000_000));
258        let request_id = Ulid::new().to_string();
259        let expires_at = Utc::now() + Duration::minutes(10);
260
261        let mut hasher = Blake2b::<U32>::new();
262        hasher.update(code.as_bytes());
263        hasher.update(request_id.as_bytes());
264        let code_hash = hasher.finalize().to_vec();
265
266        self.otp_store.insert(
267            request_id.clone(),
268            OtpRecord {
269                email: email.clone(),
270                code_hash,
271                google_nonce: None,
272                expires_at,
273                attempts: 0,
274            },
275        );
276
277        let sender = self.email_sender.clone();
278        let code_clone = code.clone();
279        let env = std::env::var("AETHERIS_ENV").unwrap_or_else(|_| "dev".to_string());
280        if env == "production" {
281            tracing::info!(request_id = %request_id, "Generated OTP");
282        } else {
283            tracing::info!(request_id = %request_id, email = %email, code = %code, "Generated OTP (DEV ONLY)");
284        }
285        tokio::spawn(async move {
286            let _ = sender
287                .send(
288                    &email,
289                    "Your Aetheris OTP",
290                    &format!("Code: {code_clone}"),
291                    &format!("<h1>Code: {code_clone}</h1>"),
292                )
293                .await;
294        });
295
296        Ok(Response::new(OtpRequestAck {
297            request_id,
298            expires_at_unix_ms: expires_at.timestamp_millis() as u64,
299            retry_after_seconds: Some(0), // 0 on normal path per spec
300        }))
301    }
302
303    #[allow(clippy::too_many_lines)]
304    async fn login(
305        &self,
306        request: Request<LoginRequest>,
307    ) -> Result<Response<LoginResponse>, Status> {
308        let req = request.into_inner();
309        let metadata = req.metadata.unwrap_or_default();
310        let method = req
311            .method
312            .ok_or_else(|| Status::invalid_argument("Missing login method"))?;
313
314        tracing::info!(
315            version = metadata.client_version,
316            platform = metadata.platform,
317            "Processing login request"
318        );
319
320        match method {
321            Method::Otp(otp_req) => {
322                let (request_id, code) = (otp_req.request_id, otp_req.code);
323
324                // T100.20 — decisional delay before first store access to mitigate timing side-channels
325                tokio::time::sleep(std::time::Duration::from_millis(15)).await;
326
327                let mut entry = self
328                    .otp_store
329                    .get_mut(&request_id)
330                    .ok_or_else(|| Status::unauthenticated("Invalid credentials"))?;
331
332                // M1005 §3.4.1 — Authentication Bypass for Automated Integration Tests
333                // This block allows 'smoke-test@aetheris.dev' to login with pre-defined codes
334                // when AETHERIS_AUTH_BYPASS is enabled (strictly DEV/TEST environments only).
335                // TODO: In Phase 4, consider moving this logic to a dedicated Auth Sidecar.
336                if self.bypass_enabled && entry.email == "smoke-test@aetheris.dev" {
337                    // "000001" is the canonical 'Success' code for automated smoke tests and playground.
338                    if code == "000001" {
339                        tracing::warn!(email = entry.email, "Bypass authentication successful");
340                        let player_id = Self::derive_player_id("email", &entry.email);
341
342                        // Check if new player
343                        let is_new_player =
344                            self.player_registry.insert(player_id.clone(), ()).is_none();
345
346                        let (token, exp) = self.mint_session_token(&player_id)?;
347                        drop(entry);
348                        self.otp_store.remove(&request_id);
349
350                        return Ok(Response::new(LoginResponse {
351                            session_token: token,
352                            expires_at_unix_ms: exp,
353                            player_id,
354                            is_new_player,
355                            login_method: LoginMethod::EmailOtp as i32,
356                        }));
357                    }
358
359                    // "000000" is the canonical 'Failure' code used to validate client-side error handling.
360                    if code == "000000" {
361                        entry.attempts += 1;
362                        if entry.attempts >= 3 {
363                            drop(entry);
364                            self.otp_store.remove(&request_id);
365                        }
366                        return Err(Status::unauthenticated("Bypass: Forced failure for 000000"));
367                    }
368                }
369
370                if Utc::now() > entry.expires_at {
371                    drop(entry);
372                    self.otp_store.remove(&request_id);
373                    return Err(Status::deadline_exceeded("OTP expired"));
374                }
375
376                let mut hasher = Blake2b::<U32>::new();
377                hasher.update(code.as_bytes());
378                hasher.update(request_id.as_bytes());
379                let hash = hasher.finalize();
380
381                if hash.as_slice().ct_eq(&entry.code_hash).into() {
382                    let player_id = Self::derive_player_id("email", &entry.email);
383
384                    // Check if new player
385                    let is_new_player =
386                        self.player_registry.insert(player_id.clone(), ()).is_none();
387
388                    let (token, exp) = self.mint_session_token(&player_id)?;
389                    drop(entry);
390                    self.otp_store.remove(&request_id);
391
392                    Ok(Response::new(LoginResponse {
393                        session_token: token,
394                        expires_at_unix_ms: exp,
395                        player_id,
396                        is_new_player,
397                        login_method: LoginMethod::EmailOtp as i32,
398                    }))
399                } else {
400                    entry.attempts += 1;
401                    if entry.attempts >= 3 {
402                        drop(entry);
403                        self.otp_store.remove(&request_id);
404                    }
405                    Err(Status::unauthenticated("Invalid code"))
406                }
407            }
408            Method::Google(google_req) => {
409                let google_client = self
410                    .google_client
411                    .as_ref()
412                    .as_ref()
413                    .ok_or_else(|| Status::internal("Google OIDC not configured"))?;
414
415                let nonce = self
416                    .otp_store
417                    .get(&google_req.nonce_handle)
418                    .and_then(|e| e.google_nonce.clone())
419                    .ok_or_else(|| Status::unauthenticated("Invalid nonce_handle"))?;
420
421                let claims = google_client.verify_token(&google_req.google_id_token, &nonce)?;
422                self.otp_store.remove(&google_req.nonce_handle);
423
424                // Use 'sub' (Subject) as the stable identifier for the player instead of email.
425                let identity = claims.subject().to_string();
426                let player_id = Self::derive_player_id("google", &identity);
427
428                // Check if new player
429                let is_new_player = self.player_registry.insert(player_id.clone(), ()).is_none();
430
431                let (token, exp) = self.mint_session_token(&player_id)?;
432
433                Ok(Response::new(LoginResponse {
434                    session_token: token,
435                    expires_at_unix_ms: exp,
436                    player_id,
437                    is_new_player,
438                    login_method: LoginMethod::GoogleOidc as i32,
439                }))
440            }
441        }
442    }
443
444    async fn logout(
445        &self,
446        request: Request<LogoutRequest>,
447    ) -> Result<Response<LogoutResponse>, Status> {
448        let mut jti_to_revoke = None;
449
450        let token_from_metadata = request.metadata().get("authorization").and_then(|t| {
451            t.to_str()
452                .ok()
453                .map(|s| s.trim_start_matches("Bearer ").to_string())
454        });
455
456        let token_str = token_from_metadata.or_else(|| {
457            let body = request.get_ref();
458            if body.session_token.is_empty() {
459                None
460            } else {
461                Some(body.session_token.clone())
462            }
463        });
464
465        if let Some(token_clean) = token_str
466            && let Ok(claims) =
467                PasetoParser::<V4, Local>::default().parse(&token_clean, &self.session_key)
468            && let Some(jti) = claims.get("jti").and_then(|v| v.as_str())
469        {
470            jti_to_revoke = Some(jti.to_string());
471        }
472
473        if let Some(jti) = jti_to_revoke {
474            self.session_activity.remove(&jti);
475        }
476
477        Ok(Response::new(LogoutResponse { revoked: true }))
478    }
479
480    async fn refresh_token(
481        &self,
482        request: Request<RefreshRequest>,
483    ) -> Result<Response<RefreshResponse>, Status> {
484        let req = request.into_inner();
485        let token = req.session_token;
486
487        let Ok(claims) = PasetoParser::<V4, Local>::default().parse(&token, &self.session_key)
488        else {
489            return Err(Status::unauthenticated("Invalid session token"));
490        };
491
492        let Some(jti) = claims.get("jti").and_then(|v| v.as_str()) else {
493            return Err(Status::unauthenticated("Token missing jti"));
494        };
495
496        let Some(sub) = claims.get("sub").and_then(|v| v.as_str()) else {
497            return Err(Status::unauthenticated("Token missing sub"));
498        };
499
500        if !self.is_session_authorized(jti, None) {
501            return Err(Status::unauthenticated("Session revoked or expired"));
502        }
503
504        // Revoke old token
505        self.session_activity.remove(jti);
506
507        // Mint new token
508        let (new_token, exp) = self.mint_session_token(sub)?;
509
510        Ok(Response::new(RefreshResponse {
511            session_token: new_token,
512            expires_at_unix_ms: exp,
513        }))
514    }
515
516    async fn issue_connect_token(
517        &self,
518        request: Request<ConnectTokenRequest>,
519    ) -> Result<Response<ConnectTokenResponse>, Status> {
520        let req = request.into_inner();
521
522        let client_id = rand::random::<u64>();
523        let mut rng = rand::rng();
524        let mut nonce = [0u8; 24];
525        rng.fill(&mut nonce);
526        let server_nonce = base64::engine::general_purpose::STANDARD.encode(nonce);
527
528        let iat = Utc::now();
529        let exp = iat + Duration::minutes(5);
530
531        let token = PasetoBuilder::<V4, Local>::default()
532            .set_claim(CustomClaim::try_from(("client_id", serde_json::json!(client_id))).unwrap())
533            .set_claim(
534                CustomClaim::try_from(("server", serde_json::json!(req.server_address))).unwrap(),
535            )
536            .set_claim(
537                CustomClaim::try_from(("server_nonce", serde_json::json!(server_nonce))).unwrap(),
538            )
539            .set_claim(IssuedAtClaim::try_from(iat.to_rfc3339().as_str()).unwrap())
540            .set_claim(ExpirationClaim::try_from(exp.to_rfc3339().as_str()).unwrap())
541            .build(&self.transport_key)
542            .map_err(|e| Status::internal(format!("{e:?}")))?;
543
544        Ok(Response::new(ConnectTokenResponse {
545            token: Some(QuicConnectToken {
546                paseto: token,
547                server_address: req.server_address,
548                expires_at_unix_ms: exp.timestamp_millis() as u64,
549                client_id,
550            }),
551        }))
552    }
553
554    async fn create_google_login_nonce(
555        &self,
556        _request: Request<GoogleLoginNonceRequest>,
557    ) -> Result<Response<GoogleLoginNonceResponse>, Status> {
558        let mut nonce_bytes = [0u8; 16];
559        rand::rng().fill(&mut nonce_bytes);
560        let nonce = hex::encode(nonce_bytes);
561
562        let nonce_handle = Ulid::new().to_string();
563        let expires_at = Utc::now() + Duration::minutes(10);
564
565        self.otp_store.insert(
566            nonce_handle.clone(),
567            OtpRecord {
568                email: String::new(),
569                code_hash: Vec::new(),
570                google_nonce: Some(nonce.clone()),
571                expires_at,
572                attempts: 0,
573            },
574        );
575
576        Ok(Response::new(GoogleLoginNonceResponse {
577            nonce_handle,
578            nonce,
579            expires_at_unix_ms: expires_at.timestamp_millis() as u64,
580        }))
581    }
582}