<p align="center">
<a href="https://hudi.apache.org/">
<img src="https://hudi.apache.org/assets/images/hudi_logo_transparent_1400x600.png" alt="Hudi logo" height="120px">
</a>
</p>
<p align="center">
The native Rust implementation for Apache Hudi, with C++ & Python API bindings.
<br>
<br>
<a href="https://github.com/apache/hudi-rs/actions/workflows/ci.yml">
<img alt="hudi-rs ci" src="https://github.com/apache/hudi-rs/actions/workflows/ci.yml/badge.svg">
</a>
<a href="https://codecov.io/github/apache/hudi-rs">
<img alt="hudi-rs codecov" src="https://codecov.io/github/apache/hudi-rs/graph/badge.svg">
</a>
<a href="https://join.slack.com/t/apache-hudi/shared_invite/zt-2ggm1fub8-_yt4Reu9djwqqVRFC7X49g">
<img alt="join hudi slack" src="https://img.shields.io/badge/slack-%23hudi-72eff8?logo=slack&color=48c628">
</a>
<a href="https://x.com/apachehudi">
<img alt="follow hudi x/twitter" src="https://img.shields.io/twitter/follow/apachehudi?label=apachehudi">
</a>
<a href="https://www.linkedin.com/company/apache-hudi">
<img alt="follow hudi linkedin" src="https://img.shields.io/badge/apache%E2%80%93hudi-0077B5?logo=linkedin">
</a>
</p>
The Hudi-rs project aims to standardize the core [Apache Hudi](https://github.com/apache/hudi) APIs, and broaden the
Hudi integration in the data ecosystems for a diverse range of users and projects.
| [**PyPi.org**][pypi] | [![][pypi-badge]][pypi] | `pip install hudi` |
| [**Crates.io**][crates] | [![][crates-badge]][crates] | `cargo add hudi` |
[pypi]: https://pypi.org/project/hudi/
[pypi-badge]: https://img.shields.io/pypi/dm/hudi?style=flat-square&color=51AEF3
[crates]: https://crates.io/crates/hudi
[crates-badge]: https://img.shields.io/crates/d/hudi?style=flat-square&color=163669
## Usage Examples
> [!NOTE]
> These examples expect a Hudi table exists at `/tmp/trips_table`, created using
> the [quick start guide](https://hudi.apache.org/docs/quick-start-guide).
### Snapshot Query
Snapshot query reads the latest version of the data from the table. The table API also accepts partition filters.
#### Python
```python
from hudi import HudiTableBuilder
import pyarrow as pa
hudi_table = HudiTableBuilder.from_base_uri("/tmp/trips_table").build()
batches = hudi_table.read_snapshot(filters=[("city", "=", "san_francisco")])
# convert to PyArrow table
arrow_table = pa.Table.from_batches(batches)
result = arrow_table.select(["rider", "city", "ts", "fare"])
print(result)
```
#### Rust
```rust
use hudi::error::Result;
use hudi::table::builder::TableBuilder as HudiTableBuilder;
use arrow::compute::concat_batches;
#[tokio::main]
async fn main() -> Result<()> {
let hudi_table = HudiTableBuilder::from_base_uri("/tmp/trips_table").build().await?;
let batches = hudi_table.read_snapshot(&[("city", "=", "san_francisco")]).await?;
let batch = concat_batches(&batches[0].schema(), &batches)?;
let columns = vec!["rider", "city", "ts", "fare"];
for col_name in columns {
let idx = batch.schema().index_of(col_name).unwrap();
println!("{}: {}", col_name, batch.column(idx));
}
Ok(())
}
```
To run read-optimized (RO) query on Merge-on-Read (MOR) tables, set `hoodie.read.use.read_optimized.mode` when creating the table.
#### Python
```python
hudi_table = (
HudiTableBuilder
.from_base_uri("/tmp/trips_table")
.with_option("hoodie.read.use.read_optimized.mode", "true")
.build()
)
```
#### Rust
```rust
let hudi_table =
HudiTableBuilder::from_base_uri("/tmp/trips_table")
.with_option("hoodie.read.use.read_optimized.mode", "true")
.build().await?;
```
### Time-Travel Query
Time-travel query reads the data at a specific timestamp from the table. The table API also accepts partition filters.
#### Python
```python
batches = (
hudi_table
.read_snapshot_as_of("20241231123456789", filters=[("city", "=", "san_francisco")])
)
```
#### Rust
```rust
let batches =
hudi_table
.read_snapshot_as_of("20241231123456789", &[("city", "=", "san_francisco")]).await?;
```
<details>
<summary>Supported timestamp formats</summary>
The supported formats for the timestamp argument are:
- Hudi Timeline format (highest matching precedence): `yyyyMMddHHmmssSSS` or `yyyyMMddHHmmss`.
- Unix epoch time in seconds, milliseconds, microseconds, or nanoseconds.
- ISO 8601 format including but not limited to:
- `yyyy-MM-dd'T'HH:mm:ss.SSS+00:00`
- `yyyy-MM-dd'T'HH:mm:ss.SSSZ`
- `yyyy-MM-dd'T'HH:mm:ss.SSS`
- `yyyy-MM-dd'T'HH:mm:ss+00:00`
- `yyyy-MM-dd'T'HH:mm:ssZ`
- `yyyy-MM-dd'T'HH:mm:ss`
- `yyyy-MM-dd`
</details>
### Incremental Query
Incremental query reads the changed data from the table for a given time range.
#### Python
```python
# read the records between t1 (exclusive) and t2 (inclusive)
batches = hudi_table.read_incremental_records(t1, t2)
# read the records after t1
batches = hudi_table.read_incremental_records(t1)
```
#### Rust
```rust
// read the records between t1 (exclusive) and t2 (inclusive)
let batches = hudi_table.read_incremental_records(t1, Some(t2)).await?;
// read the records after t1
let batches = hudi_table.read_incremental_records(t1, None).await?;
```
*Incremental queries support the same timestamp formats as time-travel queries.*
### File Group Reading (Experimental)
File group reading allows you to read data from a specific file slice. This is useful when integrating with query
engines, where the plan provides file paths.
#### Python
```python
from hudi import HudiFileGroupReader
reader = HudiFileGroupReader(
"/table/base/path", {"hoodie.read.file_group.start_timestamp": "0"})
# Returns a PyArrow RecordBatch
record_batch = reader.read_file_slice_by_base_file_path("relative/path.parquet")
```
#### Rust
```rust
use hudi::file_group::reader::FileGroupReader;
let reader = FileGroupReader::new_with_options(
"/table/base/path", [("hoodie.read.file_group.start_timestamp", "0")])?;
// Returns an Arrow RecordBatch
let record_batch = reader.read_file_slice_by_base_file_path("relative/path.parquet").await?;
```
#### C++
```cpp
#include "cxx.h"
#include "src/lib.rs.h"
#include "arrow/c/abi.h"
auto reader = new_file_group_reader_with_options(
"/table/base/path", {"hoodie.read.file_group.start_timestamp=0"});
// Returns an ArrowArrayStream pointer
ArrowArrayStream* stream_ptr = reader->read_file_slice_by_base_file_path("relative/path.parquet");
```
## Query Engine Integration
Hudi-rs provides APIs to support integration with query engines. The sections below highlight some commonly used APIs.
### Table API
Create a Hudi table instance using its constructor or the `TableBuilder` API.
| Query planning | `get_file_slices()` | For snapshot query, get a list of file slices. |
| | `get_file_slices_splits()` | For snapshot query, get a list of file slices in splits. |
| | `get_file_slices_as_of()` | For time-travel query, get a list of file slices at a given time. |
| | `get_file_slices_splits_as_of()` | For time-travel query, get a list of file slices in splits at a given time. |
| | `get_file_slices_between()` | For incremental query, get a list of changed file slices between a time range. |
| Query execution | `create_file_group_reader_with_options()` | Create a file group reader instance with the table instance's configs. |
### File Group API
Create a Hudi file group reader instance using its constructor or the Hudi table API `create_file_group_reader_with_options()`.
| Query execution | `read_file_slice()` | Read records from a given file slice; based on the configs, read records from only base file, or from base file and log files, and merge records based on the configured strategy. |
| | `read_file_slice_by_base_file_path()` | Read records from a given base file path; log files will be ignored |
### Apache DataFusion
Enabling the `hudi` crate with `datafusion` feature will provide a [DataFusion](https://datafusion.apache.org/)
extension to query Hudi tables.
<details>
<summary>Add crate hudi with datafusion feature to your application to query a Hudi table.</summary>
```shell
cargo new my_project --bin && cd my_project
cargo add tokio@1 datafusion@45
cargo add hudi --features datafusion
```
Update `src/main.rs` with the code snippet below then `cargo run`.
</details>
```rust
use std::sync::Arc;
use datafusion::error::Result;
use datafusion::prelude::{DataFrame, SessionContext};
use hudi::HudiDataSource;
#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
let hudi = HudiDataSource::new_with_options(
"/tmp/trips_table",
[("hoodie.read.input.partitions", "5")]).await?;
ctx.register_table("trips_table", Arc::new(hudi))?;
let df: DataFrame = ctx.sql("SELECT * from trips_table where city = 'san_francisco'").await?;
df.show().await?;
Ok(())
}
```
### Other Integrations
Hudi is also integrated with
- [Daft](https://www.getdaft.io/projects/docs/en/stable/integrations/hudi/)
- [Ray](https://docs.ray.io/en/latest/data/api/doc/ray.data.read_hudi.html#ray.data.read_hudi)
### Work with cloud storage
Ensure cloud storage credentials are set properly as environment variables, e.g., `AWS_*`, `AZURE_*`, or `GOOGLE_*`.
Relevant storage environment variables will then be picked up. The target table's base uri with schemes such
as `s3://`, `az://`, or `gs://` will be processed accordingly.
Alternatively, you can pass the storage configuration as options via Table APIs.
#### Python
```python
from hudi import HudiTableBuilder
hudi_table = (
HudiTableBuilder
.from_base_uri("s3://bucket/trips_table")
.with_option("aws_region", "us-west-2")
.build()
)
```
#### Rust
```rust
use hudi::table::builder::TableBuilder as HudiTableBuilder;
async fn main() -> Result<()> {
let hudi_table =
HudiTableBuilder::from_base_uri("s3://bucket/trips_table")
.with_option("aws_region", "us-west-2")
.build().await?;
}
```
## Contributing
Check out the [contributing guide](./CONTRIBUTING.md) for all the details about making contributions to the project.