use super::convert::event_wrapper_to_json;
use super::proto::vector;
use crate::transport::grpc::GrpcToken;
use crate::transport::types::{Message, PayloadFormat};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::sync::mpsc;
use tonic::{Request, Response, Status};
pub struct VectorCompatService {
sender: mpsc::Sender<Message<GrpcToken>>,
sequence: Arc<AtomicU64>,
}
impl VectorCompatService {
pub fn new(sender: mpsc::Sender<Message<GrpcToken>>, sequence: Arc<AtomicU64>) -> Self {
Self { sender, sequence }
}
}
#[tonic::async_trait]
impl vector::vector_server::Vector for VectorCompatService {
async fn push_events(
&self,
request: Request<vector::PushEventsRequest>,
) -> Result<Response<vector::PushEventsResponse>, Status> {
let req = request.into_inner();
for event_wrapper in &req.events {
let Some(json_value) = event_wrapper_to_json(event_wrapper) else {
continue;
};
let payload = serde_json::to_vec(&json_value)
.map_err(|e| Status::internal(format!("json serialise failed: {e}")))?;
let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
let msg = Message {
key: None, payload,
token: GrpcToken::new(seq),
timestamp_ms: None,
format: PayloadFormat::Json,
};
self.sender
.send(msg)
.await
.map_err(|_| Status::unavailable("receiver buffer full"))?;
}
Ok(Response::new(vector::PushEventsResponse {}))
}
async fn health_check(
&self,
_request: Request<vector::HealthCheckRequest>,
) -> Result<Response<vector::HealthCheckResponse>, Status> {
Ok(Response::new(vector::HealthCheckResponse {
status: vector::ServingStatus::Serving.into(),
}))
}
}