vgi-rpc 0.2.0

Transport-agnostic RPC framework built on Apache Arrow IPC
Documentation

vgi-rpc

crates.io docs.rs

Rust library for the vgi-rpc transport-agnostic RPC framework built on Apache Arrow IPC. Byte-for-byte wire-compatible with the Python canonical implementation and the Go port — this crate passes the complete 868-test Python conformance suite across pipe, subprocess, HTTP, Unix-socket, externalised-upload, and shared-memory pipe transports.

Stock arrow-rs 58.x dependency tree, MSRV 1.86, no [patch.crates-io] required.

Highlights

  • Server-side dispatch for unary, producer, and exchange stream methods over stdio / Unix socket / HTTP / shared memory.
  • Introspection via the built-in __describe__ method (DESCRIBE_VERSION = "4", slim wire format plus protocol_hash).
  • HTTP surface — axum-backed server with HMAC-signed stateless stream-state tokens, CORS + preflight, zstd request/response compression, configurable URL prefix, landing / describe / health pages, RFC 9728 Protected Resource Metadata, configurable response / externalised-response caps.
  • Auth — bearer (constant-time compare), mTLS via RFC 8705 x-forwarded-client-cert, OAuth 2 Protected Resource Metadata; JWKS-backed JWT (single-flight refresh) and OAuth 2 PKCE primitives behind Cargo features.
  • ObservabilityDispatchHook + CallStatistics, schema-validated structured access logs, real tracing::Span emission for tracing-opentelemetry, two-tier Sentry integration (tracing-only or full SDK).
  • LifecycleTransportKind + on_serve_start hook so workers can tailor startup work to the bound transport (pipe / HTTP / unix / shared-memory).
  • External locations — transparent upload of oversized batches to pluggable ExternalStorage backends (vgi-rpc-s3 / vgi-rpc-gcs), SHA-256 integrity, optional zstd, post-decompression size cap.
  • DoS-resilience by construction — wire reader rejects oversized IPC schema length prefixes (MAX_IPC_SCHEMA_BYTES = 16 MiB) and per-message flatbuffer bodyLength (MAX_IPC_MESSAGE_BYTES = 256 MiB) before allocating, blocking the trivial 4-byte and 50-byte OOM payloads fuzz/wire_stream_reader discovered.
  • Graceful shutdown on SIGTERM / SIGINT for both HTTP and Unix listeners.

Cargo features

feature default enables
http Axum HTTP server + external-location helpers + stream-state tokens.
macros #[service], #[unary], #[producer], #[exchange], #[derive(VgiArrow)], #[derive(StreamState)].
jwt JwtConfig / Jwks types only — bring your own verifier.
jwt-jsonwebtoken Bundled JWT verifier (jsonwebtoken + reqwest JWKS fetcher).
oauth-pkce auth::pkce cookie + verifier crypto primitives.
oauth-pkce-server OAuth 2 PKCE server-side redirect / token-exchange (reqwest).
mtls-pem PEM-cert parsing helpers (x509-parser). XFCC parsing is always on.
otel OtelHook — real tracing::Span per RPC for tracing-opentelemetry.
sentry-tracing TracingSentryHook — lightweight, no extra deps.
sentry-sdk SentrySdkHook — full Sentry SDK integration (sentry 0.46).
shm POSIX shared-memory transport (zero-copy producer / exchange).
test-utils external::InMemoryStorage fixture for downstream tests.

Minimal example

