use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PythonSdkConfig {
pub base_url: String,
pub version: String,
pub include_rbac: bool,
pub include_conditional_permissions: bool,
pub include_audit: bool,
pub client_name: String,
pub async_support: bool,
pub type_hints: bool,
}
impl Default for PythonSdkConfig {
fn default() -> Self {
Self {
base_url: "https://api.example.com".to_string(),
version: "v1".to_string(),
include_rbac: true,
include_conditional_permissions: true,
include_audit: true,
client_name: "AuthFrameworkClient".to_string(),
async_support: true,
type_hints: true,
}
}
}
pub struct PythonSdkGenerator {
config: PythonSdkConfig,
}
impl PythonSdkGenerator {
pub fn new(config: PythonSdkConfig) -> Self {
Self { config }
}
pub fn generate_sdk(&self) -> Result<HashMap<String, String>, Box<dyn std::error::Error>> {
let mut files = HashMap::new();
files.insert("client.py".to_string(), self.generate_base_client()?);
if self.config.type_hints {
files.insert("types.py".to_string(), self.generate_types()?);
}
if self.config.include_rbac {
files.insert("rbac.py".to_string(), self.generate_rbac_module()?);
}
if self.config.include_conditional_permissions {
files.insert(
"conditional.py".to_string(),
self.generate_conditional_module()?,
);
}
if self.config.include_audit {
files.insert("audit.py".to_string(), self.generate_audit_module()?);
}
files.insert("utils.py".to_string(), self.generate_utils()?);
files.insert("__init__.py".to_string(), self.generate_init()?);
files.insert("setup.py".to_string(), self.generate_setup()?);
files.insert(
"requirements.txt".to_string(),
self.generate_requirements()?,
);
files.insert("README.md".to_string(), self.generate_readme()?);
Ok(files)
}
fn generate_base_client(&self) -> Result<String, Box<dyn std::error::Error>> {
let client_code = format!(
r#"""
Enhanced {} with RBAC Support
Generated by AuthFramework SDK Generator
"""
import asyncio
import json
import time
from typing import Dict, Any, Optional, Union
{}
import aiohttp
import requests
from urllib.parse import urljoin, urlencode
class HttpError(Exception):
"""HTTP error with status code and response data"""
def __init__(self, status: int, message: str, data: Optional[Dict[str, Any]] = None):
self.status = status
self.message = message
self.data = data
super().__init__(f"HTTP {{status}}: {{message}}")
class ApiResponse:
"""Wrapper for API responses"""
def __init__(self, success: bool, data: Optional[Any] = None,
error: Optional[str] = None, message: Optional[str] = None):
self.success = success
self.data = data
self.error = error
self.message = message
class {}:
"""Enhanced AuthFramework client with comprehensive RBAC support"""
def __init__(self, base_url: str, access_token: Optional[str] = None,
api_key: Optional[str] = None, timeout: int = 30,
retry_attempts: int = 3):
self.base_url = base_url.rstrip('/')
self.timeout = timeout
self.retry_attempts = retry_attempts
self.headers = {{
'Content-Type': 'application/json',
'User-Agent': 'AuthFramework-Python-SDK/1.0.0'
}}
if access_token:
self.headers['Authorization'] = f'Bearer {{access_token}}'
if api_key:
self.headers['X-API-Key'] = api_key
def set_access_token(self, token: str) -> None:
"""Set authentication token"""
self.headers['Authorization'] = f'Bearer {{token}}'
def clear_access_token(self) -> None:
"""Clear authentication token"""
self.headers.pop('Authorization', None)
def set_api_key(self, api_key: str) -> None:
"""Set API key"""
self.headers['X-API-Key'] = api_key
def _make_request_sync(self, method: str, path: str, data: Optional[Dict] = None,
headers: Optional[Dict[str, str]] = None) -> ApiResponse:
"""Make synchronous HTTP request with retry logic"""
url = urljoin(self.base_url, path)
request_headers = {{**self.headers, **(headers or {{}})}}
last_error = None
for attempt in range(self.retry_attempts + 1):
try:
response = requests.request(
method=method,
url=url,
headers=request_headers,
json=data,
timeout=self.timeout
)
response_data = response.json() if response.content else {{}}
if not response.ok:
raise HttpError(
status=response.status_code,
message=response.reason or 'Request failed',
data=response_data
)
return ApiResponse(
success=response_data.get('success', True),
data=response_data.get('data'),
error=response_data.get('error'),
message=response_data.get('message')
)
except (requests.RequestException, HttpError) as e:
last_error = e
# Don't retry on client errors (4xx)
if isinstance(e, HttpError) and 400 <= e.status < 500:
raise e
# Wait before retry (exponential backoff)
if attempt < self.retry_attempts:
time.sleep(2 ** attempt)
raise last_error or Exception('Request failed after all retries')
async def _make_request_async(self, method: str, path: str, data: Optional[Dict] = None,
headers: Optional[Dict[str, str]] = None) -> ApiResponse:
"""Make asynchronous HTTP request with retry logic"""
url = urljoin(self.base_url, path)
request_headers = {{**self.headers, **(headers or {{}})}}
last_error = None
for attempt in range(self.retry_attempts + 1):
try:
timeout = aiohttp.ClientTimeout(total=self.timeout)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.request(
method=method,
url=url,
headers=request_headers,
json=data
) as response:
response_data = await response.json() if response.content_length else {{}}
if not response.ok:
raise HttpError(
status=response.status,
message=response.reason or 'Request failed',
data=response_data
)
return ApiResponse(
success=response_data.get('success', True),
data=response_data.get('data'),
error=response_data.get('error'),
message=response_data.get('message')
)
except (aiohttp.ClientError, HttpError) as e:
last_error = e
# Don't retry on client errors (4xx)
if isinstance(e, HttpError) and 400 <= e.status < 500:
raise e
# Wait before retry (exponential backoff)
if attempt < self.retry_attempts:
await asyncio.sleep(2 ** attempt)
raise last_error or Exception('Request failed after all retries')
def get(self, path: str, headers: Optional[Dict[str, str]] = None) -> ApiResponse:
"""Synchronous GET request"""
return self._make_request_sync('GET', path, headers=headers)
def post(self, path: str, data: Optional[Dict] = None, headers: Optional[Dict[str, str]] = None) -> ApiResponse:
"""Synchronous POST request"""
return self._make_request_sync('POST', path, data=data, headers=headers)
def put(self, path: str, data: Optional[Dict] = None, headers: Optional[Dict[str, str]] = None) -> ApiResponse:
"""Synchronous PUT request"""
return self._make_request_sync('PUT', path, data=data, headers=headers)
def delete(self, path: str, headers: Optional[Dict[str, str]] = None) -> ApiResponse:
"""Synchronous DELETE request"""
return self._make_request_sync('DELETE', path, headers=headers)
async def get_async(self, path: str, headers: Optional[Dict[str, str]] = None) -> ApiResponse:
"""Asynchronous GET request"""
return await self._make_request_async('GET', path, headers=headers)
async def post_async(self, path: str, data: Optional[Dict] = None, headers: Optional[Dict[str, str]] = None) -> ApiResponse:
"""Asynchronous POST request"""
return await self._make_request_async('POST', path, data=data, headers=headers)
async def put_async(self, path: str, data: Optional[Dict] = None, headers: Optional[Dict[str, str]] = None) -> ApiResponse:
"""Asynchronous PUT request"""
return await self._make_request_async('PUT', path, data=data, headers=headers)
async def delete_async(self, path: str, headers: Optional[Dict[str, str]] = None) -> ApiResponse:
"""Asynchronous DELETE request"""
return await self._make_request_async('DELETE', path, headers=headers)
"#,
self.config.client_name,
if self.config.type_hints { ", List" } else { "" },
self.config.client_name
);
Ok(client_code)
}
fn generate_types(&self) -> Result<String, Box<dyn std::error::Error>> {
let types_code = r#"""
Type definitions for AuthFramework RBAC
"""
from typing import Dict, List, Optional, Union, Any
from datetime import datetime
from dataclasses import dataclass
from enum import Enum
class TimeOfDay(Enum):
"""Time of day classification"""
BUSINESS_HOURS = "business_hours"
AFTER_HOURS = "after_hours"
WEEKEND = "weekend"
HOLIDAY = "holiday"
class DayType(Enum):
"""Day type classification"""
WEEKDAY = "weekday"
WEEKEND = "weekend"
HOLIDAY = "holiday"
class DeviceType(Enum):
"""Device type classification"""
DESKTOP = "desktop"
MOBILE = "mobile"
TABLET = "tablet"
UNKNOWN = "unknown"
class ConnectionType(Enum):
"""Connection type classification"""
DIRECT = "direct"
VPN = "vpn"
PROXY = "proxy"
TOR = "tor"
CORPORATE = "corporate"
UNKNOWN = "unknown"
class SecurityLevel(Enum):
"""Security level assessment"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class Role:
"""Role definition"""
id: str
name: str
description: Optional[str] = None
parent_id: Optional[str] = None
permissions: List[str] = None
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
def __post_init__(self):
if self.permissions is None:
self.permissions = []
@dataclass
class Permission:
"""Permission definition"""
id: str
action: str
resource: str
conditions: Optional[Dict[str, str]] = None
created_at: Optional[datetime] = None
@dataclass
class UserRole:
"""User role assignment"""
role_id: str
role_name: str
assigned_at: datetime
assigned_by: Optional[str] = None
expires_at: Optional[datetime] = None
@dataclass
class UserRolesResponse:
"""User roles and effective permissions"""
user_id: str
roles: List[UserRole]
effective_permissions: List[str]
@dataclass
class CreateRoleRequest:
"""Request to create a new role"""
name: str
description: Optional[str] = None
parent_id: Optional[str] = None
permissions: Optional[List[str]] = None
@dataclass
class UpdateRoleRequest:
"""Request to update an existing role"""
name: Optional[str] = None
description: Optional[str] = None
parent_id: Optional[str] = None
@dataclass
class AssignRoleRequest:
"""Request to assign a role to a user"""
role_id: str
expires_at: Optional[datetime] = None
reason: Optional[str] = None
@dataclass
class BulkAssignment:
"""Single assignment in bulk operation"""
user_id: str
role_id: str
expires_at: Optional[datetime] = None
@dataclass
class BulkAssignRequest:
"""Request for bulk role assignment"""
assignments: List[BulkAssignment]
@dataclass
class ElevateRoleRequest:
"""Request for role elevation"""
target_role: str
duration_minutes: Optional[int] = None
justification: str = ""
@dataclass
class PermissionCheckRequest:
"""Request to check permissions"""
action: str
resource: str
context: Optional[Dict[str, str]] = None
@dataclass
class PermissionCheckResponse:
"""Response from permission check"""
granted: bool
reason: str
required_roles: List[str]
missing_permissions: List[str]
@dataclass
class AuditEntry:
"""Audit log entry"""
id: str
user_id: Optional[str]
action: str
resource: Optional[str]
result: str
context: Dict[str, str]
timestamp: datetime
@dataclass
class AuditLogResponse:
"""Response containing audit logs"""
entries: List[AuditEntry]
total_count: int
page: int
per_page: int
@dataclass
class AuditQuery:
"""Query parameters for audit logs"""
user_id: Optional[str] = None
action: Optional[str] = None
resource: Optional[str] = None
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
page: Optional[int] = None
per_page: Optional[int] = None
@dataclass
class ConditionalContext:
"""Context for conditional permissions"""
time_of_day: Optional[TimeOfDay] = None
day_type: Optional[DayType] = None
device_type: Optional[DeviceType] = None
connection_type: Optional[ConnectionType] = None
security_level: Optional[SecurityLevel] = None
risk_score: Optional[int] = None
ip_address: Optional[str] = None
user_agent: Optional[str] = None
custom_attributes: Optional[Dict[str, str]] = None
@dataclass
class RoleListQuery:
"""Query parameters for listing roles"""
page: Optional[int] = None
per_page: Optional[int] = None
parent_id: Optional[str] = None
include_permissions: Optional[bool] = None
"#;
Ok(types_code.to_string())
}
fn generate_rbac_module(&self) -> Result<String, Box<dyn std::error::Error>> {
let rbac_code = format!(
r#"""
RBAC (Role-Based Access Control) Module
Provides comprehensive role and permission management
"""
from typing import Dict, List, Optional, Union
{}
from .types import (
Role, CreateRoleRequest, UpdateRoleRequest, UserRolesResponse,
AssignRoleRequest, BulkAssignRequest, ElevateRoleRequest,
PermissionCheckRequest, PermissionCheckResponse, RoleListQuery
)
from .client import ApiResponse
class RbacManager:
"""RBAC management client"""
def __init__(self, client):
self.client = client
# ============================================================================
# ROLE MANAGEMENT
# ============================================================================
def create_role(self, request: CreateRoleRequest) -> ApiResponse:
"""Create a new role"""
data = {{
'name': request.name,
'description': request.description,
'parent_id': request.parent_id,
'permissions': request.permissions
}}
return self.client.post('/{}/rbac/roles', data)
def get_role(self, role_id: str) -> ApiResponse:
"""Get role by ID"""
return self.client.get(f'/{}/rbac/roles/{{role_id}}')
def list_roles(self, query: Optional[RoleListQuery] = None) -> ApiResponse:
"""List all roles with pagination"""
path = '/{}/rbac/roles'
if query:
params = []
if query.page is not None:
params.append(f'page={{query.page}}')
if query.per_page is not None:
params.append(f'per_page={{query.per_page}}')
if query.parent_id is not None:
params.append(f'parent_id={{query.parent_id}}')
if query.include_permissions is not None:
params.append(f'include_permissions={{str(query.include_permissions).lower()}}')
if params:
path += '?' + '&'.join(params)
return self.client.get(path)
def update_role(self, role_id: str, request: UpdateRoleRequest) -> ApiResponse:
"""Update an existing role"""
data = {{}}
if request.name is not None:
data['name'] = request.name
if request.description is not None:
data['description'] = request.description
if request.parent_id is not None:
data['parent_id'] = request.parent_id
return self.client.put(f'/{}/rbac/roles/{{role_id}}', data)
def delete_role(self, role_id: str) -> ApiResponse:
"""Delete a role"""
return self.client.delete(f'/{}/rbac/roles/{{role_id}}')
# ============================================================================
# USER ROLE ASSIGNMENTS
# ============================================================================
def assign_user_role(self, user_id: str, request: AssignRoleRequest) -> ApiResponse:
"""Assign role to user"""
data = {{
'role_id': request.role_id,
'expires_at': request.expires_at.isoformat() if request.expires_at else None,
'reason': request.reason
}}
return self.client.post(f'/{}/rbac/users/{{user_id}}/roles', data)
def revoke_user_role(self, user_id: str, role_id: str) -> ApiResponse:
"""Revoke role from user"""
return self.client.delete(f'/{}/rbac/users/{{user_id}}/roles/{{role_id}}')
def get_user_roles(self, user_id: str) -> ApiResponse:
"""Get user's roles and effective permissions"""
return self.client.get(f'/{}/rbac/users/{{user_id}}/roles')
def bulk_assign_roles(self, request: BulkAssignRequest) -> ApiResponse:
"""Bulk assign roles to multiple users"""
data = {{
'assignments': [
{{
'user_id': assignment.user_id,
'role_id': assignment.role_id,
'expires_at': assignment.expires_at.isoformat() if assignment.expires_at else None
}}
for assignment in request.assignments
]
}}
return self.client.post('/{}/rbac/bulk/assign', data)
# ============================================================================
# PERMISSION CHECKING
# ============================================================================
def check_permission(self, request: PermissionCheckRequest) -> ApiResponse:
"""Check if current user has permission"""
data = {{
'action': request.action,
'resource': request.resource,
'context': request.context
}}
return self.client.post('/{}/rbac/check-permission', data)
def has_permission(self, action: str, resource: str, context: Optional[Dict[str, str]] = None) -> bool:
"""Quick permission check (returns boolean)"""
try:
request = PermissionCheckRequest(action=action, resource=resource, context=context)
response = self.check_permission(request)
return response.data and response.data.get('granted', False)
except Exception:
return False
def elevate_role(self, request: ElevateRoleRequest) -> ApiResponse:
"""Request role elevation"""
data = {{
'target_role': request.target_role,
'duration_minutes': request.duration_minutes,
'justification': request.justification
}}
return self.client.post('/{}/rbac/elevate', data)
# ============================================================================
# ASYNC METHODS
# ============================================================================
async def create_role_async(self, request: CreateRoleRequest) -> ApiResponse:
"""Create a new role (async)"""
data = {{
'name': request.name,
'description': request.description,
'parent_id': request.parent_id,
'permissions': request.permissions
}}
return await self.client.post_async('/{}/rbac/roles', data)
async def get_role_async(self, role_id: str) -> ApiResponse:
"""Get role by ID (async)"""
return await self.client.get_async(f'/{}/rbac/roles/{{role_id}}')
async def has_permission_async(self, action: str, resource: str, context: Optional[Dict[str, str]] = None) -> bool:
"""Quick permission check (async, returns boolean)"""
try:
request = PermissionCheckRequest(action=action, resource=resource, context=context)
data = {{
'action': request.action,
'resource': request.resource,
'context': request.context
}}
response = await self.client.post_async('/{}/rbac/check-permission', data)
return response.data and response.data.get('granted', False)
except Exception:
return False
# ============================================================================
# CONVENIENCE METHODS
# ============================================================================
def user_has_any_role(self, user_id: str, role_names: List[str]) -> bool:
"""Check if user has any of the specified roles"""
try:
response = self.get_user_roles(user_id)
if not response.data:
return False
user_role_names = [role['role_name'] for role in response.data.get('roles', [])]
return any(role in user_role_names for role in role_names)
except Exception:
return False
def user_has_all_roles(self, user_id: str, role_names: List[str]) -> bool:
"""Check if user has all of the specified roles"""
try:
response = self.get_user_roles(user_id)
if not response.data:
return False
user_role_names = [role['role_name'] for role in response.data.get('roles', [])]
return all(role in user_role_names for role in role_names)
except Exception:
return False
def get_role_hierarchy(self) -> Dict[str, List[str]]:
"""Get role hierarchy (parent-child relationships)"""
try:
response = self.list_roles(RoleListQuery(include_permissions=False))
if not response.data:
return {{}}
hierarchy = {{}}
for role in response.data:
parent_id = role.get('parent_id')
if parent_id:
if parent_id not in hierarchy:
hierarchy[parent_id] = []
hierarchy[parent_id].append(role['id'])
return hierarchy
except Exception:
return {{}}
def get_child_roles(self, parent_role_id: str) -> List[Role]:
"""Get all child roles for a given parent role"""
try:
response = self.list_roles(RoleListQuery(parent_id=parent_role_id))
return response.data or []
except Exception:
return []
"#,
"", self.config.version,
self.config.version,
self.config.version,
self.config.version,
self.config.version,
self.config.version,
self.config.version,
self.config.version,
self.config.version,
self.config.version,
self.config.version,
self.config.version,
self.config.version,
self.config.version
);
Ok(rbac_code)
}
fn generate_conditional_module(&self) -> Result<String, Box<dyn std::error::Error>> {
Ok(format!(
r#""""Conditional permission helpers for auth-framework Python SDK v{version}."""
from __future__ import annotations
from typing import Any, Callable, List, Optional
class ConditionalPermission:
"""A permission that is only granted when a runtime condition is satisfied."""
def __init__(self, permission: str, condition: Callable[..., bool]) -> None:
self.permission = permission
self._condition = condition
def is_granted(self, **context: Any) -> bool:
"""Return True only when the attached condition evaluates to True."""
return self._condition(**context)
class PermissionGate:
"""Evaluates a list of :class:`ConditionalPermission` items against a context."""
def __init__(self, permissions: Optional[List[ConditionalPermission]] = None) -> None:
self._permissions: List[ConditionalPermission] = permissions or []
def add(self, cp: ConditionalPermission) -> None:
self._permissions.append(cp)
def evaluate(self, **context: Any) -> List[str]:
"""Return the names of all permissions granted in the given context."""
return [
cp.permission for cp in self._permissions if cp.is_granted(**context)
]
def has(self, permission: str, **context: Any) -> bool:
"""Return True if *permission* is granted in *context*."""
return permission in self.evaluate(**context)
"#,
version = self.config.version
))
}
fn generate_audit_module(&self) -> Result<String, Box<dyn std::error::Error>> {
Ok(format!(
r#"""Audit log client for auth-framework Python SDK v{version}."""
from __future__ import annotations
import time
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
@dataclass
class AuditEvent:
"""Represents a single audit log entry."""
event_type: str
user_id: str
resource: str
outcome: str
timestamp: float = field(default_factory=time.time)
metadata: Dict[str, Any] = field(default_factory=dict)
class AuditLog:
"""Lightweight in-process audit log buffer."""
def __init__(self, max_size: int = 10_000) -> None:
self._events: List[AuditEvent] = []
self._max_size = max_size
def record(self, event: AuditEvent) -> None:
if len(self._events) >= self._max_size:
self._events.pop(0)
self._events.append(event)
def query(
self,
*,
user_id: Optional[str] = None,
event_type: Optional[str] = None,
outcome: Optional[str] = None,
) -> List[AuditEvent]:
results = self._events
if user_id is not None:
results = [e for e in results if e.user_id == user_id]
if event_type is not None:
results = [e for e in results if e.event_type == event_type]
if outcome is not None:
results = [e for e in results if e.outcome == outcome]
return results
def clear(self) -> None:
self._events.clear()
"#,
version = self.config.version
))
}
fn generate_utils(&self) -> Result<String, Box<dyn std::error::Error>> {
Ok(format!(
r#"""Utility helpers for auth-framework Python SDK v{version}."""
from __future__ import annotations
import base64
import time
from typing import Any, Dict, Optional
def decode_jwt_payload(token: str) -> Dict[str, Any]:
"""Decode JWT payload without verifying the signature.
.. warning::
This does **not** verify the token. Use the server-side validation
endpoints for security-sensitive operations.
"""
try:
parts = token.split(".")
if len(parts) < 2:
raise ValueError("Not a valid JWT")
padding = "=" * (4 - len(parts[1]) % 4)
import json
return json.loads(base64.urlsafe_b64decode(parts[1] + padding))
except Exception as exc:
raise ValueError(f"Failed to decode JWT payload: {{exc}}") from exc
def is_token_expired(token: str, leeway: int = 0) -> bool:
"""Return True if the JWT *exp* claim is in the past."""
try:
payload = decode_jwt_payload(token)
exp: Optional[int] = payload.get("exp")
if exp is None:
return False
return time.time() > (exp + leeway)
except ValueError:
return True
def constant_time_compare(a: str, b: str) -> bool:
"""Compare two strings in constant time to prevent timing attacks."""
if len(a) != len(b):
return False
result = 0
for x, y in zip(a, b):
result |= ord(x) ^ ord(y)
return result == 0
"#,
version = self.config.version
))
}
fn generate_init(&self) -> Result<String, Box<dyn std::error::Error>> {
Ok(format!(
r#"""auth-framework Python SDK v{version}.
Usage::
from auth_framework import AuthClient
client = AuthClient(base_url="https://auth.example.com", api_key="...")
token = client.authenticate(username="alice", password="secret")
"""
from .client import AuthClient
from .models import AuthToken, UserInfo
from .exceptions import AuthError, TokenExpiredError, PermissionDeniedError
__all__ = [
"AuthClient",
"AuthToken",
"UserInfo",
"AuthError",
"TokenExpiredError",
"PermissionDeniedError",
]
__version__ = "{version}"
"#,
version = self.config.version
))
}
fn generate_setup(&self) -> Result<String, Box<dyn std::error::Error>> {
Ok(format!(
r#""""Setup configuration for auth-framework Python SDK."""
from setuptools import setup, find_packages
setup(
name="auth-framework-sdk",
version="{version}",
description="Python SDK for auth-framework authentication and authorization",
long_description=open("README.md").read(),
long_description_content_type="text/markdown",
python_requires=">=3.9",
packages=find_packages(),
install_requires=[
"aiohttp>=3.8.0",
"requests>=2.28.0",
],
extras_require={{
"async": ["aiohttp>=3.8.0"],
}},
classifiers=[
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Topic :: Security",
],
)
"#,
version = self.config.version,
))
}
fn generate_requirements(&self) -> Result<String, Box<dyn std::error::Error>> {
Ok("aiohttp>=3.8.0\nrequests>=2.28.0".to_string())
}
fn generate_readme(&self) -> Result<String, Box<dyn std::error::Error>> {
Ok(format!(
r#"# auth-framework-sdk — AuthFramework Python SDK v{version}
A Python SDK for integrating with the AuthFramework authentication and authorization server.
## Installation
```bash
pip install auth-framework-sdk
```
## Quick Start
```python
from auth_framework import {client_name}
client = {client_name}(
base_url="{base_url}",
api_key="your-api-key",
)
# Authenticate a user
token = client.authenticate(username="alice", password="secret")
print(token.access_token)
# Validate a token
user = client.validate_token(token.access_token)
print(user.user_id)
```
## Features
- **Token Authentication**: Username/password, API keys, OAuth 2.0
- **RBAC**: Role-based access control with permission checks
- **MFA**: Multi-factor authentication (TOTP, Email, WebAuthn)
- **Audit Logging**: Built-in audit log client
- **Async Support**: Full async/await support via `aiohttp`
## License
MIT
"#,
version = self.config.version,
client_name = self.config.client_name,
base_url = self.config.base_url,
))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_python_sdk_config_default() {
let config = PythonSdkConfig::default();
assert_eq!(config.base_url, "https://api.example.com");
assert_eq!(config.version, "v1");
assert!(config.include_rbac);
assert!(config.include_conditional_permissions);
assert!(config.include_audit);
assert_eq!(config.client_name, "AuthFrameworkClient");
assert!(config.async_support);
assert!(config.type_hints);
}
#[test]
fn test_generate_sdk_default_config() {
let generator = PythonSdkGenerator::new(PythonSdkConfig::default());
let files = generator.generate_sdk().unwrap();
assert!(files.contains_key("client.py"));
assert!(files.contains_key("types.py"));
assert!(files.contains_key("rbac.py"));
assert!(files.contains_key("conditional.py"));
assert!(files.contains_key("audit.py"));
assert!(files.contains_key("utils.py"));
assert!(files.contains_key("__init__.py"));
assert!(files.contains_key("setup.py"));
assert!(files.contains_key("requirements.txt"));
assert!(files.contains_key("README.md"));
}
#[test]
fn test_generate_sdk_without_optional_modules() {
let config = PythonSdkConfig {
include_rbac: false,
include_conditional_permissions: false,
include_audit: false,
type_hints: false,
..PythonSdkConfig::default()
};
let generator = PythonSdkGenerator::new(config);
let files = generator.generate_sdk().unwrap();
assert!(files.contains_key("client.py"));
assert!(!files.contains_key("types.py"));
assert!(!files.contains_key("rbac.py"));
assert!(!files.contains_key("conditional.py"));
assert!(!files.contains_key("audit.py"));
}
#[test]
fn test_generated_client_contains_class_name() {
let config = PythonSdkConfig {
client_name: "MyClient".to_string(),
..PythonSdkConfig::default()
};
let generator = PythonSdkGenerator::new(config);
let files = generator.generate_sdk().unwrap();
let client_code = &files["client.py"];
assert!(client_code.contains("MyClient"));
}
}