1#![allow(clippy::missing_errors_doc)]
2#![allow(clippy::result_large_err)]
3#![allow(clippy::cast_sign_loss)]
4
5use aetheris_protocol::auth::v1::{
6 ConnectTokenRequest, ConnectTokenResponse, GoogleLoginNonceRequest, GoogleLoginNonceResponse,
7 LoginMethod, LoginRequest, LoginResponse, LogoutRequest, LogoutResponse, OtpRequest,
8 OtpRequestAck, QuicConnectToken, RefreshRequest, RefreshResponse,
9 auth_service_server::AuthService, login_request::Method,
10};
11use async_trait::async_trait;
12use base64::Engine;
13use blake2::{Blake2b, Digest, digest::consts::U32};
14use chrono::{DateTime, Duration, Utc};
15use dashmap::DashMap;
16use rand::RngExt;
17use rusty_paseto::prelude::{
18 CustomClaim, ExpirationClaim, IssuedAtClaim, Key, Local, PasetoBuilder, PasetoParser,
19 PasetoSymmetricKey, SubjectClaim, TokenIdentifierClaim, V4,
20};
21use std::sync::Arc;
22use subtle::ConstantTimeEq;
23use tonic::{Request, Response, Status};
24use tracing::warn;
25use ulid::Ulid;
26
27pub mod email;
28pub mod google;
29pub mod rate_limit;
30
31use email::EmailSender;
32use google::GoogleOidcClient;
33use rate_limit::{InMemoryRateLimiter, RateLimitType};
34
35pub struct OtpRecord {
36 pub email: String,
37 pub code_hash: Vec<u8>,
38 pub google_nonce: Option<String>,
39 pub expires_at: DateTime<Utc>,
40 pub attempts: u8,
41}
42
43#[derive(Clone)]
44pub struct AuthServiceImpl {
45 otp_store: Arc<DashMap<String, OtpRecord>>,
46 session_activity: Arc<DashMap<String, i64>>,
48 player_registry: Arc<DashMap<String, ()>>,
50 email_sender: Arc<dyn EmailSender>,
51 google_client: Arc<Option<GoogleOidcClient>>,
52 pub(crate) session_key: Arc<PasetoSymmetricKey<V4, Local>>,
53 transport_key: Arc<PasetoSymmetricKey<V4, Local>>,
54 rate_limiter: InMemoryRateLimiter,
55 bypass_enabled: bool,
56}
57
58impl std::fmt::Debug for AuthServiceImpl {
59 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60 f.debug_struct("AuthServiceImpl").finish_non_exhaustive()
61 }
62}
63
64impl AuthServiceImpl {
65 pub async fn new(email_sender: Arc<dyn EmailSender>) -> Self {
73 let env = std::env::var("AETHERIS_ENV").unwrap_or_else(|_| "dev".to_string());
74
75 let session_key_str =
76 std::env::var("SESSION_PASETO_KEY").map_err(|_| "SESSION_PASETO_KEY missing");
77 let transport_key_str =
78 std::env::var("TRANSPORT_PASETO_KEY").map_err(|_| "TRANSPORT_PASETO_KEY missing");
79
80 let bypass_enabled = std::env::var("AETHERIS_AUTH_BYPASS").is_ok_and(|v| {
81 let v = v.to_lowercase();
82 v == "1" || v == "true" || v == "yes" || v == "on"
83 });
84
85 if env == "production" {
86 assert!(
87 !bypass_enabled,
88 "AETHERIS_AUTH_BYPASS=1 is forbidden in production"
89 );
90 assert!(
91 !(session_key_str.is_err() || transport_key_str.is_err()),
92 "PASETO keys must be explicitly set in production"
93 );
94 }
95
96 let session_key_val = session_key_str.unwrap_or_else(|_| {
97 assert!(env != "production", "Missing SESSION_PASETO_KEY");
98 "01234567890123456789012345678901".to_string()
99 });
100 let transport_key_val = transport_key_str.unwrap_or_else(|_| {
101 assert!(env != "production", "Missing TRANSPORT_PASETO_KEY");
102 "01234567890123456789012345678901".to_string()
103 });
104
105 assert!(
106 !(session_key_val.len() != 32 || transport_key_val.len() != 32),
107 "PASETO keys must be exactly 32 bytes"
108 );
109
110 let session_key =
111 PasetoSymmetricKey::<V4, Local>::from(Key::<32>::from(session_key_val.as_bytes()));
112 let transport_key =
113 PasetoSymmetricKey::<V4, Local>::from(Key::<32>::from(transport_key_val.as_bytes()));
114
115 let google_client = GoogleOidcClient::new().await.ok();
116
117 if bypass_enabled {
118 tracing::warn!(
119 "Authentication bypass is ENABLED (DEV ONLY) — 000001 code will work for smoke-test@aetheris.dev"
120 );
121 } else {
122 tracing::info!("Authentication bypass is disabled");
123 }
124
125 Self {
126 otp_store: Arc::new(DashMap::new()),
127 session_activity: Arc::new(DashMap::new()),
128 player_registry: Arc::new(DashMap::new()),
129 email_sender,
130 google_client: Arc::new(google_client),
131 session_key: Arc::new(session_key),
132 transport_key: Arc::new(transport_key),
133 rate_limiter: InMemoryRateLimiter::new(),
134 bypass_enabled,
135 }
136 }
137
138 fn normalize_email(email: &str) -> String {
140 email.trim().to_lowercase()
141 }
142
143 fn derive_player_id(method: &str, identifier: &str) -> String {
144 use sha2::{Digest, Sha256};
145 let mut hasher = Sha256::new();
146 hasher.update(format!("aetheris:{method}:{identifier}").as_bytes());
147 let hash = hasher.finalize();
148 let mut buf = [0u8; 16];
149 buf.copy_from_slice(&hash[0..16]);
150 Ulid::from(u128::from_be_bytes(buf)).to_string()
151 }
152
153 #[must_use]
154 pub fn is_authorized(&self, token: &str) -> bool {
155 self.validate_and_get_jti(token, None).is_some()
156 }
157
158 #[must_use]
160 pub fn validate_and_get_jti(&self, token: &str, tick: Option<u64>) -> Option<String> {
161 let claims = PasetoParser::<V4, Local>::default()
162 .parse(token, &self.session_key)
163 .ok()?;
164
165 let jti = claims.get("jti").and_then(|v| v.as_str())?;
166
167 if self.is_session_authorized(jti, tick) {
168 Some(jti.to_string())
169 } else {
170 None
171 }
172 }
173
174 #[must_use]
180 pub fn is_session_authorized(&self, jti: &str, tick: Option<u64>) -> bool {
181 let (needs_update, now_ts) = if let Some(activity) = self.session_activity.get(jti) {
183 let now = Utc::now().timestamp();
184 if now - *activity > 3600 {
186 return false;
187 }
188
189 let jitter = jti
193 .as_bytes()
194 .iter()
195 .fold(0u64, |acc, &x| acc.wrapping_add(u64::from(x)));
196 let needs_update = tick.is_none_or(|t| (t.wrapping_add(jitter)) % 60 == 0);
197 (needs_update, now)
198 } else {
199 return false;
201 };
202
203 if needs_update && let Some(mut activity) = self.session_activity.get_mut(jti) {
204 *activity = now_ts;
205 }
206
207 true
208 }
209
210 pub fn mint_session_token_for_test(&self, player_id: &str) -> Result<(String, u64), Status> {
211 self.mint_session_token(player_id, None)
212 }
213
214 fn mint_session_token(
215 &self,
216 player_id: &str,
217 jti: Option<String>,
218 ) -> Result<(String, u64), Status> {
219 let jti = jti.unwrap_or_else(|| Ulid::new().to_string());
220 let iat = Utc::now();
221 let exp = iat + Duration::hours(24);
222
223 let token = PasetoBuilder::<V4, Local>::default()
224 .set_claim(SubjectClaim::from(player_id))
225 .set_claim(TokenIdentifierClaim::from(jti.as_str()))
226 .set_claim(IssuedAtClaim::try_from(iat.to_rfc3339().as_str()).unwrap())
227 .set_claim(ExpirationClaim::try_from(exp.to_rfc3339().as_str()).unwrap())
228 .build(&self.session_key)
229 .map_err(|e| Status::internal(format!("{e:?}")))?;
230
231 self.session_activity.insert(jti, iat.timestamp());
233
234 Ok((token, exp.timestamp_millis() as u64))
235 }
236}
237
238#[async_trait]
239impl AuthService for AuthServiceImpl {
240 async fn request_otp(
241 &self,
242 request: Request<OtpRequest>,
243 ) -> Result<Response<OtpRequestAck>, Status> {
244 if let Some(addr) = request.remote_addr() {
247 let ip = addr.ip().to_string();
248 self.rate_limiter.check_limit(RateLimitType::Ip, &ip)?;
249 } else {
250 warn!("Request missing remote_addr; IP rate limiting skipped (check proxy config)");
251 }
252
253 let req = request.into_inner();
254 let email = Self::normalize_email(&req.email);
255
256 self.rate_limiter
258 .check_limit(RateLimitType::Email, &email)?;
259
260 let mut rng = rand::rng();
261 let code = format!("{:06}", rng.random_range(0..1_000_000));
262 let request_id = Ulid::new().to_string();
263 let expires_at = Utc::now() + Duration::minutes(10);
264
265 let mut hasher = Blake2b::<U32>::new();
266 hasher.update(code.as_bytes());
267 hasher.update(request_id.as_bytes());
268 let code_hash = hasher.finalize().to_vec();
269
270 self.otp_store.insert(
271 request_id.clone(),
272 OtpRecord {
273 email: email.clone(),
274 code_hash,
275 google_nonce: None,
276 expires_at,
277 attempts: 0,
278 },
279 );
280
281 let sender = self.email_sender.clone();
282 let code_clone = code.clone();
283 let env = std::env::var("AETHERIS_ENV").unwrap_or_else(|_| "dev".to_string());
284 if env == "production" {
285 tracing::info!(request_id = %request_id, "Generated OTP");
286 } else {
287 tracing::info!(request_id = %request_id, email = %email, code = %code, "Generated OTP (DEV ONLY)");
288 }
289 tokio::spawn(async move {
290 let _ = sender
291 .send(
292 &email,
293 "Your Aetheris OTP",
294 &format!("Code: {code_clone}"),
295 &format!("<h1>Code: {code_clone}</h1>"),
296 )
297 .await;
298 });
299
300 Ok(Response::new(OtpRequestAck {
301 request_id,
302 expires_at_unix_ms: expires_at.timestamp_millis() as u64,
303 retry_after_seconds: Some(0), }))
305 }
306
307 #[allow(clippy::too_many_lines)]
308 async fn login(
309 &self,
310 request: Request<LoginRequest>,
311 ) -> Result<Response<LoginResponse>, Status> {
312 let req = request.into_inner();
313 let metadata = req.metadata.unwrap_or_default();
314 let method = req
315 .method
316 .ok_or_else(|| Status::invalid_argument("Missing login method"))?;
317
318 tracing::info!(
319 version = metadata.client_version,
320 platform = metadata.platform,
321 "Processing login request"
322 );
323
324 match method {
325 Method::Otp(otp_req) => {
326 let (request_id, code) = (otp_req.request_id, otp_req.code);
327
328 tokio::time::sleep(std::time::Duration::from_millis(15)).await;
330
331 let mut entry = self
332 .otp_store
333 .get_mut(&request_id)
334 .ok_or_else(|| Status::unauthenticated("Invalid credentials"))?;
335
336 if self.bypass_enabled && entry.email == "smoke-test@aetheris.dev" {
341 if code == "000001" {
343 tracing::warn!(email = entry.email, "Bypass authentication successful");
344 let player_id = Self::derive_player_id("email", &entry.email);
345
346 let is_new_player =
348 self.player_registry.insert(player_id.clone(), ()).is_none();
349
350 let (token, exp) =
351 self.mint_session_token(&player_id, Some("admin".to_string()))?;
352 drop(entry);
353 self.otp_store.remove(&request_id);
354
355 return Ok(Response::new(LoginResponse {
356 session_token: token,
357 expires_at_unix_ms: exp,
358 player_id,
359 is_new_player,
360 login_method: LoginMethod::EmailOtp as i32,
361 }));
362 }
363
364 if code == "000000" {
366 entry.attempts += 1;
367 if entry.attempts >= 3 {
368 drop(entry);
369 self.otp_store.remove(&request_id);
370 }
371 return Err(Status::unauthenticated("Bypass: Forced failure for 000000"));
372 }
373 }
374
375 if Utc::now() > entry.expires_at {
376 drop(entry);
377 self.otp_store.remove(&request_id);
378 return Err(Status::deadline_exceeded("OTP expired"));
379 }
380
381 let mut hasher = Blake2b::<U32>::new();
382 hasher.update(code.as_bytes());
383 hasher.update(request_id.as_bytes());
384 let hash = hasher.finalize();
385
386 if hash.as_slice().ct_eq(&entry.code_hash).into() {
387 let player_id = Self::derive_player_id("email", &entry.email);
388
389 let is_new_player =
391 self.player_registry.insert(player_id.clone(), ()).is_none();
392
393 let (token, exp) = self.mint_session_token(&player_id, None)?;
394 drop(entry);
395 self.otp_store.remove(&request_id);
396
397 Ok(Response::new(LoginResponse {
398 session_token: token,
399 expires_at_unix_ms: exp,
400 player_id,
401 is_new_player,
402 login_method: LoginMethod::EmailOtp as i32,
403 }))
404 } else {
405 entry.attempts += 1;
406 if entry.attempts >= 3 {
407 drop(entry);
408 self.otp_store.remove(&request_id);
409 }
410 Err(Status::unauthenticated("Invalid code"))
411 }
412 }
413 Method::Google(google_req) => {
414 let google_client = self
415 .google_client
416 .as_ref()
417 .as_ref()
418 .ok_or_else(|| Status::internal("Google OIDC not configured"))?;
419
420 let nonce = self
421 .otp_store
422 .get(&google_req.nonce_handle)
423 .and_then(|e| e.google_nonce.clone())
424 .ok_or_else(|| Status::unauthenticated("Invalid nonce_handle"))?;
425
426 let claims = google_client.verify_token(&google_req.google_id_token, &nonce)?;
427 self.otp_store.remove(&google_req.nonce_handle);
428
429 let identity = claims.subject().to_string();
431 let player_id = Self::derive_player_id("google", &identity);
432
433 let is_new_player = self.player_registry.insert(player_id.clone(), ()).is_none();
435
436 let (token, exp) = self.mint_session_token(&player_id, None)?;
437
438 Ok(Response::new(LoginResponse {
439 session_token: token,
440 expires_at_unix_ms: exp,
441 player_id,
442 is_new_player,
443 login_method: LoginMethod::GoogleOidc as i32,
444 }))
445 }
446 }
447 }
448
449 async fn logout(
450 &self,
451 request: Request<LogoutRequest>,
452 ) -> Result<Response<LogoutResponse>, Status> {
453 let mut jti_to_revoke = None;
454
455 let token_from_metadata = request.metadata().get("authorization").and_then(|t| {
456 t.to_str()
457 .ok()
458 .map(|s| s.trim_start_matches("Bearer ").to_string())
459 });
460
461 let token_str = token_from_metadata.or_else(|| {
462 let body = request.get_ref();
463 if body.session_token.is_empty() {
464 None
465 } else {
466 Some(body.session_token.clone())
467 }
468 });
469
470 if let Some(token_clean) = token_str
471 && let Ok(claims) =
472 PasetoParser::<V4, Local>::default().parse(&token_clean, &self.session_key)
473 && let Some(jti) = claims.get("jti").and_then(|v| v.as_str())
474 {
475 jti_to_revoke = Some(jti.to_string());
476 }
477
478 if let Some(jti) = jti_to_revoke {
479 self.session_activity.remove(&jti);
480 }
481
482 Ok(Response::new(LogoutResponse { revoked: true }))
483 }
484
485 async fn refresh_token(
486 &self,
487 request: Request<RefreshRequest>,
488 ) -> Result<Response<RefreshResponse>, Status> {
489 let req = request.into_inner();
490 let token = req.session_token;
491
492 let Ok(claims) = PasetoParser::<V4, Local>::default().parse(&token, &self.session_key)
493 else {
494 return Err(Status::unauthenticated("Invalid session token"));
495 };
496
497 let Some(jti) = claims.get("jti").and_then(|v| v.as_str()) else {
498 return Err(Status::unauthenticated("Token missing jti"));
499 };
500
501 let Some(sub) = claims.get("sub").and_then(|v| v.as_str()) else {
502 return Err(Status::unauthenticated("Token missing sub"));
503 };
504
505 if !self.is_session_authorized(jti, None) {
506 return Err(Status::unauthenticated("Session revoked or expired"));
507 }
508
509 self.session_activity.remove(jti);
511
512 let new_jti = if jti == "admin" {
514 Some("admin".to_string())
515 } else {
516 None
517 };
518 let (new_token, exp) = self.mint_session_token(sub, new_jti)?;
519
520 Ok(Response::new(RefreshResponse {
521 session_token: new_token,
522 expires_at_unix_ms: exp,
523 }))
524 }
525
526 async fn issue_connect_token(
527 &self,
528 request: Request<ConnectTokenRequest>,
529 ) -> Result<Response<ConnectTokenResponse>, Status> {
530 let req = request.into_inner();
531
532 let client_id = rand::random::<u64>();
533 let mut rng = rand::rng();
534 let mut nonce = [0u8; 24];
535 rng.fill(&mut nonce);
536 let server_nonce = base64::engine::general_purpose::STANDARD.encode(nonce);
537
538 let iat = Utc::now();
539 let exp = iat + Duration::minutes(5);
540
541 let token = PasetoBuilder::<V4, Local>::default()
542 .set_claim(CustomClaim::try_from(("client_id", serde_json::json!(client_id))).unwrap())
543 .set_claim(
544 CustomClaim::try_from(("server", serde_json::json!(req.server_address))).unwrap(),
545 )
546 .set_claim(
547 CustomClaim::try_from(("server_nonce", serde_json::json!(server_nonce))).unwrap(),
548 )
549 .set_claim(IssuedAtClaim::try_from(iat.to_rfc3339().as_str()).unwrap())
550 .set_claim(ExpirationClaim::try_from(exp.to_rfc3339().as_str()).unwrap())
551 .build(&self.transport_key)
552 .map_err(|e| Status::internal(format!("{e:?}")))?;
553
554 Ok(Response::new(ConnectTokenResponse {
555 token: Some(QuicConnectToken {
556 paseto: token,
557 server_address: req.server_address,
558 expires_at_unix_ms: exp.timestamp_millis() as u64,
559 client_id,
560 }),
561 }))
562 }
563
564 async fn create_google_login_nonce(
565 &self,
566 _request: Request<GoogleLoginNonceRequest>,
567 ) -> Result<Response<GoogleLoginNonceResponse>, Status> {
568 let mut nonce_bytes = [0u8; 16];
569 rand::rng().fill(&mut nonce_bytes);
570 let nonce = hex::encode(nonce_bytes);
571
572 let nonce_handle = Ulid::new().to_string();
573 let expires_at = Utc::now() + Duration::minutes(10);
574
575 self.otp_store.insert(
576 nonce_handle.clone(),
577 OtpRecord {
578 email: String::new(),
579 code_hash: Vec::new(),
580 google_nonce: Some(nonce.clone()),
581 expires_at,
582 attempts: 0,
583 },
584 );
585
586 Ok(Response::new(GoogleLoginNonceResponse {
587 nonce_handle,
588 nonce,
589 expires_at_unix_ms: expires_at.timestamp_millis() as u64,
590 }))
591 }
592}