terrazzo-terminal 0.2.7

A simple web-based terminal emulator built on Terrazzo.
use std::sync::Arc;

use nameth::NamedEnumValues as _;
use nameth::nameth;
use scopeguard::defer;
use terrazzo::http::StatusCode;
use terrazzo_pty::OpenProcessError;
use terrazzo_pty::ProcessIO;
use tonic::body::Body as BoxBody;
use tonic::client::GrpcService;
use tonic::codegen::Bytes;
use tonic::codegen::StdError;
use tonic::transport::Body;
use tracing::Instrument as _;
use tracing::info;
use tracing::info_span;
use trz_gateway_common::http_error::IsHttpError;
use trz_gateway_common::id::ClientName;

use crate::api::shared::terminal_schema::RegisterTerminalMode;
use crate::api::shared::terminal_schema::STREAMING_WINDOW_SIZE;
use crate::api::shared::terminal_schema::TerminalDef;
use crate::backend::Server;
use crate::backend::client_service::routing::DistributedCallback;
use crate::backend::client_service::routing::DistributedCallbackError;
use crate::backend::protos::terrazzo::shared::ClientAddress;
use crate::backend::protos::terrazzo::terminal::RegisterTerminalRequest;
use crate::backend::protos::terrazzo::terminal::terminal_service_client::TerminalServiceClient;
use crate::backend::throttling_stream::ThrottleProcessOutput;
use crate::processes;
use crate::processes::io::HybridReader;

pub async fn register(
    my_client_name: Option<ClientName>,
    server: &Arc<Server>,
    mut request: RegisterTerminalRequest,
) -> Result<HybridReader, RegisterStreamError> {
    let terminal_def = request.def.get_or_insert_default();
    let client_address = terminal_def.client_address().to_vec();
    async move {
        info!("Start");
        defer!(info!("Done"));
        let stream =
            RegisterCallback::process(server, &client_address, (my_client_name, request)).await;
        let stream = stream.map_err(|error| error.map_local(Box::new))?;
        Ok(stream)
    }
    .instrument(info_span!("Register"))
    .await
}

struct RegisterCallback;

impl DistributedCallback for RegisterCallback {
    type Request = (Option<ClientName>, RegisterTerminalRequest);
    type Response = HybridReader;
    type LocalError = RegisterStreamError;
    type RemoteError = tonic::Status;

    async fn local(
        server: Option<&Arc<Server>>,
        (my_client_name, request): (Option<ClientName>, RegisterTerminalRequest),
    ) -> Result<HybridReader, RegisterStreamError> {
        let server =
            server.ok_or_else(|| RegisterStreamError::Grpc(tonic::Status::internal("server")))?;
        let mode = request.mode().try_into()?;
        let def = request
            .def
            .ok_or_else(|| tonic::Status::invalid_argument("def"))?;
        let def = TerminalDef::from(def);
        let terminal_id = def.address.id.clone();
        let stream = processes::stream::open_stream(
            server,
            def,
            mode == RegisterTerminalMode::Create,
            |_| async move {
                match mode {
                    RegisterTerminalMode::Create => {
                        let terminal_shell = server
                            .config()
                            .server
                            .with(|server_config| server_config.terminal_shell.clone());
                        ProcessIO::open(
                            my_client_name.map(|s| s.to_string()),
                            STREAMING_WINDOW_SIZE,
                            terminal_shell,
                        )
                        .await
                    }
                    RegisterTerminalMode::Reopen => Err(OpenProcessError::NotFound),
                }
            },
        )
        .await;
        let stream = stream.map_err(|error| tonic::Status::internal(error.to_string()))?;
        let stream = ThrottleProcessOutput::new(terminal_id, stream);
        return Ok(HybridReader::Local(stream));
    }

    async fn remote<T>(
        channel: T,
        client_address: &[impl AsRef<str>],
        (_, mut request): (Option<ClientName>, RegisterTerminalRequest),
    ) -> Result<HybridReader, tonic::Status>
    where
        T: GrpcService<BoxBody>,
        T::Error: Into<StdError>,
        T::ResponseBody: Body<Data = Bytes> + Send + 'static,
        <T::ResponseBody as Body>::Error: Into<StdError> + Send,
    {
        let def = request.def.as_mut();
        let def = def.ok_or_else(|| tonic::Status::invalid_argument("def"))?;
        let address = def.address.get_or_insert_default();
        address.via = Some(ClientAddress::of(client_address));
        let mut client = TerminalServiceClient::new(channel);
        let stream = client.register(request).await?.into_inner();
        Ok(HybridReader::Remote(Box::new(stream)))
    }
}

#[nameth]
#[derive(thiserror::Error, Debug)]
pub enum RegisterStreamError {
    #[error("[{n}] {0}", n = self.name())]
    RegisterStreamError(#[from] DistributedCallbackError<Box<RegisterStreamError>, tonic::Status>),

    #[error("[{n}] {0}", n = self.name())]
    Grpc(#[from] tonic::Status),
}

impl IsHttpError for RegisterStreamError {
    fn status_code(&self) -> StatusCode {
        match self {
            Self::RegisterStreamError(error) => error.status_code(),
            Self::Grpc(error) => error.status_code(),
        }
    }
}

impl From<RegisterStreamError> for tonic::Status {
    fn from(error: RegisterStreamError) -> Self {
        match error {
            RegisterStreamError::RegisterStreamError(error) => error.into(),
            RegisterStreamError::Grpc(status) => status,
        }
    }
}

impl From<Box<RegisterStreamError>> for tonic::Status {
    fn from(error: Box<RegisterStreamError>) -> Self {
        tonic::Status::from(*error)
    }
}