import asyncio
try:
from dissolve import replace_me
except ModuleNotFoundError:
import warnings
def replace_me(since=None, remove_in=None, message=None):
def decorator(func):
def wrapper(*args, **kwargs):
msg = f"{func.__name__} has been deprecated"
if since:
msg += f" since {since}"
if remove_in:
msg += f" and will be removed in {remove_in}"
if message:
msg += f". {message}"
else:
msg += ". Consider running 'dissolve migrate' to automatically update your code."
warnings.warn(msg, DeprecationWarning, stacklevel=2)
return func(*args, **kwargs)
return wrapper
return decorator
async def fetch_data(url, timeout=30, headers=None):
if headers is None:
headers = {}
await asyncio.sleep(0.1)
return {
"url": url,
"timeout": timeout,
"headers": headers,
"data": f"Data from {url}",
"status": 200
}
async def fetch_json(url, timeout=30, parse_dates=False):
response = await fetch_data(url, timeout=timeout, headers={"Accept": "application/json"})
json_data = {"parsed": True, "content": response["data"]}
if parse_dates:
json_data["dates_parsed"] = True
return json_data
async def upload_file(file_path, destination_url, chunk_size=8192):
await asyncio.sleep(0.2) return {
"file_path": file_path,
"destination": destination_url,
"chunk_size": chunk_size,
"status": "uploaded"
}
@replace_me(since="3.0.0", remove_in="4.0.0")
async def old_fetch_data(url):
return await fetch_data(url, timeout=30)
@replace_me(since="2.5.0")
async def get_json(endpoint):
return await fetch_json(f"https://api.example.com/{endpoint}", timeout=60, parse_dates=True)
@replace_me(since="1.0.0")
async def legacy_upload(path, url):
return await upload_file(path, url, chunk_size=4096)
@replace_me(since="3.1.0")
async def old_batch_fetch(urls, delay=1):
return await batch_fetch(urls, delay_seconds=delay * 2, timeout=45)
async def batch_fetch(urls, delay_seconds=0.5, timeout=30):
results = []
for url in urls:
result = await fetch_data(url, timeout=timeout)
results.append(result)
if delay_seconds > 0:
await asyncio.sleep(delay_seconds)
return results