use crate::audit::AUDIT_BUCKET_NAME;
use crate::auth::proto::TokenRepo;
use crate::auth::token_repository::AccessTokens;
use crate::auth::token_repository::{
check_token_lifetime, parse_bearer_token, resolve_last_access_from_cache, ManageTokens,
INIT_TOKEN_NAME, TOKEN_REPO_FILE_NAME,
};
use crate::auth::token_secret::{hash_token_secret, matched_hashed_token_secret};
use crate::cfg::{Cfg, InstanceRole};
use crate::core::cache::Cache;
use crate::core::file_cache::FILE_CACHE;
use crate::core::internal_client::{
check_response, map_request_error, ClientBuildErrorContext, ClientBuildErrorKind,
InternalClientApi, InternalClientBuilder,
};
use crate::core::sync::AsyncRwLock;
use crate::storage::engine::StorageEngine;
use async_trait::async_trait;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use log::{debug, error};
use prost::Message;
use reduct_base::error::ReductError;
use reduct_base::msg::bucket_api::FullBucketInfo;
use reduct_base::msg::token_api::{Permissions, Token, TokenCreateRequest, TokenCreateResponse};
use reduct_base::{forbidden, internal_server_error};
use std::collections::HashMap;
use std::io::{Read, SeekFrom};
use std::net::IpAddr;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::SystemTime;
use tokio::time::{Duration, Instant};
const TOKEN_LAST_ACCESS_CACHE_TTL: Duration = Duration::from_secs(10);
pub(super) struct ReadOnlyTokenRepository {
config_path: PathBuf,
repo: HashMap<String, Token>,
auth_cache: Cache<String, Token>,
cfg: Cfg,
last_replica_sync: AsyncRwLock<Instant>,
audit_client: Option<InternalClientApi>,
last_access_cache: HashMap<String, u64>,
last_access_cache_updated_at: Option<Instant>,
}
const AUTH_CACHE_SIZE: usize = 1024;
const AUTH_CACHE_TTL: Duration = Duration::MAX;
impl ReadOnlyTokenRepository {
pub async fn new(data_path: PathBuf, cfg: Cfg, _storage: Option<Arc<StorageEngine>>) -> Self {
let config_path = data_path.join(TOKEN_REPO_FILE_NAME);
let audit_client = Self::build_audit_client(&cfg);
let mut token_repository = Self {
config_path,
repo: HashMap::new(),
auth_cache: Cache::new(AUTH_CACHE_SIZE, AUTH_CACHE_TTL),
last_replica_sync: AsyncRwLock::new(Instant::now()),
cfg,
audit_client,
last_access_cache: HashMap::new(),
last_access_cache_updated_at: None,
};
let repo = token_repository
.load_repo()
.await
.expect("Could not load token repository");
token_repository.repo = repo;
token_repository
}
fn build_audit_client(cfg: &Cfg) -> Option<InternalClientApi> {
if cfg.primary_url.is_none() && cfg.secondary_url.is_none() {
return None;
}
let client = InternalClientBuilder::new(ClientBuildErrorContext {
ca_read: "Failed to read audit remote CA certificate",
ca_parse: "Failed to parse audit remote CA certificate",
client_build: "Failed to build audit read-only token client",
kind: ClientBuildErrorKind::InternalServerError,
})
.api_token(&cfg.api_token)
.verify_ssl(cfg.audit_conf.remote_verify_ssl)
.ca_path(cfg.audit_conf.remote_ca_path.as_ref())
.connect_timeout(cfg.audit_conf.remote_timeout)
.build();
match client {
Ok(client) => Some(InternalClientApi::new(
client,
cfg.primary_url.clone(),
cfg.secondary_url.clone(),
)),
Err(err) => {
error!(
"Failed to initialize audit client for read-only token repository: {}",
err
);
None
}
}
}
async fn load_repo(&self) -> Result<HashMap<String, Token>, ReductError> {
let api_token = self.cfg.api_token.clone();
FILE_CACHE.discard_recursive(&self.config_path).await?; let mut repo: HashMap<String, Token> = HashMap::new();
match FILE_CACHE.read(&self.config_path, SeekFrom::Start(0)).await {
Ok(mut lock) => {
debug!(
"Loading token repository from {}",
self.config_path.as_path().display()
);
let mut buf = Vec::new();
lock.read_to_end(&mut buf)?;
let proto_repo = TokenRepo::decode(&mut Bytes::from(buf)).map_err(|e| {
internal_server_error!("Could not decode token repository: {}", e)
})?;
for token in proto_repo.tokens {
repo.insert(token.name.clone(), token.into());
}
}
Err(_) => error!("Token repository not found at {:?}", self.config_path),
};
if !api_token.is_empty() {
let init_token_value = repo
.get(INIT_TOKEN_NAME)
.and_then(|token| matched_hashed_token_secret(&token.value, &api_token))
.map(|secret| secret.to_string())
.unwrap_or_else(|| {
hash_token_secret(&api_token).expect("Failed to hash init token secret")
});
let init_token = Token {
name: INIT_TOKEN_NAME.to_string(),
value: init_token_value,
created_at: DateTime::<Utc>::from(SystemTime::now()),
permissions: Some(Permissions {
full_access: false,
read: vec!["*".to_string(), AUDIT_BUCKET_NAME.to_string()],
write: vec![],
}),
is_provisioned: true,
expires_at: None,
ttl: None,
last_access: None,
ip_allowlist: vec![],
is_expired: false,
};
repo.insert(init_token.name.clone(), init_token);
}
Ok(repo)
}
async fn update_repo(&mut self) -> Result<(), ReductError> {
let mut last_sync = self.last_replica_sync.write().await?;
if self.cfg.role != InstanceRole::Replica
|| last_sync.elapsed() < self.cfg.engine_config.replica_update_interval
{
return Ok(());
}
*last_sync = Instant::now();
self.repo = self.load_repo().await?;
self.auth_cache.clear();
Ok(())
}
async fn refresh_last_access_cache_if_needed(&mut self) {
let now = Instant::now();
if self
.last_access_cache_updated_at
.is_some_and(|ts| now.duration_since(ts) < TOKEN_LAST_ACCESS_CACHE_TTL)
{
return;
}
let Some(audit_client) = self.audit_client.as_ref() else {
return;
};
let bucket_info = audit_client
.execute_with_failover(
"Neither primary nor secondary URL is configured for replica audit reads",
|client, base_url| async move {
let response = client
.get(format!("{}api/v1/b/{}", base_url, AUDIT_BUCKET_NAME))
.send()
.await;
let response = check_response(response)?;
response
.json::<FullBucketInfo>()
.await
.map_err(map_request_error)
},
)
.await;
match bucket_info {
Ok(bucket_info) => {
self.last_access_cache = bucket_info
.entries
.into_iter()
.filter(|entry| entry.record_count > 0)
.map(|entry| (entry.name, entry.latest_record))
.collect();
self.last_access_cache_updated_at = Some(now);
}
Err(err) => {
debug!(
"Failed to fetch remote audit info for token last access: {}",
err
);
}
}
}
async fn populate_token_last_access(&mut self, token: &mut Token) {
self.refresh_last_access_cache_if_needed().await;
token.last_access = resolve_last_access_from_cache(&self.last_access_cache, &token.name);
}
}
impl AccessTokens for ReadOnlyTokenRepository {
fn repo(&self) -> &HashMap<String, Token> {
&self.repo
}
}
#[async_trait]
impl ManageTokens for ReadOnlyTokenRepository {
async fn generate_token(
&mut self,
_name: &str,
_request: TokenCreateRequest,
) -> Result<TokenCreateResponse, ReductError> {
Err(forbidden!("Cannot generate token in read-only mode"))
}
async fn get_token(&mut self, name: &str) -> Result<&Token, ReductError> {
self.update_repo().await?;
AccessTokens::get_token(self, name)
}
async fn get_token_with_last_access(&mut self, name: &str) -> Result<Token, ReductError> {
self.update_repo().await?;
let mut token = AccessTokens::get_token(self, name)?.clone();
self.populate_token_last_access(&mut token).await;
Ok(token)
}
async fn update_token(&mut self, _token: Token) -> Result<(), ReductError> {
Err(forbidden!("Cannot mutate token in read-only mode"))
}
async fn get_token_list(&mut self) -> Result<Vec<Token>, ReductError> {
self.update_repo().await?;
AccessTokens::get_token_list(self)
}
async fn get_token_list_with_last_access(&mut self) -> Result<Vec<Token>, ReductError> {
self.update_repo().await?;
let mut tokens = AccessTokens::get_token_list(self)?;
for token in tokens.iter_mut() {
self.populate_token_last_access(token).await;
}
Ok(tokens)
}
async fn validate_token(
&mut self,
header: Option<&str>,
client_ip: Option<IpAddr>,
) -> Result<Token, ReductError> {
self.update_repo().await?;
let value = parse_bearer_token(header.unwrap_or(""))?;
if let Some(token) = self.auth_cache.get(&value).cloned() {
if let Err(err) = check_token_lifetime(&token) {
self.auth_cache.remove(&value);
return Err(err);
}
if let Err(err) = super::check_token_ip_allowlist(&token, client_ip) {
self.auth_cache.remove(&value);
return Err(err);
}
return Ok(token);
}
let header = format!("Bearer {}", value);
let token = AccessTokens::validate_token(self, Some(&header), client_ip)?;
self.auth_cache.insert(value, token.clone());
Ok(token)
}
async fn validate_token_with_last_access(
&mut self,
header: Option<&str>,
client_ip: Option<IpAddr>,
) -> Result<Token, ReductError> {
self.update_repo().await?;
let mut token = AccessTokens::validate_token(self, header, client_ip)?;
self.populate_token_last_access(&mut token).await;
Ok(token)
}
async fn remove_token(&mut self, _name: &str) -> Result<(), ReductError> {
Err(forbidden!("Cannot remove token in read-only mode"))
}
async fn rotate_token(&mut self, _name: &str) -> Result<TokenCreateResponse, ReductError> {
Err(forbidden!("Cannot rotate token in read-only mode"))
}
async fn remove_bucket_from_tokens(&mut self, _bucket: &str) -> Result<(), ReductError> {
Err(forbidden!(
"Cannot remove bucket from token in read-only mode"
))
}
async fn rename_bucket(&mut self, _old_name: &str, _new_name: &str) -> Result<(), ReductError> {
Err(forbidden!(
"Cannot rename bucket in token in read-only mode"
))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::auth::token_repository::{BoxedTokenRepository, INIT_TOKEN_NAME};
use crate::auth::token_secret::{is_hashed_token_secret, verify_token_secret};
use crate::cfg::{Cfg, InstanceRole};
use axum::extract::State;
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::routing::get;
use axum::Router;
use reduct_base::msg::bucket_api::{BucketInfo, BucketSettings, FullBucketInfo};
use reduct_base::msg::entry_api::EntryInfo;
use reduct_base::msg::token_api::Permissions;
use rstest::*;
use std::io::Write;
use std::path::PathBuf;
use tempfile::tempdir;
use tokio::net::TcpListener;
mod repo_methods {
use super::*;
use std::time::Duration;
#[rstest]
#[tokio::test]
async fn test_new_loads_tokens(#[future] repo_fixture: (BoxedTokenRepository, PathBuf)) {
let (mut repo, _) = repo_fixture.await;
let token_list = repo.get_token_list().await.unwrap();
assert!(token_list.iter().any(|t| t.name == "file_token"));
assert!(token_list.iter().any(|t| t.name == INIT_TOKEN_NAME));
}
#[rstest]
#[tokio::test]
async fn test_load_repo_with_api_token(
#[future] repo_fixture: (BoxedTokenRepository, PathBuf),
cfg: Cfg,
) {
let (mut repo, _) = repo_fixture.await;
let token = repo.get_token(INIT_TOKEN_NAME).await.unwrap();
assert!(is_hashed_token_secret(&token.value));
assert!(verify_token_secret(&token.value, &cfg.api_token));
assert!(token.is_provisioned);
}
#[rstest]
#[tokio::test]
async fn test_reload_from_from_file(
#[future] repo_fixture: (BoxedTokenRepository, PathBuf),
) {
let (mut repo, path) = repo_fixture.await;
let new_token = Token {
name: "new_file_token".to_string(),
value: "new_file_value".to_string(),
created_at: DateTime::<Utc>::from(SystemTime::now()),
permissions: Some(Permissions {
full_access: true,
..Default::default()
}),
is_provisioned: true,
..Default::default()
};
write_token_to_file(&path, &new_token).await;
tokio::time::sleep(Duration::from_millis(200)).await;
assert_eq!(
repo.get_token("new_file_token").await.unwrap().name,
"new_file_token"
);
}
}
mod manage_tokens {
use super::*;
use reduct_base::{not_found, unauthorized};
use std::time::Duration;
#[rstest]
#[tokio::test]
async fn test_generate_token_forbidden(
#[future] repo_fixture: (BoxedTokenRepository, PathBuf),
) {
let (mut repo, _) = repo_fixture.await;
let perms = Permissions {
full_access: true,
read: vec![],
write: vec![],
};
let res = repo
.generate_token(
"test",
TokenCreateRequest {
permissions: perms,
..Default::default()
},
)
.await;
assert_eq!(
res.err().unwrap(),
forbidden!("Cannot generate token in read-only mode")
);
}
#[rstest]
#[tokio::test]
async fn test_get_token_existing(
#[future] repo_fixture: (BoxedTokenRepository, PathBuf),
cfg: Cfg,
) {
let (mut repo, _) = repo_fixture.await;
let token = repo.get_token(INIT_TOKEN_NAME).await.unwrap().clone();
assert_eq!(token.name, INIT_TOKEN_NAME.to_string());
assert!(is_hashed_token_secret(&token.value));
assert!(verify_token_secret(&token.value, &cfg.api_token));
assert_eq!(
token.permissions,
Some(Permissions {
full_access: false,
read: vec!["*".to_string(), AUDIT_BUCKET_NAME.to_string()],
write: vec![],
})
);
assert!(token.is_provisioned);
assert!(token.expires_at.is_none());
}
#[rstest]
#[tokio::test]
async fn test_get_token_missing(#[future] repo_fixture: (BoxedTokenRepository, PathBuf)) {
let (mut repo, _) = repo_fixture.await;
let token = repo.get_token("missing").await;
assert_eq!(
token.err().unwrap(),
not_found!("Token 'missing' doesn't exist")
);
}
#[rstest]
#[tokio::test]
async fn test_update_token_forbidden(
#[future] repo_fixture: (BoxedTokenRepository, PathBuf),
) {
let (mut repo, _) = repo_fixture.await;
let result = repo.update_token(Token::default()).await;
assert_eq!(
result.err().unwrap(),
forbidden!("Cannot mutate token in read-only mode")
);
}
#[rstest]
#[tokio::test]
async fn test_get_token_list(#[future] repo_fixture: (BoxedTokenRepository, PathBuf)) {
let (mut repo, _) = repo_fixture.await;
let list = repo.get_token_list().await.unwrap();
assert!(list.iter().any(|t| t.name == "file_token"));
assert!(list.iter().any(|t| t.name == INIT_TOKEN_NAME));
}
#[rstest]
#[tokio::test]
async fn test_validate_token_valid(
#[future] repo_fixture: (BoxedTokenRepository, PathBuf),
cfg: Cfg,
) {
let (mut repo, _) = repo_fixture.await;
let header = format!("Bearer {}", cfg.api_token);
let res = repo
.validate_token(Some(header.as_str()), None)
.await
.unwrap();
assert_eq!(res.name, INIT_TOKEN_NAME.to_string());
assert!(is_hashed_token_secret(&res.value));
assert!(verify_token_secret(&res.value, &cfg.api_token));
assert_eq!(
res.permissions,
Some(Permissions {
full_access: false,
read: vec!["*".to_string(), AUDIT_BUCKET_NAME.to_string()],
write: vec![],
})
);
assert!(res.is_provisioned);
assert!(res.expires_at.is_none());
}
#[rstest]
#[tokio::test]
async fn test_validate_token_invalid(
#[future] repo_fixture: (BoxedTokenRepository, PathBuf),
) {
let (mut repo, _) = repo_fixture.await;
let header = Some("Bearer invalid_token");
let res = repo.validate_token(header, None).await;
assert_eq!(res.err().unwrap(), unauthorized!("Invalid token"));
}
#[rstest]
#[tokio::test]
async fn test_validate_token_cache_invalidation_after_reload(
#[future] repo_fixture: (BoxedTokenRepository, PathBuf),
) {
let (mut repo, path) = repo_fixture.await;
repo.validate_token(Some("Bearer file_value"), None)
.await
.unwrap();
let updated_token = Token {
name: "file_token".to_string(),
value: "new_file_value".to_string(),
created_at: DateTime::<Utc>::from(SystemTime::now()),
permissions: Some(Permissions {
full_access: true,
..Default::default()
}),
is_provisioned: true,
..Default::default()
};
write_token_to_file(&path, &updated_token).await;
tokio::time::sleep(Duration::from_millis(200)).await;
let err = repo
.validate_token(Some("Bearer file_value"), None)
.await
.err()
.unwrap();
assert_eq!(err, unauthorized!("Invalid token"));
let token = repo
.validate_token(Some("Bearer new_file_value"), None)
.await
.unwrap();
assert_eq!(token.name, "file_token");
}
#[rstest]
#[tokio::test]
async fn test_remove_token_forbidden(
#[future] repo_fixture: (BoxedTokenRepository, PathBuf),
) {
let (mut repo, _) = repo_fixture.await;
let res = repo.remove_token(INIT_TOKEN_NAME).await;
assert_eq!(
res.err().unwrap(),
forbidden!("Cannot remove token in read-only mode")
);
}
#[rstest]
#[tokio::test]
async fn test_remove_bucket_from_tokens_forbidden(
#[future] repo_fixture: (BoxedTokenRepository, PathBuf),
) {
let (mut repo, _) = repo_fixture.await;
let res = repo.remove_bucket_from_tokens("bucket").await;
assert_eq!(
res.err().unwrap(),
forbidden!("Cannot remove bucket from token in read-only mode")
);
}
#[rstest]
#[tokio::test]
async fn test_rotate_token_forbidden(
#[future] repo_fixture: (BoxedTokenRepository, PathBuf),
) {
let (mut repo, _) = repo_fixture.await;
let res = repo.rotate_token("file_token").await;
assert_eq!(
res.err().unwrap(),
forbidden!("Cannot rotate token in read-only mode")
);
}
#[rstest]
#[tokio::test]
async fn test_rename_bucket_forbidden(
#[future] repo_fixture: (BoxedTokenRepository, PathBuf),
) {
let (mut repo, _) = repo_fixture.await;
let res = repo.rename_bucket("old", "new").await;
assert_eq!(
res.err().unwrap(),
forbidden!("Cannot rename bucket in token in read-only mode")
);
}
#[rstest]
#[tokio::test]
async fn test_get_token_with_last_access_uses_instance_prefixed_cache(
#[future] repo_fixture: (BoxedTokenRepository, PathBuf),
) {
let (mut repo, _) = repo_fixture.await;
let mut token = repo.get_token_with_last_access("file_token").await.unwrap();
token.last_access = Some(DateTime::<Utc>::from_timestamp_micros(42).unwrap());
assert!(token.last_access.is_some());
}
}
#[derive(Clone)]
struct BucketInfoState {
body: FullBucketInfo,
}
async fn bucket_info_handler(State(state): State<BucketInfoState>) -> impl IntoResponse {
(StatusCode::OK, axum::Json(state.body))
}
async fn start_bucket_info_server(body: FullBucketInfo) -> String {
let app = Router::new()
.route("/api/v1/b/$audit", get(bucket_info_handler))
.with_state(BucketInfoState { body });
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
format!("http://{}/", addr)
}
#[tokio::test]
async fn test_build_audit_client_none_without_urls() {
let cfg = Cfg::default();
assert!(ReadOnlyTokenRepository::build_audit_client(&cfg).is_none());
}
#[tokio::test]
async fn test_build_audit_client_none_with_invalid_ca() {
let mut cfg = Cfg::default();
cfg.api_token = "secret".to_string();
cfg.primary_url = Some("http://127.0.0.1:1/".to_string());
cfg.audit_conf.remote_ca_path = Some("/tmp/does-not-exist-ca.pem".into());
assert!(ReadOnlyTokenRepository::build_audit_client(&cfg).is_none());
}
#[tokio::test]
async fn test_get_token_with_last_access_reads_instance_prefixed_entry_from_remote() {
let body = FullBucketInfo {
info: BucketInfo::default(),
settings: BucketSettings::default(),
entries: vec![EntryInfo {
name: "instance-a/file_token".to_string(),
record_count: 1,
latest_record: 5_000_000,
..Default::default()
}],
};
let base_url = start_bucket_info_server(body).await;
let mut cfg = Cfg::default();
cfg.api_token = "test_token".to_string();
cfg.role = InstanceRole::Replica;
cfg.primary_url = Some(base_url);
let path = tempdir().unwrap().keep();
let token = Token {
name: "file_token".to_string(),
value: "file_value".to_string(),
created_at: DateTime::<Utc>::from(SystemTime::now()),
permissions: Some(Permissions {
full_access: true,
..Default::default()
}),
is_provisioned: true,
..Default::default()
};
write_token_to_file(&path, &token).await;
let mut repo = ReadOnlyTokenRepository::new(path, cfg, None).await;
let token = repo.get_token_with_last_access("file_token").await.unwrap();
assert_eq!(
token.last_access,
DateTime::<Utc>::from_timestamp_micros(5_000_000)
);
}
#[fixture]
fn cfg() -> Cfg {
let mut cfg = Cfg::default();
cfg.api_token = "test_token".to_string();
cfg.role = InstanceRole::Replica;
cfg.engine_config.replica_update_interval = std::time::Duration::from_millis(100);
cfg
}
#[fixture]
async fn repo_fixture(cfg: Cfg) -> (BoxedTokenRepository, PathBuf) {
let path = tempdir().unwrap().keep();
let token = Token {
name: "file_token".to_string(),
value: "file_value".to_string(),
created_at: DateTime::<Utc>::from(SystemTime::now()),
permissions: Some(Permissions {
full_access: true,
..Default::default()
}),
is_provisioned: true,
..Default::default()
};
write_token_to_file(&path, &token).await;
(
Box::new(ReadOnlyTokenRepository::new(path.clone(), cfg, None).await),
path,
)
}
async fn write_token_to_file(path: &PathBuf, new_token: &Token) {
let mut token_repo = TokenRepo::default();
token_repo.tokens.push(new_token.clone().into());
let mut buf = Vec::new();
token_repo.encode(&mut buf).unwrap();
let mut lock = FILE_CACHE
.write_or_create(&path.join(TOKEN_REPO_FILE_NAME), SeekFrom::Start(0))
.await
.unwrap();
lock.set_len(0).unwrap(); lock.write_all(&buf).unwrap();
lock.sync_all().await.unwrap();
}
}