import asyncio
from typing import AsyncIterator, Coroutine, Any
import aiohttp
async def fetch_data(session: aiohttp.ClientSession, url: str) -> dict:
async with session.get(url) as response:
response.raise_for_status()
return await response.json()
async def fetch_all(urls: list[str]) -> list[dict]:
async with aiohttp.ClientSession() as session:
tasks = [fetch_data(session, url) for url in urls]
return await asyncio.gather(*tasks)
async def bounded_fetch(urls: list[str], concurrency: int = 5) -> list[dict]:
semaphore = asyncio.Semaphore(concurrency)
async def fetch_with_limit(url: str) -> dict:
async with semaphore:
async with aiohttp.ClientSession() as session:
return await fetch_data(session, url)
return await asyncio.gather(*[fetch_with_limit(url) for url in urls])
async def producer_consumer():
queue: asyncio.Queue[int] = asyncio.Queue(maxsize=10)
async def producer():
for i in range(20):
await queue.put(i)
await asyncio.sleep(0.01)
await queue.put(None)
async def consumer():
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
print(f"Processed: {item}")
queue.task_done()
await asyncio.gather(producer(), consumer())
async def timeout_example():
try:
await asyncio.wait_for(slow_operation(), timeout=5.0)
except asyncio.TimeoutError:
print("Operation timed out")
async def slow_operation():
await asyncio.sleep(10)
async def periodic_task(interval: float, task: Coroutine[Any, Any, None]):
while True:
await task
await asyncio.sleep(interval)
class AsyncContextManager:
async def __aenter__(self) -> "AsyncContextManager":
print("Entering context")
await asyncio.sleep(0.1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
print("Exiting context")
await asyncio.sleep(0.1)
async def retry_async(func: Callable[..., Coroutine[Any, Any, T]],
max_attempts: int = 3,
delay: float = 1.0) -> T:
for attempt in range(max_attempts):
try:
return await func()
except Exception as e:
if attempt == max_attempts - 1:
raise
await asyncio.sleep(delay * (attempt + 1))
raise RuntimeError("Max retries exceeded")
async def async_generator_example() -> AsyncIterator[int]:
for i in range(10):
await asyncio.sleep(0.1)
yield i * i
async def gather_with_exceptions(*coros: Coroutine[Any, Any, T]) -> list[T | Exception]:
tasks = [asyncio.create_task(c) for c in coros]
results: list[Any] = []
for task in asyncio.as_completed(tasks):
try:
results.append(await task)
except Exception as e:
results.append(e)
return results
from typing import Callable, TypeVar
T = TypeVar('T')