import sys
import os
import asyncio
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from library.utils import old_add, old_multiply, square, old_format_string
from library.models import OldDataProcessor, LegacyUser
from library.async_ops import old_fetch_data, get_json
from library.processors import ProcessingEngine
from library.config import OLD_API_URL, OLD_TIMEOUT, AppConfig
from library.containers import LegacyContainer, SmartList
class DataAnalyzer:
def __init__(self, data_source):
self.processor = OldDataProcessor(data_source, cache_size=100)
self.api_url = OLD_API_URL
self.timeout = OLD_TIMEOUT
self.page_size = AppConfig.LEGACY_PAGE_SIZE
def calculate_metrics(self, values):
total = 0
squares_sum = 0
for value in values:
total = old_add(total, value)
square_val = square(value)
squares_sum = old_add(squares_sum, square_val)
mean = total / len(values) if values else 0
variance = squares_sum / len(values) if values else 0
return {
"total": total,
"mean": mean,
"variance": variance,
"count": len(values)
}
def process_user_data(self, user_info):
user = LegacyUser(user_info["name"], user_info["email"])
display_name = user.get_name()
user.set_profile_data({"age": user_info.get("age", 0)})
is_active = user.is_enabled
return {
"display_name": display_name,
"is_active": is_active,
"profile": user._user.get_profile()
}
def format_report(self, template, data):
return old_format_string(
template,
data.get("title", "Report"),
data.get("date", "Today"),
uppercase=data.get("header_caps", False)
)
def validate_inputs(self, inputs):
for item in inputs:
is_valid = ProcessingEngine.old_validate_data([item])
if not is_valid:
return False
return True
async def fetch_external_data(self, endpoint):
if endpoint.startswith("json/"):
return await get_json(endpoint[5:]) else:
full_url = f"{self.api_url}/{endpoint}"
return await old_fetch_data(full_url)
class ReportGenerator:
def __init__(self):
self.data_container = LegacyContainer()
self.results_list = SmartList()
def add_data_point(self, data):
current_size = self.data_container.size()
self.data_container._items.append(data)
has_data = self.data_container.contains(data)
if has_data:
result_count = self.results_list.get_count()
self.results_list.append(f"Data point #{result_count + 1}: {data}")
def generate_summary(self):
total_items = self.data_container.size()
results_count = self.results_list.length()
is_empty = self.results_list.is_empty()
if is_empty:
return "No data to summarize"
sample_data = []
for i in range(min(3, total_items)):
item = self.data_container.get_item(i)
sample_data.append(str(item))
return {
"total_data_points": total_items,
"total_results": results_count,
"sample_data": sample_data,
"has_data": not is_empty
}
def process_financial_data(transactions):
balance = 0
fees_total = 0
for transaction in transactions:
amount = transaction.get("amount", 0)
fee = transaction.get("fee", 0)
if transaction.get("type") == "deposit":
balance = old_add(balance, amount)
elif transaction.get("type") == "withdrawal":
balance = old_add(balance, -amount)
compound_fee = old_multiply(fee, 1.1) fees_total = old_add(fees_total, compound_fee)
return {
"final_balance": balance,
"total_fees": fees_total,
"transaction_count": len(transactions)
}
async def main():
print("=== Data Processing Example ===")
print("This module shows realistic usage of deprecated functions.")
analyzer = DataAnalyzer("financial_data.db")
sample_values = [10, 20, 30, 40, 50]
metrics = analyzer.calculate_metrics(sample_values)
print(f"Calculated metrics: {metrics}")
user_info = {"name": "alice_smith", "email": "alice@example.com", "age": 28}
user_data = analyzer.process_user_data(user_info)
print(f"Processed user data: {user_data}")
report_data = {"title": "Monthly Analysis", "date": "2024-01-15", "header_caps": True}
report = analyzer.format_report("Report: {} - Generated on {}", report_data)
print(f"Formatted report: {report}")
test_inputs = ["valid_data", "more_data"]
is_valid = analyzer.validate_inputs(test_inputs)
print(f"Input validation result: {is_valid}")
report_gen = ReportGenerator()
report_gen.add_data_point("Sample data 1")
report_gen.add_data_point("Sample data 2")
report_gen.add_data_point("Sample data 3")
summary = report_gen.generate_summary()
print(f"Report summary: {summary}")
transactions = [
{"type": "deposit", "amount": 100, "fee": 1},
{"type": "withdrawal", "amount": 30, "fee": 0.5},
{"type": "deposit", "amount": 50, "fee": 1}
]
financial_result = process_financial_data(transactions)
print(f"Financial processing result: {financial_result}")
try:
print("\nFetching external data...")
external_data = await analyzer.fetch_external_data("json/test_data")
print(f"External data fetched: {external_data}")
except Exception as e:
print(f"Error fetching external data: {e}")
print("\n=== Migration Instructions ===")
print("To migrate this code:")
print("1. Run: dissolve migrate consumer/data_processing.py")
print("2. Review the changes")
print("3. Run: dissolve migrate --write consumer/data_processing.py")
if __name__ == "__main__":
asyncio.run(main())