Skip to main content

aetheris_server/auth/
mod.rs

1#![allow(clippy::missing_errors_doc)]
2#![allow(clippy::result_large_err)]
3#![allow(clippy::cast_sign_loss)]
4
5use aetheris_protocol::auth::v1::{
6    ConnectTokenRequest, ConnectTokenResponse, GoogleLoginNonceRequest, GoogleLoginNonceResponse,
7    LoginMethod, LoginRequest, LoginResponse, LogoutRequest, LogoutResponse, OtpRequest,
8    OtpRequestAck, QuicConnectToken, RefreshRequest, RefreshResponse,
9    auth_service_server::AuthService, login_request::Method,
10};
11use async_trait::async_trait;
12use base64::Engine;
13use blake2::{Blake2b, Digest, digest::consts::U32};
14use chrono::{DateTime, Duration, Utc};
15use dashmap::DashMap;
16use rand::RngExt;
17use rusty_paseto::prelude::{
18    CustomClaim, ExpirationClaim, IssuedAtClaim, Key, Local, PasetoBuilder, PasetoParser,
19    PasetoSymmetricKey, SubjectClaim, TokenIdentifierClaim, V4,
20};
21use std::sync::Arc;
22use subtle::ConstantTimeEq;
23use tonic::{Request, Response, Status};
24use ulid::Ulid;
25
26pub mod email;
27pub mod google;
28
29use email::EmailSender;
30use google::GoogleOidcClient;
31
32pub struct OtpRecord {
33    pub email: String,
34    pub code_hash: Vec<u8>,
35    pub google_nonce: Option<String>,
36    pub expires_at: DateTime<Utc>,
37    pub attempts: u8,
38}
39
40#[derive(Clone)]
41pub struct AuthServiceImpl {
42    otp_store: Arc<DashMap<String, OtpRecord>>,
43    /// Maps Session JTI -> Last Activity Unix Timestamp
44    session_activity: Arc<DashMap<String, i64>>,
45    /// Maps Player ID -> () (existence check for P1)
46    player_registry: Arc<DashMap<String, ()>>,
47    email_sender: Arc<dyn EmailSender>,
48    google_client: Arc<Option<GoogleOidcClient>>,
49    pub(crate) session_key: Arc<PasetoSymmetricKey<V4, Local>>,
50    transport_key: Arc<PasetoSymmetricKey<V4, Local>>,
51    bypass_enabled: bool,
52}
53
54impl std::fmt::Debug for AuthServiceImpl {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        f.debug_struct("AuthServiceImpl").finish_non_exhaustive()
57    }
58}
59
60impl AuthServiceImpl {
61    /// Creates a new `AuthServiceImpl` with the provided `email_sender`.
62    ///
63    /// # Panics
64    ///
65    /// - If `AETHERIS_ENV` is set to "production" but `AETHERIS_AUTH_BYPASS` is enabled.
66    /// - If `AETHERIS_ENV` is set to "production" but `SESSION_PASETO_KEY` or `TRANSPORT_PASETO_KEY` are missing.
67    /// - If the provided PASETO keys are not exactly 32 bytes long.
68    pub async fn new(email_sender: Arc<dyn EmailSender>) -> Self {
69        let env = std::env::var("AETHERIS_ENV").unwrap_or_else(|_| "dev".to_string());
70
71        let session_key_str =
72            std::env::var("SESSION_PASETO_KEY").map_err(|_| "SESSION_PASETO_KEY missing");
73        let transport_key_str =
74            std::env::var("TRANSPORT_PASETO_KEY").map_err(|_| "TRANSPORT_PASETO_KEY missing");
75
76        let bypass_enabled = std::env::var("AETHERIS_AUTH_BYPASS").is_ok_and(|v| {
77            let v = v.to_lowercase();
78            v == "1" || v == "true" || v == "yes" || v == "on"
79        });
80
81        if env == "production" {
82            assert!(
83                !bypass_enabled,
84                "AETHERIS_AUTH_BYPASS=1 is forbidden in production"
85            );
86            assert!(
87                !(session_key_str.is_err() || transport_key_str.is_err()),
88                "PASETO keys must be explicitly set in production"
89            );
90        }
91
92        let session_key_val = session_key_str.unwrap_or_else(|_| {
93            assert!(env != "production", "Missing SESSION_PASETO_KEY");
94            "01234567890123456789012345678901".to_string()
95        });
96        let transport_key_val = transport_key_str.unwrap_or_else(|_| {
97            assert!(env != "production", "Missing TRANSPORT_PASETO_KEY");
98            "01234567890123456789012345678901".to_string()
99        });
100
101        assert!(
102            !(session_key_val.len() != 32 || transport_key_val.len() != 32),
103            "PASETO keys must be exactly 32 bytes"
104        );
105
106        let session_key =
107            PasetoSymmetricKey::<V4, Local>::from(Key::<32>::from(session_key_val.as_bytes()));
108        let transport_key =
109            PasetoSymmetricKey::<V4, Local>::from(Key::<32>::from(transport_key_val.as_bytes()));
110
111        let google_client = GoogleOidcClient::new().await.ok();
112
113        if bypass_enabled {
114            tracing::warn!(
115                "Authentication bypass is ENABLED (DEV ONLY) — 000001 code will work for smoke-test@aetheris.dev"
116            );
117        } else {
118            tracing::info!("Authentication bypass is disabled");
119        }
120
121        Self {
122            otp_store: Arc::new(DashMap::new()),
123            session_activity: Arc::new(DashMap::new()),
124            player_registry: Arc::new(DashMap::new()),
125            email_sender,
126            google_client: Arc::new(google_client),
127            session_key: Arc::new(session_key),
128            transport_key: Arc::new(transport_key),
129            bypass_enabled,
130        }
131    }
132
133    /// Normalizes email according to M1005 spec: trim whitespace and lowercase the entire address.
134    fn normalize_email(email: &str) -> String {
135        email.trim().to_lowercase()
136    }
137
138    fn derive_player_id(method: &str, identifier: &str) -> String {
139        use sha2::{Digest, Sha256};
140        let mut hasher = Sha256::new();
141        hasher.update(format!("aetheris:{method}:{identifier}").as_bytes());
142        let hash = hasher.finalize();
143        let mut buf = [0u8; 16];
144        buf.copy_from_slice(&hash[0..16]);
145        Ulid::from(u128::from_be_bytes(buf)).to_string()
146    }
147
148    #[must_use]
149    pub fn is_authorized(&self, token: &str) -> bool {
150        self.validate_and_get_jti(token, None).is_some()
151    }
152
153    /// Validates a session token and returns the JTI if authorized.
154    #[must_use]
155    pub fn validate_and_get_jti(&self, token: &str, tick: Option<u64>) -> Option<String> {
156        let claims = PasetoParser::<V4, Local>::default()
157            .parse(token, &self.session_key)
158            .ok()?;
159
160        let jti = claims.get("jti").and_then(|v| v.as_str())?;
161
162        if self.is_session_authorized(jti, tick) {
163            Some(jti.to_string())
164        } else {
165            None
166        }
167    }
168
169    /// Validates a session by JTI, checking for revocation and enforcing 1h sliding idle window.
170    ///
171    /// # Performance
172    /// if `tick` is provided, activity updates are coalesced to once per 60 ticks (~1s)
173    /// to reduce write-lock contention on the session map.
174    #[must_use]
175    pub fn is_session_authorized(&self, jti: &str, tick: Option<u64>) -> bool {
176        // Optimistic Read: Check for existence and idle timeout without write lock first.
177        let (needs_update, now_ts) = if let Some(activity) = self.session_activity.get(jti) {
178            let now = Utc::now().timestamp();
179            // Idle timeout: 1 hour (3600 seconds)
180            if now - *activity > 3600 {
181                return false;
182            }
183
184            // Coalescing: Only update if tick is unavailable or it's a multiple of 60 (with jitter).
185            // We use a simple summation of jti bytes to stagger updates across the 60-tick window
186            // and avoid synchronized lock contention from 1000s of clients simultaneously.
187            let jitter = jti
188                .as_bytes()
189                .iter()
190                .fold(0u64, |acc, &x| acc.wrapping_add(u64::from(x)));
191            let needs_update = tick.is_none_or(|t| (t.wrapping_add(jitter)) % 60 == 0);
192            (needs_update, now)
193        } else {
194            // Not in activity map -> revoked or expired
195            return false;
196        };
197
198        if needs_update && let Some(mut activity) = self.session_activity.get_mut(jti) {
199            *activity = now_ts;
200        }
201
202        true
203    }
204
205    pub fn mint_session_token_for_test(&self, player_id: &str) -> Result<(String, u64), Status> {
206        self.mint_session_token(player_id)
207    }
208
209    fn mint_session_token(&self, player_id: &str) -> Result<(String, u64), Status> {
210        let jti = Ulid::new().to_string();
211        let iat = Utc::now();
212        let exp = iat + Duration::hours(24);
213
214        let token = PasetoBuilder::<V4, Local>::default()
215            .set_claim(SubjectClaim::from(player_id))
216            .set_claim(TokenIdentifierClaim::from(jti.as_str()))
217            .set_claim(IssuedAtClaim::try_from(iat.to_rfc3339().as_str()).unwrap())
218            .set_claim(ExpirationClaim::try_from(exp.to_rfc3339().as_str()).unwrap())
219            .build(&self.session_key)
220            .map_err(|e| Status::internal(format!("{e:?}")))?;
221
222        // Initialize session activity
223        self.session_activity.insert(jti, iat.timestamp_millis());
224
225        Ok((token, exp.timestamp_millis() as u64))
226    }
227}
228
229#[async_trait]
230impl AuthService for AuthServiceImpl {
231    async fn request_otp(
232        &self,
233        request: Request<OtpRequest>,
234    ) -> Result<Response<OtpRequestAck>, Status> {
235        let req = request.into_inner();
236        let email = Self::normalize_email(&req.email);
237
238        // TODO: Implement per-email 5/h and per-IP 30/h rate limits (M1005 §3.4.2)
239        // Link to spec: docs/roadmap/phase-1-playable-mvp/specs/M1005_control_plane_services.md
240
241        let mut rng = rand::rng();
242        let code = format!("{:06}", rng.random_range(0..1_000_000));
243        let request_id = Ulid::new().to_string();
244        let expires_at = Utc::now() + Duration::minutes(10);
245
246        let mut hasher = Blake2b::<U32>::new();
247        hasher.update(code.as_bytes());
248        hasher.update(request_id.as_bytes());
249        let code_hash = hasher.finalize().to_vec();
250
251        self.otp_store.insert(
252            request_id.clone(),
253            OtpRecord {
254                email: email.clone(),
255                code_hash,
256                google_nonce: None,
257                expires_at,
258                attempts: 0,
259            },
260        );
261
262        let sender = self.email_sender.clone();
263        let code_clone = code.clone();
264        let env = std::env::var("AETHERIS_ENV").unwrap_or_else(|_| "dev".to_string());
265        if env == "production" {
266            tracing::info!(request_id = %request_id, "Generated OTP");
267        } else {
268            tracing::info!(request_id = %request_id, email = %email, code = %code, "Generated OTP (DEV ONLY)");
269        }
270        tokio::spawn(async move {
271            let _ = sender
272                .send(
273                    &email,
274                    "Your Aetheris OTP",
275                    &format!("Code: {code_clone}"),
276                    &format!("<h1>Code: {code_clone}</h1>"),
277                )
278                .await;
279        });
280
281        Ok(Response::new(OtpRequestAck {
282            request_id,
283            expires_at_unix_ms: expires_at.timestamp_millis() as u64,
284            retry_after_seconds: Some(0), // 0 on normal path per spec
285        }))
286    }
287
288    #[allow(clippy::too_many_lines)]
289    async fn login(
290        &self,
291        request: Request<LoginRequest>,
292    ) -> Result<Response<LoginResponse>, Status> {
293        let req = request.into_inner();
294        let metadata = req.metadata.unwrap_or_default();
295        let method = req
296            .method
297            .ok_or_else(|| Status::invalid_argument("Missing login method"))?;
298
299        tracing::info!(
300            version = metadata.client_version,
301            platform = metadata.platform,
302            "Processing login request"
303        );
304
305        match method {
306            Method::Otp(otp_req) => {
307                let (request_id, code) = (otp_req.request_id, otp_req.code);
308
309                // T100.20 — decisional delay before first store access to mitigate timing side-channels
310                tokio::time::sleep(std::time::Duration::from_millis(15)).await;
311
312                let mut entry = self
313                    .otp_store
314                    .get_mut(&request_id)
315                    .ok_or_else(|| Status::unauthenticated("Invalid credentials"))?;
316
317                if self.bypass_enabled && entry.email == "smoke-test@aetheris.dev" {
318                    if code == "000000" {
319                        entry.attempts += 1;
320                        if entry.attempts >= 3 {
321                            drop(entry);
322                            self.otp_store.remove(&request_id);
323                        }
324                        return Err(Status::unauthenticated("Bypass: Forced failure for 000000"));
325                    }
326
327                    if Utc::now() > entry.expires_at {
328                        drop(entry);
329                        self.otp_store.remove(&request_id);
330                        return Err(Status::deadline_exceeded("OTP expired"));
331                    }
332
333                    tracing::warn!(email = entry.email, "Bypass authentication successful");
334                    let player_id = Self::derive_player_id("email", &entry.email);
335
336                    // Check if new player
337                    let is_new_player =
338                        self.player_registry.insert(player_id.clone(), ()).is_none();
339
340                    let (token, exp) = self.mint_session_token(&player_id)?;
341                    drop(entry);
342                    self.otp_store.remove(&request_id);
343
344                    return Ok(Response::new(LoginResponse {
345                        session_token: token,
346                        expires_at_unix_ms: exp,
347                        player_id,
348                        is_new_player,
349                        login_method: LoginMethod::EmailOtp as i32,
350                    }));
351                }
352
353                if Utc::now() > entry.expires_at {
354                    drop(entry);
355                    self.otp_store.remove(&request_id);
356                    return Err(Status::deadline_exceeded("OTP expired"));
357                }
358
359                let mut hasher = Blake2b::<U32>::new();
360                hasher.update(code.as_bytes());
361                hasher.update(request_id.as_bytes());
362                let hash = hasher.finalize();
363
364                if hash.as_slice().ct_eq(&entry.code_hash).into() {
365                    let player_id = Self::derive_player_id("email", &entry.email);
366
367                    // Check if new player
368                    let is_new_player =
369                        self.player_registry.insert(player_id.clone(), ()).is_none();
370
371                    let (token, exp) = self.mint_session_token(&player_id)?;
372                    drop(entry);
373                    self.otp_store.remove(&request_id);
374
375                    Ok(Response::new(LoginResponse {
376                        session_token: token,
377                        expires_at_unix_ms: exp,
378                        player_id,
379                        is_new_player,
380                        login_method: LoginMethod::EmailOtp as i32,
381                    }))
382                } else {
383                    entry.attempts += 1;
384                    if entry.attempts >= 3 {
385                        drop(entry);
386                        self.otp_store.remove(&request_id);
387                    }
388                    Err(Status::unauthenticated("Invalid code"))
389                }
390            }
391            Method::Google(google_req) => {
392                let google_client = self
393                    .google_client
394                    .as_ref()
395                    .as_ref()
396                    .ok_or_else(|| Status::internal("Google OIDC not configured"))?;
397
398                let nonce = self
399                    .otp_store
400                    .get(&google_req.nonce_handle)
401                    .and_then(|e| e.google_nonce.clone())
402                    .ok_or_else(|| Status::unauthenticated("Invalid nonce_handle"))?;
403
404                let claims = google_client.verify_token(&google_req.google_id_token, &nonce)?;
405                self.otp_store.remove(&google_req.nonce_handle);
406
407                // Use 'sub' (Subject) as the stable identifier for the player instead of email.
408                let identity = claims.subject().to_string();
409                let player_id = Self::derive_player_id("google", &identity);
410
411                // Check if new player
412                let is_new_player = self.player_registry.insert(player_id.clone(), ()).is_none();
413
414                let (token, exp) = self.mint_session_token(&player_id)?;
415
416                Ok(Response::new(LoginResponse {
417                    session_token: token,
418                    expires_at_unix_ms: exp,
419                    player_id,
420                    is_new_player,
421                    login_method: LoginMethod::GoogleOidc as i32,
422                }))
423            }
424        }
425    }
426
427    async fn logout(
428        &self,
429        request: Request<LogoutRequest>,
430    ) -> Result<Response<LogoutResponse>, Status> {
431        let mut jti_to_revoke = None;
432
433        let token_from_metadata = request.metadata().get("authorization").and_then(|t| {
434            t.to_str()
435                .ok()
436                .map(|s| s.trim_start_matches("Bearer ").to_string())
437        });
438
439        let token_str = token_from_metadata.or_else(|| {
440            let body = request.get_ref();
441            if body.session_token.is_empty() {
442                None
443            } else {
444                Some(body.session_token.clone())
445            }
446        });
447
448        if let Some(token_clean) = token_str
449            && let Ok(claims) =
450                PasetoParser::<V4, Local>::default().parse(&token_clean, &self.session_key)
451            && let Some(jti) = claims.get("jti").and_then(|v| v.as_str())
452        {
453            jti_to_revoke = Some(jti.to_string());
454        }
455
456        if let Some(jti) = jti_to_revoke {
457            self.session_activity.remove(&jti);
458        }
459
460        Ok(Response::new(LogoutResponse { revoked: true }))
461    }
462
463    async fn refresh_token(
464        &self,
465        request: Request<RefreshRequest>,
466    ) -> Result<Response<RefreshResponse>, Status> {
467        let req = request.into_inner();
468        let token = req.session_token;
469
470        let Ok(claims) = PasetoParser::<V4, Local>::default().parse(&token, &self.session_key)
471        else {
472            return Err(Status::unauthenticated("Invalid session token"));
473        };
474
475        let Some(jti) = claims.get("jti").and_then(|v| v.as_str()) else {
476            return Err(Status::unauthenticated("Token missing jti"));
477        };
478
479        let Some(sub) = claims.get("sub").and_then(|v| v.as_str()) else {
480            return Err(Status::unauthenticated("Token missing sub"));
481        };
482
483        if !self.is_session_authorized(jti, None) {
484            return Err(Status::unauthenticated("Session revoked or expired"));
485        }
486
487        // Revoke old token
488        self.session_activity.remove(jti);
489
490        // Mint new token
491        let (new_token, exp) = self.mint_session_token(sub)?;
492
493        Ok(Response::new(RefreshResponse {
494            session_token: new_token,
495            expires_at_unix_ms: exp,
496        }))
497    }
498
499    async fn issue_connect_token(
500        &self,
501        request: Request<ConnectTokenRequest>,
502    ) -> Result<Response<ConnectTokenResponse>, Status> {
503        let req = request.into_inner();
504
505        let client_id = rand::random::<u64>();
506        let mut rng = rand::rng();
507        let mut nonce = [0u8; 24];
508        rng.fill(&mut nonce);
509        let server_nonce = base64::engine::general_purpose::STANDARD.encode(nonce);
510
511        let iat = Utc::now();
512        let exp = iat + Duration::minutes(5);
513
514        let token = PasetoBuilder::<V4, Local>::default()
515            .set_claim(CustomClaim::try_from(("client_id", serde_json::json!(client_id))).unwrap())
516            .set_claim(
517                CustomClaim::try_from(("server", serde_json::json!(req.server_address))).unwrap(),
518            )
519            .set_claim(
520                CustomClaim::try_from(("server_nonce", serde_json::json!(server_nonce))).unwrap(),
521            )
522            .set_claim(IssuedAtClaim::try_from(iat.to_rfc3339().as_str()).unwrap())
523            .set_claim(ExpirationClaim::try_from(exp.to_rfc3339().as_str()).unwrap())
524            .build(&self.transport_key)
525            .map_err(|e| Status::internal(format!("{e:?}")))?;
526
527        Ok(Response::new(ConnectTokenResponse {
528            token: Some(QuicConnectToken {
529                paseto: token,
530                server_address: req.server_address,
531                expires_at_unix_ms: exp.timestamp_millis() as u64,
532                client_id,
533            }),
534        }))
535    }
536
537    async fn create_google_login_nonce(
538        &self,
539        _request: Request<GoogleLoginNonceRequest>,
540    ) -> Result<Response<GoogleLoginNonceResponse>, Status> {
541        let mut nonce_bytes = [0u8; 16];
542        rand::rng().fill(&mut nonce_bytes);
543        let nonce = hex::encode(nonce_bytes);
544
545        let nonce_handle = Ulid::new().to_string();
546        let expires_at = Utc::now() + Duration::minutes(10);
547
548        self.otp_store.insert(
549            nonce_handle.clone(),
550            OtpRecord {
551                email: String::new(),
552                code_hash: Vec::new(),
553                google_nonce: Some(nonce.clone()),
554                expires_at,
555                attempts: 0,
556            },
557        );
558
559        Ok(Response::new(GoogleLoginNonceResponse {
560            nonce_handle,
561            nonce,
562            expires_at_unix_ms: expires_at.timestamp_millis() as u64,
563        }))
564    }
565}