from __future__ import annotations
import copy
from enum import Enum
from typing import Any, Callable, Dict, List, Optional, TypeVar
T = TypeVar("T")
class IsolationLevel(str, Enum):
READ_UNCOMMITTED = "readUncommitted"
READ_COMMITTED = "readCommitted"
REPEATABLE_READ = "repeatableRead"
SERIALIZABLE = "serializable"
class TransactionClient:
def __init__(self, parent: Any, transaction_id: str) -> None:
self._parent = parent
self._transaction_id = transaction_id
for name, delegate in parent._delegates.items():
clone = copy.copy(delegate)
clone._client = self setattr(self, name, clone)
async def _rpc(self, method: str, params: Dict[str, Any]) -> Any:
params = {**params, "transactionId": self._transaction_id}
return await self._parent._rpc(method, params)
def _sync_rpc(self, method: str, params: Dict[str, Any]) -> Any:
params = {**params, "transactionId": self._transaction_id}
return self._parent._sync_rpc(method, params)
def get_delegate(self, name: str) -> Any:
return getattr(self, name)
def register_delegate(self, name: str, delegate: Any) -> None:
pass