hyperi-rustlib 2.8.6

There's plenty of sage advice out there about how to run Rust services in production at scale — config cascades, structured logging, masking secrets, multi-backend secrets management, Prometheus, OpenTelemetry, Kafka transports, tiered disk-spillover sinks, adaptive worker pools, graceful shutdown — but almost none of it as code you can just install and use. This is that code. Opinionated, drop-in, working out of the box. The patterns from blog posts, watercooler chats and beers with your Google mates as actual library — not a framework you assemble from twenty crates and 8 weeks of munging.
Documentation
// Project:   hyperi-rustlib
// File:      src/transport/vector_compat/source.rs
// Purpose:   Vector gRPC source compatibility wrapper
// Language:  Rust
//
// License:   BUSL-1.1
// Copyright: (c) 2026 HYPERI PTY LIMITED

//! Vector-compatible gRPC source.
//!
//! Implements the `vector.Vector` gRPC service so that legacy Vector sinks
//! can push events to a DFE service. Incoming `EventWrapper` messages are
//! converted to JSON and fed into the same receive channel as native DFE traffic.

use super::convert::event_wrapper_to_json;
use super::proto::vector;
use crate::transport::grpc::GrpcToken;
use crate::transport::types::{Message, PayloadFormat};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::sync::mpsc;
use tonic::{Request, Response, Status};

/// gRPC service that accepts `PushEvents` RPCs from Vector sinks.
///
/// Converts Vector's protobuf events to JSON and forwards them into
/// the transport's receive channel alongside native DFE messages.
pub struct VectorCompatService {
    sender: mpsc::Sender<Message<GrpcToken>>,
    sequence: Arc<AtomicU64>,
}

impl VectorCompatService {
    /// Create a new Vector compat service.
    ///
    /// Uses the same sender/sequence as the DFE transport server so
    /// both native and Vector-compat events arrive in the same channel.
    pub fn new(sender: mpsc::Sender<Message<GrpcToken>>, sequence: Arc<AtomicU64>) -> Self {
        Self { sender, sequence }
    }
}

#[tonic::async_trait]
impl vector::vector_server::Vector for VectorCompatService {
    async fn push_events(
        &self,
        request: Request<vector::PushEventsRequest>,
    ) -> Result<Response<vector::PushEventsResponse>, Status> {
        let req = request.into_inner();

        for event_wrapper in &req.events {
            // Convert Vector event to JSON (skip metrics)
            let Some(json_value) = event_wrapper_to_json(event_wrapper) else {
                continue;
            };

            let payload = serde_json::to_vec(&json_value)
                .map_err(|e| Status::internal(format!("json serialise failed: {e}")))?;

            let seq = self.sequence.fetch_add(1, Ordering::Relaxed);

            // `payload` is `Vec<u8>` from `serde_json::to_vec`; move into `Bytes`.
            let msg = Message {
                key: None, // Vector events don't carry a topic key
                payload: payload.into(),
                token: GrpcToken::new(seq),
                timestamp_ms: None,
                format: PayloadFormat::Json,
            };

            self.sender
                .send(msg)
                .await
                .map_err(|_| Status::unavailable("receiver buffer full"))?;
        }

        Ok(Response::new(vector::PushEventsResponse {}))
    }

    async fn health_check(
        &self,
        _request: Request<vector::HealthCheckRequest>,
    ) -> Result<Response<vector::HealthCheckResponse>, Status> {
        Ok(Response::new(vector::HealthCheckResponse {
            status: vector::ServingStatus::Serving.into(),
        }))
    }
}