from contextlib import contextmanager, asynccontextmanager
from typing import Generator, AsyncGenerator, TypeVar, Optional
import time
import threading
import tempfile
import os
T = TypeVar('T')
@contextmanager
def timer_context(operation_name: str = "Operation") -> Generator[None, None, None]:
start = time.perf_counter()
try:
yield
finally:
elapsed = time.perf_counter() - start
print(f"{operation_name} took {elapsed:.4f}s")
@contextmanager
def managed_file(filepath: str, mode: str = "r") -> Generator[Optional[object], None, None]:
file = None
try:
file = open(filepath, mode)
yield file
except FileNotFoundError:
print(f"File {filepath} not found")
yield None
except IOError as e:
print(f"IO error: {e}")
yield None
finally:
if file:
file.close()
@contextmanager
def temp_database() -> Generator[dict, None, None]:
db = {"connection": None, "transactions": []}
try:
db["connection"] = "active"
print("Database connection opened")
yield db
except Exception as e:
print(f"Transaction failed: {e}")
db["transactions"].append("rollback")
raise
finally:
db["connection"] = None
print("Database connection closed")
@contextmanager
def atomic_write(filepath: str) -> Generator[object, None, None]:
temp_path = filepath + ".tmp"
file = open(temp_path, "w")
try:
yield file
file.close()
os.rename(temp_path, filepath)
except Exception:
file.close()
os.unlink(temp_path)
raise
@contextmanager
def lock_context(lock: threading.Lock, timeout: float = 1.0) -> Generator[bool, None, None]:
acquired = lock.acquire(timeout=timeout)
try:
yield acquired
finally:
if acquired:
lock.release()
class DatabaseConnection:
def __init__(self, dsn: str) -> None:
self.dsn = dsn
self.is_connected = False
def connect(self) -> None:
self.is_connected = True
print(f"Connected to {self.dsn}")
def disconnect(self) -> None:
self.is_connected = False
print("Disconnected")
def execute(self, query: str) -> list[dict]:
if not self.is_connected:
raise RuntimeError("Not connected")
return [{"result": query}]
def __enter__(self) -> "DatabaseConnection":
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
if exc_type:
print(f"Rolling back due to {exc_type.__name__}: {exc_val}")
self.disconnect()
@contextmanager
def suppress(*exceptions: type[BaseException]) -> Generator[None, None, None]:
try:
yield
except exceptions:
pass
@contextmanager
def working_directory(path: str) -> Generator[str, None, None]:
import os
original = os.getcwd()
try:
os.chdir(path)
yield path
finally:
os.chdir(original)
@contextmanager
def environment_variable(key: str, value: str) -> Generator[None, None, None]:
import os
original = os.environ.get(key)
try:
os.environ[key] = value
yield
finally:
if original is None:
os.environ.pop(key, None)
else:
os.environ[key] = original
@contextmanager
def multiple_contexts(*managers) -> Generator[tuple, None, None]:
from contextlib import ExitStack
with ExitStack() as stack:
results = [stack.enter_context(m) for m in managers]
yield tuple(results)
@asynccontextmanager
async def async_timer_context(name: str = "Async operation") -> AsyncGenerator[None, None]:
import asyncio
start = asyncio.get_event_loop().time()
try:
yield
finally:
elapsed = asyncio.get_event_loop().time() - start
print(f"{name} took {elapsed:.4f}s")
@contextmanager
def retry_context(max_attempts: int = 3, delay: float = 1.0):
class RetryContext:
def __init__(self, max_attempts: int, delay: float):
self.max_attempts = max_attempts
self.delay = delay
self.attempt = 0
def should_retry(self, exception: Exception) -> bool:
self.attempt += 1
if self.attempt >= self.max_attempts:
return False
time.sleep(self.delay * self.attempt)
return True
ctx = RetryContext(max_attempts, delay)
yield ctx