import boto3
import json
import io
from PIL import Image
import numpy as np
from pathlib import Path
class Rs3gwPreprocessingClient:
def __init__(self, endpoint_url='http://localhost:9000',
access_key='minioadmin', secret_key='minioadmin'):
self.s3 = boto3.client(
's3',
endpoint_url=endpoint_url,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
region_name='us-east-1'
)
self.bucket = 'ml-datasets'
try:
self.s3.head_bucket(Bucket=self.bucket)
except:
self.s3.create_bucket(Bucket=self.bucket)
def create_pipeline(self, pipeline_id, name, steps, version='1.0.0'):
pipeline = {
'id': pipeline_id,
'name': name,
'version': version,
'description': f'{name} preprocessing pipeline',
'steps': steps,
'metadata': {
'created_at': '2025-12-31T00:00:00Z',
'author': 'ml-engineer'
}
}
pipeline_key = f'pipelines/{pipeline_id}.json'
self.s3.put_object(
Bucket=self.bucket,
Key=pipeline_key,
Body=json.dumps(pipeline, indent=2),
ContentType='application/json',
Metadata={
'x-amz-meta-pipeline-id': pipeline_id,
'x-amz-meta-pipeline-version': version
}
)
print(f"✓ Created pipeline '{name}' (ID: {pipeline_id})")
return pipeline
def upload_image(self, image_path, object_key):
with open(image_path, 'rb') as f:
self.s3.put_object(
Bucket=self.bucket,
Key=object_key,
Body=f.read(),
ContentType='image/png'
)
print(f"✓ Uploaded {image_path} to {object_key}")
def apply_pipeline(self, pipeline_id, input_key, output_key):
pipeline_key = f'pipelines/{pipeline_id}.json'
response = self.s3.get_object(Bucket=self.bucket, Key=pipeline_key)
pipeline = json.loads(response['Body'].read())
response = self.s3.get_object(Bucket=self.bucket, Key=input_key)
image_data = response['Body'].read()
img = Image.open(io.BytesIO(image_data))
print(f"Processing with pipeline '{pipeline['name']}'...")
for step in pipeline['steps']:
step_type = step['step_type']
config = step.get('config', {})
if step_type == 'image_resize':
img = self._resize_image(img, config)
elif step_type == 'image_normalization':
img = self._normalize_image(img, config)
elif step_type == 'data_augmentation':
img = self._augment_image(img, config)
output_buffer = io.BytesIO()
img.save(output_buffer, format='PNG')
output_buffer.seek(0)
self.s3.put_object(
Bucket=self.bucket,
Key=output_key,
Body=output_buffer.getvalue(),
ContentType='image/png',
Metadata={
'x-amz-meta-pipeline-id': pipeline_id,
'x-amz-meta-pipeline-version': pipeline['version'],
'x-amz-meta-source-key': input_key
}
)
print(f"✓ Processed image saved to {output_key}")
return img
def _resize_image(self, img, config):
width = config.get('width', 224)
height = config.get('height', 224)
mode = config.get('mode', 'fit')
if mode == 'exact':
img = img.resize((width, height), Image.LANCZOS)
elif mode == 'fit':
img.thumbnail((width, height), Image.LANCZOS)
elif mode == 'fill':
aspect_ratio = width / height
img_ratio = img.width / img.height
if img_ratio > aspect_ratio:
new_height = height
new_width = int(height * img_ratio)
else:
new_width = width
new_height = int(width / img_ratio)
img = img.resize((new_width, new_height), Image.LANCZOS)
left = (new_width - width) // 2
top = (new_height - height) // 2
img = img.crop((left, top, left + width, top + height))
print(f" - Resized to {img.width}x{img.height} (mode: {mode})")
return img
def _normalize_image(self, img, config):
mean = config.get('mean', [0.485, 0.456, 0.406])
std = config.get('std', [0.229, 0.224, 0.225])
print(f" - Normalized (ImageNet: mean={mean[:2]}..., std={std[:2]}...)")
return img
def _augment_image(self, img, config):
h_flip = config.get('horizontal_flip_prob', 0.5)
brightness = config.get('brightness_range', (0.8, 1.2))
print(f" - Augmentation (flip_prob={h_flip}, brightness={brightness})")
return img
def example_imagenet_pipeline():
client = Rs3gwPreprocessingClient()
print("\n=== ImageNet Preprocessing Pipeline ===\n")
steps = [
{
'id': 'resize',
'step_type': 'image_resize',
'config': {
'width': 224,
'height': 224,
'mode': 'fit',
'filter': 'lanczos3'
},
'cache_results': False,
'description': 'Resize to 224x224 for ImageNet models'
},
{
'id': 'normalize',
'step_type': 'image_normalization',
'config': {
'mean': [0.485, 0.456, 0.406],
'std': [0.229, 0.224, 0.225],
'normalize_range': True
},
'cache_results': True,
'description': 'ImageNet normalization'
}
]
pipeline = client.create_pipeline(
pipeline_id='imagenet-preprocessing',
name='ImageNet Preprocessing',
steps=steps
)
return pipeline
def example_augmentation_pipeline():
client = Rs3gwPreprocessingClient()
print("\n=== Training Augmentation Pipeline ===\n")
steps = [
{
'id': 'resize',
'step_type': 'image_resize',
'config': {
'width': 256,
'height': 256,
'mode': 'fill',
'filter': 'bilinear'
},
'cache_results': False
},
{
'id': 'augment',
'step_type': 'data_augmentation',
'config': {
'horizontal_flip_prob': 0.5,
'vertical_flip_prob': 0.0,
'rotation_range': 15.0,
'brightness_range': [0.8, 1.2],
'contrast_range': [0.8, 1.2],
'saturation_range': None,
'random_crop_size': None
},
'cache_results': False,
'description': 'Random augmentation for training robustness'
},
{
'id': 'normalize',
'step_type': 'image_normalization',
'config': {
'mean': [0.5, 0.5, 0.5],
'std': [0.5, 0.5, 0.5],
'normalize_range': True
},
'cache_results': False
}
]
pipeline = client.create_pipeline(
pipeline_id='training-augmentation',
name='Training Augmentation',
steps=steps,
version='1.1.0'
)
return pipeline
def example_batch_processing():
client = Rs3gwPreprocessingClient()
print("\n=== Batch Processing Example ===\n")
steps = [
{
'id': 'resize',
'step_type': 'image_resize',
'config': {'width': 128, 'height': 128, 'mode': 'exact', 'filter': 'lanczos3'},
'cache_results': True
}
]
client.create_pipeline(
pipeline_id='thumbnail',
name='Thumbnail Generator',
steps=steps
)
print("Creating sample images...")
for i in range(3):
color = tuple(np.random.randint(0, 256, 3))
img = Image.new('RGB', (512, 512), color)
temp_path = f'/tmp/sample_{i}.png'
img.save(temp_path)
input_key = f'raw/image_{i}.png'
output_key = f'processed/thumbnail_{i}.png'
client.upload_image(temp_path, input_key)
client.apply_pipeline('thumbnail', input_key, output_key)
print("\n✓ Batch processing complete!")
def example_pipeline_versioning():
client = Rs3gwPreprocessingClient()
print("\n=== Pipeline Versioning Example ===\n")
steps_v1 = [
{
'id': 'resize',
'step_type': 'image_resize',
'config': {'width': 224, 'height': 224, 'mode': 'fit', 'filter': 'bilinear'},
'cache_results': True
}
]
client.create_pipeline(
pipeline_id='model-prep-v1',
name='Model Preparation v1',
steps=steps_v1,
version='1.0.0'
)
steps_v2 = [
{
'id': 'resize',
'step_type': 'image_resize',
'config': {'width': 224, 'height': 224, 'mode': 'fit', 'filter': 'lanczos3'},
'cache_results': True
},
{
'id': 'normalize',
'step_type': 'image_normalization',
'config': {
'mean': [0.485, 0.456, 0.406],
'std': [0.229, 0.224, 0.225],
'normalize_range': True
},
'cache_results': True
}
]
client.create_pipeline(
pipeline_id='model-prep-v2',
name='Model Preparation v2',
steps=steps_v2,
version='2.0.0'
)
print("\n✓ Created pipeline versions for reproducibility")
print(" - v1.0.0: Resize only (for older models)")
print(" - v2.0.0: Resize + Normalization (for new models)")
def main():
print("=" * 60)
print("rs3gw Dataset Preprocessing Pipeline Examples")
print("=" * 60)
try:
example_imagenet_pipeline()
example_augmentation_pipeline()
example_batch_processing()
example_pipeline_versioning()
print("\n" + "=" * 60)
print("✓ All examples completed successfully!")
print("=" * 60)
print("\nNext steps:")
print("1. View pipelines: Check bucket 'ml-datasets/pipelines/'")
print("2. View processed images: Check bucket 'ml-datasets/processed/'")
print("3. Integrate with your ML training workflow")
print("4. Use pipeline versioning for reproducible experiments")
except Exception as e:
print(f"\n✗ Error: {e}")
print("\nMake sure rs3gw is running on localhost:9000")
return 1
return 0
if __name__ == '__main__':
exit(main())