1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
//! gRPC → HTTP — full builder showcase for both connectors.
//!
//! gRPC source uses a request body, Bearer-token metadata auth, TLS, and a
//! records-path. The HTTP sink exercises:
//!
//! - `.method(...)` — POST/PUT/PATCH/etc. (default POST)
//! - `.headers(...)` — extra headers
//! - `.auth(...)` — Bearer / Basic / Custom (default None)
//! - `.batch_mode(...)` — `Individual` (one POST per record) or `Array`
//! - `.max_retries(...)` / `.concurrency(...)` — throughput tuning
//! - `.with_batch_size(...)` — records per outbound POST in `Array` mode
//! (`0` = forward each upstream `StreamPage` as a single POST)
//!
//! Query params aren't a separate knob — bake them into the URL.
//! The request body is always the source record(s).
//!
//! Run:
//! ```bash
//! cargo run -p faucet-stream --example grpc_to_http \
//! --features "source-grpc sink-http"
//! ```
use faucet_stream::sink::http::{HttpBatchMode, HttpSink, HttpSinkAuth, HttpSinkConfig};
use faucet_stream::source::grpc::{GrpcAuth, GrpcStream, GrpcStreamConfig};
use faucet_stream::{Pipeline, json};
use reqwest::header::{HeaderMap, HeaderValue};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let source = GrpcStream::new(
GrpcStreamConfig::new(
"https://grpc.example.com:443",
"metrics.MetricsService",
"ListMetrics",
"proto/metrics.bin",
)
.request(json!({ "window": "1h" }))
.auth(GrpcAuth::Bearer {
token: std::env::var("GRPC_TOKEN")?,
})
.tls(true)
.records_path("$.metrics[*]"),
)?;
let mut headers = HeaderMap::new();
headers.insert("X-Source", HeaderValue::from_static("faucet-stream"));
headers.insert("Content-Type", HeaderValue::from_static("application/json"));
let sink = HttpSink::new(
HttpSinkConfig::new("https://ingest.example.com/v1/events?tenant=acme")
.method(reqwest::Method::POST)
.auth(HttpSinkAuth::Bearer {
token: std::env::var("INGEST_TOKEN")?,
})
.headers(headers)
.batch_mode(HttpBatchMode::Array)
.max_retries(3)
.concurrency(8)
.with_batch_size(500),
);
let result = Pipeline::new(&source, &sink).run().await?;
println!(
"forwarded {} records to ingest endpoint",
result.records_written
);
Ok(())
}