import omniparse
import time
import sys
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Tuple, Optional
def process_file(file_path: str) -> Tuple[str, Optional[dict], Optional[str]]:
try:
result = omniparse.extract_from_path(file_path)
summary = {
'mime_type': result.mime_type,
'confidence': result.detection_confidence,
'content_length': len(result.content) if result.content else 0,
'metadata_fields': len(result.metadata) if result.metadata else 0,
}
return (file_path, summary, None)
except Exception as e:
return (file_path, None, str(e))
def process_sequential(files: List[str]) -> List[Tuple[str, Optional[dict], Optional[str]]]:
results = []
for file_path in files:
results.append(process_file(file_path))
return results
def process_parallel(files: List[str], max_workers: int = 4) -> List[Tuple[str, Optional[dict], Optional[str]]]:
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_file = {executor.submit(process_file, file_path): file_path
for file_path in files}
for future in as_completed(future_to_file):
results.append(future.result())
return results
def process_parallel_with_progress(files: List[str], max_workers: int = 4) -> List[Tuple[str, Optional[dict], Optional[str]]]:
results = []
total = len(files)
completed = 0
print(f"Processing {total} files with {max_workers} workers...")
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_file = {executor.submit(process_file, file_path): file_path
for file_path in files}
for future in as_completed(future_to_file):
result = future.result()
results.append(result)
completed += 1
file_path, summary, error = result
status = "✓" if summary else "✗"
print(f"[{completed}/{total}] {status} {Path(file_path).name}")
return results
def display_results(results: List[Tuple[str, Optional[dict], Optional[str]]], duration: float):
successful = sum(1 for _, summary, _ in results if summary is not None)
failed = len(results) - successful
print(f"\n{'='*60}")
print(f"Processing Summary")
print('='*60)
print(f"Total files: {len(results)}")
print(f"Successful: {successful}")
print(f"Failed: {failed}")
print(f"Duration: {duration:.2f} seconds")
print(f"Throughput: {len(results)/duration:.2f} files/second")
if failed > 0:
print(f"\nFailed files:")
for file_path, _, error in results:
if error:
print(f" ✗ {Path(file_path).name}: {error}")
print(f"\nSuccessful extractions (sample):")
count = 0
for file_path, summary, _ in results:
if summary and count < 5:
print(f" ✓ {Path(file_path).name}")
print(f" MIME: {summary['mime_type']}")
print(f" Confidence: {summary['confidence']:.2%}")
print(f" Content length: {summary['content_length']} chars")
print(f" Metadata fields: {summary['metadata_fields']}")
count += 1
def collect_test_files() -> List[str]:
test_dirs = [
"test_data/document",
"test_data/text",
"test_data/image",
"test_data/archive",
]
files = []
for test_dir in test_dirs:
dir_path = Path(test_dir)
if dir_path.exists():
for file_path in dir_path.glob("*"):
if file_path.is_file() and not file_path.name.startswith('.'):
files.append(str(file_path))
return files
def main():
print("Omniparse Python Bindings - Batch Processing Example\n")
files = collect_test_files()
if not files:
print("No test files found. Please ensure test_data directory exists.")
sys.exit(1)
print(f"Found {len(files)} test files\n")
print(f"{'='*60}")
print("Example 1: Sequential Processing")
print('='*60)
start_time = time.time()
results_seq = process_sequential(files)
duration_seq = time.time() - start_time
display_results(results_seq, duration_seq)
print(f"\n{'='*60}")
print("Example 2: Parallel Processing (4 workers)")
print('='*60)
start_time = time.time()
results_par = process_parallel(files, max_workers=4)
duration_par = time.time() - start_time
display_results(results_par, duration_par)
speedup = duration_seq / duration_par if duration_par > 0 else 0
print(f"\nSpeedup: {speedup:.2f}x faster than sequential")
print(f"\n{'='*60}")
print("Example 3: Parallel Processing with Progress Tracking")
print('='*60)
start_time = time.time()
results_progress = process_parallel_with_progress(files, max_workers=4)
duration_progress = time.time() - start_time
print(f"\nCompleted in {duration_progress:.2f} seconds")
print(f"\n{'='*60}")
print("Example 4: Performance Comparison (Different Worker Counts)")
print('='*60)
worker_counts = [1, 2, 4, 8]
for workers in worker_counts:
start_time = time.time()
results = process_parallel(files, max_workers=workers)
duration = time.time() - start_time
throughput = len(results) / duration
print(f"Workers: {workers:2d} | Duration: {duration:6.2f}s | Throughput: {throughput:6.2f} files/s")
print(f"\n{'='*60}")
print("Example 5: Processing Specific File Types")
print('='*60)
doc_files = [f for f in files if 'document' in f]
if doc_files:
print(f"\nProcessing {len(doc_files)} document files...")
start_time = time.time()
results_docs = process_parallel(doc_files, max_workers=4)
duration_docs = time.time() - start_time
display_results(results_docs, duration_docs)
print(f"\n{'='*60}")
print("Batch processing examples complete!")
print('='*60)
if __name__ == "__main__":
main()