Skip to main content

aetheris_server/auth/
mod.rs

1#![allow(clippy::missing_errors_doc)]
2#![allow(clippy::result_large_err)]
3#![allow(clippy::cast_sign_loss)]
4
5use aetheris_protocol::auth::v1::{
6    ConnectTokenRequest, ConnectTokenResponse, GoogleLoginNonceRequest, GoogleLoginNonceResponse,
7    LoginMethod, LoginRequest, LoginResponse, LogoutRequest, LogoutResponse, OtpRequest,
8    OtpRequestAck, QuicConnectToken, RefreshRequest, RefreshResponse,
9    auth_service_server::AuthService, login_request::Method,
10};
11use async_trait::async_trait;
12use base64::Engine;
13use blake2::{Blake2b, Digest, digest::consts::U32};
14use chrono::{DateTime, Duration, Utc};
15use dashmap::DashMap;
16use rand::RngExt;
17use rusty_paseto::prelude::{
18    CustomClaim, ExpirationClaim, IssuedAtClaim, Key, Local, PasetoBuilder, PasetoParser,
19    PasetoSymmetricKey, SubjectClaim, TokenIdentifierClaim, V4,
20};
21use std::sync::Arc;
22use subtle::ConstantTimeEq;
23use tonic::{Request, Response, Status};
24use tracing::warn;
25use ulid::Ulid;
26
27pub mod email;
28pub mod google;
29pub mod rate_limit;
30
31use email::EmailSender;
32use google::GoogleOidcClient;
33use rate_limit::{InMemoryRateLimiter, RateLimitType};
34
35pub struct OtpRecord {
36    pub email: String,
37    pub code_hash: Vec<u8>,
38    pub google_nonce: Option<String>,
39    pub expires_at: DateTime<Utc>,
40    pub attempts: u8,
41}
42
43#[derive(Clone)]
44pub struct AuthServiceImpl {
45    otp_store: Arc<DashMap<String, OtpRecord>>,
46    /// Maps Session JTI -> Last Activity Unix Timestamp
47    session_activity: Arc<DashMap<String, i64>>,
48    /// Maps Player ID -> () (existence check for P1)
49    player_registry: Arc<DashMap<String, ()>>,
50    email_sender: Arc<dyn EmailSender>,
51    google_client: Arc<Option<GoogleOidcClient>>,
52    pub(crate) session_key: Arc<PasetoSymmetricKey<V4, Local>>,
53    transport_key: Arc<PasetoSymmetricKey<V4, Local>>,
54    rate_limiter: InMemoryRateLimiter,
55    bypass_enabled: bool,
56}
57
58impl std::fmt::Debug for AuthServiceImpl {
59    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60        f.debug_struct("AuthServiceImpl").finish_non_exhaustive()
61    }
62}
63
64impl AuthServiceImpl {
65    /// Creates a new `AuthServiceImpl` with the provided `email_sender`.
66    ///
67    /// # Panics
68    ///
69    /// - If `AETHERIS_ENV` is set to "production" but `AETHERIS_AUTH_BYPASS` is enabled.
70    /// - If `AETHERIS_ENV` is set to "production" but `SESSION_PASETO_KEY` or `TRANSPORT_PASETO_KEY` are missing.
71    /// - If the provided PASETO keys are not exactly 32 bytes long.
72    pub async fn new(email_sender: Arc<dyn EmailSender>) -> Self {
73        let env = std::env::var("AETHERIS_ENV").unwrap_or_else(|_| "dev".to_string());
74
75        let session_key_str =
76            std::env::var("SESSION_PASETO_KEY").map_err(|_| "SESSION_PASETO_KEY missing");
77        let transport_key_str =
78            std::env::var("TRANSPORT_PASETO_KEY").map_err(|_| "TRANSPORT_PASETO_KEY missing");
79
80        let bypass_enabled = std::env::var("AETHERIS_AUTH_BYPASS").is_ok_and(|v| {
81            let v = v.to_lowercase();
82            v == "1" || v == "true" || v == "yes" || v == "on"
83        });
84
85        if env == "production" {
86            assert!(
87                !bypass_enabled,
88                "AETHERIS_AUTH_BYPASS=1 is forbidden in production"
89            );
90            assert!(
91                !(session_key_str.is_err() || transport_key_str.is_err()),
92                "PASETO keys must be explicitly set in production"
93            );
94        }
95
96        let session_key_val = session_key_str.unwrap_or_else(|_| {
97            assert!(env != "production", "Missing SESSION_PASETO_KEY");
98            "01234567890123456789012345678901".to_string()
99        });
100        let transport_key_val = transport_key_str.unwrap_or_else(|_| {
101            assert!(env != "production", "Missing TRANSPORT_PASETO_KEY");
102            "01234567890123456789012345678901".to_string()
103        });
104
105        assert!(
106            !(session_key_val.len() != 32 || transport_key_val.len() != 32),
107            "PASETO keys must be exactly 32 bytes"
108        );
109
110        let session_key =
111            PasetoSymmetricKey::<V4, Local>::from(Key::<32>::from(session_key_val.as_bytes()));
112        let transport_key =
113            PasetoSymmetricKey::<V4, Local>::from(Key::<32>::from(transport_key_val.as_bytes()));
114
115        let google_client = GoogleOidcClient::new().await.ok();
116
117        if bypass_enabled {
118            tracing::warn!(
119                "Authentication bypass is ENABLED (DEV ONLY) — 000001 code will work for smoke-test@aetheris.dev"
120            );
121        } else {
122            tracing::info!("Authentication bypass is disabled");
123        }
124
125        Self {
126            otp_store: Arc::new(DashMap::new()),
127            session_activity: Arc::new(DashMap::new()),
128            player_registry: Arc::new(DashMap::new()),
129            email_sender,
130            google_client: Arc::new(google_client),
131            session_key: Arc::new(session_key),
132            transport_key: Arc::new(transport_key),
133            rate_limiter: InMemoryRateLimiter::new(),
134            bypass_enabled,
135        }
136    }
137
138    /// Normalizes email according to M1005 spec: trim whitespace and lowercase the entire address.
139    fn normalize_email(email: &str) -> String {
140        email.trim().to_lowercase()
141    }
142
143    fn derive_player_id(method: &str, identifier: &str) -> String {
144        use sha2::{Digest, Sha256};
145        let mut hasher = Sha256::new();
146        hasher.update(format!("aetheris:{method}:{identifier}").as_bytes());
147        let hash = hasher.finalize();
148        let mut buf = [0u8; 16];
149        buf.copy_from_slice(&hash[0..16]);
150        Ulid::from(u128::from_be_bytes(buf)).to_string()
151    }
152
153    #[must_use]
154    pub fn is_authorized(&self, token: &str) -> bool {
155        self.validate_and_get_jti(token, None).is_some()
156    }
157
158    /// Validates a session token and returns the JTI if authorized.
159    #[must_use]
160    pub fn validate_and_get_jti(&self, token: &str, tick: Option<u64>) -> Option<String> {
161        let claims = PasetoParser::<V4, Local>::default()
162            .parse(token, &self.session_key)
163            .ok()?;
164
165        let jti = claims.get("jti").and_then(|v| v.as_str())?;
166
167        if self.is_session_authorized(jti, tick) {
168            Some(jti.to_string())
169        } else {
170            None
171        }
172    }
173
174    /// Validates a session by JTI, checking for revocation and enforcing 1h sliding idle window.
175    ///
176    /// # Performance
177    /// if `tick` is provided, activity updates are coalesced to once per 60 ticks (~1s)
178    /// to reduce write-lock contention on the session map.
179    #[must_use]
180    pub fn is_session_authorized(&self, jti: &str, tick: Option<u64>) -> bool {
181        // Optimistic Read: Check for existence and idle timeout without write lock first.
182        let (needs_update, now_ts) = if let Some(activity) = self.session_activity.get(jti) {
183            let now = Utc::now().timestamp();
184            // Idle timeout: 1 hour (3600 seconds)
185            if now - *activity > 3600 {
186                return false;
187            }
188
189            // Coalescing: Only update if tick is unavailable or it's a multiple of 60 (with jitter).
190            // We use a simple summation of jti bytes to stagger updates across the 60-tick window
191            // and avoid synchronized lock contention from 1000s of clients simultaneously.
192            let jitter = jti
193                .as_bytes()
194                .iter()
195                .fold(0u64, |acc, &x| acc.wrapping_add(u64::from(x)));
196            let needs_update = tick.is_none_or(|t| (t.wrapping_add(jitter)) % 60 == 0);
197            (needs_update, now)
198        } else {
199            // Not in activity map -> revoked or expired
200            return false;
201        };
202
203        if needs_update && let Some(mut activity) = self.session_activity.get_mut(jti) {
204            *activity = now_ts;
205        }
206
207        true
208    }
209
210    pub fn mint_session_token_for_test(&self, player_id: &str) -> Result<(String, u64), Status> {
211        self.mint_session_token(player_id, None)
212    }
213
214    fn mint_session_token(
215        &self,
216        player_id: &str,
217        jti: Option<String>,
218    ) -> Result<(String, u64), Status> {
219        let jti = jti.unwrap_or_else(|| Ulid::new().to_string());
220        let iat = Utc::now();
221        let exp = iat + Duration::hours(24);
222
223        let token = PasetoBuilder::<V4, Local>::default()
224            .set_claim(SubjectClaim::from(player_id))
225            .set_claim(TokenIdentifierClaim::from(jti.as_str()))
226            .set_claim(IssuedAtClaim::try_from(iat.to_rfc3339().as_str()).unwrap())
227            .set_claim(ExpirationClaim::try_from(exp.to_rfc3339().as_str()).unwrap())
228            .build(&self.session_key)
229            .map_err(|e| Status::internal(format!("{e:?}")))?;
230
231        // Initialize session activity (store seconds, matched by is_session_authorized)
232        self.session_activity.insert(jti, iat.timestamp());
233
234        Ok((token, exp.timestamp_millis() as u64))
235    }
236}
237
238#[async_trait]
239impl AuthService for AuthServiceImpl {
240    async fn request_otp(
241        &self,
242        request: Request<OtpRequest>,
243    ) -> Result<Response<OtpRequestAck>, Status> {
244        // M10146 — Rate Limiting (M1005 §3.4.2)
245        // 1. IP-based limit (30/h)
246        if let Some(addr) = request.remote_addr() {
247            let ip = addr.ip().to_string();
248            self.rate_limiter.check_limit(RateLimitType::Ip, &ip)?;
249        } else {
250            warn!("Request missing remote_addr; IP rate limiting skipped (check proxy config)");
251        }
252
253        let req = request.into_inner();
254        let email = Self::normalize_email(&req.email);
255
256        // 2. Email-based limit (5/h)
257        self.rate_limiter
258            .check_limit(RateLimitType::Email, &email)?;
259
260        let mut rng = rand::rng();
261        let code = format!("{:06}", rng.random_range(0..1_000_000));
262        let request_id = Ulid::new().to_string();
263        let expires_at = Utc::now() + Duration::minutes(10);
264
265        let mut hasher = Blake2b::<U32>::new();
266        hasher.update(code.as_bytes());
267        hasher.update(request_id.as_bytes());
268        let code_hash = hasher.finalize().to_vec();
269
270        self.otp_store.insert(
271            request_id.clone(),
272            OtpRecord {
273                email: email.clone(),
274                code_hash,
275                google_nonce: None,
276                expires_at,
277                attempts: 0,
278            },
279        );
280
281        let sender = self.email_sender.clone();
282        let code_clone = code.clone();
283        let env = std::env::var("AETHERIS_ENV").unwrap_or_else(|_| "dev".to_string());
284        if env == "production" {
285            tracing::info!(request_id = %request_id, "Generated OTP");
286        } else {
287            tracing::info!(request_id = %request_id, email = %email, code = %code, "Generated OTP (DEV ONLY)");
288        }
289        tokio::spawn(async move {
290            let _ = sender
291                .send(
292                    &email,
293                    "Your Aetheris OTP",
294                    &format!("Code: {code_clone}"),
295                    &format!("<h1>Code: {code_clone}</h1>"),
296                )
297                .await;
298        });
299
300        Ok(Response::new(OtpRequestAck {
301            request_id,
302            expires_at_unix_ms: expires_at.timestamp_millis() as u64,
303            retry_after_seconds: Some(0), // 0 on normal path per spec
304        }))
305    }
306
307    #[allow(clippy::too_many_lines)]
308    async fn login(
309        &self,
310        request: Request<LoginRequest>,
311    ) -> Result<Response<LoginResponse>, Status> {
312        let req = request.into_inner();
313        let metadata = req.metadata.unwrap_or_default();
314        let method = req
315            .method
316            .ok_or_else(|| Status::invalid_argument("Missing login method"))?;
317
318        tracing::info!(
319            version = metadata.client_version,
320            platform = metadata.platform,
321            "Processing login request"
322        );
323
324        match method {
325            Method::Otp(otp_req) => {
326                let (request_id, code) = (otp_req.request_id, otp_req.code);
327
328                // T100.20 — decisional delay before first store access to mitigate timing side-channels
329                tokio::time::sleep(std::time::Duration::from_millis(15)).await;
330
331                let mut entry = self
332                    .otp_store
333                    .get_mut(&request_id)
334                    .ok_or_else(|| Status::unauthenticated("Invalid credentials"))?;
335
336                // M1005 §3.4.1 — Authentication Bypass for Automated Integration Tests
337                // This block allows 'smoke-test@aetheris.dev' to login with pre-defined codes
338                // when AETHERIS_AUTH_BYPASS is enabled (strictly DEV/TEST environments only).
339                // TODO: In Phase 4, consider moving this logic to a dedicated Auth Sidecar.
340                if self.bypass_enabled && entry.email == "smoke-test@aetheris.dev" {
341                    // "000001" is the canonical 'Success' code for automated smoke tests and playground.
342                    if code == "000001" {
343                        tracing::warn!(email = entry.email, "Bypass authentication successful");
344                        let player_id = Self::derive_player_id("email", &entry.email);
345
346                        // Check if new player
347                        let is_new_player =
348                            self.player_registry.insert(player_id.clone(), ()).is_none();
349
350                        let (token, exp) =
351                            self.mint_session_token(&player_id, Some("admin".to_string()))?;
352                        drop(entry);
353                        self.otp_store.remove(&request_id);
354
355                        return Ok(Response::new(LoginResponse {
356                            session_token: token,
357                            expires_at_unix_ms: exp,
358                            player_id,
359                            is_new_player,
360                            login_method: LoginMethod::EmailOtp as i32,
361                        }));
362                    }
363
364                    // "000000" is the canonical 'Failure' code used to validate client-side error handling.
365                    if code == "000000" {
366                        entry.attempts += 1;
367                        if entry.attempts >= 3 {
368                            drop(entry);
369                            self.otp_store.remove(&request_id);
370                        }
371                        return Err(Status::unauthenticated("Bypass: Forced failure for 000000"));
372                    }
373                }
374
375                if Utc::now() > entry.expires_at {
376                    drop(entry);
377                    self.otp_store.remove(&request_id);
378                    return Err(Status::deadline_exceeded("OTP expired"));
379                }
380
381                let mut hasher = Blake2b::<U32>::new();
382                hasher.update(code.as_bytes());
383                hasher.update(request_id.as_bytes());
384                let hash = hasher.finalize();
385
386                if hash.as_slice().ct_eq(&entry.code_hash).into() {
387                    let player_id = Self::derive_player_id("email", &entry.email);
388
389                    // Check if new player
390                    let is_new_player =
391                        self.player_registry.insert(player_id.clone(), ()).is_none();
392
393                    let (token, exp) = self.mint_session_token(&player_id, None)?;
394                    drop(entry);
395                    self.otp_store.remove(&request_id);
396
397                    Ok(Response::new(LoginResponse {
398                        session_token: token,
399                        expires_at_unix_ms: exp,
400                        player_id,
401                        is_new_player,
402                        login_method: LoginMethod::EmailOtp as i32,
403                    }))
404                } else {
405                    entry.attempts += 1;
406                    if entry.attempts >= 3 {
407                        drop(entry);
408                        self.otp_store.remove(&request_id);
409                    }
410                    Err(Status::unauthenticated("Invalid code"))
411                }
412            }
413            Method::Google(google_req) => {
414                let google_client = self
415                    .google_client
416                    .as_ref()
417                    .as_ref()
418                    .ok_or_else(|| Status::internal("Google OIDC not configured"))?;
419
420                let nonce = self
421                    .otp_store
422                    .get(&google_req.nonce_handle)
423                    .and_then(|e| e.google_nonce.clone())
424                    .ok_or_else(|| Status::unauthenticated("Invalid nonce_handle"))?;
425
426                let claims = google_client.verify_token(&google_req.google_id_token, &nonce)?;
427                self.otp_store.remove(&google_req.nonce_handle);
428
429                // Use 'sub' (Subject) as the stable identifier for the player instead of email.
430                let identity = claims.subject().to_string();
431                let player_id = Self::derive_player_id("google", &identity);
432
433                // Check if new player
434                let is_new_player = self.player_registry.insert(player_id.clone(), ()).is_none();
435
436                let (token, exp) = self.mint_session_token(&player_id, None)?;
437
438                Ok(Response::new(LoginResponse {
439                    session_token: token,
440                    expires_at_unix_ms: exp,
441                    player_id,
442                    is_new_player,
443                    login_method: LoginMethod::GoogleOidc as i32,
444                }))
445            }
446        }
447    }
448
449    async fn logout(
450        &self,
451        request: Request<LogoutRequest>,
452    ) -> Result<Response<LogoutResponse>, Status> {
453        let mut jti_to_revoke = None;
454
455        let token_from_metadata = request.metadata().get("authorization").and_then(|t| {
456            t.to_str()
457                .ok()
458                .map(|s| s.trim_start_matches("Bearer ").to_string())
459        });
460
461        let token_str = token_from_metadata.or_else(|| {
462            let body = request.get_ref();
463            if body.session_token.is_empty() {
464                None
465            } else {
466                Some(body.session_token.clone())
467            }
468        });
469
470        if let Some(token_clean) = token_str
471            && let Ok(claims) =
472                PasetoParser::<V4, Local>::default().parse(&token_clean, &self.session_key)
473            && let Some(jti) = claims.get("jti").and_then(|v| v.as_str())
474        {
475            jti_to_revoke = Some(jti.to_string());
476        }
477
478        if let Some(jti) = jti_to_revoke {
479            self.session_activity.remove(&jti);
480        }
481
482        Ok(Response::new(LogoutResponse { revoked: true }))
483    }
484
485    async fn refresh_token(
486        &self,
487        request: Request<RefreshRequest>,
488    ) -> Result<Response<RefreshResponse>, Status> {
489        let req = request.into_inner();
490        let token = req.session_token;
491
492        let Ok(claims) = PasetoParser::<V4, Local>::default().parse(&token, &self.session_key)
493        else {
494            return Err(Status::unauthenticated("Invalid session token"));
495        };
496
497        let Some(jti) = claims.get("jti").and_then(|v| v.as_str()) else {
498            return Err(Status::unauthenticated("Token missing jti"));
499        };
500
501        let Some(sub) = claims.get("sub").and_then(|v| v.as_str()) else {
502            return Err(Status::unauthenticated("Token missing sub"));
503        };
504
505        if !self.is_session_authorized(jti, None) {
506            return Err(Status::unauthenticated("Session revoked or expired"));
507        }
508
509        // Revoke old token
510        self.session_activity.remove(jti);
511
512        // Mint new token (preserve admin status if old token was admin)
513        let new_jti = if jti == "admin" {
514            Some("admin".to_string())
515        } else {
516            None
517        };
518        let (new_token, exp) = self.mint_session_token(sub, new_jti)?;
519
520        Ok(Response::new(RefreshResponse {
521            session_token: new_token,
522            expires_at_unix_ms: exp,
523        }))
524    }
525
526    async fn issue_connect_token(
527        &self,
528        request: Request<ConnectTokenRequest>,
529    ) -> Result<Response<ConnectTokenResponse>, Status> {
530        let req = request.into_inner();
531
532        let client_id = rand::random::<u64>();
533        let mut rng = rand::rng();
534        let mut nonce = [0u8; 24];
535        rng.fill(&mut nonce);
536        let server_nonce = base64::engine::general_purpose::STANDARD.encode(nonce);
537
538        let iat = Utc::now();
539        let exp = iat + Duration::minutes(5);
540
541        let token = PasetoBuilder::<V4, Local>::default()
542            .set_claim(CustomClaim::try_from(("client_id", serde_json::json!(client_id))).unwrap())
543            .set_claim(
544                CustomClaim::try_from(("server", serde_json::json!(req.server_address))).unwrap(),
545            )
546            .set_claim(
547                CustomClaim::try_from(("server_nonce", serde_json::json!(server_nonce))).unwrap(),
548            )
549            .set_claim(IssuedAtClaim::try_from(iat.to_rfc3339().as_str()).unwrap())
550            .set_claim(ExpirationClaim::try_from(exp.to_rfc3339().as_str()).unwrap())
551            .build(&self.transport_key)
552            .map_err(|e| Status::internal(format!("{e:?}")))?;
553
554        Ok(Response::new(ConnectTokenResponse {
555            token: Some(QuicConnectToken {
556                paseto: token,
557                server_address: req.server_address,
558                expires_at_unix_ms: exp.timestamp_millis() as u64,
559                client_id,
560            }),
561        }))
562    }
563
564    async fn create_google_login_nonce(
565        &self,
566        _request: Request<GoogleLoginNonceRequest>,
567    ) -> Result<Response<GoogleLoginNonceResponse>, Status> {
568        let mut nonce_bytes = [0u8; 16];
569        rand::rng().fill(&mut nonce_bytes);
570        let nonce = hex::encode(nonce_bytes);
571
572        let nonce_handle = Ulid::new().to_string();
573        let expires_at = Utc::now() + Duration::minutes(10);
574
575        self.otp_store.insert(
576            nonce_handle.clone(),
577            OtpRecord {
578                email: String::new(),
579                code_hash: Vec::new(),
580                google_nonce: Some(nonce.clone()),
581                expires_at,
582                attempts: 0,
583            },
584        );
585
586        Ok(Response::new(GoogleLoginNonceResponse {
587            nonce_handle,
588            nonce,
589            expires_at_unix_ms: expires_at.timestamp_millis() as u64,
590        }))
591    }
592}