from __future__ import annotations
from typing import Any
from ._base import BaseClient, RequestConfig
class OAuthService:
def __init__(self, client: BaseClient) -> None:
self._client = client
async def authorize(
self,
client_id: str,
redirect_uri: str,
scope: str,
state: str | None = None,
code_challenge: str | None = None,
code_challenge_method: str | None = None,
) -> dict[str, Any]:
params: dict[str, Any] = {
"client_id": client_id,
"redirect_uri": redirect_uri,
"scope": scope,
"response_type": "code",
}
if state:
params["state"] = state
if code_challenge:
params["code_challenge"] = code_challenge
if code_challenge_method:
params["code_challenge_method"] = code_challenge_method
config = RequestConfig(params=params)
return await self._client.make_request("GET", "/oauth/authorize", config=config)
async def token(
self,
grant_type: str,
client_id: str,
client_secret: str | None = None,
code: str | None = None,
redirect_uri: str | None = None,
refresh_token: str | None = None,
username: str | None = None,
password: str | None = None,
scope: str | None = None,
code_verifier: str | None = None,
) -> dict[str, Any]:
data: dict[str, Any] = {
"grant_type": grant_type,
"client_id": client_id,
}
if client_secret:
data["client_secret"] = client_secret
if code:
data["code"] = code
if redirect_uri:
data["redirect_uri"] = redirect_uri
if refresh_token:
data["refresh_token"] = refresh_token
if username:
data["username"] = username
if password:
data["password"] = password
if scope:
data["scope"] = scope
if code_verifier:
data["code_verifier"] = code_verifier
config = RequestConfig(form_data=data)
response = await self._client.make_request(
"POST", "/oauth/token", config=config
)
if "access_token" in response:
self._client.set_access_token(response["access_token"])
return response
async def revoke(
self,
token: str,
client_id: str,
client_secret: str | None = None,
token_type_hint: str | None = None,
) -> dict[str, Any]:
data: dict[str, Any] = {
"token": token,
"client_id": client_id,
}
if client_secret:
data["client_secret"] = client_secret
if token_type_hint:
data["token_type_hint"] = token_type_hint
config = RequestConfig(form_data=data)
response = await self._client.make_request(
"POST", "/oauth/revoke", config=config
)
if token == self._client.get_access_token():
self._client.clear_access_token()
return response
async def introspect(
self,
token: str,
client_id: str,
client_secret: str | None = None,
token_type_hint: str | None = None,
) -> dict[str, Any]:
data: dict[str, Any] = {
"token": token,
"client_id": client_id,
}
if client_secret:
data["client_secret"] = client_secret
if token_type_hint:
data["token_type_hint"] = token_type_hint
config = RequestConfig(form_data=data)
return await self._client.make_request(
"POST", "/oauth/introspect", config=config
)
async def get_userinfo(self) -> dict[str, Any]:
return await self._client.make_request("GET", "/oauth/userinfo")