rust-data-processing 0.3.3

Schema-first ingestion (CSV, JSON, Parquet, Excel) into an in-memory DataSet, plus Polars-backed pipelines, SQL, profiling, validation, and map/reduce-style processing.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
---
title: "Java examples — JVM bindings (Panama + JSON parity)"
---

# Java quick start and examples

This page is the **JVM counterpart** to the Python tour published as **`python/examples.html`** on the docs site (Markdown source: [`docs/python/README.md`](https://github.com/scorpio-datalake/rust-data-processing/blob/main/docs/python/README.md)). The Python package calls Rust through **PyO3** with a rich in-process API. On the JVM, Phase 3 exposes a **narrower surface**: a native **`rdp_jvm_sys`** library, an **`ffi_manifest.json`** list of `extern "C"` symbols, and **JSON parity exports** (`rdp_parity_*`) you call with **Panama (FFM)** from Java.

**Canonical API detail** for symbols and calling convention: [`FFI_MANIFEST_JAVA_USAGE.md`](https://github.com/scorpio-datalake/rust-data-processing/blob/main/docs/java/FFI_MANIFEST_JAVA_USAGE.md), [`FFI_API_SLICE.md`](https://github.com/scorpio-datalake/rust-data-processing/blob/main/docs/java/FFI_API_SLICE.md), and [`Planning/PHASE3_EPICS.md`](https://github.com/scorpio-datalake/rust-data-processing/blob/main/Planning/PHASE3_EPICS.md).

**Runnable code** in the repository: [`bindings/java/rust-data-processing-jvm-examples/README.md`](https://github.com/scorpio-datalake/rust-data-processing/blob/main/bindings/java/rust-data-processing-jvm-examples/README.md) (`LoadFfiManifestExample`, `RunPytestMirrorExample`, `ParityScenariosWalkthrough`, …).

## What this page covers

Use this as a **tour of how Java integrates today** (parity JSON over FFI), not a line-for-line duplicate of every Python method. For full signatures and options on the Python side, see [`python-wrapper/API.md`](https://github.com/scorpio-datalake/rust-data-processing/blob/main/python-wrapper/API.md).

| Topic | Where below |
| --- | --- |
| **Maven / native library / JVM flags** | [Prerequisites](#prerequisites) |
| **Discover symbols, ABI** | [FFI manifest and ABI](#ffi-manifest-and-abi) |
| **Invoke any parity export** | [Calling `rdp_parity_*` from Java](#calling-rdp_parity_-from-java) |
| **File ETL** (ingestion, tabular JSON) | [`rdp_parity_ingestion`](#file-etl-ingestion-and-tabular-json), [`rdp_parity_types_dataset`](#file-etl-ingestion-and-tabular-json) |
| **Ordered paths, directory scans, watermarks, Hive-style discovery** | [Ordered paths and directory scans (incremental batches)](#ordered-paths-and-directory-scans-incremental-batches) |
| **SQL & lazy plans** | [`rdp_parity_pipeline_sql`](#sql-and-dataframe-parity), [`rdp_parity_sql_suite_mirror`](#sql-and-dataframe-parity) |
| **Declarative transforms** | [`rdp_parity_transform`](#transform-and-mapping-spec) |
| **Mapping spec** | [`rdp_parity_mapping_spec_mirror`](#transform-and-mapping-spec) |
| **Row-wise processing** | [`rdp_parity_processing`](#processing-and-execution) |
| **Benchmark-style smoke** | [`rdp_parity_benchmark_smoke_mirror`](#processing-and-execution) |
| **Observability** | [`rdp_parity_observability_mirror`](#observability) |
| **Multi-scenario walkthrough** | [Runnable walkthrough class](#runnable-walkthrough-class) |
| **Large results / production layout** | [Rust-first ETL vs JVM consumption](#rust-first-etl-vs-jvm-consumption) |
| **Temp Parquet → local Spark `DataFrame`** | [`rdp_export_parquet_temp`](#temp-parquet-handoff-rdp_export_parquet_temp) |
| **All doc examples** (`docs/java/examples/`) | [Example catalog](#example-catalog), [Why these examples](#why-these-examples), [Shared fixtures](#shared-json-fixtures-testsfixtures) |
| **Phase 2 (Python `PHASE2_EXAMPLES.md`)** | [Phase 2 examples](#phase-2-examples) |
| **Database / data lake (no warehouse driver on FFI)** | [Database and data lake on the JVM](#database-and-data-lake-jvm) |
| **All connectors (Rust / Python / Java, same URLs)** | [`docs/CONNECTORS.md`](../CONNECTORS.md) |

## Why rust-data-processing on the JVM {#why-rust-data-processing-on-the-jvm}

The JVM module is deliberately **thin**: Java does not re-implement CSV parsing, Polars SQL, Excel readers, or pipeline orchestration. You load **`rdp_jvm_sys`** once, pass **UTF-8 JSON** (schemas, pipeline specs, ingest payloads) over **Panama downcalls**, and read back a small **JSON envelope** (`ok`, `interchange`, optional `error`). That design buys you:

- **One engine, three languages** — the same `tests/fixtures/<bundle>/` trees are shared by Rust (`PipelineBundle`), Python (`tests.pipeline_fixture_support`), and Java (`PipelineJsonFixtures` in these examples), so schemas and pipelines stay aligned across bindings.
- **Production-shaped control plane** — orchestration (paths, watermarks, idempotency keys) can live in Java/Kotlin while **heavy transforms stay in Rust**, with sinks writing **Parquet / XML / CSV** to disk instead of shipping millions of rows as JSON (see [Rust-first ETL vs JVM consumption](#rust-first-etl-vs-jvm-consumption)).
- **Typed contracts** — serde `Schema` JSON (`data_type`: `Int64`, `Utf8`, …) is validated at ingest time in Rust; examples decode `interchange.dataset` with `PytestMirrorAssertions` and `SerdeDatasetRows` where needed.
- **Inspectable failures** — pipeline failures return structured `error.code` / `error.stage` (ADR 006) in the same JSON envelope shape the examples parse.

**When to reach for which FFI:**

| Goal | Prefer |
| --- | --- |
| Declarative multi-step ETL (sources → SQL on `df` → sinks) | `rdp_run_pipeline_json` + bundle `pipelines/*.pipeline.json` |
| Single-shot multi-file ingest (dataset / temp Parquet / Arrow IPC) | `rdp_ingest_ordered_paths_json` + bundle `payloads/*.payload.json` |
| One file + explicit schema (CSV / JSON / Parquet / XML / Excel sheet) | `rdp_ingest_*_path` or `rdp_excel_ingest_path_sheet` |
| Pytest-shaped regression without crafting payloads | `rdp_parity_*` via `RdpNativeJson.invokeParityExport` |

## Shared JSON fixtures (`tests/fixtures/`) {#shared-json-fixtures-testsfixtures}

Every runnable class under [`docs/java/examples/`](examples/) loads committed JSON from the repo — **not** strings embedded in Java source. Layout (see also [`tests/fixtures/README.md`](https://github.com/scorpio-datalake/rust-data-processing/blob/main/tests/fixtures/README.md)):

| Bundle | Schemas | Pipelines / payloads | Data files |
| --- | --- | --- | --- |
| `jvm_contract/` | `schemas/three_rows.schema.json`, `id_name.schema.json` | `pipelines/dataframe_centric_sql.pipeline.json`, `sql_query_dataset.pipeline.json`, `ordered_json_to_parquet.pipeline.json`, `payloads/ordered_paths_*.payload.json` | `data/three_rows.json`; also `../jvm_contract_three_rows.json` at fixtures root |
| `ghcn/` | `json_source`, `xml_intermediate`, `parquet_lake` under `schemas/` | `pipelines/json_to_xml.pipeline.json`, `xml_to_parquet.pipeline.json` | `ghcn_stations_sample.json` (5 NOAA stations, committed sample) |
| `people/` | `people_csv`, `people_json`, `people_flat`, … | `pipelines/csv_to_parquet.pipeline.json`, `payloads/*_path_*.json` | `../people.csv`, `../people.json`, `../people.xlsx` |
| `student_etl/` | `student_source`, `lake_grade_stats`, `postgres_courses` | `legacy_student_etl*.pipeline.json`, `payloads/ordered_ingest_*.payload.json` | `data/part-0000*.json` |
| `cloud_connectors/` | `id_name` | `platform_connectors.pipeline.json`, `object_store_sources_only.pipeline.json`, `oracle_db_read.pipeline.json` | `data/two_rows.json` |
| `file_transfer/` | `id_name` | `ftp_sources_only.pipeline.json` | `data/two_rows.json` (FTP demo) |
| `watermark/` | `schemas/events.schema.json` | `payloads/csv_watermark_ingest.body.json`, `directory_scan_two_csv.payload.json` | `../watermark_events.csv` / `.json` |
| `sql_parity/` | `join_left`, `join_right` | `queries/join_people_scores.sql.json` | `data/join_left.json`, `join_right.json` |

**Schema shape** (shared with Rust/Python):

```json
{
  "fields": [
    { "name": "id", "data_type": "Int64" },
    { "name": "active", "data_type": "Bool" },
    { "name": "score", "data_type": "Float64" }
  ]
}
```

**Pipeline shape** — placeholders `{{SOURCE_PATH}}`, `{{SINK_PATH}}`, … are substituted in Java before calling Rust (`tests/fixtures/jvm_contract/pipelines/dataframe_centric_sql.pipeline.json`):

```json
{
  "pipeline_spec_version": 1,
  "sources": {
    "paths": ["{{SOURCE_PATH}}"],
    "schema_ref": "schemas/three_rows.schema.json",
    "options": { "format": "json" }
  },
  "transform": {
    "sql": "SELECT id, active, (score * 2.0) AS score FROM df WHERE active = TRUE ORDER BY id"
  },
  "sinks": [{ "kind": "parquet_file", "path": "{{SINK_PATH}}" }]
}
```

**Sample input data** (`tests/fixtures/jvm_contract_three_rows.json` — three rows, one inactive):

```json
[
  {"id": 1, "active": true, "score": 10.0},
  {"id": 2, "active": true, "score": 20.0},
  {"id": 3, "active": false, "score": 30.0}
]
```

**Ordered-ingest payload** (`people/payloads/excel_sheet_dataset.payload.json` — `schema_ref` expanded in Rust/Java):

```json
{
  "paths": ["{{SOURCE_PATH}}"],
  "schema_ref": "schemas/people_flat.schema.json",
  "options": { "format": "excel", "sheet_name": "{{SHEET_NAME}}" },
  "response": { "mode": "dataset", "max_rows": 1000 }
}
```

**Path ingest with no extra options** — pass `PipelineJsonFixtures.defaultPathIngestOptionsJson()` (`"{}"`) to `rdp_ingest_*_path`; do not add empty per-bundle `*.options.json` files.

### Loading fixtures from Java

[`PipelineJsonFixtures`](https://github.com/scorpio-datalake/rust-data-processing/blob/main/bindings/java/rust-data-processing-jvm/src/main/java/io/github/scorpio_datalake/rust_data_processing/fixture/PipelineJsonFixtures.java) resolves `tests/fixtures` by walking up from the working directory until `people.csv` exists (or uses `GITHUB_WORKSPACE` in CI):

```java
Path fixtures = PipelineJsonFixtures.resolveTestsFixturesDir().orElseThrow();
Path bundle = PipelineJsonFixtures.resolveBundleRoot(fixtures, "jvm_contract").orElseThrow();
String pipeline =
    PipelineJsonFixtures.resolvePipelineJson(
        bundle,
        "pipelines/dataframe_centric_sql.pipeline.json",
        Map.of(
            "SOURCE_PATH", inputJson.toAbsolutePath().normalize().toString(),
            "SINK_PATH", parquetOut.toAbsolutePath().normalize().toString()));
JSONObject root =
    RdpNativeJson.invokeRunPipelineJson(linker, lookup, arena, pipeline);
```

`schema_ref` inside pipelines and payloads is expanded to an inline `schema` object before the native call. **`SerdeDatasetRows`** decodes `interchange.dataset.rows` cells (serde `Value` tags like `{"Utf8":"Ada"}`).

## Why these examples {#why-these-examples}

The files under [`docs/java/examples/`](examples/) are **copy-paste tutorials** for JVM integrators. They are **not** compiled into the `rust-data-processing-jvm` JAR; you copy a class into your app module (which depends on that JAR) and run `main` with a built `rdp_jvm_sys` library.

Each example shows how to:

- Load **`RDP_JVM_SYS`** (or `-Drdp.jvm.sys.library`) and enable **`--enable-native-access=ALL-UNNAMED`**
- Resolve **`tests/fixtures/<bundle>/`** JSON (schemas, pipelines, payloads) instead of hard-coding SQL or schemas in Java
- Call the right FFI entry point (`rdp_run_pipeline_json`, `rdp_ingest_*_path`, `rdp_parity_*`, …)
- Map the flow to the **Python** tour on the same docs site ([`docs/python/README.md`](../python/README.md), [`PHASE2_EXAMPLES.md`](../python/PHASE2_EXAMPLES.md))

Class-level Javadoc in each `.java` file explains **why** that sketch exists, **what** it calls in Rust, and the Python analogue.

> **JUnit:** Most runnable examples have a matching integration test in [`DocsExampleNativeIntegrationTest`](https://github.com/scorpio-datalake/rust-data-processing/blob/main/bindings/java/rust-data-processing-jvm/src/test/java/io/github/scorpio_datalake/rust_data_processing/docexamples/DocsExampleNativeIntegrationTest.java) (when `RDP_JVM_SYS` is set in CI). Cloud connector examples use committed **`file://`** URIs in CI (no live S3/Snowflake). **Template-only in CI** (no network): [`DbReadPipelineExample.java`](examples/DbReadPipelineExample.java), [`SftpFtpConnectorsExample.java`](examples/SftpFtpConnectorsExample.java) — run `main` with real URLs locally. Documentation-only: [`ExportFilterRowsMaxUtf8Chars.java`](examples/ExportFilterRowsMaxUtf8Chars.java), [`MedianReduceAndDataFrame.java`](examples/MedianReduceAndDataFrame.java), [`ExecutionEngineNoteExample.java`](examples/ExecutionEngineNoteExample.java), [`DeltaLakeHandoff.java`](examples/DeltaLakeHandoff.java) (prerequisites only). See [Prerequisites](#prerequisites).

## Example catalog {#example-catalog}

All **37** classes in [`docs/java/examples/`](examples/):

### Pipelines, SQL, and multi-format ETL

| Example | What it demonstrates | Native entry point(s) | Fixtures |
| --- | --- | --- | --- |
| [`QuickStartIngestExample.java`](examples/QuickStartIngestExample.java) | Minimal CSV path ingest with explicit schema | `rdp_ingest_csv_path` | `people` |
| [`DataFrameCentricPipeline.java`](examples/DataFrameCentricPipeline.java) | Polars SQL on `df` inside a pipeline (filter active rows, double `score`) → Parquet sink | `rdp_run_pipeline_json` | `jvm_contract` |
| [`SQLQueries.java`](examples/SQLQueries.java) | Single-table SQL via pipeline JSON; JOIN via `rdp_parity_sql_suite_mirror` | `rdp_run_pipeline_json`, `rdp_parity_sql_suite_mirror` | `jvm_contract`, `sql_parity` |
| [`SqlJoinPipelineExample.java`](examples/SqlJoinPipelineExample.java) | JOIN-only tour via `rdp_parity_sql_suite_mirror` | `rdp_parity_sql_suite_mirror` | `sql_parity` (parity data) |
| [`GroupByAggregatesExample.java`](examples/GroupByAggregatesExample.java) | `GROUP BY` / `HAVING` via SQL suite parity | `rdp_parity_sql_suite_mirror` | parity |
| [`CookbookTransformsExample.java`](examples/CookbookTransformsExample.java) | Rename / cast / fill_null mapping spec | `rdp_parity_mapping_spec_mirror` | parity |
| [`ProcessingReduceExample.java`](examples/ProcessingReduceExample.java) | Filter / map / reduce row-wise API | `rdp_parity_processing` | parity |
| [`ProfilingExample.java`](examples/ProfilingExample.java) | Dataset profiling JSON report | `rdp_parity_profiling` | parity |
| [`OutlierDetectionExample.java`](examples/OutlierDetectionExample.java) | IQR outlier detection summary | `rdp_parity_outliers` | parity |
| [`ValidationUtf8Length.java`](examples/ValidationUtf8Length.java) | Validation summary over FFI | `rdp_parity_validation` | parity |
| [`CdcBoundaryExample.java`](examples/CdcBoundaryExample.java) | CDC event type shapes (no live connector) | `rdp_parity_cdc` | parity |
| [`ExecutionEngineNoteExample.java`](examples/ExecutionEngineNoteExample.java) | Documents Python-only `ExecutionEngine` | (none) | — |
| [`GhcnJsonXmlParquetPipeline.java`](examples/GhcnJsonXmlParquetPipeline.java) | JSON → XML → Parquet with three schemas on a NOAA station sample | `rdp_run_pipeline_json`, `rdp_ingest_xml_path`, `rdp_ingest_parquet_path` | `ghcn` |
| [`RDPOnlyETLExample.java`](examples/RDPOnlyETLExample.java) | `postgresql_url` + `lake_sink` URLs in pipeline JSON; local JSON sources | `rdp_run_pipeline_json`, `rdp_ingest_ordered_paths_json` | `student_etl` |
| [`PlatformConnectorsPipelineExample.java`](examples/PlatformConnectorsPipelineExample.java) | Full Snowflake / Databricks / Spark / S3·GCS·ABFS URLs; `object_store_uris` + Rust sinks | `rdp_run_pipeline_json` | `cloud_connectors` |
| [`ObjectStoreUrlsExample.java`](examples/ObjectStoreUrlsExample.java) | S3, GCS, Azure Blob read URIs + working local `parquet_file` sink | `rdp_run_pipeline_json` | `cloud_connectors` |
| [`SftpFtpConnectorsExample.java`](examples/SftpFtpConnectorsExample.java) | SFTP / FTP / FTPS in `file_transfer_uris` | `rdp_run_pipeline_json` | `file_transfer` |
| [`KafkaEltStreamExample.java`](examples/KafkaEltStreamExample.java) | Kafka **Extract → Load** on a broker (one row per message in integration) | `rdp_kafka_export_dataset_json`, `rdp_kafka_poll_window_loaded_json` | (broker) — build **`full,kafka`** |
| [`KafkaEltLoadExample.java`](examples/KafkaEltLoadExample.java) | Kafka **Load** from fixture JSON (no broker) | `rdp_kafka_elt_load_records_json` | `tests/fixtures/kafka/` |
| [`DbReadPipelineExample.java`](examples/DbReadPipelineExample.java) | ConnectorX `sources.db_reads` pipeline sketch (run locally) | `rdp_run_pipeline_json` | `cloud_connectors` |
| [`WarehouseExportHandoffExample.java`](examples/WarehouseExportHandoffExample.java) | Export Parquet locally → `rdp_ingest_parquet_path` | `rdp_run_pipeline_json`, `rdp_ingest_parquet_path` | `people` |
| [`SparkParquetHandoffExample.java`](examples/SparkParquetHandoffExample.java) | Working `rdp_export_parquet_temp`; documents pending `kind: spark` sink | `rdp_export_parquet_temp`, (pipeline JSON in sibling example) | `jvm_contract` (export sample) |

### People fixtures (CSV, JSON, Parquet, Excel)

| Example | What it demonstrates | Native entry point(s) | Fixtures |
| --- | --- | --- | --- |
| [`JsonParquetExcelSnippets.java`](examples/JsonParquetExcelSnippets.java) | Payload and path ingest for JSON/CSV; CSV→Parquet pipeline round-trip | `rdp_ingest_ordered_paths_json`, `rdp_ingest_*_path`, `rdp_run_pipeline_json` | `people` |
| [`InferredSchemaIngestExample.java`](examples/InferredSchemaIngestExample.java) | Excel infer-then-ingest inside Rust (`rdp_excel_ingest_path_sheet`) | `rdp_excel_ingest_path_sheet` | `people.xlsx` |
| [`ExcelSnippets.java`](examples/ExcelSnippets.java) | Excel sheet ingest via payload and `rdp_excel_ingest_path_sheet` | `rdp_ingest_ordered_paths_json`, `rdp_excel_ingest_path_sheet` | `people` (+ `people.xlsx`) |
| [`ParquetSnippets.java`](examples/ParquetSnippets.java) | CSV→Parquet pipeline, path verify, temp Parquet export handoff | `rdp_run_pipeline_json`, `rdp_ingest_parquet_path`, `rdp_export_parquet_temp` | `people` |

### Incremental batches (watermarks, directory scan)

| Example | What it demonstrates | Native entry point(s) | Fixtures |
| --- | --- | --- | --- |
| [`OrderedPaths.java`](examples/OrderedPaths.java) | Java glob → `paths` array → ingest with watermark options | `rdp_ingest_ordered_paths_json` | `watermark` |
| [`PathFromDirectoryScan.java`](examples/PathFromDirectoryScan.java) | Back-compat alias; delegates to `OrderedPaths` | (same as `OrderedPaths`) | `watermark` |
| [`PartitionDiscoveryExample.java`](examples/PartitionDiscoveryExample.java) | Hive partition discovery, globs, explicit lists | `rdp_parity_partition_discovery_mirror` | `hive_partitioned` |
| [`IngestObservabilityExample.java`](examples/IngestObservabilityExample.java) | Missing-file ingest errors / observability hooks | `rdp_parity_observability_mirror` | parity |

## Database and data lake on the JVM {#database-and-data-lake-jvm}

This is **not** the same model as opening a warehouse connection in Java and streaming rows across the FFI boundary. On the JVM you pass **UTF-8 JSON** (pipeline specs, ingest payloads, schemas) and **local filesystem paths** for sources. Connection strings and lake catalog URIs appear **inside that JSON**; **Rust** (`rdp_jvm_sys`) interprets them and uses the crate’s own drivers (file readers, optional PostgreSQL **sink** via libpq, optional **`sources.db_reads`** via ConnectorX when built with **`db_connectorx`**, etc.). Java does **not** pass a live database session, Spark session, or Delta catalog handle across FFI.

| What Java developers often expect | What this library does on the JVM |
| --- | --- |
| `jdbc:…` URL or `DataSource` → direct ingest FFI | **Not supported** — use **`sources.db_reads`** (`oracle://`, `mssql://`, …) or **local file** handoff |
| SQL URL in a **pipeline JSON** field → Rust runs the connector | **Supported** for **sinks** and **declared** lake metadata (see below) |
| `s3://` / `gs://` / `abfss://` in **`sources.paths`** | **Rejected** — use **`sources.object_store_uris`** (pending) + **local** `paths` for ingest |
| Read `your_lake` table via catalog API in Java | **Out of process** — export Parquet/CSV locally, then ingest (see [Your data lake](#your-data-lake-your_lake)) |

The runnable examples teach the mechanics: [`JsonParquetExcelSnippets`](examples/JsonParquetExcelSnippets.java) and [`ParquetSnippets`](examples/ParquetSnippets.java) for **local** Parquet/CSV; [`RDPOnlyETLExample`](examples/RDPOnlyETLExample.java) for **pipeline JSON** that carries **`postgresql_url`**, **`catalog_uri`**, and **`warehouse`** placeholders; [`PlatformConnectorsPipelineExample`](examples/PlatformConnectorsPipelineExample.java) and [`ObjectStoreUrlsExample`](examples/ObjectStoreUrlsExample.java) for **full JVM URLs** (Snowflake, Databricks, Spark, S3, GCS, Azure); [`SparkParquetHandoffExample`](examples/SparkParquetHandoffExample.java) for working Spark handoff FFI; [`DeltaLakeHandoff`](examples/DeltaLakeHandoff.java) for the lake → file → ingest story. Per-platform notes: [Connector cookbook](#connector-cookbook).

### Your database (`your_database`) {#your-database-your_database}

**Three patterns** (pick one; they are not interchangeable):

**1. File handoff (most common on the JVM today)** — Your app (or any ETL tool) runs warehouse SQL with **your own** database client, writes a **bounded** CSV or Parquet file to disk, then passes that **local path** into ingest FFI—the same as `people.csv` in the people fixtures:

```java
// Step A — outside rdp_jvm_sys: your app's DB client (Hikari, Spring, sqlcmd, etc.)
// ResultSet → write /tmp/your_database/nightly_accounts.parquet (or .csv)

// Step B — Rust ingest: absolute LOCAL path only
String payload =
    PipelineJsonFixtures.resolvePayloadJson(
        peopleBundle,
        "payloads/csv_path_dataset.payload.json",
        Map.of(
            "SOURCE_PATH",
            Path.of("/tmp/your_database/nightly_accounts.csv").toAbsolutePath().toString()));
JSONObject root = RdpNativeJson.invokeIngestOrderedPathsJson(linker, lookup, arena, payload);
```

See [`JsonParquetExcelSnippets`](examples/JsonParquetExcelSnippets.java), [`IngestValidateJsonlEndToEnd`](examples/IngestValidateJsonlEndToEnd.java).

**2. SQL URL in pipeline JSON (Rust sink)** — You embed a **`postgresql://`** string in pipeline metadata; Java sends the document to `rdp_run_pipeline_json`. Rust ingests **local JSON sources**, transforms in Polars, then may **write** to Postgres using a **Rust** libpq sink (when `rdp_jvm_sys` is built with `sink_postgres`).

Committed fixture (replace host, database, and table with **your_database** values):

```json
{
  "relational_sink": {
    "postgresql_url": "postgresql://app:CHANGE_ME@db.your_database.example:5432/analytics?sslmode=require",
    "courses_teachers_table": "public.your_table"
  }
}
```

Full legacy control-plane shape (lake + DB URLs + **local** JSON source paths after substitution):

[`tests/fixtures/student_etl/pipelines/legacy_student_etl_three_paths.pipeline.json`](https://github.com/scorpio-datalake/rust-data-processing/blob/main/tests/fixtures/student_etl/pipelines/legacy_student_etl_three_paths.pipeline.json)

```java
// RDPOnlyETLExample: bind PATH_A..C to tests/fixtures/student_etl/data/part-*.json (local files)
String pipeline =
    RDPOnlyETLExample.resolveLiveLegacyPipelineJson(fixturesDir, pathA, pathB, pathC);
JSONObject root = RdpNativeJson.invokeRunPipelineJson(linker, lookup, arena, pipeline);
// interchange.sink_results[] describes parquet/postgresql/delta_lake branches
```

See [`RDPOnlyETLExample.java`](examples/RDPOnlyETLExample.java) — loads `schema_postgres_courses.schema.json`, `lake_grade_stats`, and prints the conceptual `s3://` sketch from [`legacy_student_etl.pipeline.json`](https://github.com/scorpio-datalake/rust-data-processing/blob/main/tests/fixtures/student_etl/pipelines/legacy_student_etl.pipeline.json) (production metadata only; live demo uses local parts).

**3. Built-in SQL → tabular read (Rust ConnectorX, Python today)** — `ingest_from_db("postgresql://…", "SELECT …", schema)` uses **ConnectorX inside the native extension** (`db_connectorx` feature). That is the close analogue to “give the library a SQL URL,” but it is exposed on **Python**, not as a JVM FFI symbol in [`ffi_manifest.json`](https://github.com/scorpio-datalake/rust-data-processing/blob/main/bindings/jvm-sys/ffi_manifest.json). See [`python-wrapper/API.md`](../../python-wrapper/API.md) § Ingestion and [`README_DEV.md`](../../python-wrapper/README_DEV.md).

### Your data lake (`your_lake`) {#your-data-lake-your_lake}

**There is no JVM call** that says “open `your_lake` catalog and read table X.” Lake **catalog URIs** and **warehouse** paths in JSON describe **where Rust should write** (or future connectors), not a live read API from Java.

**Lake URLs in pipeline JSON (declare sink target)** — same legacy fixture family as the database:

```json
{
  "lake_sink": {
    "format": "delta_or_iceberg_tbd",
    "catalog_uri": "thrift://iceberg-catalog.your_lake.example:9083",
    "warehouse": "s3://your-lake-warehouse/",
    "namespace": "curated",
    "table_student_grades": "your_table"
  }
}
```

On typical `rdp_jvm_sys` builds, `delta_lake` / `iceberg` sink kinds return **`connector_pending`** in `sink_results` (metadata accepted; native lake write not linked yet). Use a **`parquet_file`** sink to a **local path** for a working end-to-end demo—see [`ParquetSnippets`](examples/ParquetSnippets.java) and GHCN’s `parquet_file` step in [`GhcnJsonXmlParquetPipeline`](examples/GhcnJsonXmlParquetPipeline.java).

**Lake read handoff (what works today)** — Use **your_lake**’s normal client (Spark, Databricks, `deltalake`, Trino, etc.) to materialize a slice, copy or write to a **local** Parquet directory, then ingest:

```java
// your_lake export (outside this library), then local path into Rust:
String parquetFromYourLake =
    "/data/your_lake/curated/your_table/dt=2026-05-20/part-00000.parquet";
JSONObject root =
    RdpNativeJson.invokeIngestParquetPath(
        linker, lookup, arena,
        parquetFromYourLake,
        schemaJson,
        PipelineJsonFixtures.defaultPathIngestOptionsJson());
```

**Do not** put `s3://your-bucket/...` in **`sources.paths`** — use **`sources.object_store_uris`** for cloud ingest (validated in [`CloudImportIntegrationTest`](../../integration_testing/CloudConnectors/java/src/test/java/io/github/scorpio_datalake/rust_data_processing/integration/CloudImportIntegrationTest.java) and [`cloud_pipeline.py`](../../integration_testing/scripts/cloud_pipeline.py)). For a local staging pattern, sync objects to disk first → substitute **local** absolutes in `paths`, as [`RDPOnlyETLExample`](examples/RDPOnlyETLExample.java) does for `student_etl/data/part-0000*.json`.

**Partitioned / Hive-style layouts** — Java discovers files (glob), builds a sorted `paths` array of **local** absolutes, and calls `rdp_ingest_ordered_paths_json` with watermark options—[`OrderedPaths`](examples/OrderedPaths.java). That mirrors scanning `s3://your_lake/events/dt=*/part-*.parquet` **after** you list or sync keys to local paths.

More background: [`docs/LAKE_TABLE_READ.md`](../LAKE_TABLE_READ.md), [`DeltaLakeHandoff.java`](examples/DeltaLakeHandoff.java), Python § 8 in [`PHASE2_EXAMPLES.md`](../python/PHASE2_EXAMPLES.md).

### Connector cookbook: PostgreSQL, Oracle, SQL Server, Snowflake, Databricks, Spark, object stores {#connector-cookbook}

Cross-language reference with **the same fake URLs** in Rust, Python, and Java: [`docs/CONNECTORS.md`](../CONNECTORS.md) (SFTP/FTP via `sources.file_transfer_uris` — see [`SftpFtpConnectorsExample.java`](examples/SftpFtpConnectorsExample.java)). **Docker-validated flows:** [`integration_testing/integration_testing_details.md`](../../integration_testing/integration_testing_details.md).

**Runnable JVM examples (no external CI):** [`PlatformConnectorsPipelineExample.java`](examples/PlatformConnectorsPipelineExample.java), [`ObjectStoreUrlsExample.java`](examples/ObjectStoreUrlsExample.java), [`SparkParquetHandoffExample.java`](examples/SparkParquetHandoffExample.java), [`KafkaEltStreamExample.java`](examples/KafkaEltStreamExample.java). Fixtures: `tests/fixtures/cloud_connectors/`. Build `rdp_jvm_sys` with **`cloud_connectors`** (enabled on `link-main`); add **`kafka`** for streaming. CI uses `file://`; production and **integration tests** use `s3://` / `gs://` / `azure://` in the same JSON fields — Java never opens cloud clients; Rust does I/O.

| Platform | JVM `rdp_run_pipeline_json` | JVM ingest today | Example class / fixture |
| --- | --- | --- | --- |
| **PostgreSQL** | `postgresql` sink (`postgresql://…`, libpq) or legacy `relational_sink` | Local paths; optional COPY sink | [`RDPOnlyETLExample`](examples/RDPOnlyETLExample.java) |
| **Oracle / SQL Server** | **`sources.db_reads`** (`oracle://`, `mssql://`) when **`db_connectorx`** | Else: export → local file → **`sources.paths`** | Cookbook snippets below |
| **Snowflake** | `kind: snowflake` — Rust writes stage Parquet (`stage_uri`); optional `COPY INTO` | `sources.object_store_uris` | [`PlatformConnectorsPipelineExample`](examples/PlatformConnectorsPipelineExample.java) |
| **Databricks** | `kind: databricks` — Rust writes Parquet under `warehouse` path | same | Same |
| **Apache Spark** | `kind: spark` — Rust writes `handoff_uri` Parquet (driver reads outside FFI) | same | [`SparkParquetHandoffExample`](examples/SparkParquetHandoffExample.java) |
| **S3 / GCS / Azure** | `sources.object_store_uris` + `kind: object_store` sink (Rust `object_store` crate) | URIs in JSON only | [`ObjectStoreUrlsExample`](examples/ObjectStoreUrlsExample.java) · integration: [`CloudImportIntegrationTest`](../../integration_testing/CloudConnectors/java/src/test/java/io/github/scorpio_datalake/rust_data_processing/integration/CloudImportIntegrationTest.java) |
| **SFTP / FTP** | `sources.file_transfer_uris` | URIs + env passwords | [`SftpFtpConnectorsExample`](examples/SftpFtpConnectorsExample.java) · integration: same CloudConnectors suite |
| **Kafka (streaming)** | `rdp_kafka_export_dataset_json` / `rdp_kafka_poll_window_loaded_json` | Broker config JSON | [`KafkaEltStreamExample`](examples/KafkaEltStreamExample.java) · integration: [`KafkaStreamIntegrationTest`](../../integration_testing/Kafka/java/src/test/java/io/github/scorpio_datalake/rust_data_processing/integration/KafkaStreamIntegrationTest.java) |
| **Delta / Iceberg** | `delta_lake` — Rust stages Parquet at `warehouse/.../table/`; Iceberg still pending | URIs in JSON | [`PlatformConnectorsPipelineExample`](examples/PlatformConnectorsPipelineExample.java) |

#### Two kinds of SQL on the JVM

| Kind | Where you put it | What runs it | Example |
| --- | --- | --- | --- |
| **Source / warehouse SQL** | Java `String` in your app, or Python `ingest_from_db(..., query, ...)` | **Your** DB client, **`sources.db_reads`**, or Python ConnectorX | `SELECT … FROM hr.employees` on Oracle |
| **Pipeline SQL (Polars)** | `transform.sql` and/or per-sink `"sql"` in **pipeline JSON** | **Rust** (`rdp_run_pipeline_json`) on the ingested frame `df` | `SELECT id, score FROM df WHERE active = TRUE` |

Pipeline SQL is **not** sent to Oracle or PostgreSQL as a remote query — it filters and projects rows **after** Rust has loaded a file or object-store slice. See [`SQLQueries.java`](examples/SQLQueries.java) and `tests/fixtures/jvm_contract/pipelines/sql_query_dataset.pipeline.json`.

**Source SQL in pipeline JSON (documentation):** the contract has no `source_sql` field yet; keep the warehouse `SELECT` in Java constants or in your orchestrator docs, and pass only **local paths** or **`object_store_uris`** in `sources`.

#### PostgreSQL

**Python — built-in read** (build with `db` feature; use ConnectorX URL form):

```python
import rust_data_processing as rdp

ds = rdp.ingest_from_db_infer(
    "postgresql://etl:CHANGE_ME@db.your_database.example:5432/analytics?cxprotocol=binary",
    "SELECT id, score FROM public.fact_scores WHERE dt = CURRENT_DATE",
)
```

**JVM — pipeline sink** (ingest stays **local** JSON/Parquet paths; URL is for **load** only):

```json
{
  "relational_sink": {
    "postgresql_url": "postgresql://app:CHANGE_ME@db.your_database.example:5432/analytics?sslmode=require",
    "courses_teachers_table": "public.fact_scores"
  }
}
```

```java
// Same substitution pattern as RDPOnlyETLExample — sources are local files, not the URL
String pipeline = RDPOnlyETLExample.resolveLiveLegacyPipelineJson(fixturesDir, pathA, pathB, pathC);
JSONObject root = RdpNativeJson.invokeRunPipelineJson(linker, lookup, arena, pipeline);
```

**Source SQL (warehouse)** — run in your PostgreSQL client; Rust does not execute this string on the JVM today:

```sql
SELECT id, score, posted_at
FROM public.fact_scores
WHERE dt = CURRENT_DATE;
```

**In Java** — run the query in your app, export rows to disk, then call Rust on the file path:

```java
String postgresSelect =
    """
    SELECT id, score, posted_at
    FROM public.fact_scores
    WHERE dt = CURRENT_DATE
    """;
// Your DB client → write /var/rdp/staging/pg_fact_scores.parquet

Path staging = Path.of("/var/rdp/staging/pg_fact_scores.parquet");
String payload =
    PipelineJsonFixtures.resolvePayloadJson(
        peopleBundle,
        "payloads/csv_path_dataset.payload.json",
        Map.of("SOURCE_PATH", staging.toAbsolutePath().toString()));
JSONObject root = RdpNativeJson.invokeIngestOrderedPathsJson(linker, lookup, arena, payload);
```

**Pipeline SQL in JSON** — Polars on `df` after ingest (same pattern as [`SQLQueries.java`](examples/SQLQueries.java)):

```json
{
  "pipeline_spec_version": 1,
  "sources": {
    "paths": ["{{STAGING_PARQUET}}"],
    "schema_ref": "schemas/people_flat.schema.json",
    "options": { "format": "parquet" }
  },
  "transform": {
    "sql": "SELECT id, score FROM df WHERE score > 0 ORDER BY id"
  },
  "sinks": [
    {
      "kind": "postgresql",
      "url": "postgresql://app:CHANGE_ME@db.your_database.example:5432/analytics?sslmode=require",
      "table": "public.fact_scores_curated",
      "sql": "SELECT id, score FROM df"
    }
  ]
}
```

Per-sink `"sql"` is optional; it projects the frame again before that sink writes.

#### Oracle

**Python — built-in read** (requires `--features db_connectorx` / Python `db`; use ConnectorX `oracle://` form):

```python
ds = rdp.ingest_from_db_infer(
    "oracle://etl:CHANGE_ME@db.your_database.example:1521/ORCLPDB1",
    "SELECT employee_id, department_id FROM hr.employees WHERE ROWNUM <= 100000",
)
```

**Source SQL (warehouse)** — Oracle dialect; pass the same text in pipeline **`sources.db_reads[].query`** (Rust ConnectorX):

```sql
SELECT employee_id, department_id, hire_date
FROM hr.employees
WHERE ROWNUM <= 100000;
```

**In Java (preferred)** — ConnectorX URL + query in pipeline JSON; build **`rdp_jvm_sys`** with **`--features db_connectorx`** (or **`full`**). Use **`oracle://`** (see [`docs/CONNECTORS.md`](../CONNECTORS.md)):

```json
{
  "pipeline_spec_version": 1,
  "sources": {
    "paths": [],
    "db_reads": [
      {
        "url": "oracle://etl:CHANGE_ME@db.your_database.example:1521/ORCLPDB1",
        "query": "SELECT employee_id, department_id, hire_date FROM hr.employees WHERE ROWNUM <= 100000"
      }
    ],
    "schema_ref": "schemas/your_oracle_hr.schema.json",
    "options": {}
  },
  "transform": {
    "sql": "SELECT employee_id, department_id FROM df WHERE department_id IS NOT NULL ORDER BY employee_id"
  },
  "sinks": [
    {
      "kind": "parquet_file",
      "path": "{{CURATED_PARQUET}}",
      "sql": "SELECT employee_id, department_id FROM df"
    }
  ]
}
```

```java
Path bundleRoot =
    PipelineJsonFixtures.resolveBundleRoot(fixturesDir, "your_bundle").orElseThrow();
String pipeline =
    PipelineJsonFixtures.resolvePipelineJson(
        bundleRoot,
        "pipelines/your_oracle_hr_curate.pipeline.json",
        Map.of("CURATED_PARQUET", curatedOut.toAbsolutePath().toString()));
JSONObject root = RdpNativeJson.invokeRunPipelineJson(linker, lookup, arena, pipeline);
```

**Fallback (no `db_connectorx` build)** — run the same SQL in your environment, write Parquet locally, then set **`sources.paths`** to that file. Expect **`DB_CONNECTORX_NOT_BUILT`** if you use **`db_reads`** without the feature. Do **not** use `jdbc:` URLs in **`db_reads`** — they are rejected at parse.

#### Microsoft SQL Server

**Python — built-in read:**

```python
ds = rdp.ingest_from_db_infer(
    "mssql://etl:CHANGE_ME@db.your_database.example:1433/warehouse?encrypt=true",
    "SELECT TOP (100000) id, amount FROM dbo.ledger WHERE posted_at >= '2026-05-01'",
)
```

**Source SQL (warehouse)** — T-SQL; run in your DB client or `sqlcmd`:

```sql
SELECT TOP (100000) id, amount, posted_at
FROM dbo.ledger
WHERE posted_at >= '2026-05-01';
```

**In Java (preferred)** — **`sources.db_reads`** with **`mssql://`** (requires **`db_connectorx`** on **`rdp_jvm_sys`**):

```json
{
  "pipeline_spec_version": 1,
  "sources": {
    "paths": [],
    "db_reads": [
      {
        "url": "mssql://etl:CHANGE_ME@db.your_database.example:1433/warehouse?encrypt=true",
        "query": "SELECT TOP (100000) id, amount, posted_at FROM dbo.ledger WHERE posted_at >= '2026-05-01'"
      }
    ],
    "schema_ref": "schemas/ledger.schema.json",
    "options": {}
  },
  "transform": {
    "sql": "SELECT id, amount FROM df WHERE amount > 0 ORDER BY posted_at"
  },
  "sinks": [
    { "kind": "parquet_file", "path": "{{SINK_PATH}}" }
  ]
}
```

```java
JSONObject root = RdpNativeJson.invokeRunPipelineJson(linker, lookup, arena, pipelineJson);
```

**Fallback** — export to local Parquet → **`sources.paths`** when the native library is not built with **`db_connectorx`**.

#### Snowflake

**JVM pipeline (full URLs, Rust executes):** run [`PlatformConnectorsPipelineExample`](examples/PlatformConnectorsPipelineExample.java) — `account_url`, `warehouse`, `stage_uri`; expect `sink_results[].status: ok` (stage write in Rust).

Stage **read** uses the same **S3 auth** as any `s3://` URI (not in the JSON string). Set credentials on the **process** that runs `rdp_jvm_sys`:

| Auth | How Rust / `object_store` gets it |
| --- | --- |
| Access key | `AWS_ACCESS_KEY_ID` + `AWS_SECRET_ACCESS_KEY` (see [`docs/CONNECTORS.md`](../CONNECTORS.md)) |
| Session token | `AWS_SESSION_TOKEN` (temporary creds) |
| IAM role | Instance profile / IRSA on the host (no keys in Java) |

**JVM ingest from the stage object in S3** (preferred — Rust reads `s3://`; Java only passes JSON):

```json
{
  "pipeline_spec_version": 1,
  "sources": {
    "paths": [],
    "object_store_uris": [
      "s3://demo-bucket-us-east-1/snowflake-stage/rdp/ledger/dt=2026-05-20/part-00000.parquet"
    ],
    "schema_ref": "schemas/your_ledger.schema.json",
    "options": { "format": "parquet" }
  },
  "sinks": [{ "kind": "parquet_file", "path": "{{LOCAL_SINK}}" }]
}
```

```java
String pipeline =
    PipelineJsonFixtures.resolvePipelineJson(bundleRoot, "pipelines/your_snowflake_ingest.pipeline.json", bindings);
JSONObject root = RdpNativeJson.invokeRunPipelineJson(linker, lookup, arena, pipeline);
```

**Legacy pattern (only if you sync/mount S3 to disk yourself):** local path + `invokeIngestParquetPath` — Java/your sync tool must copy the object first; credentials are still required for that S3 copy, just not passed into `rdp_ingest_parquet_path`:

```java
// After aws s3 sync / Snowflake COPY INTO external stage backed by S3:
String localSlice =
    "/data/snowflake_sync/demo-bucket-us-east-1/snowflake-stage/rdp/ledger/dt=2026-05-20/part-00000.parquet";
RdpNativeJson.invokeIngestParquetPath(linker, lookup, arena, localSlice, schemaJson, optionsJson);
```

Snowflake **warehouse** SQL (`COPY INTO @stage …`) runs in Snowflake or your ETL tool, not inside `rdp_jvm_sys`. See [`PlatformConnectorsPipelineExample`](examples/PlatformConnectorsPipelineExample.java) for `kind: snowflake` **sink** (Rust writes to `stage_uri`).

Pipeline JSON can still **declare** lake layout metadata (same as other warehouses):

```json
{
  "lake_sink": {
    "format": "delta_or_iceberg_tbd",
    "catalog_uri": "https://your_account.snowflakecomputing.com",
    "warehouse": "s3://your-snowflake-external-volume/",
    "namespace": "your_schema",
    "table_student_grades": "ledger_curated"
  }
}
```

#### Databricks (Unity Catalog / Delta)

**JVM pipeline:** [`PlatformConnectorsPipelineExample`](examples/PlatformConnectorsPipelineExample.java) — `workspace_url`, Unity `catalog_uri`, `warehouse` (`abfss://…` or `file://` in tests); Rust writes Parquet under the warehouse path.

**Read path:** use Databricks SQL, notebook, or Spark on the cluster to write **local** Parquet (or sync cloud storage to a mounted path), then ingest — see [Your data lake](#your-data-lake-your_lake).

**PySpark export (runs on Databricks or a cluster; not in `rdp_jvm_sys`):**

```python
# spark is your Databricks / cluster session
df = spark.table("your_catalog.your_schema.your_table")
df.where("dt = '2026-05-20'").write.mode("overwrite").parquet("/local/mnt/rdp/your_table/dt=2026-05-20")
```

**JVM ingest** that Parquet tree (single file or [`OrderedPaths`](examples/OrderedPaths.java) for many parts):

```java
String path =
    "/local/mnt/rdp/your_table/dt=2026-05-20/part-00000-*.parquet"; // glob in your app, absolutes in JSON
```

**Pipeline metadata** for a future native Delta writer (today often `connector_pending`):

```json
{
  "lake_sink": {
    "format": "delta",
    "catalog_uri": "https://your-workspace.cloud.databricks.com/api/2.1/unity-catalog/iceberg",
    "warehouse": "s3://your-unity-catalog-warehouse/",
    "namespace": "your_catalog.your_schema",
    "table_student_grades": "your_table"
  }
}
```

See [`DeltaLakeHandoff.java`](examples/DeltaLakeHandoff.java) for the end-to-end story in comments.

#### Apache Spark (connector / consumer)

Spark is the usual **consumer** after Rust processing, or the **producer** that writes files Rust ingests. There is **no** `SparkSession` parameter on the FFI boundary.

**Runnable:** [`SparkParquetHandoffExample`](examples/SparkParquetHandoffExample.java) runs the platform pipeline and prints the `spark` sink (`handoff_uri` written by Rust).

**Rust → Spark (small/medium handoff)** — [`rdp_export_parquet_temp`](#temp-parquet-handoff-rdp_export_parquet_temp), module `rust-data-processing-jvm-spark` for Arrow/Parquet helpers:

```java
JSONObject envelope = RdpNativeJson.invokeExportParquetTemp(linker, lookup, arena, interchangeJson);
String path = envelope.getString("path");
SparkSession spark =
    SparkSession.builder().master("local[*]").appName("rdp-handoff").getOrCreate();
Dataset<Row> df = spark.read().parquet(path);
// delete temp file when done (see ParquetTempExportExample)
```

**Spark / Databricks → Rust (large tables)** — cluster writes Parquet; Java passes **local** absolutes into `rdp_ingest_parquet_path` or `rdp_ingest_ordered_paths_json` (same as [lake read handoff](#your-data-lake-your_lake)).

**Spark Snowflake connector** (example producer only):

```scala
// Runs in your Spark app — not shipped inside rdp_jvm_sys
val sfOptions = Map(
  "sfURL" -> "your_account.snowflakecomputing.com",
  "sfDatabase" -> "YOUR_DB",
  "sfSchema" -> "YOUR_SCHEMA",
  "sfWarehouse" -> "YOUR_WH"
)
spark.read.format("net.snowflake.spark.snowflake").options(sfOptions)
  .option("query", "select id, amount from ledger limit 1000000")
  .load()
  .write.parquet("/var/rdp/staging/snowflake_ledger.parquet")
```

Then ingest `/var/rdp/staging/snowflake_ledger.parquet` from Java as in the Snowflake subsection above.

### Summary for Java integrators

- **Credentials and URLs** belong in **JSON documents** (and **local path** lists) you send to `rdp_run_pipeline_json`, `rdp_ingest_*_path`, or `rdp_ingest_ordered_paths_json`—not in live database session objects on the FFI boundary.
- **Rust** performs reads/writes using library drivers; you avoid reimplementing Polars, Parquet, or SQL execution in Java.
- **Examples to copy:** file ingest → people/GHCN fixtures; pipeline URLs → [`RDPOnlyETLExample`](examples/RDPOnlyETLExample.java) + `tests/fixtures/student_etl/`; cloud platforms + object stores → [`PlatformConnectorsPipelineExample`](examples/PlatformConnectorsPipelineExample.java), [`ObjectStoreUrlsExample`](examples/ObjectStoreUrlsExample.java), [`SparkParquetHandoffExample`](examples/SparkParquetHandoffExample.java) + `tests/fixtures/cloud_connectors/`; [Connector cookbook](#connector-cookbook).

## Phase 2 examples {#phase-2-examples}

Counterpart to [`docs/python/PHASE2_EXAMPLES.md`](../python/PHASE2_EXAMPLES.md). Python often builds `rdp.DataSet(...)` in-process; JVM examples call **`rdp_parity_*`** (or file ingest) and read fields from `interchange`. Where FFI is missing, the class documents the Python API and the intended Rust function.

| § | Python topic | Example | How it runs on the JVM |
| --- | --- | --- | --- |
| 1 | JSONL + train/test indices | [`ExportJsonlTrainTest.java`](examples/ExportJsonlTrainTest.java) | `rdp_parity_export_privacy_reports` → `jsonl_preview_lines`, `train_test_indices_demo` |
| 2 | UTF-8 length row filter | [`ExportFilterRowsMaxUtf8Chars.java`](examples/ExportFilterRowsMaxUtf8Chars.java) | **Doc only** — `export_filter_rows_max_utf8_chars` not on FFI yet |
| 3 | Privacy diff after transform | [`PrivacyDiffReports.java`](examples/PrivacyDiffReports.java) | Same parity export → `privacy_report_json` |
| 4 | Truncate large UTF-8 text | [`ReportsTruncateUtf8.java`](examples/ReportsTruncateUtf8.java) | Same parity export → `reports_truncated_sample` |
| 5 | Utf8 masking `TransformSpec` | [`TransformUtf8Masking.java`](examples/TransformUtf8Masking.java) | `rdp_parity_transform` (rename/cast demo; masking steps in Python/Rust) |
| 6 | UTF-8 length validation | [`ValidationUtf8Length.java`](examples/ValidationUtf8Length.java) | `rdp_parity_validation` (summary envelope; `utf8_len_chars_between` in Python) |
| 7 | Median reduce / grouped median | [`MedianReduceAndDataFrame.java`](examples/MedianReduceAndDataFrame.java) | **Doc only** — see [`SQLQueries.java`](examples/SQLQueries.java) for Polars SQL on JVM |
| 8 | Delta / Iceberg handoff | [`DeltaLakeHandoff.java`](examples/DeltaLakeHandoff.java) | **Doc only** — lake read out-of-process; then Parquet ingest like `JsonParquetExcelSnippets` |
| 9 | Ingest → validate → JSONL | [`IngestValidateJsonlEndToEnd.java`](examples/IngestValidateJsonlEndToEnd.java) | `people.csv` ingest + validation + JSONL preview parity |
| 10 | Watermark / ordered paths | [`OrderedPaths.java`](examples/OrderedPaths.java) | (see [Incremental batches](#incremental-batches-watermarks-directory-scan) above) |

### DataFrame-centric Polars SQL (`DataFrameCentricPipeline`)

Python’s lazy `DataFrame.filter_eq(…).multiply_f64(…)` maps to **Polars SQL** on table `df` inside a pipeline. Fixture SQL filters `active = TRUE` and doubles `score`; the Parquet sink should have **2 rows**.

- **Schema:** `jvm_contract/schemas/three_rows.schema.json`
- **Pipeline:** `jvm_contract/pipelines/dataframe_centric_sql.pipeline.json`
- **Input:** `tests/fixtures/jvm_contract_three_rows.json`

### SQL queries (`SQLQueries`)

- **Single-table:** `sql_query_dataset.pipeline.json` with the same `three_rows` schema; SQL selects active rows ordered by `id DESC`.
- **JOIN:** SQL text in `sql_parity/queries/join_people_scores.sql.json`; side tables `sql_parity/data/join_left.json` and `join_right.json`. Multi-table `rdp_run_pipeline_json` is not on the JVM yet — the doc example runs the join via **`rdp_parity_sql_suite_mirror`** (same workload as Python `SqlContext` tests).

### GHCN JSON → XML → Parquet (`GhcnJsonXmlParquetPipeline`)

Demonstrates **three distinct schemas** on one small NOAA station sample (`ghcn/ghcn_stations_sample.json`):

1. **`json_to_xml.pipeline.json`** — renames columns (`id` → `stationCode`, …) and writes `xml_file`.
2. **`rdp_ingest_xml_path`** — verifies intermediate XML against `xml_intermediate.schema.json`.
3. **`xml_to_parquet.pipeline.json`** — maps to lake column names (`station_id`, `geo_lat`, …) and writes `parquet_file`.
4. **`rdp_ingest_parquet_path`** — verifies lake Parquet (5 rows; first `station_id` = `ACW00011604`).

No runtime download: the sample is committed under `tests/fixtures/ghcn/`.

### People CSV / JSON / Parquet / Excel (`JsonParquetExcelSnippets`, `ExcelSnippets`, `ParquetSnippets`)

- **Payload ingest:** `people/payloads/json_path_dataset.payload.json`, `csv_path_dataset.payload.json` → `rdp_ingest_ordered_paths_json`.
- **Path ingest:** `rdp_ingest_json_path` / `rdp_ingest_csv_path` with `people/schemas/*.schema.json` and `defaultPathIngestOptionsJson()`.
- **CSV → Parquet pipeline:** `people/pipelines/csv_to_parquet.pipeline.json` then verify with `people_flat.schema.json`.
- **Excel:** `excel_sheet_dataset.payload.json` with `{{SOURCE_PATH}}` → `tests/fixtures/people.xlsx` (generate via `python scripts/write_people_xlsx_stdlib.py` if missing).

### Incremental batches (watermarks, directory scan) {#incremental-batches-watermarks-directory-scan}

[`OrderedPaths.java`](examples/OrderedPaths.java) — Java lists files (glob) like Python `paths_from_directory_scan`; Rust ingests with watermark options from `watermark/payloads/csv_watermark_ingest.body.json`. [`PathFromDirectoryScan.java`](examples/PathFromDirectoryScan.java) delegates to the same logic for back-compat.

```json
{
  "schema_ref": "schemas/events.schema.json",
  "options": {
    "format": "csv",
    "watermark_column": "ts",
    "watermark_exclusive_above": 100
  },
  "response": { "mode": "dataset", "max_rows": 10000 }
}
```

Attach the scanned absolute paths as the `paths` array in the payload JSON before calling `rdp_ingest_ordered_paths_json`. Semantics are also covered by **`rdp_parity_watermark_mirror`**.

### Rust-only student ETL (`RDPOnlyETLExample`)

Shows how **database and lake connection strings live in pipeline JSON** (`postgresql_url`, `catalog_uri`, `warehouse`) while **ingest sources stay local files** — see [Your database](#your-database-your_database) and [Your data lake](#your-data-lake-your_lake). Loads [`legacy_student_etl_three_paths.pipeline.json`](https://github.com/scorpio-datalake/rust-data-processing/blob/main/tests/fixtures/student_etl/pipelines/legacy_student_etl_three_paths.pipeline.json) with `PATH_A..C` → `student_etl/data/part-0000*.json`, plus ordered ingest payloads. Execution stays in Rust; Java substitutes paths and calls `rdp_run_pipeline_json` / `rdp_ingest_ordered_paths_json`.

## Rust-first ETL vs JVM consumption {#rust-first-etl-vs-jvm-consumption}

**Default recommendation:** do as much work as possible **inside the Rust engine** (or via the Python extension calling Rust), and **persist** results where downstream systems already read efficiently — **Parquet**, **CSV**, **database** tables, object storage, etc. Avoid pulling large **`DataSet`** / Polars materializations back over the JVM FFI when you do not need them on the JVM.

**Why:** Parity exports that return tabular data today expose **`interchange.dataset`** as **JSON** (`schema` + `rows`). That is appropriate for **contracts, tests, demos, and small control-plane payloads**. It is **not** the right default for multi‑gigabyte pipelines: memory, GC, and parse cost grow with row count on both sides.

**This guidance applies to every export that materializes a full table in `interchange`**, including (non‑exhaustive):

- **`rdp_parity_types_dataset`**, **`rdp_parity_ingestion`**, **`rdp_parity_pipeline_sql`**, **`rdp_parity_transform`** — tabular **`dataset`**.
- **`rdp_parity_benchmark_smoke_mirror`** and similar smoke paths that include large-ish synthetic **`dataset`** payloads in JSON.

For **mirrors** that mostly return **metrics, counts, flags, or small JSON** (bindings, mapping spec, SQL suite, partition discovery, watermark, validation summaries, observability, etc.), JVM JSON is usually fine.

**When you do need data on the JVM** (local Spark `DataFrame`, unit tests, small extracts):

- Use **`interchange.dataset`** JSON only for **bounded** row counts you are willing to hold in the heap.
- Prefer **file-based handoff** when you add or use dedicated FFI: e.g. Rust writes **Parquet or CSV** to a path (temp or durable), returns a **small envelope** (`path`, `row_count`, `schema`, checksum), and Java or **Spark in `local[*]` mode** reads from that path — then delete the temp file if you created one.
- Longer term, **Arrow IPC / C Data Interface** (see [`ARROW_FFI_JVM.md`](ARROW_FFI_JVM.md)) is the direction for columnar, lower-copy bridges; until then, **Rust writes files → JVM reads files** is the scalable pattern outside a data lake too.

**Summary:** treat JVM parity calls as **orchestration, validation, and small-surface results**; treat **Rust (or Python + Rust) as the place that runs ETL and lands data** to Parquet, CSV, or a database unless you have a deliberate, size-bounded reason to materialize rows in the JVM.

### Temp Parquet handoff (`rdp_export_parquet_temp`) {#temp-parquet-handoff-rdp_export_parquet_temp}

Rust writes a **small sample** `DataSet` (two rows: id/name) to a file under the OS temp directory (`…/rdp_jvm_parquet/rdp_export_<nanos>.parquet`) and returns a **small JSON envelope** (`interchange.kind` = `parquet_export_temp`, `path`, `row_count`, `schema`). No giant `rows` array crosses the boundary.

From Java, call **`RdpNativeJson.invokeExportParquetTemp`**, then read the path with **Spark in `local[*]` mode** (or any Parquet reader), then delete the file. Helpers: **`io.github.scorpio_datalake.rust_data_processing.integration.RdpParquetTemp`**. Runnable: **`ParquetTempExportExample`**.

```java
import io.github.scorpio_datalake.rust_data_processing.ffi.RdpNativeJson;
import io.github.scorpio_datalake.rust_data_processing.integration.RdpParquetTemp;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.json.JSONObject;

JSONObject root = RdpNativeJson.invokeExportParquetTemp(linker, lookup, arena);
String path = RdpParquetTemp.parquetPath(root);
SparkSession spark = SparkSession.builder().master("local[*]").appName("rdp").getOrCreate();
Dataset<Row> df = spark.read().parquet(path);
// … use df …
RdpParquetTemp.deleteQuietly(path);
```

The crate also exposes **`rust_data_processing::ingestion::export_dataset_to_parquet`** for Rust callers who write their own paths.

## Prerequisites

- **JDK 21+** (Panama FFM). Some builds use preview features on newer JDKs; see the module `pom.xml`.
- **Artifact:** `io.github.scorpio-datalake.rust-data-processing:rust-data-processing-jvm` (same version as [`bindings/java/VERSION`](https://github.com/scorpio-datalake/rust-data-processing/blob/main/bindings/java/VERSION)).
- **Native library:** build `rdp_jvm_sys` from [`bindings/jvm-sys/`](https://github.com/scorpio-datalake/rust-data-processing/tree/main/bindings/jvm-sys) (CI: `cargo build --release --manifest-path bindings/jvm-sys/Cargo.toml --features full`). Point Java at the absolute path:
  - Environment variable **`RDP_JVM_SYS`**, or
  - System property **`-Drdp.jvm.sys.library=…`**
- **JVM flag:** `--enable-native-access=ALL-UNNAMED` (or a tighter module policy if you wire one).

**Running an example `main`:**

```bash
# From repository root (Linux/macOS; adjust .so / .dll / .dylib)
cargo build --release --manifest-path bindings/jvm-sys/Cargo.toml --features full
export RDP_JVM_SYS=$PWD/bindings/jvm-sys/target/release/librdp_jvm_sys.so
export JAVA_TOOL_OPTIONS='--enable-native-access=ALL-UNNAMED'
# Compile/run from your module that depends on rust-data-processing-jvm, with docs/java/examples/MyExample.java on the classpath
```

**JUnit (maintainers):** [`DocsExampleNativeIntegrationTest`](https://github.com/scorpio-datalake/rust-data-processing/blob/main/bindings/java/rust-data-processing-jvm/src/test/java/io/github/scorpio_datalake/rust_data_processing/docexamples/DocsExampleNativeIntegrationTest.java) exercises most catalog examples when `RDP_JVM_SYS` is set; [`FfiExportedSymbolsContractTest`](https://github.com/scorpio-datalake/rust-data-processing/blob/main/bindings/java/rust-data-processing-jvm/src/test/java/io/github/scorpio_datalake/rust_data_processing/FfiExportedSymbolsContractTest.java) covers manifest-wide symbol smoke separately.

## FFI manifest and ABI

The JAR bundles **`ffi_manifest.json`** (same content as [`bindings/jvm-sys/ffi_manifest.json`](https://github.com/scorpio-datalake/rust-data-processing/blob/main/bindings/jvm-sys/ffi_manifest.json)). Read it from the classpath to list **`exported_symbols`** and compare **`abi_version_constant`** with a live **`rdp_ffi_abi_version`** downcall.

```java
import io.github.scorpio_datalake.rust_data_processing.ffi.RdpNativeJson;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import org.json.JSONObject;

try (InputStream in = RdpNativeJson.class.getResourceAsStream(RdpNativeJson.FFI_MANIFEST_RESOURCE)) {
  JSONObject manifest = new JSONObject(new String(in.readAllBytes(), StandardCharsets.UTF_8));
  int abiFromJar = manifest.getInt("abi_version_constant");
  // … compare with RdpNativeJson.invokeAbiVersion(linker, lookup) when native lib is loaded
}
```

Runnable: **`io.github.scorpio_datalake.rust_data_processing.examples.LoadFfiManifestExample`**.

## Calling `rdp_parity_*` from Java

Parity exports take no Java arguments: Rust builds the scenario (fixtures, options), runs the engine, and writes a JSON envelope into an **`RdpJsonSlice`**. Use **`RdpNativeJson.invokeParityExport`** so the slice is freed correctly.

```java
import io.github.scorpio_datalake.rust_data_processing.ffi.RdpNativeJson;
import io.github.scorpio_datalake.rust_data_processing.scenario.PytestMirrorAssertions;
import java.lang.foreign.Arena;
import java.lang.foreign.Linker;
import java.lang.foreign.SymbolLookup;
import java.nio.file.Path;
import org.json.JSONObject;

Path lib = Path.of(System.getenv("RDP_JVM_SYS")); // or System.getProperty("rdp.jvm.sys.library")
Linker linker = Linker.nativeLinker();
try (Arena arena = Arena.ofConfined()) {
  SymbolLookup lookup = SymbolLookup.libraryLookup(lib, arena);
  JSONObject root =
      RdpNativeJson.invokeParityExport(linker, lookup, arena, "rdp_parity_bindings_mirror");
  PytestMirrorAssertions.validateMirrorExport("rdp_parity_bindings_mirror", root);
  JSONObject interchange = root.getJSONObject("interchange");
  // read fields …
}
```

Runnable: **`io.github.scorpio_datalake.rust_data_processing.examples.RunPytestMirrorExample`** — pass the export name as the only CLI argument.

For **how large `dataset` JSON fits into a production architecture**, see [Rust-first ETL vs JVM consumption](#rust-first-etl-vs-jvm-consumption).

## File ETL (ingestion and tabular JSON)

Python uses **`ingest_from_path`** and **`DataSet`** directly. On the JVM, see:

- **`rdp_parity_ingestion`** — CSV / ingestion path exercised in Rust; `interchange` includes tabular **`dataset`** (`schema` + `rows`) for Java-side projection.
- **`rdp_parity_types_dataset`** — tabular JSON shape for typed datasets.

Validate the envelope and `kind` the same way as in the runnable examples (see class Javadoc on [`JsonParquetExcelSnippets.java`](examples/JsonParquetExcelSnippets.java) and parity helpers in [`FFI_MANIFEST_JAVA_USAGE.md`](FFI_MANIFEST_JAVA_USAGE.md)).

For copy-pasteable classes with committed fixtures, see the [example catalog](#example-catalog). Parity-only sketches: [`rdp_parity_ingestion`](#file-etl-ingestion-and-tabular-json), [`rdp_parity_types_dataset`](#file-etl-ingestion-and-tabular-json).

<h2 id="ordered-paths-and-directory-scans-incremental-batches">Ordered paths and directory scans (incremental batches)</h2>

In Python, incremental batch patterns use **`paths_from_directory_scan`**, **`ingest_from_ordered_paths`**, watermark options, and Hive-style layout helpers — see [the same heading in the Python examples](../python/examples.html#ordered-paths-and-directory-scans-incremental-batches) on this site and [`docs/python/PHASE2_EXAMPLES.md`](https://github.com/scorpio-datalake/rust-data-processing/blob/main/docs/python/PHASE2_EXAMPLES.md) § 10.

**On the JVM today** there are no Java methods with those names; the same **Rust capabilities** are covered by parity exports you call over FFI:

### Hive-style partition discovery

- **Export:** `rdp_parity_partition_discovery_mirror`
- **Role:** Mirrors pytest coverage for **`discover_hive_partitioned_files`**, globs, explicit path lists, and **`parse_partition_segment`**. The JSON `interchange` includes fields such as `discover_all_len`, `discover_events_glob_len`, `reject_non_directory_ok`, `parse_dt`, `parse_nodash_is_null`, etc. (see **`PytestMirrorAssertions.assertPartitionDiscoveryMirror`**).

```java
JSONObject root =
    RdpNativeJson.invokeParityExport(
        linker, lookup, arena, "rdp_parity_partition_discovery_mirror");
PytestMirrorAssertions.validateMirrorExport(
    "rdp_parity_partition_discovery_mirror", root);
```

### Watermark and ordered ingestion semantics

- **Export:** `rdp_parity_watermark_mirror`
- **Role:** Exercises watermark options on fixture CSV/JSON (e.g. `watermark_column`, `watermark_exclusive_above`), row counts, and rejection of incomplete watermark options — aligned with Python incremental / watermark tests.

```java
JSONObject root =
    RdpNativeJson.invokeParityExport(linker, lookup, arena, "rdp_parity_watermark_mirror");
PytestMirrorAssertions.validateMirrorExport("rdp_parity_watermark_mirror", root);
```

**Practical integration pattern:** implement directory listing, ordered file batches, and checkpoint storage **in Java** (or your orchestrator), and keep **heavy ingestion / transforms** in **Rust** (or Python calling Rust), **writing outputs to files or a database** rather than streaming huge tables as JSON into the JVM. Use parity exports to **prove behavior** and for **small extracts**; see [Rust-first ETL vs JVM consumption](#rust-first-etl-vs-jvm-consumption).

## SQL and DataFrame parity

- **`rdp_parity_pipeline_sql`** — lazy SQL / Polars pipeline; `interchange` includes **`dataset`**.
- **`rdp_parity_sql_suite_mirror`** — multi-query SQL mirror (joins, errors); use **`validateMirrorExport`**.

For production-sized Polars outputs, prefer **Rust-side** materialization to **Parquet / CSV / DB** and only use JVM JSON **`dataset`** when the result set is intentionally small; see [Rust-first ETL vs JVM consumption](#rust-first-etl-vs-jvm-consumption).

## Transform and mapping spec

- **`rdp_parity_transform`** — `TransformSpec` / Polars transform parity; includes **`dataset`** in `interchange`.
- **`rdp_parity_mapping_spec_mirror`** — mapping DSL mirror; **`validateMirrorExport`**.

## Processing and execution

- **`rdp_parity_processing`** — filter / map / reduce style parity (`filtered_row_count`, `mapped_row_count`, …).
- **`rdp_parity_benchmark_smoke_mirror`** — wide dataset, processing, DataFrame group-by, parallel filter (same workload spirit as `python-wrapper/tests/test_benchmarks.py`), returned as JSON stats.

## Observability

- **`rdp_parity_observability_mirror`** — ingestion failure / missing path behavior surfaced as JSON.

## Runnable walkthrough class

**`io.github.scorpio_datalake.rust_data_processing.examples.ParityScenariosWalkthrough`** runs a curated list of exports (including types, bindings, mapping, transform, processing, SQL, validation, benchmark smoke) and prints short summaries — useful to **see** several `interchange` shapes in one JVM process.

```bash
export RDP_JVM_SYS=/absolute/path/to/librdp_jvm_sys.so
export JAVA_TOOL_OPTIONS='--enable-native-access=ALL-UNNAMED'
java -cp "rust-data-processing-jvm-examples-…jar:rust-data-processing-jvm-…jar" \
  io.github.scorpio_datalake.rust_data_processing.examples.ParityScenariosWalkthrough
```

(Exact `-cp` lines: [`bindings/java/rust-data-processing-jvm-examples/README.md`](https://github.com/scorpio-datalake/rust-data-processing/blob/main/bindings/java/rust-data-processing-jvm-examples/README.md).)

## See also

- [`FFI_MANIFEST_JAVA_USAGE.md`](https://github.com/scorpio-datalake/rust-data-processing/blob/main/docs/java/FFI_MANIFEST_JAVA_USAGE.md) — Maven, `java -cp`, manifest resource path.
- [`README.md` (JVM doc index)](https://github.com/scorpio-datalake/rust-data-processing/blob/main/docs/java/README.md) — Phase 3 links.
- [Python examples (`../python/examples.html`)](../python/examples.html) — full API tour on the same docs site.
- [`ARROW_FFI_JVM.md`](https://github.com/scorpio-datalake/rust-data-processing/blob/main/docs/java/ARROW_FFI_JVM.md) — future Arrow IPC direction.