use std::sync::Arc;
use arrow_schema::{DataType, Field, Schema};
use arrow_array::{RecordBatch, StringArray};
use vgi_rpc::{MethodInfo, RpcServer};
use vgi_rpc::http::HttpState;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let params_schema: Arc<Schema> =
        Arc::new(Schema::new(vec![Field::new("value", DataType::Utf8, false)]));
    let result_schema: Arc<Schema> =
        Arc::new(Schema::new(vec![Field::new("result", DataType::Utf8, false)]));

    let mut srv = RpcServer::builder()
        .protocol_name("Echo")
        .enable_describe(true)
        .build();

    let rs = result_schema.clone();
    srv.register(
        MethodInfo::unary("echo", params_schema, result_schema.clone(), move |req, _ctx| {
            let col = req.column("value").unwrap()
                .as_any().downcast_ref::<StringArray>().unwrap();
            Ok(Some(RecordBatch::try_new(
                rs.clone(),
                vec![Arc::new(StringArray::from(vec![col.value(0).to_string()]))],
            )?))
        })
        .doc("Echo a string")
        .param_type("value", "str"),
    );

    let state = HttpState::builder()
        .server(Arc::new(srv))
        .cors_origins("*")
        .response_compression_level(3)
        .build();
    let app = vgi_rpc::http::build_router(state);
    let listener = tokio::net::TcpListener::bind("127.0.0.1:8080").await?;
    axum::serve(listener, app).await?;
    Ok(())
}

Public surface cheat sheet

use vgi_rpc::{
    // Core
    RpcServer, RpcServerBuilder, MethodInfo, MethodType, CallContext, Request,
    AuthContext, AuthRequest, Authenticate, chain_authenticate, chain_all,
    OAuthResourceMetadata,
    DispatchHook, DispatchInfo, CallStatistics, ChainHook, SharedHook,
    AccessLogHook, RetryConfig,
    LogLevel, LogMessage, RpcError, Result,
    DESCRIBE_METHOD_NAME, DESCRIBE_VERSION,
    // Transport lifecycle
    TransportKind, TransportCapabilities, ServeStartHook,
    // Stream state (for hand-written stream handlers; macros generate these for you)
    ExchangeState, OutputCollector, ProducerState, StreamResult,
};

// Feature-gated:
use vgi_rpc::auth::bearer::bearer_authenticate_static;      // always on
use vgi_rpc::auth::mtls::mtls_authenticate_fingerprint;     // always on
use vgi_rpc::http::{HttpState, HttpStateBuilder};            // "http"
use vgi_rpc::external::{ExternalLocationConfig, Compression}; // "http"
// use vgi_rpc::auth::jwt::jwt_authenticate;                 // "jwt"
// use vgi_rpc::auth::pkce::generate_pkce_pair;               // "oauth-pkce"
// use vgi_rpc::{OtelConfig, OtelHook, OtelMetrics};         // "otel"
// use vgi_rpc::{TracingSentryConfig, TracingSentryHook};    // "sentry-tracing"
// use vgi_rpc::{SentrySdkConfig, SentrySdkHook};            // "sentry-sdk"

Macros (default-on macros feature)

use std::sync::Arc;
use vgi_rpc::{service, unary, RpcServer, Result};

pub struct Calc;

#[service]
impl Calc {
    /// Echo a string back, prefixed.
    #[unary]
    fn echo(&self, value: String) -> Result<String> {
        Ok(format!("echo: {value}"))
    }
}

let mut srv = RpcServer::builder().protocol_name("Calc").build();
Calc::register_with(&mut srv, Arc::new(Calc));

#[derive(VgiArrow)] derives a struct's Arrow Struct<fields> data type for any field that itself implements VgiArrow (mirroring Python's ArrowSerializableDataclass). #[derive(StreamState)] auto-impls StreamStateCodec (bincode) on stream-state types.

Interoperability notes

The wire protocol matches Python's vgi_rpc canonical:

  • Pointer-batch schema is empty; location metadata is the payload.
  • __describe__ version is "4"; method_type collapses Producer / Exchange / Dynamic into "stream". The describe response carries a protocol_hash SHA-256 digest computed identically to Python's algorithm.
  • Access log records carry logger: "vgi_rpc.access" and validate cleanly against Python's vgi_rpc.access_log_conformance JSON Schema. request_data is truncated at INFO level (replaced with original_request_bytes + truncated: true) — opt in via AccessLogHook::with_verbose(true) to get full payloads.
  • HTTP stream-state tokens are HMAC-SHA256-signed (token format v3, identical byte layout to Python's _state_token.py); set an explicit signing key with HttpStateBuilder::signing_key_* so tokens survive across worker restarts and load balancing.
  • Pyarrow's default nullability (list inner / map values / dynamic schema fields = nullable=true) is honored.

License

Apache-2.0.