# Examples Cookbook
Copy-pasteable recipes for common dataprof tasks, organized by interface.
These examples focus on the current Rust crate and Python package; pre-0.8 CLI
usage lives in the archived guide.
## Python Recipes
### Basic file profiling
```python
import dataprof as dp
report = dp.profile("data.csv")
print(f"{report.rows} rows, {report.columns} columns")
print(f"Quality: {report.quality_score}")
```
### Access columns directly
```python
import dataprof as dp
report = dp.profile("data.csv")
# Dict-like access
col = report["age"]
print(f"mean={col.mean}, nulls={col.null_percentage}%")
# Check if a column exists
if "email" in report:
print(f"email patterns: {report['email'].patterns}")
# Iterate column names
for name in report:
print(name, report[name].data_type)
```
### Profile a pandas DataFrame
```python
import pandas as pd
import dataprof as dp
df = pd.read_csv("data.csv")
report = dp.profile(df)
for name in report:
col = report[name]
print(f"{col.name}: {col.data_type}, {col.null_percentage:.1f}% null")
```
### Profile ad-hoc notebook data
```python
import dataprof as dp
# Column-oriented scratch data, like pandas.DataFrame({...})
report = dp.profile({"age": [31, 42, 29], "city": ["Rome", "Milan", "Rome"]})
# Row-oriented records, common after API calls or lightweight transforms
report = dp.profile([
{"age": 31, "city": "Rome"},
{"age": 42, "city": "Milan"},
])
print(report.rows, report.columns, report["city"].unique_count)
```
### Profile in-memory file bytes
```python
import io
import dataprof as dp
csv_bytes = b"age,city\n31,Rome\n42,Milan\n"
# The sync byte path needs an explicit format because there is no filename.
report = dp.profile(io.BytesIO(csv_bytes), format="csv")
```
### Profile a Polars DataFrame
```python
import polars as pl
import dataprof as dp
df = pl.read_csv("data.csv")
report = dp.profile(df)
```
### Profile a PyArrow table
```python
import pyarrow.parquet as pq
import dataprof as dp
table = pq.read_table("data.parquet")
report = dp.profile(table)
```
### Use sampling to limit processing
```python
import dataprof as dp
from dataprof import SamplingStrategy
# Reservoir sampling: statistically representative subset
report = dp.profile("huge.csv", sampling=SamplingStrategy.reservoir(50000))
print(f"Sampled {report.rows} rows, sampling_ratio: {report.sampling_ratio}")
# Systematic: every 10th row
report = dp.profile("huge.csv", sampling=SamplingStrategy.systematic(10))
```
### Early termination with stop conditions
```python
from dataprof import StopCondition
# Stop after 10k rows or 50 MB
stop = StopCondition.max_rows(10000) | StopCondition.max_bytes(50_000_000)
report = dp.profile("stream.csv", stop_condition=stop)
print(f"Truncated: {report.truncation_reason}")
```
### Selective profiling for speed
```python
# Compute only basic statistics, skip patterns and quality
report = dp.profile("huge.csv", metrics=["schema", "statistics"])
# Use the Profiler builder
report = (dp.Profiler()
.metrics(["schema", "statistics"])
.max_rows(10000)
.profile("huge.csv"))
```
### Boolean Data Analysis
```python
report = dp.profile("data.csv")
active_col = report["is_active"]
if active_col.data_type == "boolean":
print(f"True: {active_col.true_count} ({active_col.true_ratio:.1%})")
print(f"False: {active_col.false_count}")
```
### Progress callbacks
```python
import dataprof as dp
def show_progress(event):
if event.percentage is not None:
print(f"\r{event.percentage:.0f}% ({event.rows_processed} rows)", end="")
report = dp.profile("large.csv", on_progress=show_progress)
print() # newline after progress
```
### Selective quality dimensions
```python
# Only compute completeness and uniqueness (faster)
report = dp.profile("data.csv", quality_dimensions=["completeness", "uniqueness"])
print(report.quality.completeness) # has data
print(report.quality.consistency) # None (not requested)
```
### Quick pandas-like summary with describe()
```python
import dataprof as dp
report = dp.profile("data.csv")
print(report.describe())
# col_a col_b col_c
# count 1000 1000 1000
# null% 0.0 2.1 0.0
# unique 45 800 3
# mean 34.5 None None
# std 12.1 None None
# ...
```
### Export to pandas, polars, or Arrow
```python
import dataprof as dp
report = dp.profile("data.csv")
# pandas DataFrame with all stats
df = report.to_dataframe()
print(df[["name", "data_type", "null_percentage", "mean", "std_dev"]])
# polars DataFrame (no pandas needed)
pl_df = report.to_polars()
high_null = pl_df.filter(pl.col("null_percentage") > 5.0)
# PyArrow Table (no pandas needed)
table = report.to_arrow()
```
### Save and reload reports
```python
import json
import dataprof as dp
report = dp.profile("data.csv")
# Save as JSON (full report)
report.save("report.json")
# Save column profiles as CSV (no extra deps)
report.save("profiles.csv")
# Save column profiles as Parquet (requires pyarrow)
report.save("profiles.parquet")
# Later: reload JSON and inspect
with open("report.json") as f:
data = json.load(f)
print(data["execution"]["rows_processed"])
```
### Track quality over time
```python
from pathlib import Path
import pandas as pd
import dataprof as dp
files = sorted(Path("warehouse/daily/").glob("orders_*.csv"))
rows = [dp.profile(str(f)).quality_summary() for f in files]
history = pd.DataFrame(rows)
print(history[["source", "rows", "quality_score", "completeness"]])
```
### Async file profiling
```python
import asyncio
from dataprof.asyncio import profile_file
async def main():
report = await profile_file("data.csv", max_rows=10000)
print(f"{report.rows} rows")
asyncio.run(main())
```
### Async URL profiling (source build)
Build the Python extension with `python-async,async-streaming` before using this recipe.
```python
import asyncio
from dataprof.asyncio import profile_url
async def main():
report = await profile_url("https://example.com/data.csv", format="csv")
print(f"{report.rows} rows from remote source")
asyncio.run(main())
```
### Database profiling from Python (source build)
Build the Python extension with `python-async,database` and the connector feature you need before using this recipe.
```python
import asyncio
import dataprof as dp
async def main():
report = await dp.analyze_database_async(
"postgres://user:pass@localhost/mydb",
"SELECT * FROM users WHERE active = true",
batch_size=5000,
calculate_quality=True,
)
print(f"{report.rows} rows, quality: {report.quality_score}")
asyncio.run(main())
```
### Fast schema and row count
```python
import dataprof as dp
schema = dp.infer_schema("data.csv")
print(f"Columns: {schema.column_names}")
print(f"Sampled {schema.rows_sampled} rows in {schema.inference_time_ms}ms")
count = dp.quick_row_count("data.parquet")
print(f"{count.count} rows ({'exact' if count.exact else 'estimated'})")
```
---
## Data Engineering Integration Recipes
### Validate incoming data in an ETL pipeline
```python
import sys
import dataprof as dp
report = dp.profile("s3_landing/orders_2024-03-17.csv")
# 0.8.0: bail out early on tiny samples — the score is computed but not
# statistically meaningful below 10 rows.
if report.low_sample_warning:
print(f"WARN: sample too small ({report.rows} rows), skipping quality gate")
sys.exit(0)
score = report.quality_score or 0.0
if score < 90.0:
q = report.quality
failures = []
if q.missing_values_ratio > 10.0:
failures.append(f"missing values: {q.missing_values_ratio:.1f}%")
if q.duplicate_rows > 0:
failures.append(f"{q.duplicate_rows} duplicate rows")
if q.outlier_ratio > 5.0:
failures.append(f"outlier ratio: {q.outlier_ratio:.1f}%")
print(f"REJECTED (score={score:.2f}): {', '.join(failures)}")
sys.exit(1)
print(f"ACCEPTED (score={score:.2f}), {report.rows} rows")
```
### Per-column outlier triage (0.8.0)
The global `accuracy.outlier_ratio` tells you *something* is suspicious, but
per-column `outlier_count` tells you *where* to look.
```python
import dataprof as dp
report = dp.profile("sensor_readings.csv")
# Surface the worst offenders
suspicious = sorted(
(
(name, report[name].outlier_count, report[name].min, report[name].max)
for name in report
if report[name].outlier_count
),
key=lambda row: row[1],
reverse=True,
)
for name, n_outliers, lo, hi in suspicious[:5]:
print(f"{name:30} {n_outliers:>4} outliers range=[{lo}, {hi}]")
```
### Domain hints for IDs and positive values (0.8.0)
```python
report = dp.profile(
"orders.csv",
identifier_columns=["order_id", "customer_id"],
positive_columns=["total_amount"],
)
assert report["order_id"].data_type == "identifier"
print(report.quality.negative_values_in_positive)
```
### Serialize a single column (0.8.0)
For piping individual column profiles into Airflow XCom, Prometheus, a
warehouse, etc., without dragging the whole report along:
```python
import json
import dataprof as dp
report = dp.profile("data.csv")
payload = dp.column_to_dict(report["amount"])
print(json.dumps(payload, indent=2))
```
The shape is identical to one element of ``report.to_dict()["columns"]``.
### Profile before and after a cleaning step
```python
import pandas as pd
import dataprof as dp
df = pd.read_csv("raw_transactions.csv")
raw = dp.profile(df, name="raw_transactions")
# Standard cleaning
df = df.dropna(subset=["account_id", "amount"])
df = df.drop_duplicates(subset=["txn_id"])
df["amount"] = df["amount"].clip(lower=0)
df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")
clean = dp.profile(df, name="clean_transactions")
print(f"Before: {raw.quality_score:.2f} After: {clean.quality_score:.2f}")
print(f"Rows: {raw.rows} -> {clean.rows} ({raw.rows - clean.rows} dropped)")
```
### Monitor table quality across multiple CSVs
```python
from pathlib import Path
import dataprof as dp
files = sorted(Path("warehouse/daily/").glob("orders_*.csv"))
for f in files:
r = dp.profile(str(f), quality_dimensions=["completeness", "uniqueness"])
q = r.quality
print(f"{f.name:40s} rows={r.rows:>8d} "
f"complete={q.complete_records_ratio:.1f}% "
f"dupes={q.duplicate_rows}")
```
### Profile a Polars DataFrame from a Parquet source
```python
import polars as pl
import dataprof as dp
df = pl.scan_parquet("events/*.parquet").collect()
report = dp.profile(df, engine="columnar")
# Turn the profile into a Polars DataFrame natively
profile_df = report.to_polars()
problematic = profile_df.filter(pl.col("null_percentage") > 5.0)
print(f"{len(problematic)} columns with >5% nulls:")
print(problematic.select(["name", "null_percentage", "unique_count"]))
```
### Profile NumPy feature matrices via pandas
```python
import numpy as np
import pandas as pd
import dataprof as dp
X_train = np.load("features/X_train.npy")
feature_names = [f"feat_{i}" for i in range(X_train.shape[1])]
df = pd.DataFrame(X_train, columns=feature_names)
report = dp.profile(df, name="X_train")
# Check for ML-readiness issues
for name in report:
col = report[name]
issues = []
if col.null_count > 0:
issues.append(f"{col.null_percentage:.1f}% null")
if col.std_dev is not None and col.std_dev == 0:
issues.append("zero variance")
if col.skewness is not None and abs(col.skewness) > 2:
issues.append(f"skew={col.skewness:.1f}")
if issues:
print(f" {col.name}: {', '.join(issues)}")
```
### Profile a PyArrow table (zero-copy from Parquet)
```python
import pyarrow.parquet as pq
import dataprof as dp
table = pq.read_table("events.parquet")
report = dp.profile(table)
print(f"{report.rows} rows, {report.columns} columns, {report.execution_time_ms}ms")
```
### Async profiling in a FastAPI endpoint
```python
from fastapi import FastAPI, UploadFile
from dataprof.asyncio import profile_bytes
app = FastAPI()
@app.post("/profile")
async def profile_upload(file: UploadFile):
data = await file.read()
fmt = "csv" if file.filename.endswith(".csv") else "json"
report = await profile_bytes(data, format=fmt, max_rows=100_000)
return {
"rows": report.rows,
"columns": report.columns,
"quality_score": report.quality_score,
"column_profiles": report.to_dict()["columns"],
}
```
### Compare database vs file quality (source build)
Build the Python extension with `python-async,database` and the connector feature you need before using this recipe.
```python
import asyncio
import dataprof as dp
async def main():
# Profile the same logical dataset from two sources
file_report = dp.profile("exports/users.csv")
db_report = await dp.analyze_database_async(
"postgres://ro:pass@prod/app",
"SELECT * FROM users",
calculate_quality=True,
)
print(f"File: {file_report.rows} rows, quality={file_report.quality_score:.2f}")
print(f"DB: {db_report.rows} rows, quality={db_report.quality_score:.2f}")
print(f"Row diff: {abs(file_report.rows - db_report.rows)}")
asyncio.run(main())
```
---
## Rust Recipes
### Basic file profiling
```rust
use dataprof::Profiler;
let report = Profiler::new().analyze_file("data.csv")?;
println!("Rows: {}", report.execution.rows_processed);
println!("Columns: {}", report.execution.columns_detected);
```
### Explicit engine selection
```rust
use dataprof::{Profiler, EngineType};
// Force the incremental (streaming) engine
let report = Profiler::new()
.engine(EngineType::Incremental)
.analyze_file("large.csv")?;
// Force the columnar (Arrow) engine
let report = Profiler::new()
.engine(EngineType::Columnar)
.analyze_file("data.parquet")?;
```
### Streaming with stop conditions
```rust
use dataprof::{Profiler, StopCondition};
let report = Profiler::new()
.stop_when(StopCondition::MaxRows(50_000))
.analyze_file("huge.csv")?;
println!("Exhausted: {}", report.execution.source_exhausted);
println!("Truncation: {:?}", report.execution.truncation_reason);
```
### Custom CSV configuration
```rust
use dataprof::Profiler;
let report = Profiler::new()
.csv_delimiter(b';')
.csv_flexible(true)
.analyze_file("european_data.csv")?;
```
### Sampling
```rust
use dataprof::{Profiler, SamplingStrategy};
let report = Profiler::new()
.sampling(SamplingStrategy::Reservoir { size: 10_000 })
.analyze_file("huge.csv")?;
println!("Sampling applied: {}", report.execution.sampling_applied);
```
### Memory limits
```rust
use dataprof::Profiler;
let report = Profiler::new()
.memory_limit_mb(256)
.analyze_file("data.csv")?;
```
### Progress callbacks
```rust
use dataprof::{Profiler, ProgressSink, ProgressEvent};
use std::sync::Arc;
let report = Profiler::new()
.progress_sink(ProgressSink::Callback(Arc::new(|event: ProgressEvent| {
if let ProgressEvent::ChunkProcessed { rows_processed, percentage, .. } = event {
if let Some(pct) = percentage {
println!("{:.0}% ({} rows)", pct, rows_processed);
}
}
})))
.analyze_file("data.csv")?;
```
### Selective quality dimensions
```rust
use dataprof::{Profiler, QualityDimension};
let report = Profiler::new()
.quality_dimensions(vec![
QualityDimension::Completeness,
QualityDimension::Uniqueness,
])
.analyze_file("data.csv")?;
```
### Schema inference and row counting
```rust
use dataprof::{infer_schema, quick_row_count};
let schema = infer_schema("data.csv")?;
for col in &schema.columns {
println!("{}: {:?}", col.name, col.data_type);
}
let count = quick_row_count("data.parquet")?;
println!("{} rows (exact: {})", count.count, count.exact);
```
### One-liner quality check
```rust
use dataprof::quick_quality_check;
let score = quick_quality_check("data.csv")?;
println!("Quality: {:.1}%", score);
```
### Async stream profiling
```rust
use dataprof::{Profiler, BytesSource, AsyncSourceInfo, FileFormat};
let csv_bytes: bytes::Bytes = download_from_somewhere().await;
let source = BytesSource::new(csv_bytes, AsyncSourceInfo {
label: "upload.csv".into(),
format: FileFormat::Csv,
size_hint: None,
source_system: None,
});
let report = Profiler::new().profile_stream(source).await?;
```
### Async URL profiling
```rust
use dataprof::Profiler;
let report = Profiler::new()
.profile_url("https://example.com/data.parquet")
.await?;
```
### Database query profiling
```rust
use dataprof::{Profiler, DatabaseConfig};
let report = Profiler::new()
.connection_string("postgres://user:pass@localhost/mydb")
.analyze_query("SELECT * FROM users")
.await?;
```