use std::time::Duration;
use serde::{Deserialize, Serialize};
use tonic::{
Request, Status,
metadata::MetadataValue,
service::{Interceptor, interceptor::InterceptedService},
transport::Channel,
};
use tracing::info;
use crate::{
error::{ChapatyResult, TransportError},
generated::chapaty::bq_exporter::v1::exporter_service_client::ExporterServiceClient,
impl_from_primitive,
};
pub(super) type ChapatyClient =
ExporterServiceClient<InterceptedService<Channel, ApiKeyInterceptor>>;
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct Url(pub String);
impl_from_primitive!(Url, String);
impl From<&str> for Url {
fn from(value: &str) -> Self {
Url(value.to_string())
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct ApiKey(pub String);
impl_from_primitive!(ApiKey, String);
impl From<&str> for ApiKey {
fn from(value: &str) -> Self {
ApiKey(value.to_string())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SourceGroup<T> {
pub source: DataSource,
pub items: Vec<T>,
}
impl<T> SourceGroup<T> {
pub fn new(source: DataSource) -> Self {
Self {
source,
items: Vec::new(),
}
}
pub fn add(&mut self, item: T) {
self.items.push(item);
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
pub enum DataSource {
#[default]
Chapaty,
Rpc {
endpoint: Url,
api_key: Option<ApiKey>,
},
}
impl DataSource {
#[tracing::instrument(skip(self), fields(endpoint), err)]
pub(crate) async fn connect(&self) -> ChapatyResult<ChapatyClient> {
let (endpoint, api_key) = self.resolve_connection_params();
info!(%endpoint, has_api_key = api_key.is_some(), "Establishing gRPC connection");
let channel = Channel::from_shared(endpoint.clone())
.map_err(|_| TransportError::Connection("Invalid URI".into()))?
.http2_keep_alive_interval(Duration::from_secs(30))
.keep_alive_timeout(Duration::from_secs(10))
.keep_alive_while_idle(true)
.timeout(Duration::from_secs(600))
.tcp_keepalive(Some(Duration::from_secs(60)))
.connect_timeout(Duration::from_secs(30))
.initial_connection_window_size(Some(1024 * 1024)) .initial_stream_window_size(Some(1024 * 1024)) .connect()
.await
.map_err(|e| TransportError::Connection(e.to_string()))?;
let interceptor = ApiKeyInterceptor::new(api_key);
let client = ExporterServiceClient::with_interceptor(channel, interceptor);
info!(%endpoint, "gRPC connection established with long-running configuration");
Ok(client)
}
}
impl DataSource {
fn resolve_connection_params(&self) -> (String, Option<ApiKey>) {
match self {
Self::Chapaty => {
let endpoint = "grpc.chapaty.com".to_string();
let api_key = std::env::var("CHAPATY_API_KEY").ok().map(ApiKey);
(endpoint, api_key)
}
Self::Rpc { endpoint, api_key } => (endpoint.0.clone(), api_key.clone()),
}
}
}
#[derive(Clone)]
pub(crate) struct ApiKeyInterceptor {
api_key: Option<MetadataValue<tonic::metadata::Ascii>>,
}
impl ApiKeyInterceptor {
fn new(api_key: Option<ApiKey>) -> Self {
let metadata_value = api_key.map(|key| {
key.0
.parse()
.expect("API key contains invalid characters for metadata")
});
Self {
api_key: metadata_value,
}
}
}
impl Interceptor for ApiKeyInterceptor {
fn call(&mut self, mut req: Request<()>) -> Result<Request<()>, Status> {
if let Some(key) = &self.api_key {
req.metadata_mut().insert("api-key", key.clone());
}
Ok(req)
}
}