use std::collections::HashMap;
use tokio::sync::{OnceCell, RwLock};
use crate::api::rest_api::RESTApi;
use crate::api::rest_util::RESTUtil;
use crate::catalog::Identifier;
use crate::common::{CatalogOptions, Options};
use crate::io::storage_oss::OSS_ENDPOINT;
use crate::io::FileIO;
use crate::Result;
use super::rest_token::RESTToken;
const TOKEN_EXPIRATION_SAFE_TIME_MILLIS: i64 = 3_600_000;
pub struct RESTTokenFileIO {
identifier: Identifier,
path: String,
catalog_options: Options,
api: OnceCell<RESTApi>,
token: RwLock<Option<RESTToken>>,
}
impl RESTTokenFileIO {
pub fn new(identifier: Identifier, path: String, catalog_options: Options) -> Self {
Self {
identifier,
path,
catalog_options,
api: OnceCell::new(),
token: RwLock::new(None),
}
}
pub async fn build_file_io(&self) -> Result<FileIO> {
self.try_to_refresh_token().await?;
let token_guard = self.token.read().await;
match token_guard.as_ref() {
Some(token) => {
let merged_props =
RESTUtil::merge(Some(self.catalog_options.to_map()), Some(&token.token));
let mut builder = FileIO::from_path(&self.path)?;
builder = builder.with_props(merged_props);
builder.build()
}
None => {
FileIO::from_path(&self.path)?.build()
}
}
}
async fn try_to_refresh_token(&self) -> Result<()> {
{
let token_guard = self.token.read().await;
if let Some(token) = token_guard.as_ref() {
if !Self::is_token_expired(token) {
return Ok(());
}
}
}
{
let token_guard = self.token.write().await;
if let Some(token) = token_guard.as_ref() {
if !Self::is_token_expired(token) {
return Ok(());
}
}
}
let new_token = self.refresh_token().await?;
let mut token_guard = self.token.write().await;
*token_guard = Some(new_token);
Ok(())
}
async fn refresh_token(&self) -> Result<RESTToken> {
let api = self
.api
.get_or_try_init(|| async { RESTApi::new(self.catalog_options.clone(), false).await })
.await?;
let response = api.load_table_token(&self.identifier).await?;
let expires_at_millis =
response
.expires_at_millis
.ok_or_else(|| crate::Error::DataInvalid {
message: format!(
"Token response for table '{}' missing expires_at_millis",
self.identifier.full_name()
),
source: None,
})?;
let merged_token = self.merge_token_with_catalog_options(response.token);
Ok(RESTToken::new(merged_token, expires_at_millis))
}
fn is_token_expired(token: &RESTToken) -> bool {
let current_time = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64;
(token.expire_at_millis - current_time) < TOKEN_EXPIRATION_SAFE_TIME_MILLIS
}
fn merge_token_with_catalog_options(
&self,
token: HashMap<String, String>,
) -> HashMap<String, String> {
let mut merged = token;
if let Some(dlf_oss_endpoint) = self.catalog_options.get(CatalogOptions::DLF_OSS_ENDPOINT) {
if !dlf_oss_endpoint.trim().is_empty() {
merged.insert(OSS_ENDPOINT.to_string(), dlf_oss_endpoint.clone());
}
}
merged
}
}