use std::fmt;
use std::net::SocketAddr;
use std::sync::Arc;
use futures::{StreamExt, TryStreamExt};
use opentelemetry::global;
use opentelemetry::propagation::Injector;
use quickwit_proto::{tonic, LeafSearchStreamResponse};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tonic::transport::Channel;
use tonic::Request;
use tracing::*;
use tracing_opentelemetry::OpenTelemetrySpanExt;
use crate::error::parse_grpc_error;
use crate::SearchService;
struct MetadataMap<'a>(&'a mut tonic::metadata::MetadataMap);
impl<'a> Injector for MetadataMap<'a> {
fn set(&mut self, key: &str, value: String) {
if let Ok(metadata_key) = tonic::metadata::MetadataKey::from_bytes(key.as_bytes()) {
if let Ok(metadata_value) = tonic::metadata::MetadataValue::try_from(&value) {
self.0.insert(metadata_key, metadata_value);
}
}
}
}
#[derive(Clone)]
enum SearchServiceClientImpl {
Local(Arc<dyn SearchService>),
Grpc(quickwit_proto::search_service_client::SearchServiceClient<Channel>),
}
#[derive(Clone)]
pub struct SearchServiceClient {
client_impl: SearchServiceClientImpl,
grpc_addr: SocketAddr,
}
impl fmt::Debug for SearchServiceClient {
fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
match &self.client_impl {
SearchServiceClientImpl::Local(_service) => {
write!(formatter, "Local({:?})", self.grpc_addr)
}
SearchServiceClientImpl::Grpc(_grpc_client) => {
write!(formatter, "Grpc({:?})", self.grpc_addr)
}
}
}
}
impl SearchServiceClient {
pub fn from_grpc_client(
client: quickwit_proto::search_service_client::SearchServiceClient<Channel>,
grpc_addr: SocketAddr,
) -> Self {
SearchServiceClient {
client_impl: SearchServiceClientImpl::Grpc(client),
grpc_addr,
}
}
pub fn from_service(service: Arc<dyn SearchService>, grpc_addr: SocketAddr) -> Self {
SearchServiceClient {
client_impl: SearchServiceClientImpl::Local(service),
grpc_addr,
}
}
pub fn grpc_addr(&self) -> SocketAddr {
self.grpc_addr
}
pub async fn root_search(
&mut self,
request: quickwit_proto::SearchRequest,
) -> crate::Result<quickwit_proto::SearchResponse> {
match &mut self.client_impl {
SearchServiceClientImpl::Grpc(grpc_client) => {
let tonic_request = Request::new(request);
let tonic_response = grpc_client
.root_search(tonic_request)
.await
.map_err(|tonic_error| parse_grpc_error(&tonic_error))?;
Ok(tonic_response.into_inner())
}
SearchServiceClientImpl::Local(service) => service.root_search(request).await,
}
}
pub async fn leaf_search(
&mut self,
request: quickwit_proto::LeafSearchRequest,
) -> crate::Result<quickwit_proto::LeafSearchResponse> {
match &mut self.client_impl {
SearchServiceClientImpl::Grpc(grpc_client) => {
let mut tonic_request = Request::new(request);
global::get_text_map_propagator(|propagator| {
propagator.inject_context(
&tracing::Span::current().context(),
&mut MetadataMap(tonic_request.metadata_mut()),
)
});
let tonic_response = grpc_client
.leaf_search(tonic_request)
.await
.map_err(|tonic_error| parse_grpc_error(&tonic_error))?;
Ok(tonic_response.into_inner())
}
SearchServiceClientImpl::Local(service) => service.leaf_search(request).await,
}
}
pub async fn leaf_search_stream(
&mut self,
request: quickwit_proto::LeafSearchStreamRequest,
) -> UnboundedReceiverStream<crate::Result<LeafSearchStreamResponse>> {
match &mut self.client_impl {
SearchServiceClientImpl::Grpc(grpc_client) => {
let mut grpc_client_clone = grpc_client.clone();
let span = info_span!(
"client:leaf_search_stream",
grpc_addr=?self.grpc_addr()
);
let mut tonic_request = Request::new(request);
global::get_text_map_propagator(|propagator| {
propagator.inject_context(
&tracing::Span::current().context(),
&mut MetadataMap(tonic_request.metadata_mut()),
)
});
let (result_sender, result_receiver) = tokio::sync::mpsc::unbounded_channel();
tokio::spawn(
async move {
let tonic_result = grpc_client_clone
.leaf_search_stream(tonic_request)
.await
.map_err(|tonic_error| parse_grpc_error(&tonic_error));
if let Err(error) = tonic_result {
let _ = result_sender.send(Err(error));
return;
}
let mut results_stream = tonic_result
.unwrap()
.into_inner()
.map_err(|tonic_error| parse_grpc_error(&tonic_error));
while let Some(search_result) = results_stream.next().await {
let send_result = result_sender.send(search_result);
if send_result.is_err() {
break;
}
}
}
.instrument(span),
);
UnboundedReceiverStream::new(result_receiver)
}
SearchServiceClientImpl::Local(service) => {
let stream_result = service.leaf_search_stream(request).await;
stream_result.unwrap_or_else(|error| {
let (result_sender, result_receiver) = tokio::sync::mpsc::unbounded_channel();
let _ = result_sender.send(Err(error));
UnboundedReceiverStream::new(result_receiver)
})
}
}
}
pub async fn fetch_docs(
&mut self,
request: quickwit_proto::FetchDocsRequest,
) -> crate::Result<quickwit_proto::FetchDocsResponse> {
match &mut self.client_impl {
SearchServiceClientImpl::Grpc(grpc_client) => {
let mut tonic_request = Request::new(request);
global::get_text_map_propagator(|propagator| {
propagator.inject_context(
&tracing::Span::current().context(),
&mut MetadataMap(tonic_request.metadata_mut()),
)
});
let tonic_response = grpc_client
.fetch_docs(tonic_request)
.await
.map_err(|tonic_error| parse_grpc_error(&tonic_error))?;
Ok(tonic_response.into_inner())
}
SearchServiceClientImpl::Local(service) => service.fetch_docs(request).await,
}
}
}