use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PythonSdkConfig {
pub base_url: String,
pub version: String,
pub include_rbac: bool,
pub include_conditional_permissions: bool,
pub include_audit: bool,
pub client_name: String,
pub async_support: bool,
pub type_hints: bool,
}
impl Default for PythonSdkConfig {
fn default() -> Self {
Self {
base_url: "https://api.example.com".to_string(),
version: "v1".to_string(),
include_rbac: true,
include_conditional_permissions: true,
include_audit: true,
client_name: "AuthFrameworkClient".to_string(),
async_support: true,
type_hints: true,
}
}
}
pub struct PythonSdkGenerator {
config: PythonSdkConfig,
}
impl PythonSdkGenerator {
pub fn new(config: PythonSdkConfig) -> Self {
Self { config }
}
pub fn generate_sdk(&self) -> Result<HashMap<String, String>, Box<dyn std::error::Error>> {
let mut files = HashMap::new();
files.insert("client.py".to_string(), self.generate_base_client()?);
if self.config.type_hints {
files.insert("types.py".to_string(), self.generate_types()?);
}
if self.config.include_rbac {
files.insert("rbac.py".to_string(), self.generate_rbac_module()?);
}
if self.config.include_conditional_permissions {
files.insert(
"conditional.py".to_string(),
self.generate_conditional_module()?,
);
}
if self.config.include_audit {
files.insert("audit.py".to_string(), self.generate_audit_module()?);
}
files.insert("utils.py".to_string(), self.generate_utils()?);
files.insert("__init__.py".to_string(), self.generate_init()?);
files.insert("setup.py".to_string(), self.generate_setup()?);
files.insert(
"requirements.txt".to_string(),
self.generate_requirements()?,
);
files.insert("README.md".to_string(), self.generate_readme()?);
Ok(files)
}
fn generate_base_client(&self) -> Result<String, Box<dyn std::error::Error>> {
let client_code = format!(
r#"""
Enhanced {} with RBAC Support
Generated by AuthFramework SDK Generator
"""
import asyncio
import json
import time
from typing import Dict, Any, Optional, Union
{}
import aiohttp
import requests
from urllib.parse import urljoin, urlencode
class HttpError(Exception):
"""HTTP error with status code and response data"""
def __init__(self, status: int, message: str, data: Optional[Dict[str, Any]] = None):
self.status = status
self.message = message
self.data = data
super().__init__(f"HTTP {{status}}: {{message}}")
class ApiResponse:
"""Wrapper for API responses"""
def __init__(self, success: bool, data: Optional[Any] = None,
error: Optional[str] = None, message: Optional[str] = None):
self.success = success
self.data = data
self.error = error
self.message = message
class {}:
"""Enhanced AuthFramework client with comprehensive RBAC support"""
def __init__(self, base_url: str, access_token: Optional[str] = None,
api_key: Optional[str] = None, timeout: int = 30,
retry_attempts: int = 3):
self.base_url = base_url.rstrip('/')
self.timeout = timeout
self.retry_attempts = retry_attempts
self.headers = {{
'Content-Type': 'application/json',
'User-Agent': 'AuthFramework-Python-SDK/1.0.0'
}}
if access_token:
self.headers['Authorization'] = f'Bearer {{access_token}}'
if api_key:
self.headers['X-API-Key'] = api_key
def set_access_token(self, token: str) -> None:
"""Set authentication token"""
self.headers['Authorization'] = f'Bearer {{token}}'
def clear_access_token(self) -> None:
"""Clear authentication token"""
self.headers.pop('Authorization', None)
def set_api_key(self, api_key: str) -> None:
"""Set API key"""
self.headers['X-API-Key'] = api_key
def _make_request_sync(self, method: str, path: str, data: Optional[Dict] = None,
headers: Optional[Dict[str, str]] = None) -> ApiResponse:
"""Make synchronous HTTP request with retry logic"""
url = urljoin(self.base_url, path)
request_headers = {{**self.headers, **(headers or {{}})}}
last_error = None
for attempt in range(self.retry_attempts + 1):
try:
response = requests.request(
method=method,
url=url,
headers=request_headers,
json=data,
timeout=self.timeout
)
response_data = response.json() if response.content else {{}}
if not response.ok:
raise HttpError(
status=response.status_code,
message=response.reason or 'Request failed',
data=response_data
)
return ApiResponse(
success=response_data.get('success', True),
data=response_data.get('data'),
error=response_data.get('error'),
message=response_data.get('message')
)
except (requests.RequestException, HttpError) as e:
last_error = e
# Don't retry on client errors (4xx)
if isinstance(e, HttpError) and 400 <= e.status < 500:
raise e
# Wait before retry (exponential backoff)
if attempt < self.retry_attempts:
time.sleep(2 ** attempt)
raise last_error or Exception('Request failed after all retries')
async def _make_request_async(self, method: str, path: str, data: Optional[Dict] = None,
headers: Optional[Dict[str, str]] = None) -> ApiResponse:
"""Make asynchronous HTTP request with retry logic"""
url = urljoin(self.base_url, path)
request_headers = {{**self.headers, **(headers or {{}})}}
last_error = None
for attempt in range(self.retry_attempts + 1):
try:
timeout = aiohttp.ClientTimeout(total=self.timeout)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.request(
method=method,
url=url,
headers=request_headers,
json=data
) as response:
response_data = await response.json() if response.content_length else {{}}
if not response.ok:
raise HttpError(
status=response.status,
message=response.reason or 'Request failed',
data=response_data
)
return ApiResponse(
success=response_data.get('success', True),
data=response_data.get('data'),
error=response_data.get('error'),
message=response_data.get('message')
)
except (aiohttp.ClientError, HttpError) as e:
last_error = e
# Don't retry on client errors (4xx)
if isinstance(e, HttpError) and 400 <= e.status < 500:
raise e
# Wait before retry (exponential backoff)
if attempt < self.retry_attempts:
await asyncio.sleep(2 ** attempt)
raise last_error or Exception('Request failed after all retries')
def get(self, path: str, headers: Optional[Dict[str, str]] = None) -> ApiResponse:
"""Synchronous GET request"""
return self._make_request_sync('GET', path, headers=headers)
def post(self, path: str, data: Optional[Dict] = None, headers: Optional[Dict[str, str]] = None) -> ApiResponse:
"""Synchronous POST request"""
return self._make_request_sync('POST', path, data=data, headers=headers)
def put(self, path: str, data: Optional[Dict] = None, headers: Optional[Dict[str, str]] = None) -> ApiResponse:
"""Synchronous PUT request"""
return self._make_request_sync('PUT', path, data=data, headers=headers)
def delete(self, path: str, headers: Optional[Dict[str, str]] = None) -> ApiResponse:
"""Synchronous DELETE request"""
return self._make_request_sync('DELETE', path, headers=headers)
async def get_async(self, path: str, headers: Optional[Dict[str, str]] = None) -> ApiResponse:
"""Asynchronous GET request"""
return await self._make_request_async('GET', path, headers=headers)
async def post_async(self, path: str, data: Optional[Dict] = None, headers: Optional[Dict[str, str]] = None) -> ApiResponse:
"""Asynchronous POST request"""
return await self._make_request_async('POST', path, data=data, headers=headers)
async def put_async(self, path: str, data: Optional[Dict] = None, headers: Optional[Dict[str, str]] = None) -> ApiResponse:
"""Asynchronous PUT request"""
return await self._make_request_async('PUT', path, data=data, headers=headers)
async def delete_async(self, path: str, headers: Optional[Dict[str, str]] = None) -> ApiResponse:
"""Asynchronous DELETE request"""
return await self._make_request_async('DELETE', path, headers=headers)
"#,
self.config.client_name,
if self.config.type_hints { ", List" } else { "" },
self.config.client_name
);
Ok(client_code)
}
fn generate_types(&self) -> Result<String, Box<dyn std::error::Error>> {
let types_code = r#"""
Type definitions for AuthFramework RBAC
"""
from typing import Dict, List, Optional, Union, Any
from datetime import datetime
from dataclasses import dataclass
from enum import Enum
class TimeOfDay(Enum):
"""Time of day classification"""
BUSINESS_HOURS = "business_hours"
AFTER_HOURS = "after_hours"
WEEKEND = "weekend"
HOLIDAY = "holiday"
class DayType(Enum):
"""Day type classification"""
WEEKDAY = "weekday"
WEEKEND = "weekend"
HOLIDAY = "holiday"
class DeviceType(Enum):
"""Device type classification"""
DESKTOP = "desktop"
MOBILE = "mobile"
TABLET = "tablet"
UNKNOWN = "unknown"
class ConnectionType(Enum):
"""Connection type classification"""
DIRECT = "direct"
VPN = "vpn"
PROXY = "proxy"
TOR = "tor"
CORPORATE = "corporate"
UNKNOWN = "unknown"
class SecurityLevel(Enum):
"""Security level assessment"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class Role:
"""Role definition"""
id: str
name: str
description: Optional[str] = None
parent_id: Optional[str] = None
permissions: List[str] = None
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
def __post_init__(self):
if self.permissions is None:
self.permissions = []
@dataclass
class Permission:
"""Permission definition"""
id: str
action: str
resource: str
conditions: Optional[Dict[str, str]] = None
created_at: Optional[datetime] = None
@dataclass
class UserRole:
"""User role assignment"""
role_id: str
role_name: str
assigned_at: datetime
assigned_by: Optional[str] = None
expires_at: Optional[datetime] = None
@dataclass
class UserRolesResponse:
"""User roles and effective permissions"""
user_id: str
roles: List[UserRole]
effective_permissions: List[str]
@dataclass
class CreateRoleRequest:
"""Request to create a new role"""
name: str
description: Optional[str] = None
parent_id: Optional[str] = None
permissions: Optional[List[str]] = None
@dataclass
class UpdateRoleRequest:
"""Request to update an existing role"""
name: Optional[str] = None
description: Optional[str] = None
parent_id: Optional[str] = None
@dataclass
class AssignRoleRequest:
"""Request to assign a role to a user"""
role_id: str
expires_at: Optional[datetime] = None
reason: Optional[str] = None
@dataclass
class BulkAssignment:
"""Single assignment in bulk operation"""
user_id: str
role_id: str
expires_at: Optional[datetime] = None
@dataclass
class BulkAssignRequest:
"""Request for bulk role assignment"""
assignments: List[BulkAssignment]
@dataclass
class ElevateRoleRequest:
"""Request for role elevation"""
target_role: str
duration_minutes: Optional[int] = None
justification: str = ""
@dataclass
class PermissionCheckRequest:
"""Request to check permissions"""
action: str
resource: str
context: Optional[Dict[str, str]] = None
@dataclass
class PermissionCheckResponse:
"""Response from permission check"""
granted: bool
reason: str
required_roles: List[str]
missing_permissions: List[str]
@dataclass
class AuditEntry:
"""Audit log entry"""
id: str
user_id: Optional[str]
action: str
resource: Optional[str]
result: str
context: Dict[str, str]
timestamp: datetime
@dataclass
class AuditLogResponse:
"""Response containing audit logs"""
entries: List[AuditEntry]
total_count: int
page: int
per_page: int
@dataclass
class AuditQuery:
"""Query parameters for audit logs"""
user_id: Optional[str] = None
action: Optional[str] = None
resource: Optional[str] = None
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
page: Optional[int] = None
per_page: Optional[int] = None
@dataclass
class ConditionalContext:
"""Context for conditional permissions"""
time_of_day: Optional[TimeOfDay] = None
day_type: Optional[DayType] = None
device_type: Optional[DeviceType] = None
connection_type: Optional[ConnectionType] = None
security_level: Optional[SecurityLevel] = None
risk_score: Optional[int] = None
ip_address: Optional[str] = None
user_agent: Optional[str] = None
custom_attributes: Optional[Dict[str, str]] = None
@dataclass
class RoleListQuery:
"""Query parameters for listing roles"""
page: Optional[int] = None
per_page: Optional[int] = None
parent_id: Optional[str] = None
include_permissions: Optional[bool] = None
"#;
Ok(types_code.to_string())
}
fn generate_rbac_module(&self) -> Result<String, Box<dyn std::error::Error>> {
let rbac_code = format!(
r#"""
RBAC (Role-Based Access Control) Module
Provides comprehensive role and permission management
"""
from typing import Dict, List, Optional, Union
{}
from .types import (
Role, CreateRoleRequest, UpdateRoleRequest, UserRolesResponse,
AssignRoleRequest, BulkAssignRequest, ElevateRoleRequest,
PermissionCheckRequest, PermissionCheckResponse, RoleListQuery
)
from .client import ApiResponse
class RbacManager:
"""RBAC management client"""
def __init__(self, client):
self.client = client
# ============================================================================
# ROLE MANAGEMENT
# ============================================================================
def create_role(self, request: CreateRoleRequest) -> ApiResponse:
"""Create a new role"""
data = {{
'name': request.name,
'description': request.description,
'parent_id': request.parent_id,
'permissions': request.permissions
}}
return self.client.post('/{}/rbac/roles', data)
def get_role(self, role_id: str) -> ApiResponse:
"""Get role by ID"""
return self.client.get(f'/{}/rbac/roles/{{role_id}}')
def list_roles(self, query: Optional[RoleListQuery] = None) -> ApiResponse:
"""List all roles with pagination"""
path = '/{}/rbac/roles'
if query:
params = []
if query.page is not None:
params.append(f'page={{query.page}}')
if query.per_page is not None:
params.append(f'per_page={{query.per_page}}')
if query.parent_id is not None:
params.append(f'parent_id={{query.parent_id}}')
if query.include_permissions is not None:
params.append(f'include_permissions={{str(query.include_permissions).lower()}}')
if params:
path += '?' + '&'.join(params)
return self.client.get(path)
def update_role(self, role_id: str, request: UpdateRoleRequest) -> ApiResponse:
"""Update an existing role"""
data = {{}}
if request.name is not None:
data['name'] = request.name
if request.description is not None:
data['description'] = request.description
if request.parent_id is not None:
data['parent_id'] = request.parent_id
return self.client.put(f'/{}/rbac/roles/{{role_id}}', data)
def delete_role(self, role_id: str) -> ApiResponse:
"""Delete a role"""
return self.client.delete(f'/{}/rbac/roles/{{role_id}}')
# ============================================================================
# USER ROLE ASSIGNMENTS
# ============================================================================
def assign_user_role(self, user_id: str, request: AssignRoleRequest) -> ApiResponse:
"""Assign role to user"""
data = {{
'role_id': request.role_id,
'expires_at': request.expires_at.isoformat() if request.expires_at else None,
'reason': request.reason
}}
return self.client.post(f'/{}/rbac/users/{{user_id}}/roles', data)
def revoke_user_role(self, user_id: str, role_id: str) -> ApiResponse:
"""Revoke role from user"""
return self.client.delete(f'/{}/rbac/users/{{user_id}}/roles/{{role_id}}')
def get_user_roles(self, user_id: str) -> ApiResponse:
"""Get user's roles and effective permissions"""
return self.client.get(f'/{}/rbac/users/{{user_id}}/roles')
def bulk_assign_roles(self, request: BulkAssignRequest) -> ApiResponse:
"""Bulk assign roles to multiple users"""
data = {{
'assignments': [
{{
'user_id': assignment.user_id,
'role_id': assignment.role_id,
'expires_at': assignment.expires_at.isoformat() if assignment.expires_at else None
}}
for assignment in request.assignments
]
}}
return self.client.post('/{}/rbac/bulk/assign', data)
# ============================================================================
# PERMISSION CHECKING
# ============================================================================
def check_permission(self, request: PermissionCheckRequest) -> ApiResponse:
"""Check if current user has permission"""
data = {{
'action': request.action,
'resource': request.resource,
'context': request.context
}}
return self.client.post('/{}/rbac/check-permission', data)
def has_permission(self, action: str, resource: str, context: Optional[Dict[str, str]] = None) -> bool:
"""Quick permission check (returns boolean)"""
try:
request = PermissionCheckRequest(action=action, resource=resource, context=context)
response = self.check_permission(request)
return response.data and response.data.get('granted', False)
except Exception:
return False
def elevate_role(self, request: ElevateRoleRequest) -> ApiResponse:
"""Request role elevation"""
data = {{
'target_role': request.target_role,
'duration_minutes': request.duration_minutes,
'justification': request.justification
}}
return self.client.post('/{}/rbac/elevate', data)
# ============================================================================
# ASYNC METHODS
# ============================================================================
async def create_role_async(self, request: CreateRoleRequest) -> ApiResponse:
"""Create a new role (async)"""
data = {{
'name': request.name,
'description': request.description,
'parent_id': request.parent_id,
'permissions': request.permissions
}}
return await self.client.post_async('/{}/rbac/roles', data)
async def get_role_async(self, role_id: str) -> ApiResponse:
"""Get role by ID (async)"""
return await self.client.get_async(f'/{}/rbac/roles/{{role_id}}')
async def has_permission_async(self, action: str, resource: str, context: Optional[Dict[str, str]] = None) -> bool:
"""Quick permission check (async, returns boolean)"""
try:
request = PermissionCheckRequest(action=action, resource=resource, context=context)
data = {{
'action': request.action,
'resource': request.resource,
'context': request.context
}}
response = await self.client.post_async('/{}/rbac/check-permission', data)
return response.data and response.data.get('granted', False)
except Exception:
return False
# ============================================================================
# CONVENIENCE METHODS
# ============================================================================
def user_has_any_role(self, user_id: str, role_names: List[str]) -> bool:
"""Check if user has any of the specified roles"""
try:
response = self.get_user_roles(user_id)
if not response.data:
return False
user_role_names = [role['role_name'] for role in response.data.get('roles', [])]
return any(role in user_role_names for role in role_names)
except Exception:
return False
def user_has_all_roles(self, user_id: str, role_names: List[str]) -> bool:
"""Check if user has all of the specified roles"""
try:
response = self.get_user_roles(user_id)
if not response.data:
return False
user_role_names = [role['role_name'] for role in response.data.get('roles', [])]
return all(role in user_role_names for role in role_names)
except Exception:
return False
def get_role_hierarchy(self) -> Dict[str, List[str]]:
"""Get role hierarchy (parent-child relationships)"""
try:
response = self.list_roles(RoleListQuery(include_permissions=False))
if not response.data:
return {{}}
hierarchy = {{}}
for role in response.data:
parent_id = role.get('parent_id')
if parent_id:
if parent_id not in hierarchy:
hierarchy[parent_id] = []
hierarchy[parent_id].append(role['id'])
return hierarchy
except Exception:
return {{}}
def get_child_roles(self, parent_role_id: str) -> List[Role]:
"""Get all child roles for a given parent role"""
try:
response = self.list_roles(RoleListQuery(parent_id=parent_role_id))
return response.data or []
except Exception:
return []
"#,
"", self.config.version,
self.config.version,
self.config.version,
self.config.version,
self.config.version,
self.config.version,
self.config.version,
self.config.version,
self.config.version,
self.config.version,
self.config.version,
self.config.version,
self.config.version,
self.config.version
);
Ok(rbac_code)
}
fn generate_conditional_module(&self) -> Result<String, Box<dyn std::error::Error>> {
Ok("# Conditional permissions module - placeholder".to_string())
}
fn generate_audit_module(&self) -> Result<String, Box<dyn std::error::Error>> {
Ok("# Audit module - placeholder".to_string())
}
fn generate_utils(&self) -> Result<String, Box<dyn std::error::Error>> {
Ok("# Utils module - placeholder".to_string())
}
fn generate_init(&self) -> Result<String, Box<dyn std::error::Error>> {
Ok("# Package init - placeholder".to_string())
}
fn generate_setup(&self) -> Result<String, Box<dyn std::error::Error>> {
Ok("# Setup.py - placeholder".to_string())
}
fn generate_requirements(&self) -> Result<String, Box<dyn std::error::Error>> {
Ok("aiohttp>=3.8.0\nrequests>=2.28.0".to_string())
}
fn generate_readme(&self) -> Result<String, Box<dyn std::error::Error>> {
Ok("# AuthFramework Python SDK - placeholder".to_string())
}
}