use std::time::SystemTime;
use hyper_util::rt::TokioIo;
use tokio::net::UnixStream;
use tonic::Request;
use tonic::metadata::MetadataValue;
use tonic::service::Interceptor;
use tonic::service::interceptor::InterceptedService;
use tonic::transport::{Channel, ClientTlsConfig, Endpoint, Uri};
use tower::service_fn;
use crate::env::{ENV_HOST_SERVICE_SOCKET, ENV_HOST_SERVICE_TOKEN};
use crate::generated::v1::{
self as pb, runtime_log_host_client::RuntimeLogHostClient as ProtoRuntimeLogHostClient,
};
type RuntimeLogHostTransport = InterceptedService<Channel, RelayTokenInterceptor>;
pub const ENV_RUNTIME_SESSION_ID: &str = "GESTALT_RUNTIME_SESSION_ID";
const RUNTIME_LOG_RELAY_TOKEN_HEADER: &str = "x-gestalt-host-service-relay-token";
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
#[repr(i32)]
pub enum RuntimeLogStream {
#[default]
Unspecified = 0,
Stdout = 1,
Stderr = 2,
Runtime = 3,
}
impl RuntimeLogStream {
const fn as_i32(self) -> i32 {
self as i32
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RuntimeLogEntry {
pub stream: RuntimeLogStream,
pub message: String,
pub observed_at: SystemTime,
pub source_seq: i64,
}
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct AppendRuntimeLogsRequest {
pub session_id: String,
pub logs: Vec<RuntimeLogEntry>,
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
pub struct AppendRuntimeLogsResponse {
pub last_seq: i64,
}
fn append_logs_request_to_proto(request: AppendRuntimeLogsRequest) -> pb::AppendRuntimeLogsRequest {
pb::AppendRuntimeLogsRequest {
session_id: request.session_id,
logs: request
.logs
.into_iter()
.map(|entry| pb::RuntimeLogEntry {
stream: entry.stream.as_i32(),
message: entry.message,
observed_at: Some(crate::protocol::timestamp_from_system_time(
entry.observed_at,
)),
source_seq: entry.source_seq,
})
.collect(),
}
}
fn append_logs_response_from_proto(
response: pb::AppendRuntimeLogsResponse,
) -> AppendRuntimeLogsResponse {
AppendRuntimeLogsResponse {
last_seq: response.last_seq,
}
}
#[derive(Debug, thiserror::Error)]
pub enum RuntimeLogHostError {
#[error("{0}")]
Transport(#[from] tonic::transport::Error),
#[error("{0}")]
Status(#[from] tonic::Status),
#[error("{0}")]
Env(String),
}
pub struct RuntimeLogHost {
client: ProtoRuntimeLogHostClient<RuntimeLogHostTransport>,
source_seq: i64,
}
impl RuntimeLogHost {
pub async fn connect() -> std::result::Result<Self, RuntimeLogHostError> {
let socket_path = std::env::var(ENV_HOST_SERVICE_SOCKET).map_err(|_| {
RuntimeLogHostError::Env(format!("{ENV_HOST_SERVICE_SOCKET} is not set"))
})?;
let relay_token = std::env::var(ENV_HOST_SERVICE_TOKEN).unwrap_or_default();
let channel = match parse_runtime_log_host_target(&socket_path)? {
RuntimeLogHostTarget::Unix(path) => {
Endpoint::try_from("http://[::]:50051")?
.connect_with_connector(service_fn(move |_: Uri| {
let path = path.clone();
async move { UnixStream::connect(path).await.map(TokioIo::new) }
}))
.await?
}
RuntimeLogHostTarget::Tcp(address) => {
Endpoint::from_shared(format!("http://{address}"))?
.connect()
.await?
}
RuntimeLogHostTarget::Tls(address) => {
Endpoint::from_shared(format!("https://{address}"))?
.tls_config(ClientTlsConfig::new().with_native_roots())?
.connect()
.await?
}
};
Ok(Self {
client: ProtoRuntimeLogHostClient::with_interceptor(
channel,
relay_token_interceptor(relay_token.trim())?,
),
source_seq: 0,
})
}
pub async fn append_logs(
&mut self,
request: AppendRuntimeLogsRequest,
) -> std::result::Result<AppendRuntimeLogsResponse, RuntimeLogHostError> {
Ok(append_logs_response_from_proto(
self.client
.append_logs(append_logs_request_to_proto(request))
.await?
.into_inner(),
))
}
pub async fn append(
&mut self,
session_id: impl Into<String>,
stream: RuntimeLogStream,
message: impl Into<String>,
) -> std::result::Result<AppendRuntimeLogsResponse, RuntimeLogHostError> {
self.source_seq += 1;
let source_seq = self.source_seq;
self.append_entry(session_id, stream, message, None, source_seq)
.await
}
pub async fn append_current(
&mut self,
stream: RuntimeLogStream,
message: impl Into<String>,
) -> std::result::Result<AppendRuntimeLogsResponse, RuntimeLogHostError> {
self.append(runtime_session_id()?, stream, message).await
}
pub async fn append_entry(
&mut self,
session_id: impl Into<String>,
stream: RuntimeLogStream,
message: impl Into<String>,
observed_at: Option<SystemTime>,
source_seq: i64,
) -> std::result::Result<AppendRuntimeLogsResponse, RuntimeLogHostError> {
self.source_seq = self.source_seq.max(source_seq);
self.append_logs(AppendRuntimeLogsRequest {
session_id: session_id.into(),
logs: vec![RuntimeLogEntry {
stream,
message: message.into(),
observed_at: observed_at.unwrap_or_else(SystemTime::now),
source_seq,
}],
})
.await
}
pub async fn append_current_entry(
&mut self,
stream: RuntimeLogStream,
message: impl Into<String>,
observed_at: Option<SystemTime>,
source_seq: i64,
) -> std::result::Result<AppendRuntimeLogsResponse, RuntimeLogHostError> {
self.append_entry(
runtime_session_id()?,
stream,
message,
observed_at,
source_seq,
)
.await
}
pub async fn append_stdout(
&mut self,
session_id: impl Into<String>,
message: impl Into<String>,
) -> std::result::Result<AppendRuntimeLogsResponse, RuntimeLogHostError> {
self.append(session_id, RuntimeLogStream::Stdout, message)
.await
}
pub async fn append_current_stdout(
&mut self,
message: impl Into<String>,
) -> std::result::Result<AppendRuntimeLogsResponse, RuntimeLogHostError> {
self.append_current(RuntimeLogStream::Stdout, message).await
}
pub async fn append_stderr(
&mut self,
session_id: impl Into<String>,
message: impl Into<String>,
) -> std::result::Result<AppendRuntimeLogsResponse, RuntimeLogHostError> {
self.append(session_id, RuntimeLogStream::Stderr, message)
.await
}
pub async fn append_current_stderr(
&mut self,
message: impl Into<String>,
) -> std::result::Result<AppendRuntimeLogsResponse, RuntimeLogHostError> {
self.append_current(RuntimeLogStream::Stderr, message).await
}
pub async fn append_runtime(
&mut self,
session_id: impl Into<String>,
message: impl Into<String>,
) -> std::result::Result<AppendRuntimeLogsResponse, RuntimeLogHostError> {
self.append(session_id, RuntimeLogStream::Runtime, message)
.await
}
pub async fn append_current_runtime(
&mut self,
message: impl Into<String>,
) -> std::result::Result<AppendRuntimeLogsResponse, RuntimeLogHostError> {
self.append_current(RuntimeLogStream::Runtime, message)
.await
}
}
#[derive(Clone)]
struct RelayTokenInterceptor {
token: Option<MetadataValue<tonic::metadata::Ascii>>,
}
impl Interceptor for RelayTokenInterceptor {
fn call(
&mut self,
mut request: Request<()>,
) -> std::result::Result<Request<()>, tonic::Status> {
if let Some(token) = self.token.clone() {
request
.metadata_mut()
.insert(RUNTIME_LOG_RELAY_TOKEN_HEADER, token);
}
Ok(request)
}
}
fn relay_token_interceptor(
token: &str,
) -> std::result::Result<RelayTokenInterceptor, RuntimeLogHostError> {
let trimmed = token.trim();
let token = if trimmed.is_empty() {
None
} else {
Some(MetadataValue::try_from(trimmed).map_err(|err| {
RuntimeLogHostError::Env(format!(
"runtime log host: invalid relay token metadata: {err}"
))
})?)
};
Ok(RelayTokenInterceptor { token })
}
enum RuntimeLogHostTarget {
Unix(String),
Tcp(String),
Tls(String),
}
fn parse_runtime_log_host_target(
raw: &str,
) -> std::result::Result<RuntimeLogHostTarget, RuntimeLogHostError> {
let target = raw.trim();
if target.is_empty() {
return Err(RuntimeLogHostError::Env(
"runtime log host: transport target is required".to_string(),
));
}
if let Some(address) = target.strip_prefix("tcp://") {
let address = address.trim();
if address.is_empty() {
return Err(RuntimeLogHostError::Env(format!(
"runtime log host: tcp target {raw:?} is missing host:port"
)));
}
return Ok(RuntimeLogHostTarget::Tcp(address.to_string()));
}
if let Some(address) = target.strip_prefix("tls://") {
let address = address.trim();
if address.is_empty() {
return Err(RuntimeLogHostError::Env(format!(
"runtime log host: tls target {raw:?} is missing host:port"
)));
}
return Ok(RuntimeLogHostTarget::Tls(address.to_string()));
}
if let Some(path) = target.strip_prefix("unix://") {
let path = path.trim();
if path.is_empty() {
return Err(RuntimeLogHostError::Env(format!(
"runtime log host: unix target {raw:?} is missing a socket path"
)));
}
return Ok(RuntimeLogHostTarget::Unix(path.to_string()));
}
if target.contains("://") {
return Err(RuntimeLogHostError::Env(format!(
"runtime log host: unsupported target scheme in {raw:?}"
)));
}
Ok(RuntimeLogHostTarget::Unix(target.to_string()))
}
pub fn runtime_session_id() -> std::result::Result<String, RuntimeLogHostError> {
let session_id = std::env::var(ENV_RUNTIME_SESSION_ID)
.unwrap_or_default()
.trim()
.to_string();
if session_id.is_empty() {
return Err(RuntimeLogHostError::Env(format!(
"runtime session: {ENV_RUNTIME_SESSION_ID} is not set"
)));
}
Ok(session_id)
}