use std::sync::Arc;
use nameth::NamedEnumValues as _;
use nameth::nameth;
use scopeguard::defer;
use terrazzo::http::StatusCode;
use tonic::body::Body as BoxBody;
use tonic::client::GrpcService;
use tonic::codegen::Bytes;
use tonic::codegen::StdError;
use tonic::transport::Body;
use tracing::Instrument as _;
use tracing::debug;
use tracing::info_span;
use trz_gateway_common::http_error::IsHttpError;
use crate::backend::Server;
use crate::backend::client_service::convert::Impossible;
use crate::backend::client_service::routing::DistributedCallback;
use crate::backend::client_service::routing::DistributedCallbackError;
use crate::backend::protos::terrazzo::shared::ClientAddress;
use crate::backend::protos::terrazzo::shared::Empty;
use crate::backend::protos::terrazzo::terminal::AckRequest;
use crate::backend::protos::terrazzo::terminal::terminal_service_client::TerminalServiceClient;
use crate::backend::throttling_stream;
pub fn ack(
server: &Arc<Server>,
client_address: &[impl AsRef<str>],
request: AckRequest,
) -> impl Future<Output = Result<(), AckError>> {
let terminal_id = request
.terminal
.as_ref()
.map(|t| t.terminal_id.as_str())
.unwrap_or_default();
let span = info_span!("Ack", %terminal_id);
async move {
debug!("Start");
defer!(debug!("Done"));
Ok(AckCallback::process(server, client_address, request).await?)
}
.instrument(span)
}
struct AckCallback;
impl DistributedCallback for AckCallback {
type Request = AckRequest;
type Response = ();
type LocalError = Impossible;
type RemoteError = tonic::Status;
async fn local(_: Option<&Arc<Server>>, request: AckRequest) -> Result<(), Impossible> {
let terminal_id = request.terminal.unwrap_or_default().terminal_id.into();
throttling_stream::ack(&terminal_id, request.ack as usize);
Ok(())
}
async fn remote<T>(
channel: T,
client_address: &[impl AsRef<str>],
mut request: AckRequest,
) -> Result<(), tonic::Status>
where
T: GrpcService<BoxBody>,
T::Error: Into<StdError>,
T::ResponseBody: Body<Data = Bytes> + Send + 'static,
<T::ResponseBody as Body>::Error: Into<StdError> + Send,
{
request.terminal.get_or_insert_default().via = Some(ClientAddress::of(client_address));
let mut client = TerminalServiceClient::new(channel);
let Empty {} = client.ack(request).await?.into_inner();
Ok(())
}
}
#[nameth]
#[derive(thiserror::Error, Debug)]
pub enum AckError {
#[error("[{n}] {0}", n = self.name())]
AckError(#[from] DistributedCallbackError<Impossible, tonic::Status>),
}
impl IsHttpError for AckError {
fn status_code(&self) -> StatusCode {
match self {
Self::AckError(error) => error.status_code(),
}
}
}
impl From<AckError> for tonic::Status {
fn from(error: AckError) -> Self {
match error {
AckError::AckError(error) => error.into(),
}
}
}