1#![allow(clippy::missing_errors_doc)]
2#![allow(clippy::result_large_err)]
3#![allow(clippy::cast_sign_loss)]
4
5use aetheris_protocol::auth::v1::{
6 ConnectTokenRequest, ConnectTokenResponse, GoogleLoginNonceRequest, GoogleLoginNonceResponse,
7 LoginMethod, LoginRequest, LoginResponse, LogoutRequest, LogoutResponse, OtpRequest,
8 OtpRequestAck, QuicConnectToken, RefreshRequest, RefreshResponse,
9 auth_service_server::AuthService, login_request::Method,
10};
11use async_trait::async_trait;
12use base64::Engine;
13use blake2::{Blake2b, Digest, digest::consts::U32};
14use chrono::{DateTime, Duration, Utc};
15use dashmap::DashMap;
16use rand::RngExt;
17use rusty_paseto::prelude::{
18 CustomClaim, ExpirationClaim, IssuedAtClaim, Key, Local, PasetoBuilder, PasetoParser,
19 PasetoSymmetricKey, SubjectClaim, TokenIdentifierClaim, V4,
20};
21use std::sync::Arc;
22use subtle::ConstantTimeEq;
23use tonic::{Request, Response, Status};
24use tracing::warn;
25use ulid::Ulid;
26
27pub mod email;
28pub mod google;
29pub mod rate_limit;
30
31use email::EmailSender;
32use google::GoogleOidcClient;
33use rate_limit::{InMemoryRateLimiter, RateLimitType};
34
35pub struct OtpRecord {
36 pub email: String,
37 pub code_hash: Vec<u8>,
38 pub google_nonce: Option<String>,
39 pub expires_at: DateTime<Utc>,
40 pub attempts: u8,
41}
42
43#[derive(Clone)]
44pub struct AuthServiceImpl {
45 otp_store: Arc<DashMap<String, OtpRecord>>,
46 session_activity: Arc<DashMap<String, i64>>,
48 player_registry: Arc<DashMap<String, ()>>,
50 email_sender: Arc<dyn EmailSender>,
51 google_client: Arc<Option<GoogleOidcClient>>,
52 pub(crate) session_key: Arc<PasetoSymmetricKey<V4, Local>>,
53 transport_key: Arc<PasetoSymmetricKey<V4, Local>>,
54 rate_limiter: InMemoryRateLimiter,
55 bypass_enabled: bool,
56}
57
58impl std::fmt::Debug for AuthServiceImpl {
59 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60 f.debug_struct("AuthServiceImpl").finish_non_exhaustive()
61 }
62}
63
64impl AuthServiceImpl {
65 pub async fn new(email_sender: Arc<dyn EmailSender>) -> Self {
73 let env = std::env::var("AETHERIS_ENV").unwrap_or_else(|_| "dev".to_string());
74
75 let session_key_str =
76 std::env::var("SESSION_PASETO_KEY").map_err(|_| "SESSION_PASETO_KEY missing");
77 let transport_key_str =
78 std::env::var("TRANSPORT_PASETO_KEY").map_err(|_| "TRANSPORT_PASETO_KEY missing");
79
80 let bypass_enabled = std::env::var("AETHERIS_AUTH_BYPASS").is_ok_and(|v| {
81 let v = v.to_lowercase();
82 v == "1" || v == "true" || v == "yes" || v == "on"
83 });
84
85 if env == "production" {
86 assert!(
87 !bypass_enabled,
88 "AETHERIS_AUTH_BYPASS=1 is forbidden in production"
89 );
90 assert!(
91 !(session_key_str.is_err() || transport_key_str.is_err()),
92 "PASETO keys must be explicitly set in production"
93 );
94 }
95
96 let session_key_val = session_key_str.unwrap_or_else(|_| {
97 assert!(env != "production", "Missing SESSION_PASETO_KEY");
98 "01234567890123456789012345678901".to_string()
99 });
100 let transport_key_val = transport_key_str.unwrap_or_else(|_| {
101 assert!(env != "production", "Missing TRANSPORT_PASETO_KEY");
102 "01234567890123456789012345678901".to_string()
103 });
104
105 assert!(
106 !(session_key_val.len() != 32 || transport_key_val.len() != 32),
107 "PASETO keys must be exactly 32 bytes"
108 );
109
110 let session_key =
111 PasetoSymmetricKey::<V4, Local>::from(Key::<32>::from(session_key_val.as_bytes()));
112 let transport_key =
113 PasetoSymmetricKey::<V4, Local>::from(Key::<32>::from(transport_key_val.as_bytes()));
114
115 let google_client = GoogleOidcClient::new().await.ok();
116
117 if bypass_enabled {
118 tracing::warn!(
119 "Authentication bypass is ENABLED (DEV ONLY) โ 000001 code will work for smoke-test@aetheris.dev"
120 );
121 } else {
122 tracing::info!("Authentication bypass is disabled");
123 }
124
125 Self {
126 otp_store: Arc::new(DashMap::new()),
127 session_activity: Arc::new(DashMap::new()),
128 player_registry: Arc::new(DashMap::new()),
129 email_sender,
130 google_client: Arc::new(google_client),
131 session_key: Arc::new(session_key),
132 transport_key: Arc::new(transport_key),
133 rate_limiter: InMemoryRateLimiter::new(),
134 bypass_enabled,
135 }
136 }
137
138 fn normalize_email(email: &str) -> String {
140 email.trim().to_lowercase()
141 }
142
143 fn derive_player_id(method: &str, identifier: &str) -> String {
144 use sha2::{Digest, Sha256};
145 let mut hasher = Sha256::new();
146 hasher.update(format!("aetheris:{method}:{identifier}").as_bytes());
147 let hash = hasher.finalize();
148 let mut buf = [0u8; 16];
149 buf.copy_from_slice(&hash[0..16]);
150 Ulid::from(u128::from_be_bytes(buf)).to_string()
151 }
152
153 #[must_use]
154 pub fn is_authorized(&self, token: &str) -> bool {
155 self.validate_and_get_jti(token, None).is_some()
156 }
157
158 #[must_use]
160 pub fn validate_and_get_jti(&self, token: &str, tick: Option<u64>) -> Option<String> {
161 let claims = PasetoParser::<V4, Local>::default()
162 .parse(token, &self.session_key)
163 .ok()?;
164
165 let jti = claims.get("jti").and_then(|v| v.as_str())?;
166
167 if self.is_session_authorized(jti, tick) {
168 Some(jti.to_string())
169 } else {
170 None
171 }
172 }
173
174 #[must_use]
180 pub fn is_session_authorized(&self, jti: &str, tick: Option<u64>) -> bool {
181 let (needs_update, now_ts) = if let Some(activity) = self.session_activity.get(jti) {
183 let now = Utc::now().timestamp();
184 if now - *activity > 3600 {
186 return false;
187 }
188
189 let jitter = jti
193 .as_bytes()
194 .iter()
195 .fold(0u64, |acc, &x| acc.wrapping_add(u64::from(x)));
196 let needs_update = tick.is_none_or(|t| (t.wrapping_add(jitter)) % 60 == 0);
197 (needs_update, now)
198 } else {
199 return false;
201 };
202
203 if needs_update && let Some(mut activity) = self.session_activity.get_mut(jti) {
204 *activity = now_ts;
205 }
206
207 true
208 }
209
210 pub fn mint_session_token_for_test(&self, player_id: &str) -> Result<(String, u64), Status> {
211 self.mint_session_token(player_id)
212 }
213
214 fn mint_session_token(&self, player_id: &str) -> Result<(String, u64), Status> {
215 let jti = Ulid::new().to_string();
216 let iat = Utc::now();
217 let exp = iat + Duration::hours(24);
218
219 let token = PasetoBuilder::<V4, Local>::default()
220 .set_claim(SubjectClaim::from(player_id))
221 .set_claim(TokenIdentifierClaim::from(jti.as_str()))
222 .set_claim(IssuedAtClaim::try_from(iat.to_rfc3339().as_str()).unwrap())
223 .set_claim(ExpirationClaim::try_from(exp.to_rfc3339().as_str()).unwrap())
224 .build(&self.session_key)
225 .map_err(|e| Status::internal(format!("{e:?}")))?;
226
227 self.session_activity.insert(jti, iat.timestamp_millis());
229
230 Ok((token, exp.timestamp_millis() as u64))
231 }
232}
233
234#[async_trait]
235impl AuthService for AuthServiceImpl {
236 async fn request_otp(
237 &self,
238 request: Request<OtpRequest>,
239 ) -> Result<Response<OtpRequestAck>, Status> {
240 if let Some(addr) = request.remote_addr() {
243 let ip = addr.ip().to_string();
244 self.rate_limiter.check_limit(RateLimitType::Ip, &ip)?;
245 } else {
246 warn!("Request missing remote_addr; IP rate limiting skipped (check proxy config)");
247 }
248
249 let req = request.into_inner();
250 let email = Self::normalize_email(&req.email);
251
252 self.rate_limiter
254 .check_limit(RateLimitType::Email, &email)?;
255
256 let mut rng = rand::rng();
257 let code = format!("{:06}", rng.random_range(0..1_000_000));
258 let request_id = Ulid::new().to_string();
259 let expires_at = Utc::now() + Duration::minutes(10);
260
261 let mut hasher = Blake2b::<U32>::new();
262 hasher.update(code.as_bytes());
263 hasher.update(request_id.as_bytes());
264 let code_hash = hasher.finalize().to_vec();
265
266 self.otp_store.insert(
267 request_id.clone(),
268 OtpRecord {
269 email: email.clone(),
270 code_hash,
271 google_nonce: None,
272 expires_at,
273 attempts: 0,
274 },
275 );
276
277 let sender = self.email_sender.clone();
278 let code_clone = code.clone();
279 let env = std::env::var("AETHERIS_ENV").unwrap_or_else(|_| "dev".to_string());
280 if env == "production" {
281 tracing::info!(request_id = %request_id, "Generated OTP");
282 } else {
283 tracing::info!(request_id = %request_id, email = %email, code = %code, "Generated OTP (DEV ONLY)");
284 }
285 tokio::spawn(async move {
286 let _ = sender
287 .send(
288 &email,
289 "Your Aetheris OTP",
290 &format!("Code: {code_clone}"),
291 &format!("<h1>Code: {code_clone}</h1>"),
292 )
293 .await;
294 });
295
296 Ok(Response::new(OtpRequestAck {
297 request_id,
298 expires_at_unix_ms: expires_at.timestamp_millis() as u64,
299 retry_after_seconds: Some(0), }))
301 }
302
303 #[allow(clippy::too_many_lines)]
304 async fn login(
305 &self,
306 request: Request<LoginRequest>,
307 ) -> Result<Response<LoginResponse>, Status> {
308 let req = request.into_inner();
309 let metadata = req.metadata.unwrap_or_default();
310 let method = req
311 .method
312 .ok_or_else(|| Status::invalid_argument("Missing login method"))?;
313
314 tracing::info!(
315 version = metadata.client_version,
316 platform = metadata.platform,
317 "Processing login request"
318 );
319
320 match method {
321 Method::Otp(otp_req) => {
322 let (request_id, code) = (otp_req.request_id, otp_req.code);
323
324 tokio::time::sleep(std::time::Duration::from_millis(15)).await;
326
327 let mut entry = self
328 .otp_store
329 .get_mut(&request_id)
330 .ok_or_else(|| Status::unauthenticated("Invalid credentials"))?;
331
332 if self.bypass_enabled && entry.email == "smoke-test@aetheris.dev" {
333 if code == "000000" {
334 entry.attempts += 1;
335 if entry.attempts >= 3 {
336 drop(entry);
337 self.otp_store.remove(&request_id);
338 }
339 return Err(Status::unauthenticated("Bypass: Forced failure for 000000"));
340 }
341
342 if Utc::now() > entry.expires_at {
343 drop(entry);
344 self.otp_store.remove(&request_id);
345 return Err(Status::deadline_exceeded("OTP expired"));
346 }
347
348 tracing::warn!(email = entry.email, "Bypass authentication successful");
349 let player_id = Self::derive_player_id("email", &entry.email);
350
351 let is_new_player =
353 self.player_registry.insert(player_id.clone(), ()).is_none();
354
355 let (token, exp) = self.mint_session_token(&player_id)?;
356 drop(entry);
357 self.otp_store.remove(&request_id);
358
359 return Ok(Response::new(LoginResponse {
360 session_token: token,
361 expires_at_unix_ms: exp,
362 player_id,
363 is_new_player,
364 login_method: LoginMethod::EmailOtp as i32,
365 }));
366 }
367
368 if Utc::now() > entry.expires_at {
369 drop(entry);
370 self.otp_store.remove(&request_id);
371 return Err(Status::deadline_exceeded("OTP expired"));
372 }
373
374 let mut hasher = Blake2b::<U32>::new();
375 hasher.update(code.as_bytes());
376 hasher.update(request_id.as_bytes());
377 let hash = hasher.finalize();
378
379 if hash.as_slice().ct_eq(&entry.code_hash).into() {
380 let player_id = Self::derive_player_id("email", &entry.email);
381
382 let is_new_player =
384 self.player_registry.insert(player_id.clone(), ()).is_none();
385
386 let (token, exp) = self.mint_session_token(&player_id)?;
387 drop(entry);
388 self.otp_store.remove(&request_id);
389
390 Ok(Response::new(LoginResponse {
391 session_token: token,
392 expires_at_unix_ms: exp,
393 player_id,
394 is_new_player,
395 login_method: LoginMethod::EmailOtp as i32,
396 }))
397 } else {
398 entry.attempts += 1;
399 if entry.attempts >= 3 {
400 drop(entry);
401 self.otp_store.remove(&request_id);
402 }
403 Err(Status::unauthenticated("Invalid code"))
404 }
405 }
406 Method::Google(google_req) => {
407 let google_client = self
408 .google_client
409 .as_ref()
410 .as_ref()
411 .ok_or_else(|| Status::internal("Google OIDC not configured"))?;
412
413 let nonce = self
414 .otp_store
415 .get(&google_req.nonce_handle)
416 .and_then(|e| e.google_nonce.clone())
417 .ok_or_else(|| Status::unauthenticated("Invalid nonce_handle"))?;
418
419 let claims = google_client.verify_token(&google_req.google_id_token, &nonce)?;
420 self.otp_store.remove(&google_req.nonce_handle);
421
422 let identity = claims.subject().to_string();
424 let player_id = Self::derive_player_id("google", &identity);
425
426 let is_new_player = self.player_registry.insert(player_id.clone(), ()).is_none();
428
429 let (token, exp) = self.mint_session_token(&player_id)?;
430
431 Ok(Response::new(LoginResponse {
432 session_token: token,
433 expires_at_unix_ms: exp,
434 player_id,
435 is_new_player,
436 login_method: LoginMethod::GoogleOidc as i32,
437 }))
438 }
439 }
440 }
441
442 async fn logout(
443 &self,
444 request: Request<LogoutRequest>,
445 ) -> Result<Response<LogoutResponse>, Status> {
446 let mut jti_to_revoke = None;
447
448 let token_from_metadata = request.metadata().get("authorization").and_then(|t| {
449 t.to_str()
450 .ok()
451 .map(|s| s.trim_start_matches("Bearer ").to_string())
452 });
453
454 let token_str = token_from_metadata.or_else(|| {
455 let body = request.get_ref();
456 if body.session_token.is_empty() {
457 None
458 } else {
459 Some(body.session_token.clone())
460 }
461 });
462
463 if let Some(token_clean) = token_str
464 && let Ok(claims) =
465 PasetoParser::<V4, Local>::default().parse(&token_clean, &self.session_key)
466 && let Some(jti) = claims.get("jti").and_then(|v| v.as_str())
467 {
468 jti_to_revoke = Some(jti.to_string());
469 }
470
471 if let Some(jti) = jti_to_revoke {
472 self.session_activity.remove(&jti);
473 }
474
475 Ok(Response::new(LogoutResponse { revoked: true }))
476 }
477
478 async fn refresh_token(
479 &self,
480 request: Request<RefreshRequest>,
481 ) -> Result<Response<RefreshResponse>, Status> {
482 let req = request.into_inner();
483 let token = req.session_token;
484
485 let Ok(claims) = PasetoParser::<V4, Local>::default().parse(&token, &self.session_key)
486 else {
487 return Err(Status::unauthenticated("Invalid session token"));
488 };
489
490 let Some(jti) = claims.get("jti").and_then(|v| v.as_str()) else {
491 return Err(Status::unauthenticated("Token missing jti"));
492 };
493
494 let Some(sub) = claims.get("sub").and_then(|v| v.as_str()) else {
495 return Err(Status::unauthenticated("Token missing sub"));
496 };
497
498 if !self.is_session_authorized(jti, None) {
499 return Err(Status::unauthenticated("Session revoked or expired"));
500 }
501
502 self.session_activity.remove(jti);
504
505 let (new_token, exp) = self.mint_session_token(sub)?;
507
508 Ok(Response::new(RefreshResponse {
509 session_token: new_token,
510 expires_at_unix_ms: exp,
511 }))
512 }
513
514 async fn issue_connect_token(
515 &self,
516 request: Request<ConnectTokenRequest>,
517 ) -> Result<Response<ConnectTokenResponse>, Status> {
518 let req = request.into_inner();
519
520 let client_id = rand::random::<u64>();
521 let mut rng = rand::rng();
522 let mut nonce = [0u8; 24];
523 rng.fill(&mut nonce);
524 let server_nonce = base64::engine::general_purpose::STANDARD.encode(nonce);
525
526 let iat = Utc::now();
527 let exp = iat + Duration::minutes(5);
528
529 let token = PasetoBuilder::<V4, Local>::default()
530 .set_claim(CustomClaim::try_from(("client_id", serde_json::json!(client_id))).unwrap())
531 .set_claim(
532 CustomClaim::try_from(("server", serde_json::json!(req.server_address))).unwrap(),
533 )
534 .set_claim(
535 CustomClaim::try_from(("server_nonce", serde_json::json!(server_nonce))).unwrap(),
536 )
537 .set_claim(IssuedAtClaim::try_from(iat.to_rfc3339().as_str()).unwrap())
538 .set_claim(ExpirationClaim::try_from(exp.to_rfc3339().as_str()).unwrap())
539 .build(&self.transport_key)
540 .map_err(|e| Status::internal(format!("{e:?}")))?;
541
542 Ok(Response::new(ConnectTokenResponse {
543 token: Some(QuicConnectToken {
544 paseto: token,
545 server_address: req.server_address,
546 expires_at_unix_ms: exp.timestamp_millis() as u64,
547 client_id,
548 }),
549 }))
550 }
551
552 async fn create_google_login_nonce(
553 &self,
554 _request: Request<GoogleLoginNonceRequest>,
555 ) -> Result<Response<GoogleLoginNonceResponse>, Status> {
556 let mut nonce_bytes = [0u8; 16];
557 rand::rng().fill(&mut nonce_bytes);
558 let nonce = hex::encode(nonce_bytes);
559
560 let nonce_handle = Ulid::new().to_string();
561 let expires_at = Utc::now() + Duration::minutes(10);
562
563 self.otp_store.insert(
564 nonce_handle.clone(),
565 OtpRecord {
566 email: String::new(),
567 code_hash: Vec::new(),
568 google_nonce: Some(nonce.clone()),
569 expires_at,
570 attempts: 0,
571 },
572 );
573
574 Ok(Response::new(GoogleLoginNonceResponse {
575 nonce_handle,
576 nonce,
577 expires_at_unix_ms: expires_at.timestamp_millis() as u64,
578 }))
579 }
580}