from abc import ABCMeta
from collections.abc import Callable
from typing import Any
import grpc from grpc import ClientCallDetails from grpc.aio import Call, UnaryUnaryClientInterceptor
from .configuration import ClientConfiguration, SecretAccessToken
class RefreshInterceptor(UnaryUnaryClientInterceptor, metaclass=ABCMeta):
def __init__(self, client_configuration: ClientConfiguration | None = None):
if client_configuration is None:
client_configuration = ClientConfiguration.load_default()
self.configuration = client_configuration
async def _get_access_token(self) -> SecretAccessToken:
return await self.configuration.get_bearer_access_token_async()
async def intercept_unary_unary(
self,
continuation: Callable[[ClientCallDetails, Any], Call],
client_call_details: ClientCallDetails,
request: Any,
) -> Any:
authorized_metadata: list[tuple[str, str]] = []
if client_call_details.metadata is not None:
authorized_metadata = list(client_call_details.metadata)
authorized_metadata.append(("authorization", f"Bearer {await self._get_access_token()}"))
new_client_call_details = grpc.aio.ClientCallDetails(
client_call_details.method,
client_call_details.timeout,
authorized_metadata,
client_call_details.credentials,
client_call_details.wait_for_ready,
)
return await continuation(new_client_call_details, request)