import boto3
import json
import sys
from botocore.exceptions import ClientError
def main():
try:
session = boto3.Session()
print("session started")
ec2 = session.client('ec2')
regions = [r['RegionName'] for r in ec2.describe_regions()['Regions']]
print("regions", regions)
results = []
for region in regions:
try:
regional_ec2 = session.client('ec2', region_name=region)
instances_response = regional_ec2.describe_instances(
Filters=[{'Name': 'instance-state-name', 'Values': ['running']}]
)
instance_ids = [
instance['InstanceId']
for reservation in instances_response.get('Reservations', [])
for instance in reservation.get('Instances', [])
]
if instance_ids:
volumes_response = regional_ec2.describe_volumes(
Filters=[{
'Name': 'attachment.instance-id',
'Values': instance_ids
}]
)
for volume in volumes_response.get('Volumes', []):
for attachment in volume.get('Attachments', []):
volume_details = {
'volume_id': volume.get('VolumeId', 'unknown'),
'region': region,
'instance_id': attachment.get('InstanceId', 'unknown'),
'device': attachment.get('Device', 'unknown'),
'state': attachment.get('State', 'unknown'),
'volume_type': volume.get('VolumeType', 'unknown'),
'size_gb': volume.get('Size', 0),
'encrypted': volume.get('Encrypted', False)
}
results.append(volume_details)
except ClientError:
continue
output = {
'status': 'success',
'query': 'EBS volumes attached to live EC2 instances',
'count': len(results),
'results': results
}
print(json.dumps(output, indent=2))
sys.stdout.flush()
sys.exit(0)
except Exception as e:
error_output = {
'status': 'error',
'error': str(e),
'query': 'EBS volumes attached to live EC2 instances'
}
print(json.dumps(error_output, indent=2))
sys.stdout.flush()
sys.exit(1)
if __name__ == '__main__':
main()